1use async_trait::async_trait;
55use mithril_common::entities::ImmutableFileNumber;
56use serde::Serialize;
57use slog::{Logger, info};
58use std::sync::{Arc, RwLock};
59use strum::Display;
60use uuid::Uuid;
61
62#[derive(Debug, Clone, Eq, PartialEq, Display, Serialize)]
64#[strum(serialize_all = "PascalCase")]
65#[serde(untagged)]
66pub enum MithrilEventCardanoDatabase {
67 Started {
69 download_id: String,
71 total_immutable_files: u64,
73 include_ancillary: bool,
75 },
76 Completed {
78 download_id: String,
80 },
81 ImmutableDownloadStarted {
83 immutable_file_number: ImmutableFileNumber,
85 download_id: String,
87 size: u64,
89 },
90 ImmutableDownloadProgress {
92 immutable_file_number: ImmutableFileNumber,
94 download_id: String,
96 downloaded_bytes: u64,
98 size: u64,
100 },
101 ImmutableDownloadCompleted {
103 immutable_file_number: ImmutableFileNumber,
105 download_id: String,
107 },
108 AncillaryDownloadStarted {
110 download_id: String,
112 size: u64,
114 },
115 AncillaryDownloadProgress {
117 download_id: String,
119 downloaded_bytes: u64,
121 size: u64,
123 },
124 AncillaryDownloadCompleted {
126 download_id: String,
128 },
129 DigestDownloadStarted {
131 download_id: String,
133 size: u64,
135 },
136 DigestDownloadProgress {
138 download_id: String,
140 downloaded_bytes: u64,
142 size: u64,
144 },
145 DigestDownloadCompleted {
147 download_id: String,
149 },
150}
151
152#[derive(Debug, Clone, Eq, PartialEq, Display, Serialize)]
154#[strum(serialize_all = "PascalCase")]
155#[serde(untagged)]
156pub enum MithrilEvent {
157 SnapshotDownloadStarted {
159 digest: String,
161 download_id: String,
163 size: u64,
165 },
166 SnapshotDownloadProgress {
168 download_id: String,
170 downloaded_bytes: u64,
172 size: u64,
174 },
175 SnapshotDownloadCompleted {
177 download_id: String,
179 },
180 SnapshotAncillaryDownloadStarted {
182 download_id: String,
184 size: u64,
186 },
187 SnapshotAncillaryDownloadProgress {
189 download_id: String,
191 downloaded_bytes: u64,
193 size: u64,
195 },
196 SnapshotAncillaryDownloadCompleted {
198 download_id: String,
200 },
201
202 CardanoDatabase(MithrilEventCardanoDatabase),
204
205 CertificateChainValidationStarted {
207 certificate_chain_validation_id: String,
209 },
210 CertificateValidated {
212 certificate_chain_validation_id: String,
214 certificate_hash: String,
216 },
217 CertificateFetchedFromCache {
219 certificate_chain_validation_id: String,
221 certificate_hash: String,
223 },
224 CertificateChainValidated {
226 certificate_chain_validation_id: String,
228 },
229}
230
231impl MithrilEvent {
232 pub fn new_snapshot_download_id() -> String {
234 Uuid::new_v4().to_string()
235 }
236
237 pub fn new_cardano_database_download_id() -> String {
239 Uuid::new_v4().to_string()
240 }
241
242 pub fn new_certificate_chain_validation_id() -> String {
244 Uuid::new_v4().to_string()
245 }
246
247 #[cfg(test)]
248 pub(crate) fn event_id(&self) -> &str {
249 match self {
250 MithrilEvent::SnapshotDownloadStarted { download_id, .. } => download_id,
251 MithrilEvent::SnapshotDownloadProgress { download_id, .. } => download_id,
252 MithrilEvent::SnapshotDownloadCompleted { download_id } => download_id,
253 MithrilEvent::SnapshotAncillaryDownloadStarted { download_id, .. } => download_id,
254 MithrilEvent::SnapshotAncillaryDownloadProgress { download_id, .. } => download_id,
255 MithrilEvent::SnapshotAncillaryDownloadCompleted { download_id } => download_id,
256 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Started {
257 download_id,
258 ..
259 }) => download_id,
260 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Completed {
261 download_id,
262 ..
263 }) => download_id,
264 MithrilEvent::CardanoDatabase(
265 MithrilEventCardanoDatabase::ImmutableDownloadStarted { download_id, .. },
266 ) => download_id,
267 MithrilEvent::CardanoDatabase(
268 MithrilEventCardanoDatabase::ImmutableDownloadProgress { download_id, .. },
269 ) => download_id,
270 MithrilEvent::CardanoDatabase(
271 MithrilEventCardanoDatabase::ImmutableDownloadCompleted { download_id, .. },
272 ) => download_id,
273 MithrilEvent::CardanoDatabase(
274 MithrilEventCardanoDatabase::AncillaryDownloadStarted { download_id, .. },
275 ) => download_id,
276 MithrilEvent::CardanoDatabase(
277 MithrilEventCardanoDatabase::AncillaryDownloadProgress { download_id, .. },
278 ) => download_id,
279 MithrilEvent::CardanoDatabase(
280 MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id, .. },
281 ) => download_id,
282 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::DigestDownloadStarted {
283 download_id,
284 ..
285 }) => download_id,
286 MithrilEvent::CardanoDatabase(
287 MithrilEventCardanoDatabase::DigestDownloadProgress { download_id, .. },
288 ) => download_id,
289 MithrilEvent::CardanoDatabase(
290 MithrilEventCardanoDatabase::DigestDownloadCompleted { download_id, .. },
291 ) => download_id,
292 MithrilEvent::CertificateChainValidationStarted {
293 certificate_chain_validation_id,
294 } => certificate_chain_validation_id,
295 MithrilEvent::CertificateValidated {
296 certificate_chain_validation_id,
297 ..
298 } => certificate_chain_validation_id,
299 MithrilEvent::CertificateFetchedFromCache {
300 certificate_chain_validation_id,
301 ..
302 } => certificate_chain_validation_id,
303 MithrilEvent::CertificateChainValidated {
304 certificate_chain_validation_id,
305 } => certificate_chain_validation_id,
306 }
307 }
308}
309
310#[derive(Clone)]
314pub struct FeedbackSender {
315 receivers: Vec<Arc<dyn FeedbackReceiver>>,
316}
317
318impl FeedbackSender {
319 pub fn new(receivers: &[Arc<dyn FeedbackReceiver>]) -> FeedbackSender {
321 Self {
322 receivers: receivers.to_vec(),
323 }
324 }
325
326 pub async fn send_event(&self, event: MithrilEvent) {
328 for receiver in &self.receivers {
329 receiver.handle_event(event.clone()).await;
330 }
331 }
332}
333
334#[cfg_attr(target_family = "wasm", async_trait(?Send))]
336#[cfg_attr(not(target_family = "wasm"), async_trait)]
337pub trait FeedbackReceiver: Sync + Send {
338 async fn handle_event(&self, event: MithrilEvent);
340}
341
342pub struct SlogFeedbackReceiver {
344 logger: Logger,
345}
346
347impl SlogFeedbackReceiver {
348 pub fn new(logger: Logger) -> SlogFeedbackReceiver {
350 Self { logger }
351 }
352}
353
354#[cfg_attr(target_family = "wasm", async_trait(?Send))]
355#[cfg_attr(not(target_family = "wasm"), async_trait)]
356impl FeedbackReceiver for SlogFeedbackReceiver {
357 async fn handle_event(&self, event: MithrilEvent) {
358 match event {
359 MithrilEvent::SnapshotDownloadStarted {
360 digest,
361 download_id,
362 size,
363 } => {
364 info!(
365 self.logger, "Snapshot download started";
366 "size" => size, "digest" => digest, "download_id" => download_id,
367 );
368 }
369 MithrilEvent::SnapshotDownloadProgress {
370 download_id,
371 downloaded_bytes,
372 size,
373 } => {
374 info!(
375 self.logger, "Snapshot download in progress ...";
376 "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
377 );
378 }
379 MithrilEvent::SnapshotDownloadCompleted { download_id } => {
380 info!(self.logger, "Snapshot download completed"; "download_id" => download_id);
381 }
382 MithrilEvent::SnapshotAncillaryDownloadStarted { download_id, size } => {
383 info!(
384 self.logger, "Snapshot ancillary download started";
385 "size" => size, "download_id" => download_id,
386 );
387 }
388 MithrilEvent::SnapshotAncillaryDownloadProgress {
389 download_id,
390 downloaded_bytes,
391 size,
392 } => {
393 info!(
394 self.logger, "Snapshot ancillary download in progress ...";
395 "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
396 );
397 }
398 MithrilEvent::SnapshotAncillaryDownloadCompleted { download_id } => {
399 info!(self.logger, "Snapshot ancillary download completed"; "download_id" => download_id);
400 }
401 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Started {
402 download_id,
403 total_immutable_files,
404 include_ancillary,
405 }) => {
406 info!(
407 self.logger, "Cardano database download started"; "download_id" => download_id, "total_immutable_files" => total_immutable_files, "include_ancillary" => include_ancillary,
408 );
409 }
410 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::Completed {
411 download_id,
412 }) => {
413 info!(
414 self.logger, "Cardano database download completed"; "download_id" => download_id,
415 );
416 }
417 MithrilEvent::CardanoDatabase(
418 MithrilEventCardanoDatabase::ImmutableDownloadStarted {
419 immutable_file_number,
420 download_id,
421 size,
422 },
423 ) => {
424 info!(
425 self.logger, "Immutable download started";
426 "immutable_file_number" => immutable_file_number, "download_id" => download_id, "size" => size
427 );
428 }
429 MithrilEvent::CardanoDatabase(
430 MithrilEventCardanoDatabase::ImmutableDownloadProgress {
431 immutable_file_number,
432 download_id,
433 downloaded_bytes,
434 size,
435 },
436 ) => {
437 info!(
438 self.logger, "Immutable download in progress ...";
439 "immutable_file_number" => immutable_file_number, "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
440 );
441 }
442 MithrilEvent::CardanoDatabase(
443 MithrilEventCardanoDatabase::ImmutableDownloadCompleted {
444 immutable_file_number,
445 download_id,
446 },
447 ) => {
448 info!(self.logger, "Immutable download completed"; "immutable_file_number" => immutable_file_number, "download_id" => download_id);
449 }
450 MithrilEvent::CardanoDatabase(
451 MithrilEventCardanoDatabase::AncillaryDownloadStarted { download_id, size },
452 ) => {
453 info!(
454 self.logger, "Ancillary download started";
455 "download_id" => download_id,
456 "size" => size,
457 );
458 }
459 MithrilEvent::CardanoDatabase(
460 MithrilEventCardanoDatabase::AncillaryDownloadProgress {
461 download_id,
462 downloaded_bytes,
463 size,
464 },
465 ) => {
466 info!(
467 self.logger, "Ancillary download in progress ...";
468 "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
469 );
470 }
471 MithrilEvent::CardanoDatabase(
472 MithrilEventCardanoDatabase::AncillaryDownloadCompleted { download_id },
473 ) => {
474 info!(self.logger, "Ancillary download completed"; "download_id" => download_id);
475 }
476 MithrilEvent::CardanoDatabase(MithrilEventCardanoDatabase::DigestDownloadStarted {
477 download_id,
478 size,
479 }) => {
480 info!(
481 self.logger, "Digest download started";
482 "download_id" => download_id,
483 "size" => size,
484 );
485 }
486 MithrilEvent::CardanoDatabase(
487 MithrilEventCardanoDatabase::DigestDownloadProgress {
488 download_id,
489 downloaded_bytes,
490 size,
491 },
492 ) => {
493 info!(
494 self.logger, "Digest download in progress ...";
495 "downloaded_bytes" => downloaded_bytes, "size" => size, "download_id" => download_id,
496 );
497 }
498 MithrilEvent::CardanoDatabase(
499 MithrilEventCardanoDatabase::DigestDownloadCompleted { download_id },
500 ) => {
501 info!(self.logger, "Digest download completed"; "download_id" => download_id);
502 }
503 MithrilEvent::CertificateChainValidationStarted {
504 certificate_chain_validation_id,
505 } => {
506 info!(
507 self.logger, "Certificate chain validation started";
508 "certificate_chain_validation_id" => certificate_chain_validation_id,
509 );
510 }
511 MithrilEvent::CertificateValidated {
512 certificate_hash,
513 certificate_chain_validation_id,
514 } => {
515 info!(
516 self.logger, "Certificate validated";
517 "certificate_hash" => certificate_hash,
518 "certificate_chain_validation_id" => certificate_chain_validation_id,
519 );
520 }
521 MithrilEvent::CertificateFetchedFromCache {
522 certificate_hash,
523 certificate_chain_validation_id,
524 } => {
525 info!(
526 self.logger, "Cached";
527 "certificate_hash" => certificate_hash,
528 "certificate_chain_validation_id" => certificate_chain_validation_id,
529 );
530 }
531 MithrilEvent::CertificateChainValidated {
532 certificate_chain_validation_id,
533 } => {
534 info!(
535 self.logger, "Certificate chain validated";
536 "certificate_chain_validation_id" => certificate_chain_validation_id,
537 );
538 }
539 };
540 }
541}
542
543pub struct StackFeedbackReceiver {
547 stacked_events: RwLock<Vec<MithrilEvent>>,
548}
549
550impl StackFeedbackReceiver {
551 pub fn new() -> StackFeedbackReceiver {
553 Self {
554 stacked_events: RwLock::new(vec![]),
555 }
556 }
557
558 pub fn stacked_events(&self) -> Vec<MithrilEvent> {
562 let events = self.stacked_events.read().unwrap();
563 events.clone()
564 }
565}
566
567impl Default for StackFeedbackReceiver {
568 fn default() -> Self {
569 Self::new()
570 }
571}
572
573#[cfg_attr(target_family = "wasm", async_trait(?Send))]
574#[cfg_attr(not(target_family = "wasm"), async_trait)]
575impl FeedbackReceiver for StackFeedbackReceiver {
576 async fn handle_event(&self, event: MithrilEvent) {
577 let mut events = self.stacked_events.write().unwrap();
578 events.push(event);
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use crate::feedback::MithrilEvent::{SnapshotDownloadCompleted, SnapshotDownloadStarted};
586 use crate::test_utils::TestLogger;
587 use std::time::Duration;
588 use tokio::task::JoinSet;
589
590 #[tokio::test]
591 async fn send_event_same_thread() {
592 let receiver = Arc::new(StackFeedbackReceiver::new());
593 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
594 let sender = FeedbackSender::new(&[receiver_clone]);
595
596 sender
597 .send_event(SnapshotDownloadStarted {
598 digest: "digest".to_string(),
599 download_id: "download_id".to_string(),
600 size: 10,
601 })
602 .await;
603 sender
604 .send_event(SnapshotDownloadCompleted {
605 download_id: "download_id".to_string(),
606 })
607 .await;
608
609 assert_eq!(
610 receiver.stacked_events(),
611 vec![
612 SnapshotDownloadStarted {
613 digest: "digest".to_string(),
614 download_id: "download_id".to_string(),
615 size: 10
616 },
617 SnapshotDownloadCompleted {
618 download_id: "download_id".to_string()
619 }
620 ]
621 );
622 }
623
624 #[tokio::test]
625 async fn send_event_multiple_thread() {
626 let receiver = Arc::new(StackFeedbackReceiver::new());
627 let sender = FeedbackSender::new(&[
628 receiver.clone(),
629 Arc::new(SlogFeedbackReceiver::new(TestLogger::stdout())),
630 ]);
631 let sender2 = sender.clone();
632 let mut join_set = JoinSet::new();
633
634 join_set.spawn(async move {
635 sender
637 .send_event(SnapshotDownloadStarted {
638 digest: "digest1".to_string(),
639 download_id: "download1".to_string(),
640 size: 1,
641 })
642 .await;
643 tokio::time::sleep(Duration::from_millis(2)).await;
644 sender
646 .send_event(SnapshotDownloadCompleted {
647 download_id: "download3".to_string(),
648 })
649 .await;
650 sender
651 .send_event(SnapshotDownloadStarted {
652 digest: "digest2".to_string(),
653 download_id: "download2".to_string(),
654 size: 2,
655 })
656 .await;
657 });
658
659 join_set.spawn(async move {
660 sender2
662 .send_event(SnapshotDownloadCompleted {
663 download_id: "download1".to_string(),
664 })
665 .await;
666 sender2
667 .send_event(SnapshotDownloadStarted {
668 digest: "digest3".to_string(),
669 download_id: "download3".to_string(),
670 size: 3,
671 })
672 .await;
673 tokio::time::sleep(Duration::from_millis(5)).await;
674 sender2
676 .send_event(SnapshotDownloadCompleted {
677 download_id: "download2".to_string(),
678 })
679 .await;
680 });
681
682 while let Some(res) = join_set.join_next().await {
683 res.unwrap();
684 }
685
686 assert_eq!(
687 receiver.stacked_events(),
688 vec![
689 SnapshotDownloadStarted {
690 digest: "digest1".to_string(),
691 download_id: "download1".to_string(),
692 size: 1
693 },
694 SnapshotDownloadCompleted {
695 download_id: "download1".to_string()
696 },
697 SnapshotDownloadStarted {
698 digest: "digest3".to_string(),
699 download_id: "download3".to_string(),
700 size: 3
701 },
702 SnapshotDownloadCompleted {
703 download_id: "download3".to_string()
704 },
705 SnapshotDownloadStarted {
706 digest: "digest2".to_string(),
707 download_id: "download2".to_string(),
708 size: 2
709 },
710 SnapshotDownloadCompleted {
711 download_id: "download2".to_string()
712 },
713 ]
714 );
715 }
716
717 #[tokio::test]
718 async fn send_event_in_one_thread_and_receive_in_another_thread() {
719 let receiver = Arc::new(StackFeedbackReceiver::new());
720 let receiver_clone = receiver.clone() as Arc<dyn FeedbackReceiver>;
721 let receiver2 = receiver.clone();
722 let sender = FeedbackSender::new(&[receiver_clone]);
723 let mut join_set = JoinSet::new();
724
725 join_set.spawn(async move {
726 sender
728 .send_event(SnapshotDownloadStarted {
729 digest: "digest1".to_string(),
730 download_id: "download1".to_string(),
731 size: 1,
732 })
733 .await;
734 tokio::time::sleep(Duration::from_millis(10)).await;
735
736 sender
738 .send_event(SnapshotDownloadCompleted {
739 download_id: "download1".to_string(),
740 })
741 .await;
742 sender
743 .send_event(SnapshotDownloadStarted {
744 digest: "digest2".to_string(),
745 download_id: "download2".to_string(),
746 size: 2,
747 })
748 .await;
749 tokio::time::sleep(Duration::from_millis(10)).await;
750
751 sender
753 .send_event(SnapshotDownloadCompleted {
754 download_id: "download2".to_string(),
755 })
756 .await;
757 sender
758 .send_event(SnapshotDownloadStarted {
759 digest: "digest3".to_string(),
760 download_id: "download3".to_string(),
761 size: 3,
762 })
763 .await;
764 tokio::time::sleep(Duration::from_millis(10)).await;
765
766 sender
768 .send_event(SnapshotDownloadCompleted {
769 download_id: "download3".to_string(),
770 })
771 .await;
772 });
773
774 join_set.spawn(async move {
775 tokio::time::sleep(Duration::from_millis(3)).await;
777 assert_eq!(
778 receiver2.stacked_events(),
779 vec![SnapshotDownloadStarted {
780 digest: "digest1".to_string(),
781 download_id: "download1".to_string(),
782 size: 1
783 },]
784 );
785
786 tokio::time::sleep(Duration::from_millis(10)).await;
788 assert_eq!(
789 receiver2.stacked_events(),
790 vec![
791 SnapshotDownloadStarted {
792 digest: "digest1".to_string(),
793 download_id: "download1".to_string(),
794 size: 1
795 },
796 SnapshotDownloadCompleted {
797 download_id: "download1".to_string()
798 },
799 SnapshotDownloadStarted {
800 digest: "digest2".to_string(),
801 download_id: "download2".to_string(),
802 size: 2
803 },
804 ]
805 );
806
807 tokio::time::sleep(Duration::from_millis(10)).await;
809 assert_eq!(
810 receiver2.stacked_events(),
811 vec![
812 SnapshotDownloadStarted {
813 digest: "digest1".to_string(),
814 download_id: "download1".to_string(),
815 size: 1
816 },
817 SnapshotDownloadCompleted {
818 download_id: "download1".to_string()
819 },
820 SnapshotDownloadStarted {
821 digest: "digest2".to_string(),
822 download_id: "download2".to_string(),
823 size: 2
824 },
825 SnapshotDownloadCompleted {
826 download_id: "download2".to_string()
827 },
828 SnapshotDownloadStarted {
829 digest: "digest3".to_string(),
830 download_id: "download3".to_string(),
831 size: 3
832 },
833 ]
834 );
835 });
836
837 while let Some(res) = join_set.join_next().await {
838 res.unwrap();
839 }
840
841 assert_eq!(
842 receiver.stacked_events(),
843 vec![
844 SnapshotDownloadStarted {
845 digest: "digest1".to_string(),
846 download_id: "download1".to_string(),
847 size: 1
848 },
849 SnapshotDownloadCompleted {
850 download_id: "download1".to_string()
851 },
852 SnapshotDownloadStarted {
853 digest: "digest2".to_string(),
854 download_id: "download2".to_string(),
855 size: 2
856 },
857 SnapshotDownloadCompleted {
858 download_id: "download2".to_string()
859 },
860 SnapshotDownloadStarted {
861 digest: "digest3".to_string(),
862 download_id: "download3".to_string(),
863 size: 3
864 },
865 SnapshotDownloadCompleted {
866 download_id: "download3".to_string()
867 },
868 ]
869 );
870 }
871}