mithril_client/
feedback.rs

1//! # Long task feedbacks
2//!
3//! Even with fast computer and network, some tasks can take more than a few
4//! seconds to run (or even more than an hour for a snapshot download).
5//!
6//! Those tasks are:
7//! - Snapshot download
8//! - Certificate chain validation
9//!
10//! In order to have feedbacks for those tasks, a mechanism is available.
11//!
12//! Define your feedback receiver and implement the [FeedbackReceiver] trait to receive
13//! [events][MithrilEvent] with the [`handle_event`][FeedbackReceiver::handle_event] method.
14//! Then pass an instance of your receiver when building your `Client` using
15//! [`ClientBuilder::add_feedback_receiver`][crate::ClientBuilder::add_feedback_receiver] method.
16//!
17//! # Example
18//!
19//! Using the provided [SlogFeedbackReceiver] to log the events using a [slog] logger.
20//!
21//! ```no_run
22//! use std::sync::Arc;
23//! # async fn run() -> mithril_client::MithrilResult<()> {
24//! use mithril_client::{ClientBuilder, MessageBuilder, feedback::SlogFeedbackReceiver};
25//!
26//! let client = ClientBuilder::aggregator("YOUR_AGGREGATOR_ENDPOINT", "YOUR_GENESIS_VERIFICATION_KEY")
27//!     .add_feedback_receiver(Arc::new(SlogFeedbackReceiver::new(build_logger())))
28//!     .build()?;
29//!
30//! let _ = client.certificate().verify_chain("CERTIFICATE_HASH").await?;
31//! #
32//! #    Ok(())
33//! # }
34//!
35//! pub fn build_logger() -> slog::Logger {
36//!   use slog::Drain;
37//!   let decorator = slog_term::TermDecorator::new().build();
38//!   let drain = slog_term::FullFormat::new(decorator).build().fuse();
39//!   let drain = slog_async::Async::new(drain).build().fuse();
40//!
41//!   slog::Logger::root(Arc::new(drain), slog::o!())
42//! }
43//! ```
44//!
45//! Running this code should yield the following logs (example run on _pre-release-preview_):
46//!
47//! ```shell
48//! Nov 08 14:41:40.436 INFO Certificate chain validation started, certificate_chain_validation_id: ab623989-b0ac-4031-8522-1370958bbb4e
49//! Nov 08 14:41:40.626 INFO Certificate validated, certificate_chain_validation_id: ab623989-b0ac-4031-8522-1370958bbb4e, certificate_hash: dd4d4299cfb817b5ee5987c3de7cf5f13bdcda69c968ef087effd550470dc081
50//! Nov 08 14:42:05.477 INFO Certificate validated, certificate_chain_validation_id: ab623989-b0ac-4031-8522-1370958bbb4e, certificate_hash: 660b3d426a95303254bb255a56bed443616ea63c4d721ea77433b920d7ebdf62
51//! Nov 08 14:42:05.477 INFO Certificate chain validated, certificate_chain_validation_id: ab623989-b0ac-4031-8522-1370958bbb4e
52//! ```
53
54use async_trait::async_trait;
55use mithril_common::entities::ImmutableFileNumber;
56use serde::Serialize;
57use slog::{Logger, info};
58use std::sync::{Arc, RwLock};
59use strum::Display;
60use uuid::Uuid;
61
62/// Event that can be reported by a [FeedbackReceiver] for Cardano database related events.
63#[derive(Debug, Clone, Eq, PartialEq, Display, Serialize)]
64#[strum(serialize_all = "PascalCase")]
65#[serde(untagged)]
66pub enum MithrilEventCardanoDatabase {
67    /// Cardano Database download sequence started
68    Started {
69        /// Unique identifier used to track a cardano database download
70        download_id: String,
71        /// Total number of immutable files
72        total_immutable_files: u64,
73        /// Total number of ancillary files
74        include_ancillary: bool,
75    },
76    /// Cardano Database download sequence completed
77    Completed {
78        /// Unique identifier used to track a cardano database download
79        download_id: String,
80    },
81    /// An immutable archive file download has started
82    ImmutableDownloadStarted {
83        /// Immutable file number downloaded
84        immutable_file_number: ImmutableFileNumber,
85        /// Unique identifier used to track a cardano database download
86        download_id: String,
87        /// Size of the downloaded archive
88        size: u64,
89    },
90    /// An immutable archive file download is in progress
91    ImmutableDownloadProgress {
92        /// Immutable file number downloaded
93        immutable_file_number: ImmutableFileNumber,
94        /// Unique identifier used to track a cardano database download
95        download_id: String,
96        /// Number of bytes that have been downloaded
97        downloaded_bytes: u64,
98        /// Size of the downloaded archive
99        size: u64,
100    },
101    /// An immutable archive file download has completed
102    ImmutableDownloadCompleted {
103        /// Immutable file number downloaded
104        immutable_file_number: ImmutableFileNumber,
105        /// Unique identifier used to track a cardano database download
106        download_id: String,
107    },
108    /// An ancillary archive file download has started
109    AncillaryDownloadStarted {
110        /// Unique identifier used to track a cardano database download
111        download_id: String,
112        /// Size of the downloaded archive
113        size: u64,
114    },
115    /// An ancillary archive file download is in progress
116    AncillaryDownloadProgress {
117        /// Unique identifier used to track a cardano database download
118        download_id: String,
119        /// Number of bytes that have been downloaded
120        downloaded_bytes: u64,
121        /// Size of the downloaded archive
122        size: u64,
123    },
124    /// An ancillary archive file download has completed
125    AncillaryDownloadCompleted {
126        /// Unique identifier used to track a cardano database download
127        download_id: String,
128    },
129    /// A digest file download has started
130    DigestDownloadStarted {
131        /// Unique identifier used to track a cardano database download
132        download_id: String,
133        /// Size of the downloaded archive
134        size: u64,
135    },
136    /// A digest file download is in progress
137    DigestDownloadProgress {
138        /// Unique identifier used to track a cardano database download
139        download_id: String,
140        /// Number of bytes that have been downloaded
141        downloaded_bytes: u64,
142        /// Size of the downloaded archive
143        size: u64,
144    },
145    /// A digest file download has completed
146    DigestDownloadCompleted {
147        /// Unique identifier used to track a cardano database download
148        download_id: String,
149    },
150}
151
152/// Event that can be reported by a [FeedbackReceiver].
153#[derive(Debug, Clone, Eq, PartialEq, Display, Serialize)]
154#[strum(serialize_all = "PascalCase")]
155#[serde(untagged)]
156pub enum MithrilEvent {
157    /// A snapshot download has started
158    SnapshotDownloadStarted {
159        /// Digest of the downloaded snapshot
160        digest: String,
161        /// Unique identifier used to track this specific snapshot download
162        download_id: String,
163        /// Size of the downloaded archive
164        size: u64,
165    },
166    /// A snapshot download is in progress
167    SnapshotDownloadProgress {
168        /// Unique identifier used to track this specific snapshot download
169        download_id: String,
170        /// Number of bytes that have been downloaded
171        downloaded_bytes: u64,
172        /// Size of the downloaded archive
173        size: u64,
174    },
175    /// A snapshot download has completed
176    SnapshotDownloadCompleted {
177        /// Unique identifier used to track this specific snapshot download
178        download_id: String,
179    },
180    /// A snapshot ancillary download has started
181    SnapshotAncillaryDownloadStarted {
182        /// Unique identifier used to track this specific snapshot ancillary download
183        download_id: String,
184        /// Size of the downloaded archive
185        size: u64,
186    },
187    /// A snapshot ancillary download is in progress
188    SnapshotAncillaryDownloadProgress {
189        /// Unique identifier used to track this specific snapshot ancillary download
190        download_id: String,
191        /// Number of bytes that have been downloaded
192        downloaded_bytes: u64,
193        /// Size of the downloaded archive
194        size: u64,
195    },
196    /// A snapshot ancillary download has completed
197    SnapshotAncillaryDownloadCompleted {
198        /// Unique identifier used to track this specific snapshot ancillary download
199        download_id: String,
200    },
201
202    /// Cardano database related events
203    CardanoDatabase(MithrilEventCardanoDatabase),
204
205    /// A certificate chain validation has started
206    CertificateChainValidationStarted {
207        /// Unique identifier used to track this specific certificate chain validation
208        certificate_chain_validation_id: String,
209    },
210    /// An individual certificate of a chain have been validated.
211    CertificateValidated {
212        /// Unique identifier used to track this specific certificate chain validation
213        certificate_chain_validation_id: String,
214        /// The validated certificate hash
215        certificate_hash: String,
216    },
217    /// An individual certificate of a chain have been fetched from the cache.
218    CertificateFetchedFromCache {
219        /// Unique identifier used to track this specific certificate chain validation
220        certificate_chain_validation_id: String,
221        /// The fetched certificate hash
222        certificate_hash: String,
223    },
224    /// The whole certificate chain is valid.
225    CertificateChainValidated {
226        /// Unique identifier used to track this specific certificate chain validation
227        certificate_chain_validation_id: String,
228    },
229}
230
231impl MithrilEvent {
232    /// Generate a random unique identifier to identify a snapshot download
233    pub fn new_snapshot_download_id() -> String {
234        Uuid::new_v4().to_string()
235    }
236
237    /// Generate a random unique identifier to identify a Cardano download
238    pub fn new_cardano_database_download_id() -> String {
239        Uuid::new_v4().to_string()
240    }
241
242    /// Generate a random unique identifier to identify a certificate chain validation
243    pub fn new_certificate_chain_validation_id() -> String {
244        Uuid::new_v4().to_string()
245    }
246
247    #[cfg(test)]
248    pub(crate) fn event_id(&self) -> &str {
249        match self {
250            MithrilEvent::SnapshotDownloadStarted { download_id, .. } => download_id,
251            MithrilEvent::SnapshotDownloadProgress { download_id, .. } => download_id,
252            MithrilEvent::SnapshotDownloadCompleted { download_id } => download_id,
253            MithrilEvent::SnapshotAncillaryDownloadStarted { download_id, .. } => download_id,
254            MithrilEvent::SnapshotAncillaryDownloadProgress { download_id, .. } => download_id,
255            MithrilEvent::SnapshotAncillaryDownloadCompleted { download_id } => download_id,
256            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Started {
257                download_id,
258                ..
259            }) => download_id,
260            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Completed {
261                download_id,
262                ..
263            }) => download_id,
264            MithrilEvent::CardanoDatabase(
265                MithrilEventCardanoDatabase::ImmutableDownloadStarted { download_id, .. },
266            ) => download_id,
267            MithrilEvent::CardanoDatabase(
268                MithrilEventCardanoDatabase::ImmutableDownloadProgress { download_id, .. },
269            ) => download_id,
270            MithrilEvent::CardanoDatabase(
271                MithrilEventCardanoDatabase::ImmutableDownloadCompleted { download_id, .. },
272            ) => download_id,
273            MithrilEvent::CardanoDatabase(
274                MithrilEventCardanoDatabase::AncillaryDownloadStarted { download_id, .. },
275            ) => download_id,
276            MithrilEvent::CardanoDatabase(
277                MithrilEventCardanoDatabase::AncillaryDownloadProgress { download_id, .. },
278            ) => download_id,
279            MithrilEvent::CardanoDatabase(
280                MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id, .. },
281            ) => download_id,
282            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::DigestDownloadStarted {
283                download_id,
284                ..
285            }) => download_id,
286            MithrilEvent::CardanoDatabase(
287                MithrilEventCardanoDatabase::DigestDownloadProgress { download_id, .. },
288            ) => download_id,
289            MithrilEvent::CardanoDatabase(
290                MithrilEventCardanoDatabase::DigestDownloadCompleted { download_id, .. },
291            ) => download_id,
292            MithrilEvent::CertificateChainValidationStarted {
293                certificate_chain_validation_id,
294            } => certificate_chain_validation_id,
295            MithrilEvent::CertificateValidated {
296                certificate_chain_validation_id,
297                ..
298            } => certificate_chain_validation_id,
299            MithrilEvent::CertificateFetchedFromCache {
300                certificate_chain_validation_id,
301                ..
302            } => certificate_chain_validation_id,
303            MithrilEvent::CertificateChainValidated {
304                certificate_chain_validation_id,
305            } => certificate_chain_validation_id,
306        }
307    }
308}
309
310/// A sender of [MithrilEvent].
311///
312/// It uses Arc internally so it can be cloned at will.
313#[derive(Clone)]
314pub struct FeedbackSender {
315    receivers: Vec<Arc<dyn FeedbackReceiver>>,
316}
317
318impl FeedbackSender {
319    /// Create a new [FeedbackSender].
320    pub fn new(receivers: &[Arc<dyn FeedbackReceiver>]) -> FeedbackSender {
321        Self {
322            receivers: receivers.to_vec(),
323        }
324    }
325
326    /// Send the given event to the known receivers.
327    pub async fn send_event(&self, event: MithrilEvent) {
328        for receiver in &self.receivers {
329            receiver.handle_event(event.clone()).await;
330        }
331    }
332}
333
334/// A receiver of [MithrilEvent].
335#[cfg_attr(target_family = "wasm", async_trait(?Send))]
336#[cfg_attr(not(target_family = "wasm"), async_trait)]
337pub trait FeedbackReceiver: Sync + Send {
338    /// Callback called by a [FeedbackSender] when it needs to send an [event][MithrilEvent].
339    async fn handle_event(&self, event: MithrilEvent);
340}
341
342/// A [FeedbackReceiver] that writes the event it receives in a [slog logger][Logger].
343pub struct SlogFeedbackReceiver {
344    logger: Logger,
345}
346
347impl SlogFeedbackReceiver {
348    /// Create a new [SlogFeedbackReceiver].
349    pub fn new(logger: Logger) -> SlogFeedbackReceiver {
350        Self { logger }
351    }
352}
353
354#[cfg_attr(target_family = "wasm", async_trait(?Send))]
355#[cfg_attr(not(target_family = "wasm"), async_trait)]
356impl FeedbackReceiver for SlogFeedbackReceiver {
357    async fn handle_event(&self, event: MithrilEvent) {
358        match event {
359            MithrilEvent::SnapshotDownloadStarted {
360                digest,
361                download_id,
362                size,
363            } => {
364                info!(
365                    self.logger, "Snapshot download started";
366                    "size" => size, "digest" => digest, "download_id" => download_id,
367                );
368            }
369            MithrilEvent::SnapshotDownloadProgress {
370                download_id,
371                downloaded_bytes,
372                size,
373            } => {
374                info!(
375                    self.logger, "Snapshot download in progress ...";
376                    "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
377                );
378            }
379            MithrilEvent::SnapshotDownloadCompleted { download_id } => {
380                info!(self.logger, "Snapshot download completed"; "download_id" => download_id);
381            }
382            MithrilEvent::SnapshotAncillaryDownloadStarted { download_id, size } => {
383                info!(
384                    self.logger, "Snapshot ancillary download started";
385                    "size" => size, "download_id" => download_id,
386                );
387            }
388            MithrilEvent::SnapshotAncillaryDownloadProgress {
389                download_id,
390                downloaded_bytes,
391                size,
392            } => {
393                info!(
394                    self.logger, "Snapshot ancillary download in progress ...";
395                    "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
396                );
397            }
398            MithrilEvent::SnapshotAncillaryDownloadCompleted { download_id } => {
399                info!(self.logger, "Snapshot ancillary download completed"; "download_id" => download_id);
400            }
401            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Started {
402                download_id,
403                total_immutable_files,
404                include_ancillary,
405            }) => {
406                info!(
407                    self.logger, "Cardano database download started"; "download_id" => download_id, "total_immutable_files" => total_immutable_files, "include_ancillary" => include_ancillary,
408                );
409            }
410            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Completed {
411                download_id,
412            }) => {
413                info!(
414                    self.logger, "Cardano database download completed"; "download_id" => download_id,
415                );
416            }
417            MithrilEvent::CardanoDatabase(
418                MithrilEventCardanoDatabase::ImmutableDownloadStarted {
419                    immutable_file_number,
420                    download_id,
421                    size,
422                },
423            ) => {
424                info!(
425                    self.logger, "Immutable download started";
426                    "immutable_file_number" => immutable_file_number, "download_id" => download_id, "size" => size
427                );
428            }
429            MithrilEvent::CardanoDatabase(
430                MithrilEventCardanoDatabase::ImmutableDownloadProgress {
431                    immutable_file_number,
432                    download_id,
433                    downloaded_bytes,
434                    size,
435                },
436            ) => {
437                info!(
438                    self.logger, "Immutable download in progress ...";
439                    "immutable_file_number" => immutable_file_number, "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
440                );
441            }
442            MithrilEvent::CardanoDatabase(
443                MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
444                    immutable_file_number,
445                    download_id,
446                },
447            ) => {
448                info!(self.logger, "Immutable download completed"; "immutable_file_number" => immutable_file_number, "download_id" => download_id);
449            }
450            MithrilEvent::CardanoDatabase(
451                MithrilEventCardanoDatabase::AncillaryDownloadStarted { download_id, size },
452            ) => {
453                info!(
454                    self.logger, "Ancillary download started";
455                    "download_id" => download_id,
456                    "size" => size,
457                );
458            }
459            MithrilEvent::CardanoDatabase(
460                MithrilEventCardanoDatabase::AncillaryDownloadProgress {
461                    download_id,
462                    downloaded_bytes,
463                    size,
464                },
465            ) => {
466                info!(
467                    self.logger, "Ancillary download in progress ...";
468                    "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
469                );
470            }
471            MithrilEvent::CardanoDatabase(
472                MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id },
473            ) => {
474                info!(self.logger, "Ancillary download completed"; "download_id" => download_id);
475            }
476            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::DigestDownloadStarted {
477                download_id,
478                size,
479            }) => {
480                info!(
481                    self.logger, "Digest download started";
482                    "download_id" => download_id,
483                    "size" => size,
484                );
485            }
486            MithrilEvent::CardanoDatabase(
487                MithrilEventCardanoDatabase::DigestDownloadProgress {
488                    download_id,
489                    downloaded_bytes,
490                    size,
491                },
492            ) => {
493                info!(
494                    self.logger, "Digest download in progress ...";
495                    "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
496                );
497            }
498            MithrilEvent::CardanoDatabase(
499                MithrilEventCardanoDatabase::DigestDownloadCompleted { download_id },
500            ) => {
501                info!(self.logger, "Digest download completed"; "download_id" => download_id);
502            }
503            MithrilEvent::CertificateChainValidationStarted {
504                certificate_chain_validation_id,
505            } => {
506                info!(
507                    self.logger, "Certificate chain validation started";
508                    "certificate_chain_validation_id" => certificate_chain_validation_id,
509                );
510            }
511            MithrilEvent::CertificateValidated {
512                certificate_hash,
513                certificate_chain_validation_id,
514            } => {
515                info!(
516                    self.logger, "Certificate validated";
517                    "certificate_hash" => certificate_hash,
518                    "certificate_chain_validation_id" => certificate_chain_validation_id,
519                );
520            }
521            MithrilEvent::CertificateFetchedFromCache {
522                certificate_hash,
523                certificate_chain_validation_id,
524            } => {
525                info!(
526                    self.logger, "Cached";
527                    "certificate_hash" => certificate_hash,
528                    "certificate_chain_validation_id" => certificate_chain_validation_id,
529                );
530            }
531            MithrilEvent::CertificateChainValidated {
532                certificate_chain_validation_id,
533            } => {
534                info!(
535                    self.logger, "Certificate chain validated";
536                    "certificate_chain_validation_id" => certificate_chain_validation_id,
537                );
538            }
539        };
540    }
541}
542
543/// A [FeedbackReceiver] that stacks the events that it receives in a vec.
544///
545/// Use it only for tests purpose.
546pub struct StackFeedbackReceiver {
547    stacked_events: RwLock<Vec<MithrilEvent>>,
548}
549
550impl StackFeedbackReceiver {
551    /// Create a new [StackFeedbackReceiver].
552    pub fn new() -> StackFeedbackReceiver {
553        Self {
554            stacked_events: RwLock::new(vec![]),
555        }
556    }
557
558    /// Returns a copy of the stored stacked events.
559    ///
560    /// Will crash if it can't access the stored events.
561    pub fn stacked_events(&self) -> Vec<MithrilEvent> {
562        let events = self.stacked_events.read().unwrap();
563        events.clone()
564    }
565}
566
567impl Default for StackFeedbackReceiver {
568    fn default() -> Self {
569        Self::new()
570    }
571}
572
573#[cfg_attr(target_family = "wasm", async_trait(?Send))]
574#[cfg_attr(not(target_family = "wasm"), async_trait)]
575impl FeedbackReceiver for StackFeedbackReceiver {
576    async fn handle_event(&self, event: MithrilEvent) {
577        let mut events = self.stacked_events.write().unwrap();
578        events.push(event);
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use crate::feedback::MithrilEvent::{SnapshotDownloadCompleted, SnapshotDownloadStarted};
586    use crate::test_utils::TestLogger;
587    use std::time::Duration;
588    use tokio::task::JoinSet;
589
590    #[tokio::test]
591    async fn send_event_same_thread() {
592        let receiver = Arc::new(StackFeedbackReceiver::new());
593        let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
594        let sender = FeedbackSender::new(&[receiver_clone]);
595
596        sender
597            .send_event(SnapshotDownloadStarted {
598                digest: "digest".to_string(),
599                download_id: "download_id".to_string(),
600                size: 10,
601            })
602            .await;
603        sender
604            .send_event(SnapshotDownloadCompleted {
605                download_id: "download_id".to_string(),
606            })
607            .await;
608
609        assert_eq!(
610            receiver.stacked_events(),
611            vec![
612                SnapshotDownloadStarted {
613                    digest: "digest".to_string(),
614                    download_id: "download_id".to_string(),
615                    size: 10
616                },
617                SnapshotDownloadCompleted {
618                    download_id: "download_id".to_string()
619                }
620            ]
621        );
622    }
623
624    #[tokio::test]
625    async fn send_event_multiple_thread() {
626        let receiver = Arc::new(StackFeedbackReceiver::new());
627        let sender = FeedbackSender::new(&[
628            receiver.clone(),
629            Arc::new(SlogFeedbackReceiver::new(TestLogger::stdout())),
630        ]);
631        let sender2 = sender.clone();
632        let mut join_set = JoinSet::new();
633
634        join_set.spawn(async move {
635            // Step 1:
636            sender
637                .send_event(SnapshotDownloadStarted {
638                    digest: "digest1".to_string(),
639                    download_id: "download1".to_string(),
640                    size: 1,
641                })
642                .await;
643            tokio::time::sleep(Duration::from_millis(2)).await;
644            // Step 3:
645            sender
646                .send_event(SnapshotDownloadCompleted {
647                    download_id: "download3".to_string(),
648                })
649                .await;
650            sender
651                .send_event(SnapshotDownloadStarted {
652                    digest: "digest2".to_string(),
653                    download_id: "download2".to_string(),
654                    size: 2,
655                })
656                .await;
657        });
658
659        join_set.spawn(async move {
660            // Step 2:
661            sender2
662                .send_event(SnapshotDownloadCompleted {
663                    download_id: "download1".to_string(),
664                })
665                .await;
666            sender2
667                .send_event(SnapshotDownloadStarted {
668                    digest: "digest3".to_string(),
669                    download_id: "download3".to_string(),
670                    size: 3,
671                })
672                .await;
673            tokio::time::sleep(Duration::from_millis(5)).await;
674            // Step 4:
675            sender2
676                .send_event(SnapshotDownloadCompleted {
677                    download_id: "download2".to_string(),
678                })
679                .await;
680        });
681
682        while let Some(res) = join_set.join_next().await {
683            res.unwrap();
684        }
685
686        assert_eq!(
687            receiver.stacked_events(),
688            vec![
689                SnapshotDownloadStarted {
690                    digest: "digest1".to_string(),
691                    download_id: "download1".to_string(),
692                    size: 1
693                },
694                SnapshotDownloadCompleted {
695                    download_id: "download1".to_string()
696                },
697                SnapshotDownloadStarted {
698                    digest: "digest3".to_string(),
699                    download_id: "download3".to_string(),
700                    size: 3
701                },
702                SnapshotDownloadCompleted {
703                    download_id: "download3".to_string()
704                },
705                SnapshotDownloadStarted {
706                    digest: "digest2".to_string(),
707                    download_id: "download2".to_string(),
708                    size: 2
709                },
710                SnapshotDownloadCompleted {
711                    download_id: "download2".to_string()
712                },
713            ]
714        );
715    }
716
717    #[tokio::test]
718    async fn send_event_in_one_thread_and_receive_in_another_thread() {
719        let receiver = Arc::new(StackFeedbackReceiver::new());
720        let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
721        let receiver2 = receiver.clone();
722        let sender = FeedbackSender::new(&[receiver_clone]);
723        let mut join_set = JoinSet::new();
724
725        join_set.spawn(async move {
726            // Step 1:
727            sender
728                .send_event(SnapshotDownloadStarted {
729                    digest: "digest1".to_string(),
730                    download_id: "download1".to_string(),
731                    size: 1,
732                })
733                .await;
734            tokio::time::sleep(Duration::from_millis(10)).await;
735
736            // Step 2:
737            sender
738                .send_event(SnapshotDownloadCompleted {
739                    download_id: "download1".to_string(),
740                })
741                .await;
742            sender
743                .send_event(SnapshotDownloadStarted {
744                    digest: "digest2".to_string(),
745                    download_id: "download2".to_string(),
746                    size: 2,
747                })
748                .await;
749            tokio::time::sleep(Duration::from_millis(10)).await;
750
751            // Step 3:
752            sender
753                .send_event(SnapshotDownloadCompleted {
754                    download_id: "download2".to_string(),
755                })
756                .await;
757            sender
758                .send_event(SnapshotDownloadStarted {
759                    digest: "digest3".to_string(),
760                    download_id: "download3".to_string(),
761                    size: 3,
762                })
763                .await;
764            tokio::time::sleep(Duration::from_millis(10)).await;
765
766            // Final step:
767            sender
768                .send_event(SnapshotDownloadCompleted {
769                    download_id: "download3".to_string(),
770                })
771                .await;
772        });
773
774        join_set.spawn(async move {
775            // Little sleep to wait for step 1 completion
776            tokio::time::sleep(Duration::from_millis(3)).await;
777            assert_eq!(
778                receiver2.stacked_events(),
779                vec![SnapshotDownloadStarted {
780                    digest: "digest1".to_string(),
781                    download_id: "download1".to_string(),
782                    size: 1
783                },]
784            );
785
786            // Wait for step 2 completion
787            tokio::time::sleep(Duration::from_millis(10)).await;
788            assert_eq!(
789                receiver2.stacked_events(),
790                vec![
791                    SnapshotDownloadStarted {
792                        digest: "digest1".to_string(),
793                        download_id: "download1".to_string(),
794                        size: 1
795                    },
796                    SnapshotDownloadCompleted {
797                        download_id: "download1".to_string()
798                    },
799                    SnapshotDownloadStarted {
800                        digest: "digest2".to_string(),
801                        download_id: "download2".to_string(),
802                        size: 2
803                    },
804                ]
805            );
806
807            // Wait for step 3 completion
808            tokio::time::sleep(Duration::from_millis(10)).await;
809            assert_eq!(
810                receiver2.stacked_events(),
811                vec![
812                    SnapshotDownloadStarted {
813                        digest: "digest1".to_string(),
814                        download_id: "download1".to_string(),
815                        size: 1
816                    },
817                    SnapshotDownloadCompleted {
818                        download_id: "download1".to_string()
819                    },
820                    SnapshotDownloadStarted {
821                        digest: "digest2".to_string(),
822                        download_id: "download2".to_string(),
823                        size: 2
824                    },
825                    SnapshotDownloadCompleted {
826                        download_id: "download2".to_string()
827                    },
828                    SnapshotDownloadStarted {
829                        digest: "digest3".to_string(),
830                        download_id: "download3".to_string(),
831                        size: 3
832                    },
833                ]
834            );
835        });
836
837        while let Some(res) = join_set.join_next().await {
838            res.unwrap();
839        }
840
841        assert_eq!(
842            receiver.stacked_events(),
843            vec![
844                SnapshotDownloadStarted {
845                    digest: "digest1".to_string(),
846                    download_id: "download1".to_string(),
847                    size: 1
848                },
849                SnapshotDownloadCompleted {
850                    download_id: "download1".to_string()
851                },
852                SnapshotDownloadStarted {
853                    digest: "digest2".to_string(),
854                    download_id: "download2".to_string(),
855                    size: 2
856                },
857                SnapshotDownloadCompleted {
858                    download_id: "download2".to_string()
859                },
860                SnapshotDownloadStarted {
861                    digest: "digest3".to_string(),
862                    download_id: "download3".to_string(),
863                    size: 3
864                },
865                SnapshotDownloadCompleted {
866                    download_id: "download3".to_string()
867                },
868            ]
869        );
870    }
871}