mithril_client_cli/utils/
feedback_receiver.rs

1use async_trait::async_trait;
2use indicatif::{ProgressBar, ProgressDrawTarget};
3use slog::Logger;
4use tokio::sync::RwLock;
5
6use super::{
7    DownloadProgressReporter, DownloadProgressReporterParams, MultiDownloadProgressReporter,
8    ProgressBarKind, ProgressOutputType,
9};
10
11use mithril_client::feedback::{FeedbackReceiver, MithrilEvent, MithrilEventCardanoDatabase};
12
13/// Custom [FeedbackReceiver] for Cardano DB to handle events sent
14/// by the `mithril-client` library
15pub struct IndicatifFeedbackReceiver {
16    download_progress_reporter: RwLock<Option<DownloadProgressReporter>>,
17    ancillary_download_progress_reporter: RwLock<Option<DownloadProgressReporter>>,
18    certificate_validation_pb: RwLock<Option<ProgressBar>>,
19    cardano_database_multi_pb: RwLock<Option<MultiDownloadProgressReporter>>,
20    output_type: ProgressOutputType,
21    logger: Logger,
22}
23
24impl IndicatifFeedbackReceiver {
25    /// [IndicatifFeedbackReceiver] constructor
26    pub fn new(output_type: ProgressOutputType, logger: Logger) -> Self {
27        Self {
28            download_progress_reporter: RwLock::new(None),
29            ancillary_download_progress_reporter: RwLock::new(None),
30            certificate_validation_pb: RwLock::new(None),
31            cardano_database_multi_pb: RwLock::new(None),
32            output_type,
33            logger,
34        }
35    }
36}
37
38#[async_trait]
39impl FeedbackReceiver for IndicatifFeedbackReceiver {
40    async fn handle_event(&self, event: MithrilEvent) {
41        match event {
42            MithrilEvent::SnapshotDownloadStarted {
43                digest: _,
44                download_id: _,
45                size,
46            } => {
47                let pb = if self.output_type == ProgressOutputType::Tty {
48                    ProgressBar::new(size)
49                } else {
50                    ProgressBar::with_draw_target(Some(size), ProgressDrawTarget::hidden())
51                };
52                let mut download_progress_reporter = self.download_progress_reporter.write().await;
53                *download_progress_reporter = Some(DownloadProgressReporter::new(
54                    pb,
55                    DownloadProgressReporterParams {
56                        label: "Snapshot".to_string(),
57                        output_type: self.output_type,
58                        progress_bar_kind: ProgressBarKind::Bytes,
59                        include_label_in_tty: false,
60                    },
61                    self.logger.clone(),
62                ));
63            }
64            MithrilEvent::SnapshotDownloadProgress {
65                download_id: _,
66                downloaded_bytes,
67                size: _,
68            } => {
69                let download_progress_reporter = self.download_progress_reporter.read().await;
70                if let Some(progress_reporter) = download_progress_reporter.as_ref() {
71                    progress_reporter.report(downloaded_bytes);
72                }
73            }
74            MithrilEvent::SnapshotDownloadCompleted { download_id: _ } => {
75                let mut download_progress_reporter = self.download_progress_reporter.write().await;
76                if let Some(progress_reporter) = download_progress_reporter.as_ref() {
77                    progress_reporter.finish("Cardano DB download completed");
78                }
79                *download_progress_reporter = None;
80            }
81            MithrilEvent::SnapshotAncillaryDownloadStarted { size, .. } => {
82                let pb = if self.output_type == ProgressOutputType::Tty {
83                    ProgressBar::new(size)
84                } else {
85                    ProgressBar::with_draw_target(Some(size), ProgressDrawTarget::hidden())
86                };
87                let mut ancillary_download_progress_reporter =
88                    self.ancillary_download_progress_reporter.write().await;
89                *ancillary_download_progress_reporter = Some(DownloadProgressReporter::new(
90                    pb,
91                    DownloadProgressReporterParams {
92                        label: "Snapshot ancillary".to_string(),
93                        output_type: self.output_type,
94                        progress_bar_kind: ProgressBarKind::Bytes,
95                        include_label_in_tty: false,
96                    },
97                    self.logger.clone(),
98                ));
99            }
100            MithrilEvent::SnapshotAncillaryDownloadProgress {
101                downloaded_bytes, ..
102            } => {
103                let ancillary_download_progress_reporter =
104                    self.ancillary_download_progress_reporter.read().await;
105                if let Some(progress_reporter) = ancillary_download_progress_reporter.as_ref() {
106                    progress_reporter.report(downloaded_bytes);
107                }
108            }
109            MithrilEvent::SnapshotAncillaryDownloadCompleted { .. } => {
110                let mut ancillary_download_progress_reporter =
111                    self.ancillary_download_progress_reporter.write().await;
112                if let Some(progress_reporter) = ancillary_download_progress_reporter.as_ref() {
113                    progress_reporter.finish("Cardano DB ancillary download completed");
114                }
115                *ancillary_download_progress_reporter = None;
116            }
117            MithrilEvent::CardanoDatabase(cardano_database_event) => match cardano_database_event {
118                MithrilEventCardanoDatabase::Started {
119                    download_id: _,
120                    total_immutable_files,
121                    include_ancillary,
122                } => {
123                    let multi_pb = MultiDownloadProgressReporter::new(
124                        total_immutable_files + if include_ancillary { 1 } else { 0 },
125                        self.output_type,
126                        self.logger.clone(),
127                    );
128                    let mut cardano_database_multi_pb =
129                        self.cardano_database_multi_pb.write().await;
130                    *cardano_database_multi_pb = Some(multi_pb);
131                }
132                MithrilEventCardanoDatabase::Completed { download_id: _ } => {
133                    let mut cardano_database_multi_pb =
134                        self.cardano_database_multi_pb.write().await;
135
136                    if let Some(multi_pb) = cardano_database_multi_pb.as_ref() {
137                        multi_pb.finish_all("Cardano DB download completed").await;
138                        *cardano_database_multi_pb = None;
139                    }
140                }
141                MithrilEventCardanoDatabase::DigestDownloadStarted { .. }
142                | MithrilEventCardanoDatabase::DigestDownloadProgress { .. }
143                | MithrilEventCardanoDatabase::DigestDownloadCompleted { .. }
144                | MithrilEventCardanoDatabase::ImmutableDownloadStarted { .. }
145                | MithrilEventCardanoDatabase::ImmutableDownloadProgress { .. } => {
146                    // Ignore those events as those downloads are fast enough that we don't need to show progress bars
147                }
148                MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
149                    immutable_file_number: _,
150                    download_id: _,
151                } => {
152                    if let Some(cardano_database_multi_pb) =
153                        self.cardano_database_multi_pb.read().await.as_ref()
154                    {
155                        cardano_database_multi_pb.bump_main_bar_progress();
156                    }
157                }
158                MithrilEventCardanoDatabase::AncillaryDownloadStarted {
159                    download_id: _,
160                    size,
161                } => {
162                    if let Some(cardano_database_multi_pb) =
163                        self.cardano_database_multi_pb.read().await.as_ref()
164                    {
165                        cardano_database_multi_pb
166                            .add_child_bar("Ancillary", ProgressBarKind::Bytes, size)
167                            .await;
168                    }
169                }
170                MithrilEventCardanoDatabase::AncillaryDownloadProgress {
171                    download_id: _,
172                    downloaded_bytes,
173                    size: _,
174                } => {
175                    if let Some(cardano_database_multi_pb) =
176                        self.cardano_database_multi_pb.read().await.as_ref()
177                    {
178                        cardano_database_multi_pb
179                            .progress_child_bar("Ancillary", downloaded_bytes)
180                            .await;
181                    }
182                }
183                MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id: _ } => {
184                    if let Some(cardano_database_multi_pb) =
185                        self.cardano_database_multi_pb.read().await.as_ref()
186                    {
187                        cardano_database_multi_pb
188                            .finish_child_bar("Ancillary")
189                            .await;
190                    }
191                }
192            },
193            MithrilEvent::CertificateChainValidationStarted {
194                certificate_chain_validation_id: _,
195            } => {
196                let pb = if self.output_type == ProgressOutputType::Tty {
197                    ProgressBar::new_spinner()
198                } else {
199                    ProgressBar::hidden()
200                };
201                let mut certificate_validation_pb = self.certificate_validation_pb.write().await;
202                *certificate_validation_pb = Some(pb);
203            }
204            MithrilEvent::CertificateValidated {
205                certificate_chain_validation_id: _,
206                certificate_hash,
207            } => {
208                let certificate_validation_pb = self.certificate_validation_pb.read().await;
209                if let Some(progress_bar) = certificate_validation_pb.as_ref() {
210                    progress_bar.set_message(format!("Certificate '{certificate_hash}' is valid"));
211                    progress_bar.inc(1);
212                }
213            }
214            MithrilEvent::CertificateFetchedFromCache {
215                certificate_chain_validation_id: _,
216                certificate_hash,
217            } => {
218                let certificate_validation_pb = self.certificate_validation_pb.read().await;
219                if let Some(progress_bar) = certificate_validation_pb.as_ref() {
220                    progress_bar.set_message(format!("Cached '{certificate_hash}'"));
221                    progress_bar.inc(1);
222                }
223            }
224            MithrilEvent::CertificateChainValidated {
225                certificate_chain_validation_id: _,
226            } => {
227                let mut certificate_validation_pb = self.certificate_validation_pb.write().await;
228                if let Some(progress_bar) = certificate_validation_pb.as_ref() {
229                    progress_bar.finish_with_message("Certificate chain validated");
230                }
231                *certificate_validation_pb = None;
232            }
233        }
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use slog::o;
240    use std::sync::Arc;
241
242    use mithril_client::feedback::FeedbackSender;
243
244    use super::*;
245
246    const DOWNLOAD_ID: &str = "id";
247
248    macro_rules! send_event {
249        (cardano_db, dl_started => $sender:expr, total:$total_immutable:expr, ancillary:$include_ancillary:expr) => {
250            $sender
251                .send_event(MithrilEvent::CardanoDatabase(
252                    MithrilEventCardanoDatabase::Started {
253                        download_id: DOWNLOAD_ID.to_string(),
254                        total_immutable_files: $total_immutable,
255                        include_ancillary: $include_ancillary,
256                    },
257                ))
258                .await;
259        };
260        (cardano_db, dl_completed => $sender:expr) => {
261            $sender
262                .send_event(MithrilEvent::CardanoDatabase(
263                    MithrilEventCardanoDatabase::Completed {
264                        download_id: DOWNLOAD_ID.to_string(),
265                    },
266                ))
267                .await;
268        };
269        (cardano_db, immutable_dl, started => $sender:expr, immutable:$immutable_file_number:expr, size:$size:expr) => {
270            $sender
271                .send_event(MithrilEvent::CardanoDatabase(
272                    MithrilEventCardanoDatabase::ImmutableDownloadStarted {
273                        immutable_file_number: $immutable_file_number,
274                        download_id: DOWNLOAD_ID.to_string(),
275                        size: $size,
276                    },
277                ))
278                .await;
279        };
280        (cardano_db, immutable_dl, progress => $sender:expr, immutable:$immutable_file_number:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
281            $sender
282                .send_event(MithrilEvent::CardanoDatabase(
283                    MithrilEventCardanoDatabase::ImmutableDownloadProgress {
284                        immutable_file_number: $immutable_file_number,
285                        download_id: DOWNLOAD_ID.to_string(),
286                        downloaded_bytes: $downloaded_bytes,
287                        size: $size,
288                    },
289                ))
290                .await;
291        };
292        (cardano_db, immutable_dl, completed => $sender:expr, immutable:$immutable_file_number:expr) => {
293            $sender
294                .send_event(MithrilEvent::CardanoDatabase(
295                    MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
296                        immutable_file_number: $immutable_file_number,
297                        download_id: DOWNLOAD_ID.to_string(),
298                    },
299                ))
300                .await;
301        };
302        (cardano_db, ancillary_dl, started => $sender:expr, size:$size:expr) => {
303            $sender
304                .send_event(MithrilEvent::CardanoDatabase(
305                    MithrilEventCardanoDatabase::AncillaryDownloadStarted {
306                        download_id: DOWNLOAD_ID.to_string(),
307                        size: $size,
308                    },
309                ))
310                .await;
311        };
312        (cardano_db, ancillary_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
313            $sender
314                .send_event(MithrilEvent::CardanoDatabase(
315                    MithrilEventCardanoDatabase::AncillaryDownloadProgress {
316                        download_id: DOWNLOAD_ID.to_string(),
317                        downloaded_bytes: $downloaded_bytes,
318                        size: $size,
319                    },
320                ))
321                .await;
322        };
323        (cardano_db, ancillary_dl, completed => $sender:expr) => {
324            $sender
325                .send_event(MithrilEvent::CardanoDatabase(
326                    MithrilEventCardanoDatabase::AncillaryDownloadCompleted {
327                        download_id: DOWNLOAD_ID.to_string(),
328                    },
329                ))
330                .await;
331        };
332        (cardano_db, digests_dl, started => $sender:expr, size:$size:expr) => {
333            $sender
334                .send_event(MithrilEvent::CardanoDatabase(
335                    MithrilEventCardanoDatabase::DigestDownloadStarted {
336                        download_id: DOWNLOAD_ID.to_string(),
337                        size: $size,
338                    },
339                ))
340                .await;
341        };
342        (cardano_db, digests_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
343            $sender
344                .send_event(MithrilEvent::CardanoDatabase(
345                    MithrilEventCardanoDatabase::DigestDownloadProgress {
346                        download_id: DOWNLOAD_ID.to_string(),
347                        downloaded_bytes: $downloaded_bytes,
348                        size: $size,
349                    },
350                ))
351                .await;
352        };
353        (cardano_db, digests_dl, completed => $sender:expr) => {
354            $sender
355                .send_event(MithrilEvent::CardanoDatabase(
356                    MithrilEventCardanoDatabase::DigestDownloadCompleted {
357                        download_id: DOWNLOAD_ID.to_string(),
358                    },
359                ))
360                .await;
361        };
362        (cardano_db_v1, full_immutables_dl, started => $sender:expr, digest:$digest:expr, size:$size:expr) => {
363            $sender
364                .send_event(MithrilEvent::SnapshotDownloadStarted {
365                    download_id: DOWNLOAD_ID.to_string(),
366                    digest: $digest.into(),
367                    size: $size,
368                })
369                .await;
370        };
371        (cardano_db_v1, full_immutables_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
372            $sender
373                .send_event(MithrilEvent::SnapshotDownloadProgress {
374                    download_id: DOWNLOAD_ID.to_string(),
375                    downloaded_bytes: $downloaded_bytes,
376                    size: $size,
377                })
378                .await;
379        };
380        (cardano_db_v1, full_immutables_dl, completed => $sender:expr) => {
381            $sender
382                .send_event(MithrilEvent::SnapshotDownloadCompleted {
383                    download_id: DOWNLOAD_ID.to_string(),
384                })
385                .await;
386        };
387        (cardano_db_v1, ancillary_dl, started => $sender:expr, size:$size:expr) => {
388            $sender
389                .send_event(MithrilEvent::SnapshotAncillaryDownloadStarted {
390                    download_id: DOWNLOAD_ID.to_string(),
391                    size: $size,
392                })
393                .await;
394        };
395        (cardano_db_v1, ancillary_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
396            $sender
397                .send_event(MithrilEvent::SnapshotAncillaryDownloadProgress {
398                    download_id: DOWNLOAD_ID.to_string(),
399                    downloaded_bytes: $downloaded_bytes,
400                    size: $size,
401                })
402                .await;
403        };
404        (cardano_db_v1, ancillary_dl, completed => $sender:expr) => {
405            $sender
406                .send_event(MithrilEvent::SnapshotAncillaryDownloadCompleted {
407                    download_id: DOWNLOAD_ID.to_string(),
408                })
409                .await;
410        };
411    }
412
413    fn build_feedback_receiver(output_type: ProgressOutputType) -> Arc<IndicatifFeedbackReceiver> {
414        Arc::new(IndicatifFeedbackReceiver::new(
415            output_type,
416            slog::Logger::root(slog::Discard, o!()),
417        ))
418    }
419
420    mod cardano_database_v1 {
421        use super::*;
422
423        #[tokio::test]
424        async fn starting_full_immutables_and_ancillary_together_spawn_two_progress_bars() {
425            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
426            let sender = FeedbackSender::new(&[receiver.clone()]);
427
428            send_event!(cardano_db_v1, full_immutables_dl, started => sender, digest:"digest", size:123);
429            send_event!(cardano_db_v1, ancillary_dl, started =>  sender, size:456);
430
431            assert!(receiver.download_progress_reporter.read().await.is_some());
432            assert!(receiver
433                .ancillary_download_progress_reporter
434                .read()
435                .await
436                .is_some());
437        }
438
439        #[tokio::test]
440        async fn start_and_progress_ancillary_download_with_a_size_of_zero_should_not_crash() {
441            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
442            let sender = FeedbackSender::new(&[receiver.clone()]);
443
444            send_event!(cardano_db_v1, ancillary_dl, started => sender, size:0);
445            send_event!(cardano_db_v1, ancillary_dl, progress => sender, bytes:124, size:0);
446
447            assert!(receiver
448                .ancillary_download_progress_reporter
449                .read()
450                .await
451                .is_some());
452        }
453
454        #[tokio::test]
455        async fn start_then_complete_should_remove_immutables_progress_bar() {
456            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
457            let sender = FeedbackSender::new(&[receiver.clone()]);
458
459            send_event!(cardano_db_v1, full_immutables_dl, started => sender, digest:"digest", size:123);
460            send_event!(cardano_db_v1, full_immutables_dl, completed => sender);
461
462            assert!(receiver.cardano_database_multi_pb.read().await.is_none());
463        }
464
465        #[tokio::test]
466        async fn start_then_complete_should_remove_ancillary_progress_bar() {
467            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
468            let sender = FeedbackSender::new(&[receiver.clone()]);
469
470            send_event!(cardano_db_v1, ancillary_dl, started =>  sender, size:456);
471            send_event!(cardano_db_v1, ancillary_dl, completed =>  sender);
472
473            assert!(receiver
474                .ancillary_download_progress_reporter
475                .read()
476                .await
477                .is_none());
478        }
479    }
480
481    mod cardano_database_v2 {
482        use super::*;
483
484        #[tokio::test]
485        async fn starting_should_add_multi_progress_bar() {
486            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
487            let sender = FeedbackSender::new(&[receiver.clone()]);
488
489            send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
490
491            assert!(receiver.cardano_database_multi_pb.read().await.is_some());
492        }
493
494        #[tokio::test]
495        async fn start_then_complete_should_remove_multi_progress_bar() {
496            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
497            let sender = FeedbackSender::new(&[receiver.clone()]);
498
499            send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
500            send_event!(cardano_db, dl_completed => sender);
501
502            assert!(receiver.cardano_database_multi_pb.read().await.is_none());
503        }
504
505        #[tokio::test]
506        async fn start_including_ancillary_add_one_to_total_downloads() {
507            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
508            let sender = FeedbackSender::new(&[receiver.clone()]);
509
510            send_event!(cardano_db, dl_started => sender, total:99, ancillary:true);
511
512            assert_eq!(
513                receiver
514                    .cardano_database_multi_pb
515                    .read()
516                    .await
517                    .as_ref()
518                    .map(|pb| pb.total_downloads()),
519                Some(100)
520            );
521        }
522
523        #[tokio::test]
524        async fn starting_twice_should_supersede_first_multi_progress_bar() {
525            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
526            let sender = FeedbackSender::new(&[receiver.clone()]);
527
528            send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
529            send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
530
531            assert_eq!(
532                receiver
533                    .cardano_database_multi_pb
534                    .read()
535                    .await
536                    .as_ref()
537                    .map(|pb| pb.total_downloads()),
538                Some(99)
539            );
540        }
541
542        #[tokio::test]
543        async fn complete_without_start_should_not_panic() {
544            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
545            let sender = FeedbackSender::new(&[receiver.clone()]);
546
547            send_event!(cardano_db, dl_completed => sender);
548        }
549
550        #[tokio::test]
551        async fn starting_immutable_downloads_should_not_add_progress_bars() {
552            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
553            let sender = FeedbackSender::new(&[receiver.clone()]);
554
555            send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
556
557            let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
558            let multi_pb = multi_pb_option.as_ref().unwrap();
559
560            assert_eq!(multi_pb.number_of_active_downloads().await, 0);
561            send_event!(cardano_db, immutable_dl, started => sender, immutable:1, size:123);
562            assert_eq!(multi_pb.number_of_active_downloads().await, 0);
563        }
564
565        #[tokio::test]
566        async fn completed_immutable_downloads_bump_progress() {
567            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
568            let sender = FeedbackSender::new(&[receiver.clone()]);
569
570            send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
571
572            let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
573            let multi_pb = multi_pb_option.as_ref().unwrap();
574
575            assert_eq!(multi_pb.position(), 0);
576            send_event!(cardano_db, immutable_dl, completed => sender, immutable:24);
577            assert_eq!(multi_pb.position(), 1);
578        }
579
580        #[tokio::test]
581        async fn starting_digests_downloads_should_not_add_progress_bars() {
582            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
583            let sender = FeedbackSender::new(&[receiver.clone()]);
584
585            send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
586
587            let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
588            let multi_pb = multi_pb_option.as_ref().unwrap();
589
590            assert_eq!(multi_pb.number_of_active_downloads().await, 0);
591            send_event!(cardano_db, digests_dl, started => sender, size:789);
592            assert_eq!(multi_pb.number_of_active_downloads().await, 0);
593        }
594
595        #[tokio::test]
596        async fn starting_ancillary_downloads_should_add_a_progress_bar() {
597            let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
598            let sender = FeedbackSender::new(&[receiver.clone()]);
599
600            send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
601
602            let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
603            let multi_pb = multi_pb_option.as_ref().unwrap();
604
605            assert_eq!(multi_pb.number_of_active_downloads().await, 0);
606            send_event!(cardano_db, ancillary_dl, started => sender, size:456);
607            assert_eq!(multi_pb.number_of_active_downloads().await, 1);
608        }
609    }
610}