1use async_trait::async_trait;
2use indicatif::{ProgressBar, ProgressDrawTarget};
3use slog::Logger;
4use tokio::sync::RwLock;
5
6use super::{
7 DownloadProgressReporter, DownloadProgressReporterParams, MultiDownloadProgressReporter,
8 ProgressBarKind, ProgressOutputType,
9};
10
11use mithril_client::feedback::{FeedbackReceiver, MithrilEvent, MithrilEventCardanoDatabase};
12
13pub struct IndicatifFeedbackReceiver {
16 download_progress_reporter: RwLock<Option<DownloadProgressReporter>>,
17 ancillary_download_progress_reporter: RwLock<Option<DownloadProgressReporter>>,
18 certificate_validation_pb: RwLock<Option<ProgressBar>>,
19 cardano_database_multi_pb: RwLock<Option<MultiDownloadProgressReporter>>,
20 output_type: ProgressOutputType,
21 logger: Logger,
22}
23
24impl IndicatifFeedbackReceiver {
25 pub fn new(output_type: ProgressOutputType, logger: Logger) -> Self {
27 Self {
28 download_progress_reporter: RwLock::new(None),
29 ancillary_download_progress_reporter: RwLock::new(None),
30 certificate_validation_pb: RwLock::new(None),
31 cardano_database_multi_pb: RwLock::new(None),
32 output_type,
33 logger,
34 }
35 }
36}
37
38#[async_trait]
39impl FeedbackReceiver for IndicatifFeedbackReceiver {
40 async fn handle_event(&self, event: MithrilEvent) {
41 match event {
42 MithrilEvent::SnapshotDownloadStarted {
43 digest: _,
44 download_id: _,
45 size,
46 } => {
47 let pb = if self.output_type == ProgressOutputType::Tty {
48 ProgressBar::new(size)
49 } else {
50 ProgressBar::with_draw_target(Some(size), ProgressDrawTarget::hidden())
51 };
52 let mut download_progress_reporter = self.download_progress_reporter.write().await;
53 *download_progress_reporter = Some(DownloadProgressReporter::new(
54 pb,
55 DownloadProgressReporterParams {
56 label: "Snapshot".to_string(),
57 output_type: self.output_type,
58 progress_bar_kind: ProgressBarKind::Bytes,
59 include_label_in_tty: false,
60 },
61 self.logger.clone(),
62 ));
63 }
64 MithrilEvent::SnapshotDownloadProgress {
65 download_id: _,
66 downloaded_bytes,
67 size: _,
68 } => {
69 let download_progress_reporter = self.download_progress_reporter.read().await;
70 if let Some(progress_reporter) = download_progress_reporter.as_ref() {
71 progress_reporter.report(downloaded_bytes);
72 }
73 }
74 MithrilEvent::SnapshotDownloadCompleted { download_id: _ } => {
75 let mut download_progress_reporter = self.download_progress_reporter.write().await;
76 if let Some(progress_reporter) = download_progress_reporter.as_ref() {
77 progress_reporter.finish("Cardano DB download completed");
78 }
79 *download_progress_reporter = None;
80 }
81 MithrilEvent::SnapshotAncillaryDownloadStarted { size, .. } => {
82 let pb = if self.output_type == ProgressOutputType::Tty {
83 ProgressBar::new(size)
84 } else {
85 ProgressBar::with_draw_target(Some(size), ProgressDrawTarget::hidden())
86 };
87 let mut ancillary_download_progress_reporter =
88 self.ancillary_download_progress_reporter.write().await;
89 *ancillary_download_progress_reporter = Some(DownloadProgressReporter::new(
90 pb,
91 DownloadProgressReporterParams {
92 label: "Snapshot ancillary".to_string(),
93 output_type: self.output_type,
94 progress_bar_kind: ProgressBarKind::Bytes,
95 include_label_in_tty: false,
96 },
97 self.logger.clone(),
98 ));
99 }
100 MithrilEvent::SnapshotAncillaryDownloadProgress {
101 downloaded_bytes, ..
102 } => {
103 let ancillary_download_progress_reporter =
104 self.ancillary_download_progress_reporter.read().await;
105 if let Some(progress_reporter) = ancillary_download_progress_reporter.as_ref() {
106 progress_reporter.report(downloaded_bytes);
107 }
108 }
109 MithrilEvent::SnapshotAncillaryDownloadCompleted { .. } => {
110 let mut ancillary_download_progress_reporter =
111 self.ancillary_download_progress_reporter.write().await;
112 if let Some(progress_reporter) = ancillary_download_progress_reporter.as_ref() {
113 progress_reporter.finish("Cardano DB ancillary download completed");
114 }
115 *ancillary_download_progress_reporter = None;
116 }
117 MithrilEvent::CardanoDatabase(cardano_database_event) => match cardano_database_event {
118 MithrilEventCardanoDatabase::Started {
119 download_id: _,
120 total_immutable_files,
121 include_ancillary,
122 } => {
123 let multi_pb = MultiDownloadProgressReporter::new(
124 total_immutable_files + if include_ancillary { 1 } else { 0 },
125 self.output_type,
126 self.logger.clone(),
127 );
128 let mut cardano_database_multi_pb =
129 self.cardano_database_multi_pb.write().await;
130 *cardano_database_multi_pb = Some(multi_pb);
131 }
132 MithrilEventCardanoDatabase::Completed { download_id: _ } => {
133 let mut cardano_database_multi_pb =
134 self.cardano_database_multi_pb.write().await;
135
136 if let Some(multi_pb) = cardano_database_multi_pb.as_ref() {
137 multi_pb.finish_all("Cardano DB download completed").await;
138 *cardano_database_multi_pb = None;
139 }
140 }
141 MithrilEventCardanoDatabase::DigestDownloadStarted { .. }
142 | MithrilEventCardanoDatabase::DigestDownloadProgress { .. }
143 | MithrilEventCardanoDatabase::DigestDownloadCompleted { .. }
144 | MithrilEventCardanoDatabase::ImmutableDownloadStarted { .. }
145 | MithrilEventCardanoDatabase::ImmutableDownloadProgress { .. } => {
146 }
148 MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
149 immutable_file_number: _,
150 download_id: _,
151 } => {
152 if let Some(cardano_database_multi_pb) =
153 self.cardano_database_multi_pb.read().await.as_ref()
154 {
155 cardano_database_multi_pb.bump_main_bar_progress();
156 }
157 }
158 MithrilEventCardanoDatabase::AncillaryDownloadStarted {
159 download_id: _,
160 size,
161 } => {
162 if let Some(cardano_database_multi_pb) =
163 self.cardano_database_multi_pb.read().await.as_ref()
164 {
165 cardano_database_multi_pb
166 .add_child_bar("Ancillary", ProgressBarKind::Bytes, size)
167 .await;
168 }
169 }
170 MithrilEventCardanoDatabase::AncillaryDownloadProgress {
171 download_id: _,
172 downloaded_bytes,
173 size: _,
174 } => {
175 if let Some(cardano_database_multi_pb) =
176 self.cardano_database_multi_pb.read().await.as_ref()
177 {
178 cardano_database_multi_pb
179 .progress_child_bar("Ancillary", downloaded_bytes)
180 .await;
181 }
182 }
183 MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id: _ } => {
184 if let Some(cardano_database_multi_pb) =
185 self.cardano_database_multi_pb.read().await.as_ref()
186 {
187 cardano_database_multi_pb.finish_child_bar("Ancillary").await;
188 }
189 }
190 },
191 MithrilEvent::CertificateChainValidationStarted {
192 certificate_chain_validation_id: _,
193 } => {
194 let pb = if self.output_type == ProgressOutputType::Tty {
195 ProgressBar::new_spinner()
196 } else {
197 ProgressBar::hidden()
198 };
199 let mut certificate_validation_pb = self.certificate_validation_pb.write().await;
200 *certificate_validation_pb = Some(pb);
201 }
202 MithrilEvent::CertificateValidated {
203 certificate_chain_validation_id: _,
204 certificate_hash,
205 } => {
206 let certificate_validation_pb = self.certificate_validation_pb.read().await;
207 if let Some(progress_bar) = certificate_validation_pb.as_ref() {
208 progress_bar.set_message(format!("Certificate '{certificate_hash}' is valid"));
209 progress_bar.inc(1);
210 }
211 }
212 MithrilEvent::CertificateFetchedFromCache {
213 certificate_chain_validation_id: _,
214 certificate_hash,
215 } => {
216 let certificate_validation_pb = self.certificate_validation_pb.read().await;
217 if let Some(progress_bar) = certificate_validation_pb.as_ref() {
218 progress_bar.set_message(format!("Cached '{certificate_hash}'"));
219 progress_bar.inc(1);
220 }
221 }
222 MithrilEvent::CertificateChainValidated {
223 certificate_chain_validation_id: _,
224 } => {
225 let mut certificate_validation_pb = self.certificate_validation_pb.write().await;
226 if let Some(progress_bar) = certificate_validation_pb.as_ref() {
227 progress_bar.finish_with_message("Certificate chain validated");
228 }
229 *certificate_validation_pb = None;
230 }
231 }
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use slog::o;
238 use std::sync::Arc;
239
240 use mithril_client::feedback::FeedbackSender;
241
242 use super::*;
243
244 const DOWNLOAD_ID: &str = "id";
245
246 macro_rules! send_event {
247 (cardano_db, dl_started => $sender:expr, total:$total_immutable:expr, ancillary:$include_ancillary:expr) => {
248 $sender
249 .send_event(MithrilEvent::CardanoDatabase(
250 MithrilEventCardanoDatabase::Started {
251 download_id: DOWNLOAD_ID.to_string(),
252 total_immutable_files: $total_immutable,
253 include_ancillary: $include_ancillary,
254 },
255 ))
256 .await;
257 };
258 (cardano_db, dl_completed => $sender:expr) => {
259 $sender
260 .send_event(MithrilEvent::CardanoDatabase(
261 MithrilEventCardanoDatabase::Completed {
262 download_id: DOWNLOAD_ID.to_string(),
263 },
264 ))
265 .await;
266 };
267 (cardano_db, immutable_dl, started => $sender:expr, immutable:$immutable_file_number:expr, size:$size:expr) => {
268 $sender
269 .send_event(MithrilEvent::CardanoDatabase(
270 MithrilEventCardanoDatabase::ImmutableDownloadStarted {
271 immutable_file_number: $immutable_file_number,
272 download_id: DOWNLOAD_ID.to_string(),
273 size: $size,
274 },
275 ))
276 .await;
277 };
278 (cardano_db, immutable_dl, progress => $sender:expr, immutable:$immutable_file_number:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
279 $sender
280 .send_event(MithrilEvent::CardanoDatabase(
281 MithrilEventCardanoDatabase::ImmutableDownloadProgress {
282 immutable_file_number: $immutable_file_number,
283 download_id: DOWNLOAD_ID.to_string(),
284 downloaded_bytes: $downloaded_bytes,
285 size: $size,
286 },
287 ))
288 .await;
289 };
290 (cardano_db, immutable_dl, completed => $sender:expr, immutable:$immutable_file_number:expr) => {
291 $sender
292 .send_event(MithrilEvent::CardanoDatabase(
293 MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
294 immutable_file_number: $immutable_file_number,
295 download_id: DOWNLOAD_ID.to_string(),
296 },
297 ))
298 .await;
299 };
300 (cardano_db, ancillary_dl, started => $sender:expr, size:$size:expr) => {
301 $sender
302 .send_event(MithrilEvent::CardanoDatabase(
303 MithrilEventCardanoDatabase::AncillaryDownloadStarted {
304 download_id: DOWNLOAD_ID.to_string(),
305 size: $size,
306 },
307 ))
308 .await;
309 };
310 (cardano_db, ancillary_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
311 $sender
312 .send_event(MithrilEvent::CardanoDatabase(
313 MithrilEventCardanoDatabase::AncillaryDownloadProgress {
314 download_id: DOWNLOAD_ID.to_string(),
315 downloaded_bytes: $downloaded_bytes,
316 size: $size,
317 },
318 ))
319 .await;
320 };
321 (cardano_db, ancillary_dl, completed => $sender:expr) => {
322 $sender
323 .send_event(MithrilEvent::CardanoDatabase(
324 MithrilEventCardanoDatabase::AncillaryDownloadCompleted {
325 download_id: DOWNLOAD_ID.to_string(),
326 },
327 ))
328 .await;
329 };
330 (cardano_db, digests_dl, started => $sender:expr, size:$size:expr) => {
331 $sender
332 .send_event(MithrilEvent::CardanoDatabase(
333 MithrilEventCardanoDatabase::DigestDownloadStarted {
334 download_id: DOWNLOAD_ID.to_string(),
335 size: $size,
336 },
337 ))
338 .await;
339 };
340 (cardano_db, digests_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
341 $sender
342 .send_event(MithrilEvent::CardanoDatabase(
343 MithrilEventCardanoDatabase::DigestDownloadProgress {
344 download_id: DOWNLOAD_ID.to_string(),
345 downloaded_bytes: $downloaded_bytes,
346 size: $size,
347 },
348 ))
349 .await;
350 };
351 (cardano_db, digests_dl, completed => $sender:expr) => {
352 $sender
353 .send_event(MithrilEvent::CardanoDatabase(
354 MithrilEventCardanoDatabase::DigestDownloadCompleted {
355 download_id: DOWNLOAD_ID.to_string(),
356 },
357 ))
358 .await;
359 };
360 (cardano_db_v1, full_immutables_dl, started => $sender:expr, digest:$digest:expr, size:$size:expr) => {
361 $sender
362 .send_event(MithrilEvent::SnapshotDownloadStarted {
363 download_id: DOWNLOAD_ID.to_string(),
364 digest: $digest.into(),
365 size: $size,
366 })
367 .await;
368 };
369 (cardano_db_v1, full_immutables_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
370 $sender
371 .send_event(MithrilEvent::SnapshotDownloadProgress {
372 download_id: DOWNLOAD_ID.to_string(),
373 downloaded_bytes: $downloaded_bytes,
374 size: $size,
375 })
376 .await;
377 };
378 (cardano_db_v1, full_immutables_dl, completed => $sender:expr) => {
379 $sender
380 .send_event(MithrilEvent::SnapshotDownloadCompleted {
381 download_id: DOWNLOAD_ID.to_string(),
382 })
383 .await;
384 };
385 (cardano_db_v1, ancillary_dl, started => $sender:expr, size:$size:expr) => {
386 $sender
387 .send_event(MithrilEvent::SnapshotAncillaryDownloadStarted {
388 download_id: DOWNLOAD_ID.to_string(),
389 size: $size,
390 })
391 .await;
392 };
393 (cardano_db_v1, ancillary_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
394 $sender
395 .send_event(MithrilEvent::SnapshotAncillaryDownloadProgress {
396 download_id: DOWNLOAD_ID.to_string(),
397 downloaded_bytes: $downloaded_bytes,
398 size: $size,
399 })
400 .await;
401 };
402 (cardano_db_v1, ancillary_dl, completed => $sender:expr) => {
403 $sender
404 .send_event(MithrilEvent::SnapshotAncillaryDownloadCompleted {
405 download_id: DOWNLOAD_ID.to_string(),
406 })
407 .await;
408 };
409 }
410
411 fn build_feedback_receiver(output_type: ProgressOutputType) -> Arc<IndicatifFeedbackReceiver> {
412 Arc::new(IndicatifFeedbackReceiver::new(
413 output_type,
414 slog::Logger::root(slog::Discard, o!()),
415 ))
416 }
417
418 mod cardano_database_v1 {
419 use super::*;
420
421 #[tokio::test]
422 async fn starting_full_immutables_and_ancillary_together_spawn_two_progress_bars() {
423 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
424 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
425 let sender = FeedbackSender::new(&[receiver_clone]);
426
427 send_event!(cardano_db_v1, full_immutables_dl, started => sender, digest:"digest", size:123);
428 send_event!(cardano_db_v1, ancillary_dl, started => sender, size:456);
429
430 assert!(receiver.download_progress_reporter.read().await.is_some());
431 assert!(receiver.ancillary_download_progress_reporter.read().await.is_some());
432 }
433
434 #[tokio::test]
435 async fn start_and_progress_ancillary_download_with_a_size_of_zero_should_not_crash() {
436 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
437 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
438 let sender = FeedbackSender::new(&[receiver_clone]);
439
440 send_event!(cardano_db_v1, ancillary_dl, started => sender, size:0);
441 send_event!(cardano_db_v1, ancillary_dl, progress => sender, bytes:124, size:0);
442
443 assert!(receiver.ancillary_download_progress_reporter.read().await.is_some());
444 }
445
446 #[tokio::test]
447 async fn start_then_complete_should_remove_immutables_progress_bar() {
448 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
449 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
450 let sender = FeedbackSender::new(&[receiver_clone]);
451
452 send_event!(cardano_db_v1, full_immutables_dl, started => sender, digest:"digest", size:123);
453 send_event!(cardano_db_v1, full_immutables_dl, completed => sender);
454
455 assert!(receiver.cardano_database_multi_pb.read().await.is_none());
456 }
457
458 #[tokio::test]
459 async fn start_then_complete_should_remove_ancillary_progress_bar() {
460 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
461 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
462 let sender = FeedbackSender::new(&[receiver_clone]);
463
464 send_event!(cardano_db_v1, ancillary_dl, started => sender, size:456);
465 send_event!(cardano_db_v1, ancillary_dl, completed => sender);
466
467 assert!(receiver.ancillary_download_progress_reporter.read().await.is_none());
468 }
469 }
470
471 mod cardano_database_v2 {
472 use super::*;
473
474 #[tokio::test]
475 async fn starting_should_add_multi_progress_bar() {
476 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
477 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
478 let sender = FeedbackSender::new(&[receiver_clone]);
479
480 send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
481
482 assert!(receiver.cardano_database_multi_pb.read().await.is_some());
483 }
484
485 #[tokio::test]
486 async fn start_then_complete_should_remove_multi_progress_bar() {
487 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
488 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
489 let sender = FeedbackSender::new(&[receiver_clone]);
490
491 send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
492 send_event!(cardano_db, dl_completed => sender);
493
494 assert!(receiver.cardano_database_multi_pb.read().await.is_none());
495 }
496
497 #[tokio::test]
498 async fn start_including_ancillary_add_one_to_total_downloads() {
499 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
500 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
501 let sender = FeedbackSender::new(&[receiver_clone]);
502
503 send_event!(cardano_db, dl_started => sender, total:99, ancillary:true);
504
505 assert_eq!(
506 receiver
507 .cardano_database_multi_pb
508 .read()
509 .await
510 .as_ref()
511 .map(|pb| pb.total_downloads()),
512 Some(100)
513 );
514 }
515
516 #[tokio::test]
517 async fn starting_twice_should_supersede_first_multi_progress_bar() {
518 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
519 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
520 let sender = FeedbackSender::new(&[receiver_clone]);
521
522 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
523 send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
524
525 assert_eq!(
526 receiver
527 .cardano_database_multi_pb
528 .read()
529 .await
530 .as_ref()
531 .map(|pb| pb.total_downloads()),
532 Some(99)
533 );
534 }
535
536 #[tokio::test]
537 async fn complete_without_start_should_not_panic() {
538 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
539 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
540 let sender = FeedbackSender::new(&[receiver_clone]);
541
542 send_event!(cardano_db, dl_completed => sender);
543 }
544
545 #[tokio::test]
546 async fn starting_immutable_downloads_should_not_add_progress_bars() {
547 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
548 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
549 let sender = FeedbackSender::new(&[receiver_clone]);
550
551 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
552
553 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
554 let multi_pb = multi_pb_option.as_ref().unwrap();
555
556 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
557 send_event!(cardano_db, immutable_dl, started => sender, immutable:1, size:123);
558 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
559 }
560
561 #[tokio::test]
562 async fn completed_immutable_downloads_bump_progress() {
563 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
564 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
565 let sender = FeedbackSender::new(&[receiver_clone]);
566
567 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
568
569 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
570 let multi_pb = multi_pb_option.as_ref().unwrap();
571
572 assert_eq!(multi_pb.position(), 0);
573 send_event!(cardano_db, immutable_dl, completed => sender, immutable:24);
574 assert_eq!(multi_pb.position(), 1);
575 }
576
577 #[tokio::test]
578 async fn starting_digests_downloads_should_not_add_progress_bars() {
579 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
580 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
581 let sender = FeedbackSender::new(&[receiver_clone]);
582
583 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
584
585 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
586 let multi_pb = multi_pb_option.as_ref().unwrap();
587
588 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
589 send_event!(cardano_db, digests_dl, started => sender, size:789);
590 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
591 }
592
593 #[tokio::test]
594 async fn starting_ancillary_downloads_should_add_a_progress_bar() {
595 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
596 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
597 let sender = FeedbackSender::new(&[receiver_clone]);
598
599 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
600
601 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
602 let multi_pb = multi_pb_option.as_ref().unwrap();
603
604 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
605 send_event!(cardano_db, ancillary_dl, started => sender, size:456);
606 assert_eq!(multi_pb.number_of_active_downloads().await, 1);
607 }
608 }
609}