1use async_trait::async_trait;
2use indicatif::{ProgressBar, ProgressDrawTarget};
3use slog::Logger;
4use tokio::sync::RwLock;
5
6use super::{
7 DownloadProgressReporter, DownloadProgressReporterParams, MultiDownloadProgressReporter,
8 ProgressBarKind, ProgressOutputType,
9};
10
11use mithril_client::feedback::{FeedbackReceiver, MithrilEvent, MithrilEventCardanoDatabase};
12
13pub struct IndicatifFeedbackReceiver {
16 download_progress_reporter: RwLock<Option<DownloadProgressReporter>>,
17 ancillary_download_progress_reporter: RwLock<Option<DownloadProgressReporter>>,
18 certificate_validation_pb: RwLock<Option<ProgressBar>>,
19 cardano_database_multi_pb: RwLock<Option<MultiDownloadProgressReporter>>,
20 output_type: ProgressOutputType,
21 logger: Logger,
22}
23
24impl IndicatifFeedbackReceiver {
25 pub fn new(output_type: ProgressOutputType, logger: Logger) -> Self {
27 Self {
28 download_progress_reporter: RwLock::new(None),
29 ancillary_download_progress_reporter: RwLock::new(None),
30 certificate_validation_pb: RwLock::new(None),
31 cardano_database_multi_pb: RwLock::new(None),
32 output_type,
33 logger,
34 }
35 }
36}
37
38#[async_trait]
39impl FeedbackReceiver for IndicatifFeedbackReceiver {
40 async fn handle_event(&self, event: MithrilEvent) {
41 match event {
42 MithrilEvent::SnapshotDownloadStarted {
43 digest: _,
44 download_id: _,
45 size,
46 } => {
47 let pb = if self.output_type == ProgressOutputType::Tty {
48 ProgressBar::new(size)
49 } else {
50 ProgressBar::with_draw_target(Some(size), ProgressDrawTarget::hidden())
51 };
52 let mut download_progress_reporter = self.download_progress_reporter.write().await;
53 *download_progress_reporter = Some(DownloadProgressReporter::new(
54 pb,
55 DownloadProgressReporterParams {
56 label: "Snapshot".to_string(),
57 output_type: self.output_type,
58 progress_bar_kind: ProgressBarKind::Bytes,
59 include_label_in_tty: false,
60 },
61 self.logger.clone(),
62 ));
63 }
64 MithrilEvent::SnapshotDownloadProgress {
65 download_id: _,
66 downloaded_bytes,
67 size: _,
68 } => {
69 let download_progress_reporter = self.download_progress_reporter.read().await;
70 if let Some(progress_reporter) = download_progress_reporter.as_ref() {
71 progress_reporter.report(downloaded_bytes);
72 }
73 }
74 MithrilEvent::SnapshotDownloadCompleted { download_id: _ } => {
75 let mut download_progress_reporter = self.download_progress_reporter.write().await;
76 if let Some(progress_reporter) = download_progress_reporter.as_ref() {
77 progress_reporter.finish("Cardano DB download completed");
78 }
79 *download_progress_reporter = None;
80 }
81 MithrilEvent::SnapshotAncillaryDownloadStarted { size, .. } => {
82 let pb = if self.output_type == ProgressOutputType::Tty {
83 ProgressBar::new(size)
84 } else {
85 ProgressBar::with_draw_target(Some(size), ProgressDrawTarget::hidden())
86 };
87 let mut ancillary_download_progress_reporter =
88 self.ancillary_download_progress_reporter.write().await;
89 *ancillary_download_progress_reporter = Some(DownloadProgressReporter::new(
90 pb,
91 DownloadProgressReporterParams {
92 label: "Snapshot ancillary".to_string(),
93 output_type: self.output_type,
94 progress_bar_kind: ProgressBarKind::Bytes,
95 include_label_in_tty: false,
96 },
97 self.logger.clone(),
98 ));
99 }
100 MithrilEvent::SnapshotAncillaryDownloadProgress {
101 downloaded_bytes, ..
102 } => {
103 let ancillary_download_progress_reporter =
104 self.ancillary_download_progress_reporter.read().await;
105 if let Some(progress_reporter) = ancillary_download_progress_reporter.as_ref() {
106 progress_reporter.report(downloaded_bytes);
107 }
108 }
109 MithrilEvent::SnapshotAncillaryDownloadCompleted { .. } => {
110 let mut ancillary_download_progress_reporter =
111 self.ancillary_download_progress_reporter.write().await;
112 if let Some(progress_reporter) = ancillary_download_progress_reporter.as_ref() {
113 progress_reporter.finish("Cardano DB ancillary download completed");
114 }
115 *ancillary_download_progress_reporter = None;
116 }
117 MithrilEvent::CardanoDatabase(cardano_database_event) => match cardano_database_event {
118 MithrilEventCardanoDatabase::Started {
119 download_id: _,
120 total_immutable_files,
121 include_ancillary,
122 } => {
123 let multi_pb = MultiDownloadProgressReporter::new(
124 total_immutable_files + if include_ancillary { 1 } else { 0 },
125 self.output_type,
126 self.logger.clone(),
127 );
128 let mut cardano_database_multi_pb =
129 self.cardano_database_multi_pb.write().await;
130 *cardano_database_multi_pb = Some(multi_pb);
131 }
132 MithrilEventCardanoDatabase::Completed { download_id: _ } => {
133 let mut cardano_database_multi_pb =
134 self.cardano_database_multi_pb.write().await;
135
136 if let Some(multi_pb) = cardano_database_multi_pb.as_ref() {
137 multi_pb.finish_all("Cardano DB download completed").await;
138 *cardano_database_multi_pb = None;
139 }
140 }
141 MithrilEventCardanoDatabase::DigestDownloadStarted { .. }
142 | MithrilEventCardanoDatabase::DigestDownloadProgress { .. }
143 | MithrilEventCardanoDatabase::DigestDownloadCompleted { .. }
144 | MithrilEventCardanoDatabase::ImmutableDownloadStarted { .. }
145 | MithrilEventCardanoDatabase::ImmutableDownloadProgress { .. } => {
146 }
148 MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
149 immutable_file_number: _,
150 download_id: _,
151 } => {
152 if let Some(cardano_database_multi_pb) =
153 self.cardano_database_multi_pb.read().await.as_ref()
154 {
155 cardano_database_multi_pb.bump_main_bar_progress();
156 }
157 }
158 MithrilEventCardanoDatabase::AncillaryDownloadStarted {
159 download_id: _,
160 size,
161 } => {
162 if let Some(cardano_database_multi_pb) =
163 self.cardano_database_multi_pb.read().await.as_ref()
164 {
165 cardano_database_multi_pb
166 .add_child_bar("Ancillary", ProgressBarKind::Bytes, size)
167 .await;
168 }
169 }
170 MithrilEventCardanoDatabase::AncillaryDownloadProgress {
171 download_id: _,
172 downloaded_bytes,
173 size: _,
174 } => {
175 if let Some(cardano_database_multi_pb) =
176 self.cardano_database_multi_pb.read().await.as_ref()
177 {
178 cardano_database_multi_pb
179 .progress_child_bar("Ancillary", downloaded_bytes)
180 .await;
181 }
182 }
183 MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id: _ } => {
184 if let Some(cardano_database_multi_pb) =
185 self.cardano_database_multi_pb.read().await.as_ref()
186 {
187 cardano_database_multi_pb.finish_child_bar("Ancillary").await;
188 }
189 }
190 },
191 MithrilEvent::CertificateChainValidationStarted {
192 certificate_chain_validation_id: _,
193 } => {
194 let pb = if self.output_type == ProgressOutputType::Tty {
195 ProgressBar::new_spinner()
196 } else {
197 ProgressBar::hidden()
198 };
199 let mut certificate_validation_pb = self.certificate_validation_pb.write().await;
200 *certificate_validation_pb = Some(pb);
201 }
202 MithrilEvent::CertificateValidated {
203 certificate_chain_validation_id: _,
204 certificate_hash,
205 } => {
206 let certificate_validation_pb = self.certificate_validation_pb.read().await;
207 if let Some(progress_bar) = certificate_validation_pb.as_ref() {
208 progress_bar.set_message(format!("Certificate '{certificate_hash}' is valid"));
209 progress_bar.inc(1);
210 }
211 }
212 MithrilEvent::CertificateFetchedFromCache {
213 certificate_chain_validation_id: _,
214 certificate_hash,
215 } => {
216 let certificate_validation_pb = self.certificate_validation_pb.read().await;
217 if let Some(progress_bar) = certificate_validation_pb.as_ref() {
218 progress_bar.set_message(format!("Cached '{certificate_hash}'"));
219 progress_bar.inc(1);
220 }
221 }
222 MithrilEvent::CertificateChainValidated {
223 certificate_chain_validation_id: _,
224 } => {
225 let mut certificate_validation_pb = self.certificate_validation_pb.write().await;
226 if let Some(progress_bar) = certificate_validation_pb.as_ref() {
227 progress_bar.finish_with_message("Certificate chain validated");
228 }
229 *certificate_validation_pb = None;
230 }
231 }
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use slog::o;
238 use std::sync::Arc;
239
240 use mithril_client::feedback::FeedbackSender;
241
242 use super::*;
243
244 const DOWNLOAD_ID: &str = "id";
245
246 macro_rules! send_event {
247 (cardano_db, dl_started => $sender:expr, total:$total_immutable:expr, ancillary:$include_ancillary:expr) => {
248 $sender
249 .send_event(MithrilEvent::CardanoDatabase(
250 MithrilEventCardanoDatabase::Started {
251 download_id: DOWNLOAD_ID.to_string(),
252 total_immutable_files: $total_immutable,
253 include_ancillary: $include_ancillary,
254 },
255 ))
256 .await;
257 };
258 (cardano_db, dl_completed => $sender:expr) => {
259 $sender
260 .send_event(MithrilEvent::CardanoDatabase(
261 MithrilEventCardanoDatabase::Completed {
262 download_id: DOWNLOAD_ID.to_string(),
263 },
264 ))
265 .await;
266 };
267 (cardano_db, immutable_dl, started => $sender:expr, immutable:$immutable_file_number:expr, size:$size:expr) => {
268 $sender
269 .send_event(MithrilEvent::CardanoDatabase(
270 MithrilEventCardanoDatabase::ImmutableDownloadStarted {
271 immutable_file_number: $immutable_file_number,
272 download_id: DOWNLOAD_ID.to_string(),
273 size: $size,
274 },
275 ))
276 .await;
277 };
278 (cardano_db, immutable_dl, progress => $sender:expr, immutable:$immutable_file_number:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
279 $sender
280 .send_event(MithrilEvent::CardanoDatabase(
281 MithrilEventCardanoDatabase::ImmutableDownloadProgress {
282 immutable_file_number: $immutable_file_number,
283 download_id: DOWNLOAD_ID.to_string(),
284 downloaded_bytes: $downloaded_bytes,
285 size: $size,
286 },
287 ))
288 .await;
289 };
290 (cardano_db, immutable_dl, completed => $sender:expr, immutable:$immutable_file_number:expr) => {
291 $sender
292 .send_event(MithrilEvent::CardanoDatabase(
293 MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
294 immutable_file_number: $immutable_file_number,
295 download_id: DOWNLOAD_ID.to_string(),
296 },
297 ))
298 .await;
299 };
300 (cardano_db, ancillary_dl, started => $sender:expr, size:$size:expr) => {
301 $sender
302 .send_event(MithrilEvent::CardanoDatabase(
303 MithrilEventCardanoDatabase::AncillaryDownloadStarted {
304 download_id: DOWNLOAD_ID.to_string(),
305 size: $size,
306 },
307 ))
308 .await;
309 };
310 (cardano_db, ancillary_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
311 $sender
312 .send_event(MithrilEvent::CardanoDatabase(
313 MithrilEventCardanoDatabase::AncillaryDownloadProgress {
314 download_id: DOWNLOAD_ID.to_string(),
315 downloaded_bytes: $downloaded_bytes,
316 size: $size,
317 },
318 ))
319 .await;
320 };
321 (cardano_db, ancillary_dl, completed => $sender:expr) => {
322 $sender
323 .send_event(MithrilEvent::CardanoDatabase(
324 MithrilEventCardanoDatabase::AncillaryDownloadCompleted {
325 download_id: DOWNLOAD_ID.to_string(),
326 },
327 ))
328 .await;
329 };
330 (cardano_db, digests_dl, started => $sender:expr, size:$size:expr) => {
331 $sender
332 .send_event(MithrilEvent::CardanoDatabase(
333 MithrilEventCardanoDatabase::DigestDownloadStarted {
334 download_id: DOWNLOAD_ID.to_string(),
335 size: $size,
336 },
337 ))
338 .await;
339 };
340 (cardano_db, digests_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
341 $sender
342 .send_event(MithrilEvent::CardanoDatabase(
343 MithrilEventCardanoDatabase::DigestDownloadProgress {
344 download_id: DOWNLOAD_ID.to_string(),
345 downloaded_bytes: $downloaded_bytes,
346 size: $size,
347 },
348 ))
349 .await;
350 };
351 (cardano_db, digests_dl, completed => $sender:expr) => {
352 $sender
353 .send_event(MithrilEvent::CardanoDatabase(
354 MithrilEventCardanoDatabase::DigestDownloadCompleted {
355 download_id: DOWNLOAD_ID.to_string(),
356 },
357 ))
358 .await;
359 };
360 (cardano_db_v1, full_immutables_dl, started => $sender:expr, digest:$digest:expr, size:$size:expr) => {
361 $sender
362 .send_event(MithrilEvent::SnapshotDownloadStarted {
363 download_id: DOWNLOAD_ID.to_string(),
364 digest: $digest.into(),
365 size: $size,
366 })
367 .await;
368 };
369 (cardano_db_v1, full_immutables_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
370 $sender
371 .send_event(MithrilEvent::SnapshotDownloadProgress {
372 download_id: DOWNLOAD_ID.to_string(),
373 downloaded_bytes: $downloaded_bytes,
374 size: $size,
375 })
376 .await;
377 };
378 (cardano_db_v1, full_immutables_dl, completed => $sender:expr) => {
379 $sender
380 .send_event(MithrilEvent::SnapshotDownloadCompleted {
381 download_id: DOWNLOAD_ID.to_string(),
382 })
383 .await;
384 };
385 (cardano_db_v1, ancillary_dl, started => $sender:expr, size:$size:expr) => {
386 $sender
387 .send_event(MithrilEvent::SnapshotAncillaryDownloadStarted {
388 download_id: DOWNLOAD_ID.to_string(),
389 size: $size,
390 })
391 .await;
392 };
393 (cardano_db_v1, ancillary_dl, progress => $sender:expr, bytes:$downloaded_bytes:expr, size:$size:expr) => {
394 $sender
395 .send_event(MithrilEvent::SnapshotAncillaryDownloadProgress {
396 download_id: DOWNLOAD_ID.to_string(),
397 downloaded_bytes: $downloaded_bytes,
398 size: $size,
399 })
400 .await;
401 };
402 (cardano_db_v1, ancillary_dl, completed => $sender:expr) => {
403 $sender
404 .send_event(MithrilEvent::SnapshotAncillaryDownloadCompleted {
405 download_id: DOWNLOAD_ID.to_string(),
406 })
407 .await;
408 };
409 }
410
411 fn build_feedback_receiver(output_type: ProgressOutputType) -> Arc<IndicatifFeedbackReceiver> {
412 Arc::new(IndicatifFeedbackReceiver::new(
413 output_type,
414 slog::Logger::root(slog::Discard, o!()),
415 ))
416 }
417
418 mod cardano_database_v1 {
419 use super::*;
420
421 #[tokio::test]
422 async fn starting_full_immutables_and_ancillary_together_spawn_two_progress_bars() {
423 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
424 let sender = FeedbackSender::new(&[receiver.clone()]);
425
426 send_event!(cardano_db_v1, full_immutables_dl, started => sender, digest:"digest", size:123);
427 send_event!(cardano_db_v1, ancillary_dl, started => sender, size:456);
428
429 assert!(receiver.download_progress_reporter.read().await.is_some());
430 assert!(receiver.ancillary_download_progress_reporter.read().await.is_some());
431 }
432
433 #[tokio::test]
434 async fn start_and_progress_ancillary_download_with_a_size_of_zero_should_not_crash() {
435 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
436 let sender = FeedbackSender::new(&[receiver.clone()]);
437
438 send_event!(cardano_db_v1, ancillary_dl, started => sender, size:0);
439 send_event!(cardano_db_v1, ancillary_dl, progress => sender, bytes:124, size:0);
440
441 assert!(receiver.ancillary_download_progress_reporter.read().await.is_some());
442 }
443
444 #[tokio::test]
445 async fn start_then_complete_should_remove_immutables_progress_bar() {
446 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
447 let sender = FeedbackSender::new(&[receiver.clone()]);
448
449 send_event!(cardano_db_v1, full_immutables_dl, started => sender, digest:"digest", size:123);
450 send_event!(cardano_db_v1, full_immutables_dl, completed => sender);
451
452 assert!(receiver.cardano_database_multi_pb.read().await.is_none());
453 }
454
455 #[tokio::test]
456 async fn start_then_complete_should_remove_ancillary_progress_bar() {
457 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
458 let sender = FeedbackSender::new(&[receiver.clone()]);
459
460 send_event!(cardano_db_v1, ancillary_dl, started => sender, size:456);
461 send_event!(cardano_db_v1, ancillary_dl, completed => sender);
462
463 assert!(receiver.ancillary_download_progress_reporter.read().await.is_none());
464 }
465 }
466
467 mod cardano_database_v2 {
468 use super::*;
469
470 #[tokio::test]
471 async fn starting_should_add_multi_progress_bar() {
472 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
473 let sender = FeedbackSender::new(&[receiver.clone()]);
474
475 send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
476
477 assert!(receiver.cardano_database_multi_pb.read().await.is_some());
478 }
479
480 #[tokio::test]
481 async fn start_then_complete_should_remove_multi_progress_bar() {
482 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
483 let sender = FeedbackSender::new(&[receiver.clone()]);
484
485 send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
486 send_event!(cardano_db, dl_completed => sender);
487
488 assert!(receiver.cardano_database_multi_pb.read().await.is_none());
489 }
490
491 #[tokio::test]
492 async fn start_including_ancillary_add_one_to_total_downloads() {
493 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
494 let sender = FeedbackSender::new(&[receiver.clone()]);
495
496 send_event!(cardano_db, dl_started => sender, total:99, ancillary:true);
497
498 assert_eq!(
499 receiver
500 .cardano_database_multi_pb
501 .read()
502 .await
503 .as_ref()
504 .map(|pb| pb.total_downloads()),
505 Some(100)
506 );
507 }
508
509 #[tokio::test]
510 async fn starting_twice_should_supersede_first_multi_progress_bar() {
511 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
512 let sender = FeedbackSender::new(&[receiver.clone()]);
513
514 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
515 send_event!(cardano_db, dl_started => sender, total:99, ancillary:false);
516
517 assert_eq!(
518 receiver
519 .cardano_database_multi_pb
520 .read()
521 .await
522 .as_ref()
523 .map(|pb| pb.total_downloads()),
524 Some(99)
525 );
526 }
527
528 #[tokio::test]
529 async fn complete_without_start_should_not_panic() {
530 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
531 let sender = FeedbackSender::new(&[receiver.clone()]);
532
533 send_event!(cardano_db, dl_completed => sender);
534 }
535
536 #[tokio::test]
537 async fn starting_immutable_downloads_should_not_add_progress_bars() {
538 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
539 let sender = FeedbackSender::new(&[receiver.clone()]);
540
541 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
542
543 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
544 let multi_pb = multi_pb_option.as_ref().unwrap();
545
546 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
547 send_event!(cardano_db, immutable_dl, started => sender, immutable:1, size:123);
548 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
549 }
550
551 #[tokio::test]
552 async fn completed_immutable_downloads_bump_progress() {
553 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
554 let sender = FeedbackSender::new(&[receiver.clone()]);
555
556 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
557
558 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
559 let multi_pb = multi_pb_option.as_ref().unwrap();
560
561 assert_eq!(multi_pb.position(), 0);
562 send_event!(cardano_db, immutable_dl, completed => sender, immutable:24);
563 assert_eq!(multi_pb.position(), 1);
564 }
565
566 #[tokio::test]
567 async fn starting_digests_downloads_should_not_add_progress_bars() {
568 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
569 let sender = FeedbackSender::new(&[receiver.clone()]);
570
571 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
572
573 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
574 let multi_pb = multi_pb_option.as_ref().unwrap();
575
576 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
577 send_event!(cardano_db, digests_dl, started => sender, size:789);
578 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
579 }
580
581 #[tokio::test]
582 async fn starting_ancillary_downloads_should_add_a_progress_bar() {
583 let receiver = build_feedback_receiver(ProgressOutputType::Hidden);
584 let sender = FeedbackSender::new(&[receiver.clone()]);
585
586 send_event!(cardano_db, dl_started => sender, total:50, ancillary:false);
587
588 let multi_pb_option = receiver.cardano_database_multi_pb.read().await;
589 let multi_pb = multi_pb_option.as_ref().unwrap();
590
591 assert_eq!(multi_pb.number_of_active_downloads().await, 0);
592 send_event!(cardano_db, ancillary_dl, started => sender, size:456);
593 assert_eq!(multi_pb.number_of_active_downloads().await, 1);
594 }
595 }
596}