mithril_client_cli/utils/
feedback_receiver.rs

1use async_trait::async_trait;
2use indicatif::{ProgressBar, ProgressDrawTarget};
3use slog::Logger;
4use tokio::sync::RwLock;
5
6use super::{
7    DownloadProgressReporter, DownloadProgressReporterParams, MultiDownloadProgressReporter,
8    ProgressBarKind, ProgressOutputType,
9};
10
11use mithril_client::feedback::{FeedbackReceiver, MithrilEvent, MithrilEventCardanoDatabase};
12
13/// Custom [FeedbackReceiver] for Cardano DB to handle events sent
14/// by the `mithril-client` library
15pub struct IndicatifFeedbackReceiver {
16    download_progress_reporter: RwLock<Option<DownloadProgressReporter>>,
17    ancillary_download_progress_reporter: RwLock<Option<DownloadProgressReporter>>,
18    certificate_validation_pb: RwLock<Option<ProgressBar>>,
19    cardano_database_multi_pb: RwLock<Option<MultiDownloadProgressReporter>>,
20    output_type: ProgressOutputType,
21    logger: Logger,
22}
23
24impl IndicatifFeedbackReceiver {
25    /// [IndicatifFeedbackReceiver] constructor
26    pub fn new(output_type: ProgressOutputType, logger: Logger) -> Self {
27        Self {
28            download_progress_reporter: RwLock::new(None),
29            ancillary_download_progress_reporter: RwLock::new(None),
30            certificate_validation_pb: RwLock::new(None),
31            cardano_database_multi_pb: RwLock::new(None),
32            output_type,
33            logger,
34        }
35    }
36}
37
38#[async_trait]
39impl FeedbackReceiver for IndicatifFeedbackReceiver {
40    async fn handle_event(&self, event: MithrilEvent) {
41        match event {
42            MithrilEvent::SnapshotDownloadStarted {
43                digest: _,
44                download_id: _,
45                size,
46            } => {
47                let pb = if self.output_type == ProgressOutputType::Tty {
48                    ProgressBar::new(size)
49                } else {
50                    ProgressBar::with_draw_target(Some(size), ProgressDrawTarget::hidden())
51                };
52                let mut download_progress_reporter = self.download_progress_reporter.write().await;
53                *download_progress_reporter = Some(DownloadProgressReporter::new(
54                    pb,
55                    DownloadProgressReporterParams {
56                        label: "Snapshot".to_string(),
57                        output_type: self.output_type,
58                        progress_bar_kind: ProgressBarKind::Bytes,
59                        include_label_in_tty: false,
60                    },
61                    self.logger.clone(),
62                ));
63            }
64            MithrilEvent::SnapshotDownloadProgress {
65                download_id: _,
66                downloaded_bytes,
67                size: _,
68            } => {
69                let download_progress_reporter = self.download_progress_reporter.read().await;
70                if let Some(progress_reporter) = download_progress_reporter.as_ref() {
71                    progress_reporter.report(downloaded_bytes);
72                }
73            }
74            MithrilEvent::SnapshotDownloadCompleted { download_id: _ } => {
75                let mut download_progress_reporter = self.download_progress_reporter.write().await;
76                if let Some(progress_reporter) = download_progress_reporter.as_ref() {
77                    progress_reporter.finish("Cardano DB download completed");
78                }
79                *download_progress_reporter = None;
80            }
81            MithrilEvent::SnapshotAncillaryDownloadStarted { size, .. } => {
82                let pb = if self.output_type == ProgressOutputType::Tty {
83                    ProgressBar::new(size)
84                } else {
85                    ProgressBar::with_draw_target(Some(size), ProgressDrawTarget::hidden())
86                };
87                let mut ancillary_download_progress_reporter =
88                    self.ancillary_download_progress_reporter.write().await;
89                *ancillary_download_progress_reporter = Some(DownloadProgressReporter::new(
90                    pb,
91                    DownloadProgressReporterParams {
92                        label: "Snapshot ancillary".to_string(),
93                        output_type: self.output_type,
94                        progress_bar_kind: ProgressBarKind::Bytes,
95                        include_label_in_tty: false,
96                    },
97                    self.logger.clone(),
98                ));
99            }
100            MithrilEvent::SnapshotAncillaryDownloadProgress {
101                downloaded_bytes, ..
102            } => {
103                let ancillary_download_progress_reporter =
104                    self.ancillary_download_progress_reporter.read().await;
105                if let Some(progress_reporter) = ancillary_download_progress_reporter.as_ref() {
106                    progress_reporter.report(downloaded_bytes);
107                }
108            }
109            MithrilEvent::SnapshotAncillaryDownloadCompleted { .. } => {
110                let mut ancillary_download_progress_reporter =
111                    self.ancillary_download_progress_reporter.write().await;
112                if let Some(progress_reporter) = ancillary_download_progress_reporter.as_ref() {
113                    progress_reporter.finish("Cardano DB ancillary download completed");
114                }
115                *ancillary_download_progress_reporter = None;
116            }
117            MithrilEvent::CardanoDatabase(cardano_database_event) => match cardano_database_event {
118                MithrilEventCardanoDatabase::Started {
119                    download_id: _,
120                    total_immutable_files,
121                    include_ancillary,
122                } => {
123                    let multi_pb = MultiDownloadProgressReporter::new(
124                        total_immutable_files + if include_ancillary { 1 } else { 0 },
125                        self.output_type,
126                        self.logger.clone(),
127                    );
128                    let mut cardano_database_multi_pb =
129                        self.cardano_database_multi_pb.write().await;
130                    *cardano_database_multi_pb = Some(multi_pb);
131                }
132                MithrilEventCardanoDatabase::Completed { download_id: _ } => {
133                    let mut cardano_database_multi_pb =
134                        self.cardano_database_multi_pb.write().await;
135
136                    if let Some(multi_pb) = cardano_database_multi_pb.as_ref() {
137                        multi_pb.finish_all("Cardano DB download completed").await;
138                        *cardano_database_multi_pb = None;
139                    }
140                }
141                MithrilEventCardanoDatabase::DigestDownloadStarted { .. }
142                | MithrilEventCardanoDatabase::DigestDownloadProgress { .. }
143                | MithrilEventCardanoDatabase::DigestDownloadCompleted { .. }
144                | MithrilEventCardanoDatabase::ImmutableDownloadStarted { .. }
145                | MithrilEventCardanoDatabase::ImmutableDownloadProgress { .. } => {
146                    // Ignore those events as those downloads are fast enough that we don't need to show progress bars
147                }
148                MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
149                    immutable_file_number: _,
150                    download_id: _,
151                } => {
152                    if let Some(cardano_database_multi_pb) =
153                        self.cardano_database_multi_pb.read().await.as_ref()
154                    {
155                        cardano_database_multi_pb.bump_main_bar_progress();
156                    }
157                }
158                MithrilEventCardanoDatabase::AncillaryDownloadStarted {
159                    download_id: _,
160                    size,
161                } => {
162                    if let Some(cardano_database_multi_pb) =
163                        self.cardano_database_multi_pb.read().await.as_ref()
164                    {
165                        cardano_database_multi_pb
166                            .add_child_bar("Ancillary", ProgressBarKind::Bytes, size)
167                            .await;
168                    }
169                }
170                MithrilEventCardanoDatabase::AncillaryDownloadProgress {
171                    download_id: _,
172                    downloaded_bytes,
173                    size: _,
174                } => {
175                    if let Some(cardano_database_multi_pb) =
176                        self.cardano_database_multi_pb.read().await.as_ref()
177                    {
178                        cardano_database_multi_pb
179                            .progress_child_bar("Ancillary", downloaded_bytes)
180                            .await;
181                    }
182                }
183                MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id: _ } => {
184                    if let Some(cardano_database_multi_pb) =
185                        self.cardano_database_multi_pb.read().await.as_ref()
186                    {
187                        cardano_database_multi_pb.finish_child_bar("Ancillary").await;
188                    }
189                }
190            },
191            MithrilEvent::CertificateChainValidationStarted {
192                certificate_chain_validation_id: _,
193            } => {
194                let pb = if self.output_type == ProgressOutputType::Tty {
195                    ProgressBar::new_spinner()
196                } else {
197                    ProgressBar::hidden()
198                };
199                let mut certificate_validation_pb = self.certificate_validation_pb.write().await;
200                *certificate_validation_pb = Some(pb);
201            }
202            MithrilEvent::CertificateValidated {
203                certificate_chain_validation_id: _,
204                certificate_hash,
205            } => {
206                let certificate_validation_pb = self.certificate_validation_pb.read().await;
207                if let Some(progress_bar) = certificate_validation_pb.as_ref() {
208                    progress_bar.set_message(format!("Certificate '{certificate_hash}' is valid"));
209                    progress_bar.inc(1);
210                }
211            }
212            MithrilEvent::CertificateFetchedFromCache {
213                certificate_chain_validation_id: _,
214                certificate_hash,
215            } => {
216                let certificate_validation_pb = self.certificate_validation_pb.read().await;
217                if let Some(progress_bar) = certificate_validation_pb.as_ref() {
218                    progress_bar.set_message(format!("Cached '{certificate_hash}'"));
219                    progress_bar.inc(1);
220                }
221            }
222            MithrilEvent::CertificateChainValidated {
223                certificate_chain_validation_id: _,
224            } => {
225                let mut certificate_validation_pb = self.certificate_validation_pb.write().await;
226                if let Some(progress_bar) = certificate_validation_pb.as_ref() {
227                    progress_bar.finish_with_message("Certificate chain validated");
228                }
229                *certificate_validation_pb = None;
230            }
231        }
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use slog::o;
238    use std::sync::Arc;
239
240    use mithril_client::feedback::FeedbackSender;
241
242    use super::*;
243
244    const DOWNLOAD_ID: &str = "id";
245
246    macro_rules! send_event {
247        (cardano_db, dl_started => $sender:expr, total:$total_immutable:expr, ancillary:$include_ancillary:expr) => {
248            $sender
249                .send_event(MithrilEvent::CardanoDatabase(
250                    MithrilEventCardanoDatabase::Started {
251                        download_id: DOWNLOAD_ID.to_string(),
252                        total_immutable_files: $total_immutable,
253                        include_ancillary: $include_ancillary,
254                    },
255                ))
256                .await;
257        };
258        (cardano_db, dl_completed => $sender:expr) => {
259            $sender
260                .send_event(MithrilEvent::CardanoDatabase(
261                    MithrilEventCardanoDatabase::Completed {
262                        download_id: DOWNLOAD_ID.to_string(),
263                    },
264                ))
265                .await;
266        };
267        (cardano_db, immutable_dl, started => $sender:expr, immutable:$immutable_file_number:expr, size:$size:expr) => {
268            $sender
269                .send_event(MithrilEvent::CardanoDatabase(
270                    MithrilEventCardanoDatabase::ImmutableDownloadStarted {
271                        immutable_file_number: $immutable_file_number,
272                        download_id: DOWNLOAD_ID.to_string(),
273                        size: $size,
274                    },
275                ))
276                .await;
277        };
278        (cardano_db, immutable_dl, progress => $sender:expr, immutable:$immutable_file_number:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
279            $sender
280                .send_event(MithrilEvent::CardanoDatabase(
281                    MithrilEventCardanoDatabase::ImmutableDownloadProgress {
282                        immutable_file_number: $immutable_file_number,
283                        download_id: DOWNLOAD_ID.to_string(),
284                        downloaded_bytes: $downloaded_bytes,
285                        size: $size,
286                    },
287                ))
288                .await;
289        };
290        (cardano_db, immutable_dl, completed => $sender:expr, immutable:$immutable_file_number:expr) => {
291            $sender
292                .send_event(MithrilEvent::CardanoDatabase(
293                    MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
294                        immutable_file_number: $immutable_file_number,
295                        download_id: DOWNLOAD_ID.to_string(),
296                    },
297                ))
298                .await;
299        };
300        (cardano_db, ancillary_dl, started => $sender:expr, size:$size:expr) => {
301            $sender
302                .send_event(MithrilEvent::CardanoDatabase(
303                    MithrilEventCardanoDatabase::AncillaryDownloadStarted {
304                        download_id: DOWNLOAD_ID.to_string(),
305                        size: $size,
306                    },
307                ))
308                .await;
309        };
310        (cardano_db, ancillary_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
311            $sender
312                .send_event(MithrilEvent::CardanoDatabase(
313                    MithrilEventCardanoDatabase::AncillaryDownloadProgress {
314                        download_id: DOWNLOAD_ID.to_string(),
315                        downloaded_bytes: $downloaded_bytes,
316                        size: $size,
317                    },
318                ))
319                .await;
320        };
321        (cardano_db, ancillary_dl, completed => $sender:expr) => {
322            $sender
323                .send_event(MithrilEvent::CardanoDatabase(
324                    MithrilEventCardanoDatabase::AncillaryDownloadCompleted {
325                        download_id: DOWNLOAD_ID.to_string(),
326                    },
327                ))
328                .await;
329        };
330        (cardano_db, digests_dl, started => $sender:expr, size:$size:expr) => {
331            $sender
332                .send_event(MithrilEvent::CardanoDatabase(
333                    MithrilEventCardanoDatabase::DigestDownloadStarted {
334                        download_id: DOWNLOAD_ID.to_string(),
335                        size: $size,
336                    },
337                ))
338                .await;
339        };
340        (cardano_db, digests_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
341            $sender
342                .send_event(MithrilEvent::CardanoDatabase(
343                    MithrilEventCardanoDatabase::DigestDownloadProgress {
344                        download_id: DOWNLOAD_ID.to_string(),
345                        downloaded_bytes: $downloaded_bytes,
346                        size: $size,
347                    },
348                ))
349                .await;
350        };
351        (cardano_db, digests_dl, completed => $sender:expr) => {
352            $sender
353                .send_event(MithrilEvent::CardanoDatabase(
354                    MithrilEventCardanoDatabase::DigestDownloadCompleted {
355                        download_id: DOWNLOAD_ID.to_string(),
356                    },
357                ))
358                .await;
359        };
360        (cardano_db_v1, full_immutables_dl, started => $sender:expr, digest:$digest:expr, size:$size:expr) => {
361            $sender
362                .send_event(MithrilEvent::SnapshotDownloadStarted {
363                    download_id: DOWNLOAD_ID.to_string(),
364                    digest: $digest.into(),
365                    size: $size,
366                })
367                .await;
368        };
369        (cardano_db_v1, full_immutables_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
370            $sender
371                .send_event(MithrilEvent::SnapshotDownloadProgress {
372                    download_id: DOWNLOAD_ID.to_string(),
373                    downloaded_bytes: $downloaded_bytes,
374                    size: $size,
375                })
376                .await;
377        };
378        (cardano_db_v1, full_immutables_dl, completed => $sender:expr) => {
379            $sender
380                .send_event(MithrilEvent::SnapshotDownloadCompleted {
381                    download_id: DOWNLOAD_ID.to_string(),
382                })
383                .await;
384        };
385        (cardano_db_v1, ancillary_dl, started => $sender:expr, size:$size:expr) => {
386            $sender
387                .send_event(MithrilEvent::SnapshotAncillaryDownloadStarted {
388                    download_id: DOWNLOAD_ID.to_string(),
389                    size: $size,
390                })
391                .await;
392        };
393        (cardano_db_v1, ancillary_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
394            $sender
395                .send_event(MithrilEvent::SnapshotAncillaryDownloadProgress {
396                    download_id: DOWNLOAD_ID.to_string(),
397                    downloaded_bytes: $downloaded_bytes,
398                    size: $size,
399                })
400                .await;
401        };
402        (cardano_db_v1, ancillary_dl, completed => $sender:expr) => {
403            $sender
404                .send_event(MithrilEvent::SnapshotAncillaryDownloadCompleted {
405                    download_id: DOWNLOAD_ID.to_string(),
406                })
407                .await;
408        };
409    }
410
411    fn build_feedback_receiver(output_type: ProgressOutputType) -> Arc<IndicatifFeedbackReceiver> {
412        Arc::new(IndicatifFeedbackReceiver::new(
413            output_type,
414            slog::Logger::root(slog::Discard, o!()),
415        ))
416    }
417
418    mod cardano_database_v1 {
419        use super::*;
420
421        #[tokio::test]
422        async fn starting_full_immutables_and_ancillary_together_spawn_two_progress_bars() {
423            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
424            let sender = FeedbackSender::new(&[receiver.clone()]);
425
426            send_event!(cardano_db_v1, full_immutables_dl, started => sender, digest:"digest", size:123);
427            send_event!(cardano_db_v1, ancillary_dl, started =>  sender, size:456);
428
429            assert!(receiver.download_progress_reporter.read().await.is_some());
430            assert!(receiver.ancillary_download_progress_reporter.read().await.is_some());
431        }
432
433        #[tokio::test]
434        async fn start_and_progress_ancillary_download_with_a_size_of_zero_should_not_crash() {
435            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
436            let sender = FeedbackSender::new(&[receiver.clone()]);
437
438            send_event!(cardano_db_v1, ancillary_dl, started => sender, size:0);
439            send_event!(cardano_db_v1, ancillary_dl, progress => sender, bytes:124, size:0);
440
441            assert!(receiver.ancillary_download_progress_reporter.read().await.is_some());
442        }
443
444        #[tokio::test]
445        async fn start_then_complete_should_remove_immutables_progress_bar() {
446            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
447            let sender = FeedbackSender::new(&[receiver.clone()]);
448
449            send_event!(cardano_db_v1, full_immutables_dl, started => sender, digest:"digest", size:123);
450            send_event!(cardano_db_v1, full_immutables_dl, completed => sender);
451
452            assert!(receiver.cardano_database_multi_pb.read().await.is_none());
453        }
454
455        #[tokio::test]
456        async fn start_then_complete_should_remove_ancillary_progress_bar() {
457            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
458            let sender = FeedbackSender::new(&[receiver.clone()]);
459
460            send_event!(cardano_db_v1, ancillary_dl, started =>  sender, size:456);
461            send_event!(cardano_db_v1, ancillary_dl, completed =>  sender);
462
463            assert!(receiver.ancillary_download_progress_reporter.read().await.is_none());
464        }
465    }
466
467    mod cardano_database_v2 {
468        use super::*;
469
470        #[tokio::test]
471        async fn starting_should_add_multi_progress_bar() {
472            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
473            let sender = FeedbackSender::new(&[receiver.clone()]);
474
475            send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
476
477            assert!(receiver.cardano_database_multi_pb.read().await.is_some());
478        }
479
480        #[tokio::test]
481        async fn start_then_complete_should_remove_multi_progress_bar() {
482            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
483            let sender = FeedbackSender::new(&[receiver.clone()]);
484
485            send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
486            send_event!(cardano_db, dl_completed => sender);
487
488            assert!(receiver.cardano_database_multi_pb.read().await.is_none());
489        }
490
491        #[tokio::test]
492        async fn start_including_ancillary_add_one_to_total_downloads() {
493            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
494            let sender = FeedbackSender::new(&[receiver.clone()]);
495
496            send_event!(cardano_db, dl_started => sender, total:99, ancillary:true);
497
498            assert_eq!(
499                receiver
500                    .cardano_database_multi_pb
501                    .read()
502                    .await
503                    .as_ref()
504                    .map(|pb| pb.total_downloads()),
505                Some(100)
506            );
507        }
508
509        #[tokio::test]
510        async fn starting_twice_should_supersede_first_multi_progress_bar() {
511            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
512            let sender = FeedbackSender::new(&[receiver.clone()]);
513
514            send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
515            send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
516
517            assert_eq!(
518                receiver
519                    .cardano_database_multi_pb
520                    .read()
521                    .await
522                    .as_ref()
523                    .map(|pb| pb.total_downloads()),
524                Some(99)
525            );
526        }
527
528        #[tokio::test]
529        async fn complete_without_start_should_not_panic() {
530            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
531            let sender = FeedbackSender::new(&[receiver.clone()]);
532
533            send_event!(cardano_db, dl_completed => sender);
534        }
535
536        #[tokio::test]
537        async fn starting_immutable_downloads_should_not_add_progress_bars() {
538            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
539            let sender = FeedbackSender::new(&[receiver.clone()]);
540
541            send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
542
543            let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
544            let multi_pb = multi_pb_option.as_ref().unwrap();
545
546            assert_eq!(multi_pb.number_of_active_downloads().await, 0);
547            send_event!(cardano_db, immutable_dl, started => sender, immutable:1, size:123);
548            assert_eq!(multi_pb.number_of_active_downloads().await, 0);
549        }
550
551        #[tokio::test]
552        async fn completed_immutable_downloads_bump_progress() {
553            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
554            let sender = FeedbackSender::new(&[receiver.clone()]);
555
556            send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
557
558            let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
559            let multi_pb = multi_pb_option.as_ref().unwrap();
560
561            assert_eq!(multi_pb.position(), 0);
562            send_event!(cardano_db, immutable_dl, completed => sender, immutable:24);
563            assert_eq!(multi_pb.position(), 1);
564        }
565
566        #[tokio::test]
567        async fn starting_digests_downloads_should_not_add_progress_bars() {
568            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
569            let sender = FeedbackSender::new(&[receiver.clone()]);
570
571            send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
572
573            let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
574            let multi_pb = multi_pb_option.as_ref().unwrap();
575
576            assert_eq!(multi_pb.number_of_active_downloads().await, 0);
577            send_event!(cardano_db, digests_dl, started => sender, size:789);
578            assert_eq!(multi_pb.number_of_active_downloads().await, 0);
579        }
580
581        #[tokio::test]
582        async fn starting_ancillary_downloads_should_add_a_progress_bar() {
583            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
584            let sender = FeedbackSender::new(&[receiver.clone()]);
585
586            send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
587
588            let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
589            let multi_pb = multi_pb_option.as_ref().unwrap();
590
591            assert_eq!(multi_pb.number_of_active_downloads().await, 0);
592            send_event!(cardano_db, ancillary_dl, started => sender, size:456);
593            assert_eq!(multi_pb.number_of_active_downloads().await, 1);
594        }
595    }
596}