mithril_client/
feedback.rs

1//! # Long task feedbacks
2//!
3//! Even with fast computer and network, some tasks can take more than a few
4//! seconds to run (or even more than an hour for a snapshot download).
5//!
6//! Those tasks are:
7//! - Snapshot download
8//! - Certificate chain validation
9//!
10//! In order to have feedbacks for those tasks, a mechanism is available.
11//!
12//! Define your feedback receiver and implement the [FeedbackReceiver] trait to receive
13//! [events][MithrilEvent] with the [`handle_event`][FeedbackReceiver::handle_event] method.
14//! Then pass an instance of your receiver when building your `Client` using
15//! [`ClientBuilder::add_feedback_receiver`][crate::ClientBuilder::add_feedback_receiver] method.
16//!
17//! # Example
18//!
19//! Using the provided [SlogFeedbackReceiver] to log the events using a [slog] logger.
20//!
21//! ```no_run
22//! use std::sync::Arc;
23//! # async fn run() -> mithril_client::MithrilResult<()> {
24//! use mithril_client::{ClientBuilder, MessageBuilder, feedback::SlogFeedbackReceiver};
25//!
26//! let client = ClientBuilder::aggregator("YOUR_AGGREGATOR_ENDPOINT", "YOUR_GENESIS_VERIFICATION_KEY")
27//!     .add_feedback_receiver(Arc::new(SlogFeedbackReceiver::new(build_logger())))
28//!     .build()?;
29//!
30//! let _ = client.certificate().verify_chain("CERTIFICATE_HASH").await?;
31//! #
32//! #    Ok(())
33//! # }
34//!
35//! pub fn build_logger() -> slog::Logger {
36//!   use slog::Drain;
37//!   let decorator = slog_term::TermDecorator::new().build();
38//!   let drain = slog_term::FullFormat::new(decorator).build().fuse();
39//!   let drain = slog_async::Async::new(drain).build().fuse();
40//!
41//!   slog::Logger::root(Arc::new(drain), slog::o!())
42//! }
43//! ```
44//!
45//! Running this code should yield the following logs (example run on _pre-release-preview_):
46//!
47//! ```shell
48//! Nov 08 14:41:40.436 INFO Certificate chain validation started, certificate_chain_validation_id: ab623989-b0ac-4031-8522-1370958bbb4e
49//! Nov 08 14:41:40.626 INFO Certificate validated, certificate_chain_validation_id: ab623989-b0ac-4031-8522-1370958bbb4e, certificate_hash: dd4d4299cfb817b5ee5987c3de7cf5f13bdcda69c968ef087effd550470dc081
50//! Nov 08 14:42:05.477 INFO Certificate validated, certificate_chain_validation_id: ab623989-b0ac-4031-8522-1370958bbb4e, certificate_hash: 660b3d426a95303254bb255a56bed443616ea63c4d721ea77433b920d7ebdf62
51//! Nov 08 14:42:05.477 INFO Certificate chain validated, certificate_chain_validation_id: ab623989-b0ac-4031-8522-1370958bbb4e
52//! ```
53
54use async_trait::async_trait;
55use mithril_common::entities::ImmutableFileNumber;
56use serde::Serialize;
57use slog::{info, Logger};
58use std::sync::{Arc, RwLock};
59use strum::Display;
60use uuid::Uuid;
61
62/// Event that can be reported by a [FeedbackReceiver] for Cardano database related events.
63#[derive(Debug, Clone, Eq, PartialEq, Display, Serialize)]
64#[strum(serialize_all = "PascalCase")]
65#[serde(untagged)]
66pub enum MithrilEventCardanoDatabase {
67    /// Cardano Database download sequence started
68    Started {
69        /// Unique identifier used to track a cardano database download
70        download_id: String,
71        /// Total number of immutable files
72        total_immutable_files: u64,
73        /// Total number of ancillary files
74        include_ancillary: bool,
75    },
76    /// Cardano Database download sequence completed
77    Completed {
78        /// Unique identifier used to track a cardano database download
79        download_id: String,
80    },
81    /// An immutable archive file download has started
82    ImmutableDownloadStarted {
83        /// Immutable file number downloaded
84        immutable_file_number: ImmutableFileNumber,
85        /// Unique identifier used to track a cardano database download
86        download_id: String,
87        /// Size of the downloaded archive
88        size: u64,
89    },
90    /// An immutable archive file download is in progress
91    ImmutableDownloadProgress {
92        /// Immutable file number downloaded
93        immutable_file_number: ImmutableFileNumber,
94        /// Unique identifier used to track a cardano database download
95        download_id: String,
96        /// Number of bytes that have been downloaded
97        downloaded_bytes: u64,
98        /// Size of the downloaded archive
99        size: u64,
100    },
101    /// An immutable archive file download has completed
102    ImmutableDownloadCompleted {
103        /// Immutable file number downloaded
104        immutable_file_number: ImmutableFileNumber,
105        /// Unique identifier used to track a cardano database download
106        download_id: String,
107    },
108    /// An ancillary archive file download has started
109    AncillaryDownloadStarted {
110        /// Unique identifier used to track a cardano database download
111        download_id: String,
112        /// Size of the downloaded archive
113        size: u64,
114    },
115    /// An ancillary archive file download is in progress
116    AncillaryDownloadProgress {
117        /// Unique identifier used to track a cardano database download
118        download_id: String,
119        /// Number of bytes that have been downloaded
120        downloaded_bytes: u64,
121        /// Size of the downloaded archive
122        size: u64,
123    },
124    /// An ancillary archive file download has completed
125    AncillaryDownloadCompleted {
126        /// Unique identifier used to track a cardano database download
127        download_id: String,
128    },
129    /// A digest file download has started
130    DigestDownloadStarted {
131        /// Unique identifier used to track a cardano database download
132        download_id: String,
133        /// Size of the downloaded archive
134        size: u64,
135    },
136    /// A digest file download is in progress
137    DigestDownloadProgress {
138        /// Unique identifier used to track a cardano database download
139        download_id: String,
140        /// Number of bytes that have been downloaded
141        downloaded_bytes: u64,
142        /// Size of the downloaded archive
143        size: u64,
144    },
145    /// A digest file download has completed
146    DigestDownloadCompleted {
147        /// Unique identifier used to track a cardano database download
148        download_id: String,
149    },
150}
151
152/// Event that can be reported by a [FeedbackReceiver].
153#[derive(Debug, Clone, Eq, PartialEq, Display, Serialize)]
154#[strum(serialize_all = "PascalCase")]
155#[serde(untagged)]
156pub enum MithrilEvent {
157    /// A snapshot download has started
158    SnapshotDownloadStarted {
159        /// Digest of the downloaded snapshot
160        digest: String,
161        /// Unique identifier used to track this specific snapshot download
162        download_id: String,
163        /// Size of the downloaded archive
164        size: u64,
165    },
166    /// A snapshot download is in progress
167    SnapshotDownloadProgress {
168        /// Unique identifier used to track this specific snapshot download
169        download_id: String,
170        /// Number of bytes that have been downloaded
171        downloaded_bytes: u64,
172        /// Size of the downloaded archive
173        size: u64,
174    },
175    /// A snapshot download has completed
176    SnapshotDownloadCompleted {
177        /// Unique identifier used to track this specific snapshot download
178        download_id: String,
179    },
180    /// A snapshot ancillary download has started
181    SnapshotAncillaryDownloadStarted {
182        /// Unique identifier used to track this specific snapshot ancillary download
183        download_id: String,
184        /// Size of the downloaded archive
185        size: u64,
186    },
187    /// A snapshot ancillary download is in progress
188    SnapshotAncillaryDownloadProgress {
189        /// Unique identifier used to track this specific snapshot ancillary download
190        download_id: String,
191        /// Number of bytes that have been downloaded
192        downloaded_bytes: u64,
193        /// Size of the downloaded archive
194        size: u64,
195    },
196    /// A snapshot ancillary download has completed
197    SnapshotAncillaryDownloadCompleted {
198        /// Unique identifier used to track this specific snapshot ancillary download
199        download_id: String,
200    },
201
202    /// Cardano database related events
203    CardanoDatabase(MithrilEventCardanoDatabase),
204
205    /// A certificate chain validation has started
206    CertificateChainValidationStarted {
207        /// Unique identifier used to track this specific certificate chain validation
208        certificate_chain_validation_id: String,
209    },
210    /// An individual certificate of a chain have been validated.
211    CertificateValidated {
212        /// Unique identifier used to track this specific certificate chain validation
213        certificate_chain_validation_id: String,
214        /// The validated certificate hash
215        certificate_hash: String,
216    },
217    /// An individual certificate of a chain have been fetched from the cache.
218    CertificateFetchedFromCache {
219        /// Unique identifier used to track this specific certificate chain validation
220        certificate_chain_validation_id: String,
221        /// The fetched certificate hash
222        certificate_hash: String,
223    },
224    /// The whole certificate chain is valid.
225    CertificateChainValidated {
226        /// Unique identifier used to track this specific certificate chain validation
227        certificate_chain_validation_id: String,
228    },
229}
230
231impl MithrilEvent {
232    /// Generate a random unique identifier to identify a snapshot download
233    pub fn new_snapshot_download_id() -> String {
234        Uuid::new_v4().to_string()
235    }
236
237    /// Generate a random unique identifier to identify a Cardano download
238    pub fn new_cardano_database_download_id() -> String {
239        Uuid::new_v4().to_string()
240    }
241
242    /// Generate a random unique identifier to identify a certificate chain validation
243    pub fn new_certificate_chain_validation_id() -> String {
244        Uuid::new_v4().to_string()
245    }
246
247    #[cfg(test)]
248    pub(crate) fn event_id(&self) -> &str {
249        match self {
250            MithrilEvent::SnapshotDownloadStarted { download_id, .. } => download_id,
251            MithrilEvent::SnapshotDownloadProgress { download_id, .. } => download_id,
252            MithrilEvent::SnapshotDownloadCompleted { download_id } => download_id,
253            MithrilEvent::SnapshotAncillaryDownloadStarted { download_id, .. } => download_id,
254            MithrilEvent::SnapshotAncillaryDownloadProgress { download_id, .. } => download_id,
255            MithrilEvent::SnapshotAncillaryDownloadCompleted { download_id } => download_id,
256            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Started {
257                download_id,
258                ..
259            }) => download_id,
260            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Completed {
261                download_id,
262                ..
263            }) => download_id,
264            MithrilEvent::CardanoDatabase(
265                MithrilEventCardanoDatabase::ImmutableDownloadStarted { download_id, .. },
266            ) => download_id,
267            MithrilEvent::CardanoDatabase(
268                MithrilEventCardanoDatabase::ImmutableDownloadProgress { download_id, .. },
269            ) => download_id,
270            MithrilEvent::CardanoDatabase(
271                MithrilEventCardanoDatabase::ImmutableDownloadCompleted { download_id, .. },
272            ) => download_id,
273            MithrilEvent::CardanoDatabase(
274                MithrilEventCardanoDatabase::AncillaryDownloadStarted { download_id, .. },
275            ) => download_id,
276            MithrilEvent::CardanoDatabase(
277                MithrilEventCardanoDatabase::AncillaryDownloadProgress { download_id, .. },
278            ) => download_id,
279            MithrilEvent::CardanoDatabase(
280                MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id, .. },
281            ) => download_id,
282            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::DigestDownloadStarted {
283                download_id,
284                ..
285            }) => download_id,
286            MithrilEvent::CardanoDatabase(
287                MithrilEventCardanoDatabase::DigestDownloadProgress { download_id, .. },
288            ) => download_id,
289            MithrilEvent::CardanoDatabase(
290                MithrilEventCardanoDatabase::DigestDownloadCompleted { download_id, .. },
291            ) => download_id,
292            MithrilEvent::CertificateChainValidationStarted {
293                certificate_chain_validation_id,
294            } => certificate_chain_validation_id,
295            MithrilEvent::CertificateValidated {
296                certificate_chain_validation_id,
297                ..
298            } => certificate_chain_validation_id,
299            MithrilEvent::CertificateFetchedFromCache {
300                certificate_chain_validation_id,
301                ..
302            } => certificate_chain_validation_id,
303            MithrilEvent::CertificateChainValidated {
304                certificate_chain_validation_id,
305            } => certificate_chain_validation_id,
306        }
307    }
308}
309
310/// A sender of [MithrilEvent].
311///
312/// It uses Arc internally so it can be cloned at will.
313#[derive(Clone)]
314pub struct FeedbackSender {
315    receivers: Vec<Arc<dyn FeedbackReceiver>>,
316}
317
318impl FeedbackSender {
319    /// Create a new [FeedbackSender].
320    pub fn new(receivers: &[Arc<dyn FeedbackReceiver>]) -> FeedbackSender {
321        Self {
322            receivers: receivers.to_vec(),
323        }
324    }
325
326    /// Send the given event to the known receivers.
327    pub async fn send_event(&self, event: MithrilEvent) {
328        for receiver in &self.receivers {
329            receiver.handle_event(event.clone()).await;
330        }
331    }
332}
333
334/// A receiver of [MithrilEvent].
335#[cfg_attr(target_family = "wasm", async_trait(?Send))]
336#[cfg_attr(not(target_family = "wasm"), async_trait)]
337pub trait FeedbackReceiver: Sync + Send {
338    /// Callback called by a [FeedbackSender] when it needs to send an [event][MithrilEvent].
339    async fn handle_event(&self, event: MithrilEvent);
340}
341
342/// A [FeedbackReceiver] that writes the event it receives in a [slog logger][Logger].
343pub struct SlogFeedbackReceiver {
344    logger: Logger,
345}
346
347impl SlogFeedbackReceiver {
348    /// Create a new [SlogFeedbackReceiver].
349    pub fn new(logger: Logger) -> SlogFeedbackReceiver {
350        Self { logger }
351    }
352}
353
354#[cfg_attr(target_family = "wasm", async_trait(?Send))]
355#[cfg_attr(not(target_family = "wasm"), async_trait)]
356impl FeedbackReceiver for SlogFeedbackReceiver {
357    async fn handle_event(&self, event: MithrilEvent) {
358        match event {
359            MithrilEvent::SnapshotDownloadStarted {
360                digest,
361                download_id,
362                size,
363            } => {
364                info!(
365                    self.logger, "Snapshot download started";
366                    "size" => size, "digest" => digest, "download_id" => download_id,
367                );
368            }
369            MithrilEvent::SnapshotDownloadProgress {
370                download_id,
371                downloaded_bytes,
372                size,
373            } => {
374                info!(
375                    self.logger, "Snapshot download in progress ...";
376                    "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
377                );
378            }
379            MithrilEvent::SnapshotDownloadCompleted { download_id } => {
380                info!(self.logger, "Snapshot download completed"; "download_id" => download_id);
381            }
382            MithrilEvent::SnapshotAncillaryDownloadStarted { download_id, size } => {
383                info!(
384                    self.logger, "Snapshot ancillary download started";
385                    "size" => size, "download_id" => download_id,
386                );
387            }
388            MithrilEvent::SnapshotAncillaryDownloadProgress {
389                download_id,
390                downloaded_bytes,
391                size,
392            } => {
393                info!(
394                    self.logger, "Snapshot ancillary download in progress ...";
395                    "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
396                );
397            }
398            MithrilEvent::SnapshotAncillaryDownloadCompleted { download_id } => {
399                info!(self.logger, "Snapshot ancillary download completed"; "download_id" => download_id);
400            }
401            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Started {
402                download_id,
403                total_immutable_files,
404                include_ancillary,
405            }) => {
406                info!(
407                    self.logger, "Cardano database download started"; "download_id" => download_id, "total_immutable_files" => total_immutable_files, "include_ancillary" => include_ancillary,
408                );
409            }
410            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Completed {
411                download_id,
412            }) => {
413                info!(
414                    self.logger, "Cardano database download completed"; "download_id" => download_id,
415                );
416            }
417            MithrilEvent::CardanoDatabase(
418                MithrilEventCardanoDatabase::ImmutableDownloadStarted {
419                    immutable_file_number,
420                    download_id,
421                    size,
422                },
423            ) => {
424                info!(
425                    self.logger, "Immutable download started";
426                    "immutable_file_number" => immutable_file_number, "download_id" => download_id, "size" => size
427                );
428            }
429            MithrilEvent::CardanoDatabase(
430                MithrilEventCardanoDatabase::ImmutableDownloadProgress {
431                    immutable_file_number,
432                    download_id,
433                    downloaded_bytes,
434                    size,
435                },
436            ) => {
437                info!(
438                    self.logger, "Immutable download in progress ...";
439                    "immutable_file_number" => immutable_file_number, "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
440                );
441            }
442            MithrilEvent::CardanoDatabase(
443                MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
444                    immutable_file_number,
445                    download_id,
446                },
447            ) => {
448                info!(self.logger, "Immutable download completed"; "immutable_file_number" => immutable_file_number, "download_id" => download_id);
449            }
450            MithrilEvent::CardanoDatabase(
451                MithrilEventCardanoDatabase::AncillaryDownloadStarted { download_id, size },
452            ) => {
453                info!(
454                    self.logger, "Ancillary download started";
455                    "download_id" => download_id,
456                    "size" => size,
457                );
458            }
459            MithrilEvent::CardanoDatabase(
460                MithrilEventCardanoDatabase::AncillaryDownloadProgress {
461                    download_id,
462                    downloaded_bytes,
463                    size,
464                },
465            ) => {
466                info!(
467                    self.logger, "Ancillary download in progress ...";
468                    "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
469                );
470            }
471            MithrilEvent::CardanoDatabase(
472                MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id },
473            ) => {
474                info!(self.logger, "Ancillary download completed"; "download_id" => download_id);
475            }
476            MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::DigestDownloadStarted {
477                download_id,
478                size,
479            }) => {
480                info!(
481                    self.logger, "Digest download started";
482                    "download_id" => download_id,
483                    "size" => size,
484                );
485            }
486            MithrilEvent::CardanoDatabase(
487                MithrilEventCardanoDatabase::DigestDownloadProgress {
488                    download_id,
489                    downloaded_bytes,
490                    size,
491                },
492            ) => {
493                info!(
494                    self.logger, "Digest download in progress ...";
495                    "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
496                );
497            }
498            MithrilEvent::CardanoDatabase(
499                MithrilEventCardanoDatabase::DigestDownloadCompleted { download_id },
500            ) => {
501                info!(self.logger, "Digest download completed"; "download_id" => download_id);
502            }
503            MithrilEvent::CertificateChainValidationStarted {
504                certificate_chain_validation_id,
505            } => {
506                info!(
507                    self.logger, "Certificate chain validation started";
508                    "certificate_chain_validation_id" => certificate_chain_validation_id,
509                );
510            }
511            MithrilEvent::CertificateValidated {
512                certificate_hash,
513                certificate_chain_validation_id,
514            } => {
515                info!(
516                    self.logger, "Certificate validated";
517                    "certificate_hash" => certificate_hash,
518                    "certificate_chain_validation_id" => certificate_chain_validation_id,
519                );
520            }
521            MithrilEvent::CertificateFetchedFromCache {
522                certificate_hash,
523                certificate_chain_validation_id,
524            } => {
525                info!(
526                    self.logger, "Cached";
527                    "certificate_hash" => certificate_hash,
528                    "certificate_chain_validation_id" => certificate_chain_validation_id,
529                );
530            }
531            MithrilEvent::CertificateChainValidated {
532                certificate_chain_validation_id,
533            } => {
534                info!(
535                    self.logger, "Certificate chain validated";
536                    "certificate_chain_validation_id" => certificate_chain_validation_id,
537                );
538            }
539        };
540    }
541}
542
543/// A [FeedbackReceiver] that stacks the events that it receives in a vec.
544///
545/// Use it only for tests purpose.
546pub struct StackFeedbackReceiver {
547    stacked_events: RwLock<Vec<MithrilEvent>>,
548}
549
550impl StackFeedbackReceiver {
551    /// Create a new [StackFeedbackReceiver].
552    pub fn new() -> StackFeedbackReceiver {
553        Self {
554            stacked_events: RwLock::new(vec![]),
555        }
556    }
557
558    /// Returns a copy of the stored stacked events.
559    ///
560    /// Will crash if it can't access the stored events.
561    pub fn stacked_events(&self) -> Vec<MithrilEvent> {
562        let events = self.stacked_events.read().unwrap();
563        events.clone()
564    }
565}
566
567impl Default for StackFeedbackReceiver {
568    fn default() -> Self {
569        Self::new()
570    }
571}
572
573#[cfg_attr(target_family = "wasm", async_trait(?Send))]
574#[cfg_attr(not(target_family = "wasm"), async_trait)]
575impl FeedbackReceiver for StackFeedbackReceiver {
576    async fn handle_event(&self, event: MithrilEvent) {
577        let mut events = self.stacked_events.write().unwrap();
578        events.push(event);
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use crate::feedback::MithrilEvent::{SnapshotDownloadCompleted, SnapshotDownloadStarted};
586    use crate::test_utils::TestLogger;
587    use std::time::Duration;
588    use tokio::task::JoinSet;
589
590    #[tokio::test]
591    async fn send_event_same_thread() {
592        let receiver = Arc::new(StackFeedbackReceiver::new());
593        let sender = FeedbackSender::new(&[receiver.clone()]);
594
595        sender
596            .send_event(SnapshotDownloadStarted {
597                digest: "digest".to_string(),
598                download_id: "download_id".to_string(),
599                size: 10,
600            })
601            .await;
602        sender
603            .send_event(SnapshotDownloadCompleted {
604                download_id: "download_id".to_string(),
605            })
606            .await;
607
608        assert_eq!(
609            receiver.stacked_events(),
610            vec![
611                SnapshotDownloadStarted {
612                    digest: "digest".to_string(),
613                    download_id: "download_id".to_string(),
614                    size: 10
615                },
616                SnapshotDownloadCompleted {
617                    download_id: "download_id".to_string()
618                }
619            ]
620        );
621    }
622
623    #[tokio::test]
624    async fn send_event_multiple_thread() {
625        let receiver = Arc::new(StackFeedbackReceiver::new());
626        let sender = FeedbackSender::new(&[
627            receiver.clone(),
628            Arc::new(SlogFeedbackReceiver::new(TestLogger::stdout())),
629        ]);
630        let sender2 = sender.clone();
631        let mut join_set = JoinSet::new();
632
633        join_set.spawn(async move {
634            // Step 1:
635            sender
636                .send_event(SnapshotDownloadStarted {
637                    digest: "digest1".to_string(),
638                    download_id: "download1".to_string(),
639                    size: 1,
640                })
641                .await;
642            tokio::time::sleep(Duration::from_millis(2)).await;
643            // Step 3:
644            sender
645                .send_event(SnapshotDownloadCompleted {
646                    download_id: "download3".to_string(),
647                })
648                .await;
649            sender
650                .send_event(SnapshotDownloadStarted {
651                    digest: "digest2".to_string(),
652                    download_id: "download2".to_string(),
653                    size: 2,
654                })
655                .await;
656        });
657
658        join_set.spawn(async move {
659            // Step 2:
660            sender2
661                .send_event(SnapshotDownloadCompleted {
662                    download_id: "download1".to_string(),
663                })
664                .await;
665            sender2
666                .send_event(SnapshotDownloadStarted {
667                    digest: "digest3".to_string(),
668                    download_id: "download3".to_string(),
669                    size: 3,
670                })
671                .await;
672            tokio::time::sleep(Duration::from_millis(5)).await;
673            // Step 4:
674            sender2
675                .send_event(SnapshotDownloadCompleted {
676                    download_id: "download2".to_string(),
677                })
678                .await;
679        });
680
681        while let Some(res) = join_set.join_next().await {
682            res.unwrap();
683        }
684
685        assert_eq!(
686            receiver.stacked_events(),
687            vec![
688                SnapshotDownloadStarted {
689                    digest: "digest1".to_string(),
690                    download_id: "download1".to_string(),
691                    size: 1
692                },
693                SnapshotDownloadCompleted {
694                    download_id: "download1".to_string()
695                },
696                SnapshotDownloadStarted {
697                    digest: "digest3".to_string(),
698                    download_id: "download3".to_string(),
699                    size: 3
700                },
701                SnapshotDownloadCompleted {
702                    download_id: "download3".to_string()
703                },
704                SnapshotDownloadStarted {
705                    digest: "digest2".to_string(),
706                    download_id: "download2".to_string(),
707                    size: 2
708                },
709                SnapshotDownloadCompleted {
710                    download_id: "download2".to_string()
711                },
712            ]
713        );
714    }
715
716    #[tokio::test]
717    async fn send_event_in_one_thread_and_receive_in_another_thread() {
718        let receiver = Arc::new(StackFeedbackReceiver::new());
719        let receiver2 = receiver.clone();
720        let sender = FeedbackSender::new(&[receiver.clone()]);
721        let mut join_set = JoinSet::new();
722
723        join_set.spawn(async move {
724            // Step 1:
725            sender
726                .send_event(SnapshotDownloadStarted {
727                    digest: "digest1".to_string(),
728                    download_id: "download1".to_string(),
729                    size: 1,
730                })
731                .await;
732            tokio::time::sleep(Duration::from_millis(10)).await;
733
734            // Step 2:
735            sender
736                .send_event(SnapshotDownloadCompleted {
737                    download_id: "download1".to_string(),
738                })
739                .await;
740            sender
741                .send_event(SnapshotDownloadStarted {
742                    digest: "digest2".to_string(),
743                    download_id: "download2".to_string(),
744                    size: 2,
745                })
746                .await;
747            tokio::time::sleep(Duration::from_millis(10)).await;
748
749            // Step 3:
750            sender
751                .send_event(SnapshotDownloadCompleted {
752                    download_id: "download2".to_string(),
753                })
754                .await;
755            sender
756                .send_event(SnapshotDownloadStarted {
757                    digest: "digest3".to_string(),
758                    download_id: "download3".to_string(),
759                    size: 3,
760                })
761                .await;
762            tokio::time::sleep(Duration::from_millis(10)).await;
763
764            // Final step:
765            sender
766                .send_event(SnapshotDownloadCompleted {
767                    download_id: "download3".to_string(),
768                })
769                .await;
770        });
771
772        join_set.spawn(async move {
773            // Little sleep to wait for step 1 completion
774            tokio::time::sleep(Duration::from_millis(3)).await;
775            assert_eq!(
776                receiver2.stacked_events(),
777                vec![SnapshotDownloadStarted {
778                    digest: "digest1".to_string(),
779                    download_id: "download1".to_string(),
780                    size: 1
781                },]
782            );
783
784            // Wait for step 2 completion
785            tokio::time::sleep(Duration::from_millis(10)).await;
786            assert_eq!(
787                receiver2.stacked_events(),
788                vec![
789                    SnapshotDownloadStarted {
790                        digest: "digest1".to_string(),
791                        download_id: "download1".to_string(),
792                        size: 1
793                    },
794                    SnapshotDownloadCompleted {
795                        download_id: "download1".to_string()
796                    },
797                    SnapshotDownloadStarted {
798                        digest: "digest2".to_string(),
799                        download_id: "download2".to_string(),
800                        size: 2
801                    },
802                ]
803            );
804
805            // Wait for step 3 completion
806            tokio::time::sleep(Duration::from_millis(10)).await;
807            assert_eq!(
808                receiver2.stacked_events(),
809                vec![
810                    SnapshotDownloadStarted {
811                        digest: "digest1".to_string(),
812                        download_id: "download1".to_string(),
813                        size: 1
814                    },
815                    SnapshotDownloadCompleted {
816                        download_id: "download1".to_string()
817                    },
818                    SnapshotDownloadStarted {
819                        digest: "digest2".to_string(),
820                        download_id: "download2".to_string(),
821                        size: 2
822                    },
823                    SnapshotDownloadCompleted {
824                        download_id: "download2".to_string()
825                    },
826                    SnapshotDownloadStarted {
827                        digest: "digest3".to_string(),
828                        download_id: "download3".to_string(),
829                        size: 3
830                    },
831                ]
832            );
833        });
834
835        while let Some(res) = join_set.join_next().await {
836            res.unwrap();
837        }
838
839        assert_eq!(
840            receiver.stacked_events(),
841            vec![
842                SnapshotDownloadStarted {
843                    digest: "digest1".to_string(),
844                    download_id: "download1".to_string(),
845                    size: 1
846                },
847                SnapshotDownloadCompleted {
848                    download_id: "download1".to_string()
849                },
850                SnapshotDownloadStarted {
851                    digest: "digest2".to_string(),
852                    download_id: "download2".to_string(),
853                    size: 2
854                },
855                SnapshotDownloadCompleted {
856                    download_id: "download2".to_string()
857                },
858                SnapshotDownloadStarted {
859                    digest: "digest3".to_string(),
860                    download_id: "download3".to_string(),
861                    size: 3
862                },
863                SnapshotDownloadCompleted {
864                    download_id: "download3".to_string()
865                },
866            ]
867        );
868    }
869}