1use async_trait::async_trait;
2use indicatif::{ProgressBar, ProgressDrawTarget};
3use slog::Logger;
4use tokio::sync::RwLock;
5
6use super::{
7 DownloadProgressReporter, DownloadProgressReporterParams, MultiDownloadProgressReporter,
8 ProgressBarKind, ProgressOutputType,
9};
10
11use mithril_client::feedback::{FeedbackReceiver, MithrilEvent, MithrilEventCardanoDatabase};
12
13pub struct IndicatifFeedbackReceiver {
16 download_progress_reporter: RwLock<Option<DownloadProgressReporter>>,
17 ancillary_download_progress_reporter: RwLock<Option<DownloadProgressReporter>>,
18 certificate_validation_pb: RwLock<Option<ProgressBar>>,
19 cardano_database_multi_pb: RwLock<Option<MultiDownloadProgressReporter>>,
20 output_type: ProgressOutputType,
21 logger: Logger,
22}
23
24impl IndicatifFeedbackReceiver {
25 pub fn new(output_type: ProgressOutputType, logger: Logger) -> Self {
27 Self {
28 download_progress_reporter: RwLock::new(None),
29 ancillary_download_progress_reporter: RwLock::new(None),
30 certificate_validation_pb: RwLock::new(None),
31 cardano_database_multi_pb: RwLock::new(None),
32 output_type,
33 logger,
34 }
35 }
36}
37
38#[async_trait]
39impl FeedbackReceiver for IndicatifFeedbackReceiver {
40 async fn handle_event(&self, event: MithrilEvent) {
41 match event {
42 MithrilEvent::SnapshotDownloadStarted {
43 digest: _,
44 download_id: _,
45 size,
46 } => {
47 let pb = if self.output_type == ProgressOutputType::Tty {
48 ProgressBar::new(size)
49 } else {
50 ProgressBar::with_draw_target(Some(size), ProgressDrawTarget::hidden())
51 };
52 let mut download_progress_reporter = self.download_progress_reporter.write().await;
53 *download_progress_reporter = Some(DownloadProgressReporter::new(
54 pb,
55 DownloadProgressReporterParams {
56 label: "Snapshot".to_string(),
57 output_type: self.output_type,
58 progress_bar_kind: ProgressBarKind::Bytes,
59 include_label_in_tty: false,
60 },
61 self.logger.clone(),
62 ));
63 }
64 MithrilEvent::SnapshotDownloadProgress {
65 download_id: _,
66 downloaded_bytes,
67 size: _,
68 } => {
69 let download_progress_reporter = self.download_progress_reporter.read().await;
70 if let Some(progress_reporter) = download_progress_reporter.as_ref() {
71 progress_reporter.report(downloaded_bytes);
72 }
73 }
74 MithrilEvent::SnapshotDownloadCompleted { download_id: _ } => {
75 let mut download_progress_reporter = self.download_progress_reporter.write().await;
76 if let Some(progress_reporter) = download_progress_reporter.as_ref() {
77 progress_reporter.finish("Cardano DB download completed");
78 }
79 *download_progress_reporter = None;
80 }
81 MithrilEvent::SnapshotAncillaryDownloadStarted { size, .. } => {
82 let pb = if self.output_type == ProgressOutputType::Tty {
83 ProgressBar::new(size)
84 } else {
85 ProgressBar::with_draw_target(Some(size), ProgressDrawTarget::hidden())
86 };
87 let mut ancillary_download_progress_reporter =
88 self.ancillary_download_progress_reporter.write().await;
89 *ancillary_download_progress_reporter = Some(DownloadProgressReporter::new(
90 pb,
91 DownloadProgressReporterParams {
92 label: "Snapshot ancillary".to_string(),
93 output_type: self.output_type,
94 progress_bar_kind: ProgressBarKind::Bytes,
95 include_label_in_tty: false,
96 },
97 self.logger.clone(),
98 ));
99 }
100 MithrilEvent::SnapshotAncillaryDownloadProgress {
101 downloaded_bytes, ..
102 } => {
103 let ancillary_download_progress_reporter =
104 self.ancillary_download_progress_reporter.read().await;
105 if let Some(progress_reporter) = ancillary_download_progress_reporter.as_ref() {
106 progress_reporter.report(downloaded_bytes);
107 }
108 }
109 MithrilEvent::SnapshotAncillaryDownloadCompleted { .. } => {
110 let mut ancillary_download_progress_reporter =
111 self.ancillary_download_progress_reporter.write().await;
112 if let Some(progress_reporter) = ancillary_download_progress_reporter.as_ref() {
113 progress_reporter.finish("Cardano DB ancillary download completed");
114 }
115 *ancillary_download_progress_reporter = None;
116 }
117 MithrilEvent::CardanoDatabase(cardano_database_event) => match cardano_database_event {
118 MithrilEventCardanoDatabase::Started {
119 download_id: _,
120 total_immutable_files,
121 include_ancillary,
122 } => {
123 let multi_pb = MultiDownloadProgressReporter::new(
124 total_immutable_files + if include_ancillary { 1 } else { 0 },
125 self.output_type,
126 self.logger.clone(),
127 );
128 let mut cardano_database_multi_pb =
129 self.cardano_database_multi_pb.write().await;
130 *cardano_database_multi_pb = Some(multi_pb);
131 }
132 MithrilEventCardanoDatabase::Completed { download_id: _ } => {
133 let mut cardano_database_multi_pb =
134 self.cardano_database_multi_pb.write().await;
135
136 if let Some(multi_pb) = cardano_database_multi_pb.as_ref() {
137 multi_pb.finish_all("Cardano DB download completed").await;
138 *cardano_database_multi_pb = None;
139 }
140 }
141 MithrilEventCardanoDatabase::DigestDownloadStarted { .. }
142 | MithrilEventCardanoDatabase::DigestDownloadProgress { .. }
143 | MithrilEventCardanoDatabase::DigestDownloadCompleted { .. }
144 | MithrilEventCardanoDatabase::ImmutableDownloadStarted { .. }
145 | MithrilEventCardanoDatabase::ImmutableDownloadProgress { .. } => {
146 }
148 MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
149 immutable_file_number: _,
150 download_id: _,
151 } => {
152 if let Some(cardano_database_multi_pb) =
153 self.cardano_database_multi_pb.read().await.as_ref()
154 {
155 cardano_database_multi_pb.bump_main_bar_progress();
156 }
157 }
158 MithrilEventCardanoDatabase::AncillaryDownloadStarted {
159 download_id: _,
160 size,
161 } => {
162 if let Some(cardano_database_multi_pb) =
163 self.cardano_database_multi_pb.read().await.as_ref()
164 {
165 cardano_database_multi_pb
166 .add_child_bar("Ancillary", ProgressBarKind::Bytes, size)
167 .await;
168 }
169 }
170 MithrilEventCardanoDatabase::AncillaryDownloadProgress {
171 download_id: _,
172 downloaded_bytes,
173 size: _,
174 } => {
175 if let Some(cardano_database_multi_pb) =
176 self.cardano_database_multi_pb.read().await.as_ref()
177 {
178 cardano_database_multi_pb
179 .progress_child_bar("Ancillary", downloaded_bytes)
180 .await;
181 }
182 }
183 MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id: _ } => {
184 if let Some(cardano_database_multi_pb) =
185 self.cardano_database_multi_pb.read().await.as_ref()
186 {
187 cardano_database_multi_pb
188 .finish_child_bar("Ancillary")
189 .await;
190 }
191 }
192 },
193 MithrilEvent::CertificateChainValidationStarted {
194 certificate_chain_validation_id: _,
195 } => {
196 let pb = if self.output_type == ProgressOutputType::Tty {
197 ProgressBar::new_spinner()
198 } else {
199 ProgressBar::hidden()
200 };
201 let mut certificate_validation_pb = self.certificate_validation_pb.write().await;
202 *certificate_validation_pb = Some(pb);
203 }
204 MithrilEvent::CertificateValidated {
205 certificate_chain_validation_id: _,
206 certificate_hash,
207 } => {
208 let certificate_validation_pb = self.certificate_validation_pb.read().await;
209 if let Some(progress_bar) = certificate_validation_pb.as_ref() {
210 progress_bar.set_message(format!("Certificate '{certificate_hash}' is valid"));
211 progress_bar.inc(1);
212 }
213 }
214 MithrilEvent::CertificateFetchedFromCache {
215 certificate_chain_validation_id: _,
216 certificate_hash,
217 } => {
218 let certificate_validation_pb = self.certificate_validation_pb.read().await;
219 if let Some(progress_bar) = certificate_validation_pb.as_ref() {
220 progress_bar.set_message(format!("Cached '{certificate_hash}'"));
221 progress_bar.inc(1);
222 }
223 }
224 MithrilEvent::CertificateChainValidated {
225 certificate_chain_validation_id: _,
226 } => {
227 let mut certificate_validation_pb = self.certificate_validation_pb.write().await;
228 if let Some(progress_bar) = certificate_validation_pb.as_ref() {
229 progress_bar.finish_with_message("Certificate chain validated");
230 }
231 *certificate_validation_pb = None;
232 }
233 }
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use slog::o;
240 use std::sync::Arc;
241
242 use mithril_client::feedback::FeedbackSender;
243
244 use super::*;
245
246 const DOWNLOAD_ID: &str = "id";
247
248 macro_rules! send_event {
249 (cardano_db, dl_started => $sender:expr, total:$total_immutable:expr, ancillary:$include_ancillary:expr) => {
250 $sender
251 .send_event(MithrilEvent::CardanoDatabase(
252 MithrilEventCardanoDatabase::Started {
253 download_id: DOWNLOAD_ID.to_string(),
254 total_immutable_files: $total_immutable,
255 include_ancillary: $include_ancillary,
256 },
257 ))
258 .await;
259 };
260 (cardano_db, dl_completed => $sender:expr) => {
261 $sender
262 .send_event(MithrilEvent::CardanoDatabase(
263 MithrilEventCardanoDatabase::Completed {
264 download_id: DOWNLOAD_ID.to_string(),
265 },
266 ))
267 .await;
268 };
269 (cardano_db, immutable_dl, started => $sender:expr, immutable:$immutable_file_number:expr, size:$size:expr) => {
270 $sender
271 .send_event(MithrilEvent::CardanoDatabase(
272 MithrilEventCardanoDatabase::ImmutableDownloadStarted {
273 immutable_file_number: $immutable_file_number,
274 download_id: DOWNLOAD_ID.to_string(),
275 size: $size,
276 },
277 ))
278 .await;
279 };
280 (cardano_db, immutable_dl, progress => $sender:expr, immutable:$immutable_file_number:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
281 $sender
282 .send_event(MithrilEvent::CardanoDatabase(
283 MithrilEventCardanoDatabase::ImmutableDownloadProgress {
284 immutable_file_number: $immutable_file_number,
285 download_id: DOWNLOAD_ID.to_string(),
286 downloaded_bytes: $downloaded_bytes,
287 size: $size,
288 },
289 ))
290 .await;
291 };
292 (cardano_db, immutable_dl, completed => $sender:expr, immutable:$immutable_file_number:expr) => {
293 $sender
294 .send_event(MithrilEvent::CardanoDatabase(
295 MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
296 immutable_file_number: $immutable_file_number,
297 download_id: DOWNLOAD_ID.to_string(),
298 },
299 ))
300 .await;
301 };
302 (cardano_db, ancillary_dl, started => $sender:expr, size:$size:expr) => {
303 $sender
304 .send_event(MithrilEvent::CardanoDatabase(
305 MithrilEventCardanoDatabase::AncillaryDownloadStarted {
306 download_id: DOWNLOAD_ID.to_string(),
307 size: $size,
308 },
309 ))
310 .await;
311 };
312 (cardano_db, ancillary_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
313 $sender
314 .send_event(MithrilEvent::CardanoDatabase(
315 MithrilEventCardanoDatabase::AncillaryDownloadProgress {
316 download_id: DOWNLOAD_ID.to_string(),
317 downloaded_bytes: $downloaded_bytes,
318 size: $size,
319 },
320 ))
321 .await;
322 };
323 (cardano_db, ancillary_dl, completed => $sender:expr) => {
324 $sender
325 .send_event(MithrilEvent::CardanoDatabase(
326 MithrilEventCardanoDatabase::AncillaryDownloadCompleted {
327 download_id: DOWNLOAD_ID.to_string(),
328 },
329 ))
330 .await;
331 };
332 (cardano_db, digests_dl, started => $sender:expr, size:$size:expr) => {
333 $sender
334 .send_event(MithrilEvent::CardanoDatabase(
335 MithrilEventCardanoDatabase::DigestDownloadStarted {
336 download_id: DOWNLOAD_ID.to_string(),
337 size: $size,
338 },
339 ))
340 .await;
341 };
342 (cardano_db, digests_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
343 $sender
344 .send_event(MithrilEvent::CardanoDatabase(
345 MithrilEventCardanoDatabase::DigestDownloadProgress {
346 download_id: DOWNLOAD_ID.to_string(),
347 downloaded_bytes: $downloaded_bytes,
348 size: $size,
349 },
350 ))
351 .await;
352 };
353 (cardano_db, digests_dl, completed => $sender:expr) => {
354 $sender
355 .send_event(MithrilEvent::CardanoDatabase(
356 MithrilEventCardanoDatabase::DigestDownloadCompleted {
357 download_id: DOWNLOAD_ID.to_string(),
358 },
359 ))
360 .await;
361 };
362 (cardano_db_v1, full_immutables_dl, started => $sender:expr, digest:$digest:expr, size:$size:expr) => {
363 $sender
364 .send_event(MithrilEvent::SnapshotDownloadStarted {
365 download_id: DOWNLOAD_ID.to_string(),
366 digest: $digest.into(),
367 size: $size,
368 })
369 .await;
370 };
371 (cardano_db_v1, full_immutables_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
372 $sender
373 .send_event(MithrilEvent::SnapshotDownloadProgress {
374 download_id: DOWNLOAD_ID.to_string(),
375 downloaded_bytes: $downloaded_bytes,
376 size: $size,
377 })
378 .await;
379 };
380 (cardano_db_v1, full_immutables_dl, completed => $sender:expr) => {
381 $sender
382 .send_event(MithrilEvent::SnapshotDownloadCompleted {
383 download_id: DOWNLOAD_ID.to_string(),
384 })
385 .await;
386 };
387 (cardano_db_v1, ancillary_dl, started => $sender:expr, size:$size:expr) => {
388 $sender
389 .send_event(MithrilEvent::SnapshotAncillaryDownloadStarted {
390 download_id: DOWNLOAD_ID.to_string(),
391 size: $size,
392 })
393 .await;
394 };
395 (cardano_db_v1, ancillary_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
396 $sender
397 .send_event(MithrilEvent::SnapshotAncillaryDownloadProgress {
398 download_id: DOWNLOAD_ID.to_string(),
399 downloaded_bytes: $downloaded_bytes,
400 size: $size,
401 })
402 .await;
403 };
404 (cardano_db_v1, ancillary_dl, completed => $sender:expr) => {
405 $sender
406 .send_event(MithrilEvent::SnapshotAncillaryDownloadCompleted {
407 download_id: DOWNLOAD_ID.to_string(),
408 })
409 .await;
410 };
411 }
412
413 fn build_feedback_receiver(output_type: ProgressOutputType) -> Arc<IndicatifFeedbackReceiver> {
414 Arc::new(IndicatifFeedbackReceiver::new(
415 output_type,
416 slog::Logger::root(slog::Discard, o!()),
417 ))
418 }
419
420 mod cardano_database_v1 {
421 use super::*;
422
423 #[tokio::test]
424 async fn starting_full_immutables_and_ancillary_together_spawn_two_progress_bars() {
425 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
426 let sender = FeedbackSender::new(&[receiver.clone()]);
427
428 send_event!(cardano_db_v1, full_immutables_dl, started => sender, digest:"digest", size:123);
429 send_event!(cardano_db_v1, ancillary_dl, started => sender, size:456);
430
431 assert!(receiver.download_progress_reporter.read().await.is_some());
432 assert!(receiver
433 .ancillary_download_progress_reporter
434 .read()
435 .await
436 .is_some());
437 }
438
439 #[tokio::test]
440 async fn start_and_progress_ancillary_download_with_a_size_of_zero_should_not_crash() {
441 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
442 let sender = FeedbackSender::new(&[receiver.clone()]);
443
444 send_event!(cardano_db_v1, ancillary_dl, started => sender, size:0);
445 send_event!(cardano_db_v1, ancillary_dl, progress => sender, bytes:124, size:0);
446
447 assert!(receiver
448 .ancillary_download_progress_reporter
449 .read()
450 .await
451 .is_some());
452 }
453
454 #[tokio::test]
455 async fn start_then_complete_should_remove_immutables_progress_bar() {
456 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
457 let sender = FeedbackSender::new(&[receiver.clone()]);
458
459 send_event!(cardano_db_v1, full_immutables_dl, started => sender, digest:"digest", size:123);
460 send_event!(cardano_db_v1, full_immutables_dl, completed => sender);
461
462 assert!(receiver.cardano_database_multi_pb.read().await.is_none());
463 }
464
465 #[tokio::test]
466 async fn start_then_complete_should_remove_ancillary_progress_bar() {
467 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
468 let sender = FeedbackSender::new(&[receiver.clone()]);
469
470 send_event!(cardano_db_v1, ancillary_dl, started => sender, size:456);
471 send_event!(cardano_db_v1, ancillary_dl, completed => sender);
472
473 assert!(receiver
474 .ancillary_download_progress_reporter
475 .read()
476 .await
477 .is_none());
478 }
479 }
480
481 mod cardano_database_v2 {
482 use super::*;
483
484 #[tokio::test]
485 async fn starting_should_add_multi_progress_bar() {
486 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
487 let sender = FeedbackSender::new(&[receiver.clone()]);
488
489 send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
490
491 assert!(receiver.cardano_database_multi_pb.read().await.is_some());
492 }
493
494 #[tokio::test]
495 async fn start_then_complete_should_remove_multi_progress_bar() {
496 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
497 let sender = FeedbackSender::new(&[receiver.clone()]);
498
499 send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
500 send_event!(cardano_db, dl_completed => sender);
501
502 assert!(receiver.cardano_database_multi_pb.read().await.is_none());
503 }
504
505 #[tokio::test]
506 async fn start_including_ancillary_add_one_to_total_downloads() {
507 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
508 let sender = FeedbackSender::new(&[receiver.clone()]);
509
510 send_event!(cardano_db, dl_started => sender, total:99, ancillary:true);
511
512 assert_eq!(
513 receiver
514 .cardano_database_multi_pb
515 .read()
516 .await
517 .as_ref()
518 .map(|pb| pb.total_downloads()),
519 Some(100)
520 );
521 }
522
523 #[tokio::test]
524 async fn starting_twice_should_supersede_first_multi_progress_bar() {
525 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
526 let sender = FeedbackSender::new(&[receiver.clone()]);
527
528 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
529 send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
530
531 assert_eq!(
532 receiver
533 .cardano_database_multi_pb
534 .read()
535 .await
536 .as_ref()
537 .map(|pb| pb.total_downloads()),
538 Some(99)
539 );
540 }
541
542 #[tokio::test]
543 async fn complete_without_start_should_not_panic() {
544 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
545 let sender = FeedbackSender::new(&[receiver.clone()]);
546
547 send_event!(cardano_db, dl_completed => sender);
548 }
549
550 #[tokio::test]
551 async fn starting_immutable_downloads_should_not_add_progress_bars() {
552 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
553 let sender = FeedbackSender::new(&[receiver.clone()]);
554
555 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
556
557 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
558 let multi_pb = multi_pb_option.as_ref().unwrap();
559
560 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
561 send_event!(cardano_db, immutable_dl, started => sender, immutable:1, size:123);
562 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
563 }
564
565 #[tokio::test]
566 async fn completed_immutable_downloads_bump_progress() {
567 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
568 let sender = FeedbackSender::new(&[receiver.clone()]);
569
570 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
571
572 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
573 let multi_pb = multi_pb_option.as_ref().unwrap();
574
575 assert_eq!(multi_pb.position(), 0);
576 send_event!(cardano_db, immutable_dl, completed => sender, immutable:24);
577 assert_eq!(multi_pb.position(), 1);
578 }
579
580 #[tokio::test]
581 async fn starting_digests_downloads_should_not_add_progress_bars() {
582 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
583 let sender = FeedbackSender::new(&[receiver.clone()]);
584
585 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
586
587 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
588 let multi_pb = multi_pb_option.as_ref().unwrap();
589
590 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
591 send_event!(cardano_db, digests_dl, started => sender, size:789);
592 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
593 }
594
595 #[tokio::test]
596 async fn starting_ancillary_downloads_should_add_a_progress_bar() {
597 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
598 let sender = FeedbackSender::new(&[receiver.clone()]);
599
600 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
601
602 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
603 let multi_pb = multi_pb_option.as_ref().unwrap();
604
605 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
606 send_event!(cardano_db, ancillary_dl, started => sender, size:456);
607 assert_eq!(multi_pb.number_of_active_downloads().await, 1);
608 }
609 }
610}