1use async_trait::async_trait;
55use mithril_common::entities::ImmutableFileNumber;
56use serde::Serialize;
57use slog::{info, Logger};
58use std::sync::{Arc, RwLock};
59use strum::Display;
60use uuid::Uuid;
61
62#[derive(Debug, Clone, Eq, PartialEq, Display, Serialize)]
64#[strum(serialize_all = "PascalCase")]
65#[serde(untagged)]
66pub enum MithrilEventCardanoDatabase {
67 Started {
69 download_id: String,
71 total_immutable_files: u64,
73 include_ancillary: bool,
75 },
76 Completed {
78 download_id: String,
80 },
81 ImmutableDownloadStarted {
83 immutable_file_number: ImmutableFileNumber,
85 download_id: String,
87 size: u64,
89 },
90 ImmutableDownloadProgress {
92 immutable_file_number: ImmutableFileNumber,
94 download_id: String,
96 downloaded_bytes: u64,
98 size: u64,
100 },
101 ImmutableDownloadCompleted {
103 immutable_file_number: ImmutableFileNumber,
105 download_id: String,
107 },
108 AncillaryDownloadStarted {
110 download_id: String,
112 size: u64,
114 },
115 AncillaryDownloadProgress {
117 download_id: String,
119 downloaded_bytes: u64,
121 size: u64,
123 },
124 AncillaryDownloadCompleted {
126 download_id: String,
128 },
129 DigestDownloadStarted {
131 download_id: String,
133 size: u64,
135 },
136 DigestDownloadProgress {
138 download_id: String,
140 downloaded_bytes: u64,
142 size: u64,
144 },
145 DigestDownloadCompleted {
147 download_id: String,
149 },
150}
151
152#[derive(Debug, Clone, Eq, PartialEq, Display, Serialize)]
154#[strum(serialize_all = "PascalCase")]
155#[serde(untagged)]
156pub enum MithrilEvent {
157 SnapshotDownloadStarted {
159 digest: String,
161 download_id: String,
163 size: u64,
165 },
166 SnapshotDownloadProgress {
168 download_id: String,
170 downloaded_bytes: u64,
172 size: u64,
174 },
175 SnapshotDownloadCompleted {
177 download_id: String,
179 },
180 SnapshotAncillaryDownloadStarted {
182 download_id: String,
184 size: u64,
186 },
187 SnapshotAncillaryDownloadProgress {
189 download_id: String,
191 downloaded_bytes: u64,
193 size: u64,
195 },
196 SnapshotAncillaryDownloadCompleted {
198 download_id: String,
200 },
201
202 CardanoDatabase(MithrilEventCardanoDatabase),
204
205 CertificateChainValidationStarted {
207 certificate_chain_validation_id: String,
209 },
210 CertificateValidated {
212 certificate_chain_validation_id: String,
214 certificate_hash: String,
216 },
217 CertificateFetchedFromCache {
219 certificate_chain_validation_id: String,
221 certificate_hash: String,
223 },
224 CertificateChainValidated {
226 certificate_chain_validation_id: String,
228 },
229}
230
231impl MithrilEvent {
232 pub fn new_snapshot_download_id() -> String {
234 Uuid::new_v4().to_string()
235 }
236
237 pub fn new_cardano_database_download_id() -> String {
239 Uuid::new_v4().to_string()
240 }
241
242 pub fn new_certificate_chain_validation_id() -> String {
244 Uuid::new_v4().to_string()
245 }
246
247 #[cfg(test)]
248 pub(crate) fn event_id(&self) -> &str {
249 match self {
250 MithrilEvent::SnapshotDownloadStarted { download_id, .. } => download_id,
251 MithrilEvent::SnapshotDownloadProgress { download_id, .. } => download_id,
252 MithrilEvent::SnapshotDownloadCompleted { download_id } => download_id,
253 MithrilEvent::SnapshotAncillaryDownloadStarted { download_id, .. } => download_id,
254 MithrilEvent::SnapshotAncillaryDownloadProgress { download_id, .. } => download_id,
255 MithrilEvent::SnapshotAncillaryDownloadCompleted { download_id } => download_id,
256 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Started {
257 download_id,
258 ..
259 }) => download_id,
260 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Completed {
261 download_id,
262 ..
263 }) => download_id,
264 MithrilEvent::CardanoDatabase(
265 MithrilEventCardanoDatabase::ImmutableDownloadStarted { download_id, .. },
266 ) => download_id,
267 MithrilEvent::CardanoDatabase(
268 MithrilEventCardanoDatabase::ImmutableDownloadProgress { download_id, .. },
269 ) => download_id,
270 MithrilEvent::CardanoDatabase(
271 MithrilEventCardanoDatabase::ImmutableDownloadCompleted { download_id, .. },
272 ) => download_id,
273 MithrilEvent::CardanoDatabase(
274 MithrilEventCardanoDatabase::AncillaryDownloadStarted { download_id, .. },
275 ) => download_id,
276 MithrilEvent::CardanoDatabase(
277 MithrilEventCardanoDatabase::AncillaryDownloadProgress { download_id, .. },
278 ) => download_id,
279 MithrilEvent::CardanoDatabase(
280 MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id, .. },
281 ) => download_id,
282 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::DigestDownloadStarted {
283 download_id,
284 ..
285 }) => download_id,
286 MithrilEvent::CardanoDatabase(
287 MithrilEventCardanoDatabase::DigestDownloadProgress { download_id, .. },
288 ) => download_id,
289 MithrilEvent::CardanoDatabase(
290 MithrilEventCardanoDatabase::DigestDownloadCompleted { download_id, .. },
291 ) => download_id,
292 MithrilEvent::CertificateChainValidationStarted {
293 certificate_chain_validation_id,
294 } => certificate_chain_validation_id,
295 MithrilEvent::CertificateValidated {
296 certificate_chain_validation_id,
297 ..
298 } => certificate_chain_validation_id,
299 MithrilEvent::CertificateFetchedFromCache {
300 certificate_chain_validation_id,
301 ..
302 } => certificate_chain_validation_id,
303 MithrilEvent::CertificateChainValidated {
304 certificate_chain_validation_id,
305 } => certificate_chain_validation_id,
306 }
307 }
308}
309
310#[derive(Clone)]
314pub struct FeedbackSender {
315 receivers: Vec<Arc<dyn FeedbackReceiver>>,
316}
317
318impl FeedbackSender {
319 pub fn new(receivers: &[Arc<dyn FeedbackReceiver>]) -> FeedbackSender {
321 Self {
322 receivers: receivers.to_vec(),
323 }
324 }
325
326 pub async fn send_event(&self, event: MithrilEvent) {
328 for receiver in &self.receivers {
329 receiver.handle_event(event.clone()).await;
330 }
331 }
332}
333
334#[cfg_attr(target_family = "wasm", async_trait(?Send))]
336#[cfg_attr(not(target_family = "wasm"), async_trait)]
337pub trait FeedbackReceiver: Sync + Send {
338 async fn handle_event(&self, event: MithrilEvent);
340}
341
342pub struct SlogFeedbackReceiver {
344 logger: Logger,
345}
346
347impl SlogFeedbackReceiver {
348 pub fn new(logger: Logger) -> SlogFeedbackReceiver {
350 Self { logger }
351 }
352}
353
354#[cfg_attr(target_family = "wasm", async_trait(?Send))]
355#[cfg_attr(not(target_family = "wasm"), async_trait)]
356impl FeedbackReceiver for SlogFeedbackReceiver {
357 async fn handle_event(&self, event: MithrilEvent) {
358 match event {
359 MithrilEvent::SnapshotDownloadStarted {
360 digest,
361 download_id,
362 size,
363 } => {
364 info!(
365 self.logger, "Snapshot download started";
366 "size" => size, "digest" => digest, "download_id" => download_id,
367 );
368 }
369 MithrilEvent::SnapshotDownloadProgress {
370 download_id,
371 downloaded_bytes,
372 size,
373 } => {
374 info!(
375 self.logger, "Snapshot download in progress ...";
376 "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
377 );
378 }
379 MithrilEvent::SnapshotDownloadCompleted { download_id } => {
380 info!(self.logger, "Snapshot download completed"; "download_id" => download_id);
381 }
382 MithrilEvent::SnapshotAncillaryDownloadStarted { download_id, size } => {
383 info!(
384 self.logger, "Snapshot ancillary download started";
385 "size" => size, "download_id" => download_id,
386 );
387 }
388 MithrilEvent::SnapshotAncillaryDownloadProgress {
389 download_id,
390 downloaded_bytes,
391 size,
392 } => {
393 info!(
394 self.logger, "Snapshot ancillary download in progress ...";
395 "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
396 );
397 }
398 MithrilEvent::SnapshotAncillaryDownloadCompleted { download_id } => {
399 info!(self.logger, "Snapshot ancillary download completed"; "download_id" => download_id);
400 }
401 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Started {
402 download_id,
403 total_immutable_files,
404 include_ancillary,
405 }) => {
406 info!(
407 self.logger, "Cardano database download started"; "download_id" => download_id, "total_immutable_files" => total_immutable_files, "include_ancillary" => include_ancillary,
408 );
409 }
410 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Completed {
411 download_id,
412 }) => {
413 info!(
414 self.logger, "Cardano database download completed"; "download_id" => download_id,
415 );
416 }
417 MithrilEvent::CardanoDatabase(
418 MithrilEventCardanoDatabase::ImmutableDownloadStarted {
419 immutable_file_number,
420 download_id,
421 size,
422 },
423 ) => {
424 info!(
425 self.logger, "Immutable download started";
426 "immutable_file_number" => immutable_file_number, "download_id" => download_id, "size" => size
427 );
428 }
429 MithrilEvent::CardanoDatabase(
430 MithrilEventCardanoDatabase::ImmutableDownloadProgress {
431 immutable_file_number,
432 download_id,
433 downloaded_bytes,
434 size,
435 },
436 ) => {
437 info!(
438 self.logger, "Immutable download in progress ...";
439 "immutable_file_number" => immutable_file_number, "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
440 );
441 }
442 MithrilEvent::CardanoDatabase(
443 MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
444 immutable_file_number,
445 download_id,
446 },
447 ) => {
448 info!(self.logger, "Immutable download completed"; "immutable_file_number" => immutable_file_number, "download_id" => download_id);
449 }
450 MithrilEvent::CardanoDatabase(
451 MithrilEventCardanoDatabase::AncillaryDownloadStarted { download_id, size },
452 ) => {
453 info!(
454 self.logger, "Ancillary download started";
455 "download_id" => download_id,
456 "size" => size,
457 );
458 }
459 MithrilEvent::CardanoDatabase(
460 MithrilEventCardanoDatabase::AncillaryDownloadProgress {
461 download_id,
462 downloaded_bytes,
463 size,
464 },
465 ) => {
466 info!(
467 self.logger, "Ancillary download in progress ...";
468 "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
469 );
470 }
471 MithrilEvent::CardanoDatabase(
472 MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id },
473 ) => {
474 info!(self.logger, "Ancillary download completed"; "download_id" => download_id);
475 }
476 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::DigestDownloadStarted {
477 download_id,
478 size,
479 }) => {
480 info!(
481 self.logger, "Digest download started";
482 "download_id" => download_id,
483 "size" => size,
484 );
485 }
486 MithrilEvent::CardanoDatabase(
487 MithrilEventCardanoDatabase::DigestDownloadProgress {
488 download_id,
489 downloaded_bytes,
490 size,
491 },
492 ) => {
493 info!(
494 self.logger, "Digest download in progress ...";
495 "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
496 );
497 }
498 MithrilEvent::CardanoDatabase(
499 MithrilEventCardanoDatabase::DigestDownloadCompleted { download_id },
500 ) => {
501 info!(self.logger, "Digest download completed"; "download_id" => download_id);
502 }
503 MithrilEvent::CertificateChainValidationStarted {
504 certificate_chain_validation_id,
505 } => {
506 info!(
507 self.logger, "Certificate chain validation started";
508 "certificate_chain_validation_id" => certificate_chain_validation_id,
509 );
510 }
511 MithrilEvent::CertificateValidated {
512 certificate_hash,
513 certificate_chain_validation_id,
514 } => {
515 info!(
516 self.logger, "Certificate validated";
517 "certificate_hash" => certificate_hash,
518 "certificate_chain_validation_id" => certificate_chain_validation_id,
519 );
520 }
521 MithrilEvent::CertificateFetchedFromCache {
522 certificate_hash,
523 certificate_chain_validation_id,
524 } => {
525 info!(
526 self.logger, "Cached";
527 "certificate_hash" => certificate_hash,
528 "certificate_chain_validation_id" => certificate_chain_validation_id,
529 );
530 }
531 MithrilEvent::CertificateChainValidated {
532 certificate_chain_validation_id,
533 } => {
534 info!(
535 self.logger, "Certificate chain validated";
536 "certificate_chain_validation_id" => certificate_chain_validation_id,
537 );
538 }
539 };
540 }
541}
542
543pub struct StackFeedbackReceiver {
547 stacked_events: RwLock<Vec<MithrilEvent>>,
548}
549
550impl StackFeedbackReceiver {
551 pub fn new() -> StackFeedbackReceiver {
553 Self {
554 stacked_events: RwLock::new(vec![]),
555 }
556 }
557
558 pub fn stacked_events(&self) -> Vec<MithrilEvent> {
562 let events = self.stacked_events.read().unwrap();
563 events.clone()
564 }
565}
566
567impl Default for StackFeedbackReceiver {
568 fn default() -> Self {
569 Self::new()
570 }
571}
572
573#[cfg_attr(target_family = "wasm", async_trait(?Send))]
574#[cfg_attr(not(target_family = "wasm"), async_trait)]
575impl FeedbackReceiver for StackFeedbackReceiver {
576 async fn handle_event(&self, event: MithrilEvent) {
577 let mut events = self.stacked_events.write().unwrap();
578 events.push(event);
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use crate::feedback::MithrilEvent::{SnapshotDownloadCompleted, SnapshotDownloadStarted};
586 use crate::test_utils::TestLogger;
587 use std::time::Duration;
588 use tokio::task::JoinSet;
589
590 #[tokio::test]
591 async fn send_event_same_thread() {
592 let receiver = Arc::new(StackFeedbackReceiver::new());
593 let sender = FeedbackSender::new(&[receiver.clone()]);
594
595 sender
596 .send_event(SnapshotDownloadStarted {
597 digest: "digest".to_string(),
598 download_id: "download_id".to_string(),
599 size: 10,
600 })
601 .await;
602 sender
603 .send_event(SnapshotDownloadCompleted {
604 download_id: "download_id".to_string(),
605 })
606 .await;
607
608 assert_eq!(
609 receiver.stacked_events(),
610 vec![
611 SnapshotDownloadStarted {
612 digest: "digest".to_string(),
613 download_id: "download_id".to_string(),
614 size: 10
615 },
616 SnapshotDownloadCompleted {
617 download_id: "download_id".to_string()
618 }
619 ]
620 );
621 }
622
623 #[tokio::test]
624 async fn send_event_multiple_thread() {
625 let receiver = Arc::new(StackFeedbackReceiver::new());
626 let sender = FeedbackSender::new(&[
627 receiver.clone(),
628 Arc::new(SlogFeedbackReceiver::new(TestLogger::stdout())),
629 ]);
630 let sender2 = sender.clone();
631 let mut join_set = JoinSet::new();
632
633 join_set.spawn(async move {
634 sender
636 .send_event(SnapshotDownloadStarted {
637 digest: "digest1".to_string(),
638 download_id: "download1".to_string(),
639 size: 1,
640 })
641 .await;
642 tokio::time::sleep(Duration::from_millis(2)).await;
643 sender
645 .send_event(SnapshotDownloadCompleted {
646 download_id: "download3".to_string(),
647 })
648 .await;
649 sender
650 .send_event(SnapshotDownloadStarted {
651 digest: "digest2".to_string(),
652 download_id: "download2".to_string(),
653 size: 2,
654 })
655 .await;
656 });
657
658 join_set.spawn(async move {
659 sender2
661 .send_event(SnapshotDownloadCompleted {
662 download_id: "download1".to_string(),
663 })
664 .await;
665 sender2
666 .send_event(SnapshotDownloadStarted {
667 digest: "digest3".to_string(),
668 download_id: "download3".to_string(),
669 size: 3,
670 })
671 .await;
672 tokio::time::sleep(Duration::from_millis(5)).await;
673 sender2
675 .send_event(SnapshotDownloadCompleted {
676 download_id: "download2".to_string(),
677 })
678 .await;
679 });
680
681 while let Some(res) = join_set.join_next().await {
682 res.unwrap();
683 }
684
685 assert_eq!(
686 receiver.stacked_events(),
687 vec![
688 SnapshotDownloadStarted {
689 digest: "digest1".to_string(),
690 download_id: "download1".to_string(),
691 size: 1
692 },
693 SnapshotDownloadCompleted {
694 download_id: "download1".to_string()
695 },
696 SnapshotDownloadStarted {
697 digest: "digest3".to_string(),
698 download_id: "download3".to_string(),
699 size: 3
700 },
701 SnapshotDownloadCompleted {
702 download_id: "download3".to_string()
703 },
704 SnapshotDownloadStarted {
705 digest: "digest2".to_string(),
706 download_id: "download2".to_string(),
707 size: 2
708 },
709 SnapshotDownloadCompleted {
710 download_id: "download2".to_string()
711 },
712 ]
713 );
714 }
715
716 #[tokio::test]
717 async fn send_event_in_one_thread_and_receive_in_another_thread() {
718 let receiver = Arc::new(StackFeedbackReceiver::new());
719 let receiver2 = receiver.clone();
720 let sender = FeedbackSender::new(&[receiver.clone()]);
721 let mut join_set = JoinSet::new();
722
723 join_set.spawn(async move {
724 sender
726 .send_event(SnapshotDownloadStarted {
727 digest: "digest1".to_string(),
728 download_id: "download1".to_string(),
729 size: 1,
730 })
731 .await;
732 tokio::time::sleep(Duration::from_millis(10)).await;
733
734 sender
736 .send_event(SnapshotDownloadCompleted {
737 download_id: "download1".to_string(),
738 })
739 .await;
740 sender
741 .send_event(SnapshotDownloadStarted {
742 digest: "digest2".to_string(),
743 download_id: "download2".to_string(),
744 size: 2,
745 })
746 .await;
747 tokio::time::sleep(Duration::from_millis(10)).await;
748
749 sender
751 .send_event(SnapshotDownloadCompleted {
752 download_id: "download2".to_string(),
753 })
754 .await;
755 sender
756 .send_event(SnapshotDownloadStarted {
757 digest: "digest3".to_string(),
758 download_id: "download3".to_string(),
759 size: 3,
760 })
761 .await;
762 tokio::time::sleep(Duration::from_millis(10)).await;
763
764 sender
766 .send_event(SnapshotDownloadCompleted {
767 download_id: "download3".to_string(),
768 })
769 .await;
770 });
771
772 join_set.spawn(async move {
773 tokio::time::sleep(Duration::from_millis(3)).await;
775 assert_eq!(
776 receiver2.stacked_events(),
777 vec![SnapshotDownloadStarted {
778 digest: "digest1".to_string(),
779 download_id: "download1".to_string(),
780 size: 1
781 },]
782 );
783
784 tokio::time::sleep(Duration::from_millis(10)).await;
786 assert_eq!(
787 receiver2.stacked_events(),
788 vec![
789 SnapshotDownloadStarted {
790 digest: "digest1".to_string(),
791 download_id: "download1".to_string(),
792 size: 1
793 },
794 SnapshotDownloadCompleted {
795 download_id: "download1".to_string()
796 },
797 SnapshotDownloadStarted {
798 digest: "digest2".to_string(),
799 download_id: "download2".to_string(),
800 size: 2
801 },
802 ]
803 );
804
805 tokio::time::sleep(Duration::from_millis(10)).await;
807 assert_eq!(
808 receiver2.stacked_events(),
809 vec![
810 SnapshotDownloadStarted {
811 digest: "digest1".to_string(),
812 download_id: "download1".to_string(),
813 size: 1
814 },
815 SnapshotDownloadCompleted {
816 download_id: "download1".to_string()
817 },
818 SnapshotDownloadStarted {
819 digest: "digest2".to_string(),
820 download_id: "download2".to_string(),
821 size: 2
822 },
823 SnapshotDownloadCompleted {
824 download_id: "download2".to_string()
825 },
826 SnapshotDownloadStarted {
827 digest: "digest3".to_string(),
828 download_id: "download3".to_string(),
829 size: 3
830 },
831 ]
832 );
833 });
834
835 while let Some(res) = join_set.join_next().await {
836 res.unwrap();
837 }
838
839 assert_eq!(
840 receiver.stacked_events(),
841 vec![
842 SnapshotDownloadStarted {
843 digest: "digest1".to_string(),
844 download_id: "download1".to_string(),
845 size: 1
846 },
847 SnapshotDownloadCompleted {
848 download_id: "download1".to_string()
849 },
850 SnapshotDownloadStarted {
851 digest: "digest2".to_string(),
852 download_id: "download2".to_string(),
853 size: 2
854 },
855 SnapshotDownloadCompleted {
856 download_id: "download2".to_string()
857 },
858 SnapshotDownloadStarted {
859 digest: "digest3".to_string(),
860 download_id: "download3".to_string(),
861 size: 3
862 },
863 SnapshotDownloadCompleted {
864 download_id: "download3".to_string()
865 },
866 ]
867 );
868 }
869}