mithril_aggregator/services/
message.rs

1//! This service is responsible for providing HTTP server with messages as fast as possible.
2
3use std::collections::BTreeSet;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7
8use mithril_common::{
9    StdResult,
10    entities::{Epoch, SignedEntityTypeDiscriminants},
11    messages::{
12        CardanoDatabaseDigestListItemMessage, CardanoDatabaseDigestListMessage,
13        CardanoDatabaseSnapshotListMessage, CardanoDatabaseSnapshotMessage,
14        CardanoStakeDistributionListMessage, CardanoStakeDistributionMessage,
15        CardanoTransactionSnapshotListMessage, CardanoTransactionSnapshotMessage,
16        CertificateListMessage, CertificateMessage, EpochSettingsMessage,
17        MithrilStakeDistributionListMessage, MithrilStakeDistributionMessage, SignerMessagePart,
18        SnapshotListMessage, SnapshotMessage,
19    },
20};
21
22use crate::{
23    ImmutableFileDigestMapper,
24    database::repository::{CertificateRepository, SignedEntityStorer},
25    dependency_injection::EpochServiceWrapper,
26};
27
28/// HTTP Message service trait.
29#[cfg_attr(test, mockall::automock)]
30#[async_trait]
31pub trait MessageService: Sync + Send {
32    /// Return the epoch settings message if it exists.
33    async fn get_epoch_settings_message(
34        &self,
35        allowed_discriminants: BTreeSet<SignedEntityTypeDiscriminants>,
36    ) -> StdResult<EpochSettingsMessage>;
37
38    /// Return the message representation of a certificate if it exists.
39    async fn get_certificate_message(
40        &self,
41        certificate_hash: &str,
42    ) -> StdResult<Option<CertificateMessage>>;
43
44    /// Return the message representation of the latest genesis certificate.
45    async fn get_latest_genesis_certificate_message(&self)
46    -> StdResult<Option<CertificateMessage>>;
47
48    /// Return the message representation of the last N certificates.
49    async fn get_certificate_list_message(&self, limit: usize)
50    -> StdResult<CertificateListMessage>;
51
52    /// Return the information regarding the given snapshot.
53    async fn get_snapshot_message(
54        &self,
55        signed_entity_id: &str,
56    ) -> StdResult<Option<SnapshotMessage>>;
57
58    /// Return the list of the last signed snapshots. The limit of the list is
59    /// passed as argument.
60    async fn get_snapshot_list_message(&self, limit: usize) -> StdResult<SnapshotListMessage>;
61
62    /// Return the information regarding the Cardano database for the given identifier.
63    async fn get_cardano_database_message(
64        &self,
65        signed_entity_id: &str,
66    ) -> StdResult<Option<CardanoDatabaseSnapshotMessage>>;
67
68    /// Return the list of the last Cardano database message.
69    async fn get_cardano_database_list_message(
70        &self,
71        limit: usize,
72    ) -> StdResult<CardanoDatabaseSnapshotListMessage>;
73
74    /// Return the list of the last Cardano database message.
75    async fn get_cardano_database_list_message_by_epoch(
76        &self,
77        limit: usize,
78        epoch: Epoch,
79    ) -> StdResult<CardanoDatabaseSnapshotListMessage>;
80
81    /// Return the list of the Cardano database immutable file names and their digests.
82    async fn get_cardano_database_digest_list_message(
83        &self,
84    ) -> StdResult<CardanoDatabaseDigestListMessage>;
85
86    /// Return the information regarding the Mithril stake distribution for the given identifier.
87    async fn get_mithril_stake_distribution_message(
88        &self,
89        signed_entity_id: &str,
90    ) -> StdResult<Option<MithrilStakeDistributionMessage>>;
91
92    /// Return the list of the last Mithril stake distributions message.
93    async fn get_mithril_stake_distribution_list_message(
94        &self,
95        limit: usize,
96    ) -> StdResult<MithrilStakeDistributionListMessage>;
97
98    /// Return the information regarding the Cardano transactions set for the given identifier.
99    async fn get_cardano_transaction_message(
100        &self,
101        signed_entity_id: &str,
102    ) -> StdResult<Option<CardanoTransactionSnapshotMessage>>;
103
104    /// Return the list of the last Cardano transactions set message.
105    async fn get_cardano_transaction_list_message(
106        &self,
107        limit: usize,
108    ) -> StdResult<CardanoTransactionSnapshotListMessage>;
109
110    /// Return the information regarding the Cardano stake distribution for the given identifier.
111    async fn get_cardano_stake_distribution_message(
112        &self,
113        signed_entity_id: &str,
114    ) -> StdResult<Option<CardanoStakeDistributionMessage>>;
115
116    /// Return the information regarding the Cardano stake distribution for the given epoch.
117    async fn get_cardano_stake_distribution_message_by_epoch(
118        &self,
119        epoch: Epoch,
120    ) -> StdResult<Option<CardanoStakeDistributionMessage>>;
121
122    /// Return the list of the last Cardano stake distributions message.
123    async fn get_cardano_stake_distribution_list_message(
124        &self,
125        limit: usize,
126    ) -> StdResult<CardanoStakeDistributionListMessage>;
127}
128
129/// Implementation of the [MessageService]
130pub struct MithrilMessageService {
131    certificate_repository: Arc<CertificateRepository>,
132    signed_entity_storer: Arc<dyn SignedEntityStorer>,
133    immutable_file_digest_mapper: Arc<dyn ImmutableFileDigestMapper>,
134    epoch_service: EpochServiceWrapper,
135}
136
137impl MithrilMessageService {
138    /// Constructor
139    pub fn new(
140        certificate_repository: Arc<CertificateRepository>,
141        signed_entity_storer: Arc<dyn SignedEntityStorer>,
142        immutable_file_digest_mapper: Arc<dyn ImmutableFileDigestMapper>,
143        epoch_service: EpochServiceWrapper,
144    ) -> Self {
145        Self {
146            certificate_repository,
147            signed_entity_storer,
148            immutable_file_digest_mapper,
149            epoch_service,
150        }
151    }
152}
153
154#[async_trait]
155impl MessageService for MithrilMessageService {
156    async fn get_epoch_settings_message(
157        &self,
158        allowed_discriminants: BTreeSet<SignedEntityTypeDiscriminants>,
159    ) -> StdResult<EpochSettingsMessage> {
160        let epoch_service = self.epoch_service.read().await;
161
162        let epoch = epoch_service.epoch_of_current_data()?;
163        let signer_registration_protocol_parameters =
164            epoch_service.signer_registration_protocol_parameters()?.clone();
165        let current_signers = epoch_service.current_signers()?;
166        let next_signers = epoch_service.next_signers()?;
167
168        let cardano_transactions_discriminant =
169            allowed_discriminants.get(&SignedEntityTypeDiscriminants::CardanoTransactions);
170
171        let cardano_transactions_signing_config = cardano_transactions_discriminant
172            .map(|_| epoch_service.current_cardano_transactions_signing_config())
173            .transpose()?
174            .cloned();
175
176        let epoch_settings_message = EpochSettingsMessage {
177            epoch,
178            signer_registration_protocol_parameters,
179            current_signers: SignerMessagePart::from_signers(current_signers.to_vec()),
180            next_signers: SignerMessagePart::from_signers(next_signers.to_vec()),
181            cardano_transactions_signing_config,
182        };
183
184        Ok(epoch_settings_message)
185    }
186
187    async fn get_certificate_message(
188        &self,
189        certificate_hash: &str,
190    ) -> StdResult<Option<CertificateMessage>> {
191        self.certificate_repository.get_certificate(certificate_hash).await
192    }
193
194    async fn get_latest_genesis_certificate_message(
195        &self,
196    ) -> StdResult<Option<CertificateMessage>> {
197        self.certificate_repository.get_latest_genesis_certificate().await
198    }
199
200    async fn get_certificate_list_message(
201        &self,
202        limit: usize,
203    ) -> StdResult<CertificateListMessage> {
204        self.certificate_repository.get_latest_certificates(limit).await
205    }
206
207    async fn get_snapshot_message(
208        &self,
209        signed_entity_id: &str,
210    ) -> StdResult<Option<SnapshotMessage>> {
211        let signed_entity = self.signed_entity_storer.get_signed_entity(signed_entity_id).await?;
212
213        signed_entity.map(|s| s.try_into()).transpose()
214    }
215
216    async fn get_snapshot_list_message(&self, limit: usize) -> StdResult<SnapshotListMessage> {
217        let signed_entity_type_id = SignedEntityTypeDiscriminants::CardanoImmutableFilesFull;
218        let entities = self
219            .signed_entity_storer
220            .get_last_signed_entities_by_type(&signed_entity_type_id, limit)
221            .await?;
222
223        entities.into_iter().map(|i| i.try_into()).collect()
224    }
225
226    async fn get_cardano_database_message(
227        &self,
228        signed_entity_id: &str,
229    ) -> StdResult<Option<CardanoDatabaseSnapshotMessage>> {
230        let signed_entity = self.signed_entity_storer.get_signed_entity(signed_entity_id).await?;
231
232        signed_entity.map(|v| v.try_into()).transpose()
233    }
234
235    async fn get_cardano_database_list_message(
236        &self,
237        limit: usize,
238    ) -> StdResult<CardanoDatabaseSnapshotListMessage> {
239        let signed_entity_type_id = SignedEntityTypeDiscriminants::CardanoDatabase;
240        let entities = self
241            .signed_entity_storer
242            .get_last_signed_entities_by_type(&signed_entity_type_id, limit)
243            .await?;
244
245        entities.into_iter().map(|i| i.try_into()).collect()
246    }
247
248    async fn get_cardano_database_list_message_by_epoch(
249        &self,
250        limit: usize,
251        epoch: Epoch,
252    ) -> StdResult<CardanoDatabaseSnapshotListMessage> {
253        let signed_entity_type_id = SignedEntityTypeDiscriminants::CardanoDatabase;
254        let entities = self
255            .signed_entity_storer
256            .get_last_signed_entities_by_type_and_epoch(&signed_entity_type_id, epoch, limit)
257            .await?;
258
259        entities.into_iter().map(|i| i.try_into()).collect()
260    }
261
262    async fn get_cardano_database_digest_list_message(
263        &self,
264    ) -> StdResult<CardanoDatabaseDigestListMessage> {
265        Ok(self
266            .immutable_file_digest_mapper
267            .get_immutable_file_digest_map()
268            .await?
269            .into_iter()
270            .map(
271                |(immutable_file_name, digest)| CardanoDatabaseDigestListItemMessage {
272                    immutable_file_name,
273                    digest,
274                },
275            )
276            .collect::<Vec<_>>())
277    }
278
279    async fn get_mithril_stake_distribution_message(
280        &self,
281        signed_entity_id: &str,
282    ) -> StdResult<Option<MithrilStakeDistributionMessage>> {
283        let signed_entity = self.signed_entity_storer.get_signed_entity(signed_entity_id).await?;
284
285        signed_entity.map(|v| v.try_into()).transpose()
286    }
287
288    async fn get_mithril_stake_distribution_list_message(
289        &self,
290        limit: usize,
291    ) -> StdResult<MithrilStakeDistributionListMessage> {
292        let signed_entity_type_id = SignedEntityTypeDiscriminants::MithrilStakeDistribution;
293        let entities = self
294            .signed_entity_storer
295            .get_last_signed_entities_by_type(&signed_entity_type_id, limit)
296            .await?;
297
298        entities.into_iter().map(|i| i.try_into()).collect()
299    }
300
301    async fn get_cardano_transaction_message(
302        &self,
303        signed_entity_id: &str,
304    ) -> StdResult<Option<CardanoTransactionSnapshotMessage>> {
305        let signed_entity = self.signed_entity_storer.get_signed_entity(signed_entity_id).await?;
306
307        signed_entity.map(|v| v.try_into()).transpose()
308    }
309
310    async fn get_cardano_transaction_list_message(
311        &self,
312        limit: usize,
313    ) -> StdResult<CardanoTransactionSnapshotListMessage> {
314        let signed_entity_type_id = SignedEntityTypeDiscriminants::CardanoTransactions;
315        let entities = self
316            .signed_entity_storer
317            .get_last_signed_entities_by_type(&signed_entity_type_id, limit)
318            .await?;
319
320        entities.into_iter().map(|i| i.try_into()).collect()
321    }
322
323    async fn get_cardano_stake_distribution_message(
324        &self,
325        signed_entity_id: &str,
326    ) -> StdResult<Option<CardanoStakeDistributionMessage>> {
327        let signed_entity = self.signed_entity_storer.get_signed_entity(signed_entity_id).await?;
328
329        signed_entity.map(|v| v.try_into()).transpose()
330    }
331
332    async fn get_cardano_stake_distribution_message_by_epoch(
333        &self,
334        epoch: Epoch,
335    ) -> StdResult<Option<CardanoStakeDistributionMessage>> {
336        let signed_entity = self
337            .signed_entity_storer
338            .get_cardano_stake_distribution_signed_entity_by_epoch(epoch)
339            .await?;
340
341        signed_entity.map(|v| v.try_into()).transpose()
342    }
343
344    async fn get_cardano_stake_distribution_list_message(
345        &self,
346        limit: usize,
347    ) -> StdResult<CardanoStakeDistributionListMessage> {
348        let signed_entity_type_id = SignedEntityTypeDiscriminants::CardanoStakeDistribution;
349        let entities = self
350            .signed_entity_storer
351            .get_last_signed_entities_by_type(&signed_entity_type_id, limit)
352            .await?;
353
354        entities.into_iter().map(|i| i.try_into()).collect()
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use mithril_common::entities::{BlockNumber, CardanoDbBeacon, Certificate, SignedEntityType};
361    use mithril_common::test::double::{Dummy, fake_data};
362    use tokio::sync::RwLock;
363
364    use crate::database::record::SignedEntityRecord;
365    use crate::database::repository::{ImmutableFileDigestRepository, SignedEntityStore};
366    use crate::database::test_helper::main_db_connection;
367    use crate::services::FakeEpochService;
368
369    use super::*;
370
371    struct MessageServiceBuilder {
372        certificates: Vec<Certificate>,
373        signed_entity_records: Vec<SignedEntityRecord>,
374        immutable_file_digest_messages: Vec<CardanoDatabaseDigestListItemMessage>,
375        epoch_service: Option<FakeEpochService>,
376    }
377
378    impl MessageServiceBuilder {
379        fn new() -> Self {
380            Self {
381                certificates: Vec::new(),
382                signed_entity_records: Vec::new(),
383                immutable_file_digest_messages: Vec::new(),
384                epoch_service: None,
385            }
386        }
387
388        fn with_certificates(mut self, certificates: &[Certificate]) -> Self {
389            self.certificates.extend_from_slice(certificates);
390
391            self
392        }
393
394        fn with_signed_entity_records(
395            mut self,
396            signed_entity_record: &[SignedEntityRecord],
397        ) -> Self {
398            self.signed_entity_records.extend_from_slice(signed_entity_record);
399
400            self
401        }
402
403        fn with_immutable_file_digest_messages(
404            mut self,
405            digests: &[CardanoDatabaseDigestListItemMessage],
406        ) -> Self {
407            self.immutable_file_digest_messages.extend_from_slice(digests);
408
409            self
410        }
411
412        fn with_epoch_service(mut self, epoch_service: FakeEpochService) -> Self {
413            self.epoch_service = Some(epoch_service);
414
415            self
416        }
417
418        async fn build(self) -> MithrilMessageService {
419            let connection = Arc::new(main_db_connection().unwrap());
420            let certificate_repository = CertificateRepository::new(connection.clone());
421            let signed_entity_store = SignedEntityStore::new(connection.clone());
422            let immutable_file_digest_mapper =
423                ImmutableFileDigestRepository::new(connection.clone());
424            let epoch_service = self.epoch_service.unwrap_or(FakeEpochService::without_data());
425
426            certificate_repository
427                .create_many_certificates(self.certificates)
428                .await
429                .unwrap();
430            for record in self.signed_entity_records {
431                signed_entity_store.store_signed_entity(&record).await.unwrap();
432            }
433
434            for digest_message in self.immutable_file_digest_messages {
435                immutable_file_digest_mapper
436                    .upsert_immutable_file_digest(
437                        &digest_message.immutable_file_name,
438                        &digest_message.digest,
439                    )
440                    .await
441                    .unwrap();
442            }
443
444            MithrilMessageService::new(
445                Arc::new(certificate_repository),
446                Arc::new(signed_entity_store),
447                Arc::new(immutable_file_digest_mapper),
448                Arc::new(RwLock::new(epoch_service)),
449            )
450        }
451    }
452
453    mod epoch_settings {
454        use mithril_common::{
455            entities::{CardanoTransactionsSigningConfig, ProtocolParameters},
456            test::builder::MithrilFixtureBuilder,
457        };
458
459        use crate::{entities::AggregatorEpochSettings, services::FakeEpochServiceBuilder};
460
461        use super::*;
462
463        #[tokio::test]
464        async fn get_epoch_settings_message() {
465            let fixture = MithrilFixtureBuilder::default().with_signers(3).build();
466            let epoch_service = FakeEpochService::from_fixture(Epoch(4), &fixture);
467            let message_service = MessageServiceBuilder::new()
468                .with_epoch_service(epoch_service)
469                .build()
470                .await;
471
472            let message = message_service
473                .get_epoch_settings_message(SignedEntityTypeDiscriminants::all())
474                .await
475                .unwrap();
476
477            assert_eq!(message.epoch, Epoch(4));
478            assert_eq!(
479                message.signer_registration_protocol_parameters,
480                ProtocolParameters::new(5, 100, 0.65)
481            );
482            assert_eq!(message.current_signers.len(), 3);
483            assert_eq!(message.next_signers.len(), 3);
484            assert_eq!(
485                message.cardano_transactions_signing_config,
486                Some(CardanoTransactionsSigningConfig {
487                    security_parameter: BlockNumber(0),
488                    step: BlockNumber(15)
489                })
490            );
491        }
492
493        #[tokio::test]
494        async fn get_epoch_settings_message_with_cardano_transactions_enabled() {
495            let fixture = MithrilFixtureBuilder::default().with_signers(3).build();
496            let epoch_service = FakeEpochService::from_fixture(Epoch(4), &fixture);
497            let message_service = MessageServiceBuilder::new()
498                .with_epoch_service(epoch_service)
499                .build()
500                .await;
501
502            let message = message_service
503                .get_epoch_settings_message(BTreeSet::from([
504                    SignedEntityTypeDiscriminants::CardanoTransactions,
505                ]))
506                .await
507                .unwrap();
508
509            assert!(message.cardano_transactions_signing_config.is_some());
510        }
511
512        #[tokio::test]
513        async fn get_epoch_settings_message_with_cardano_transactions_not_enabled() {
514            let fixture = MithrilFixtureBuilder::default().with_signers(3).build();
515            let epoch_service = FakeEpochService::from_fixture(Epoch(4), &fixture);
516            let message_service = MessageServiceBuilder::new()
517                .with_epoch_service(epoch_service)
518                .build()
519                .await;
520
521            let message = message_service
522                .get_epoch_settings_message(BTreeSet::new())
523                .await
524                .unwrap();
525
526            assert_eq!(message.cardano_transactions_signing_config, None);
527        }
528
529        #[tokio::test]
530        async fn get_epoch_settings_message_retrieves_protocol_parameters_from_epoch_service() {
531            let current_epoch_settings = AggregatorEpochSettings {
532                protocol_parameters: ProtocolParameters::new(101, 10, 0.5),
533                ..AggregatorEpochSettings::dummy()
534            };
535            let next_epoch_settings = AggregatorEpochSettings {
536                protocol_parameters: ProtocolParameters::new(102, 20, 0.5),
537                ..AggregatorEpochSettings::dummy()
538            };
539            let signer_registration_epoch_settings = AggregatorEpochSettings {
540                protocol_parameters: ProtocolParameters::new(103, 30, 0.5),
541                ..AggregatorEpochSettings::dummy()
542            };
543            let epoch_service = FakeEpochServiceBuilder {
544                current_epoch_settings,
545                next_epoch_settings: next_epoch_settings.clone(),
546                signer_registration_epoch_settings: signer_registration_epoch_settings.clone(),
547                current_signers_with_stake: fake_data::signers_with_stakes(5),
548                next_signers_with_stake: fake_data::signers_with_stakes(3),
549                ..FakeEpochServiceBuilder::dummy(Epoch(1))
550            }
551            .build();
552            let message_service = MessageServiceBuilder::new()
553                .with_epoch_service(epoch_service)
554                .build()
555                .await;
556
557            let message = message_service
558                .get_epoch_settings_message(SignedEntityTypeDiscriminants::all())
559                .await
560                .unwrap();
561
562            assert_eq!(
563                message.signer_registration_protocol_parameters,
564                signer_registration_epoch_settings.protocol_parameters
565            );
566        }
567
568        #[tokio::test]
569        async fn get_epoch_settings_message_retrieves_signing_configuration_from_epoch_service() {
570            let current_epoch_settings = AggregatorEpochSettings {
571                cardano_transactions_signing_config: CardanoTransactionsSigningConfig {
572                    security_parameter: BlockNumber(100),
573                    step: BlockNumber(15),
574                },
575                ..AggregatorEpochSettings::dummy()
576            };
577            let next_epoch_settings = AggregatorEpochSettings {
578                cardano_transactions_signing_config: CardanoTransactionsSigningConfig {
579                    security_parameter: BlockNumber(200),
580                    step: BlockNumber(15),
581                },
582                ..AggregatorEpochSettings::dummy()
583            };
584            let epoch_service = FakeEpochServiceBuilder {
585                current_epoch_settings: current_epoch_settings.clone(),
586                next_epoch_settings: next_epoch_settings.clone(),
587                signer_registration_epoch_settings: AggregatorEpochSettings::dummy(),
588                current_signers_with_stake: fake_data::signers_with_stakes(5),
589                next_signers_with_stake: fake_data::signers_with_stakes(3),
590                ..FakeEpochServiceBuilder::dummy(Epoch(1))
591            }
592            .build();
593            let message_service = MessageServiceBuilder::new()
594                .with_epoch_service(epoch_service)
595                .build()
596                .await;
597
598            let message = message_service
599                .get_epoch_settings_message(SignedEntityTypeDiscriminants::all())
600                .await
601                .unwrap();
602
603            assert_eq!(
604                message.cardano_transactions_signing_config,
605                Some(current_epoch_settings.cardano_transactions_signing_config),
606            );
607        }
608    }
609
610    mod certificate {
611        use super::*;
612
613        #[tokio::test]
614        async fn get_no_certificate() {
615            let service = MessageServiceBuilder::new().build().await;
616
617            let certificate_hash = "whatever";
618            let certificate_message =
619                service.get_certificate_message(certificate_hash).await.unwrap();
620            assert!(certificate_message.is_none());
621        }
622
623        #[tokio::test]
624        async fn get_certificate() {
625            let genesis_certificate = fake_data::genesis_certificate("genesis_hash");
626            let service = MessageServiceBuilder::new()
627                .with_certificates(std::slice::from_ref(&genesis_certificate))
628                .build()
629                .await;
630
631            let certificate_message = service
632                .get_certificate_message(&genesis_certificate.hash)
633                .await
634                .unwrap()
635                .expect("There should be a certificate.");
636            assert_eq!(genesis_certificate.hash, certificate_message.hash);
637        }
638
639        #[tokio::test]
640        async fn get_no_latest_genesis_certificate() {
641            let service = MessageServiceBuilder::new().build().await;
642
643            let certificate_message =
644                service.get_latest_genesis_certificate_message().await.unwrap();
645            assert_eq!(None, certificate_message);
646        }
647
648        #[tokio::test]
649        async fn get_latest_genesis_certificate() {
650            let certificates = [
651                fake_data::genesis_certificate("certificate_1"),
652                fake_data::genesis_certificate("certificate_2"),
653                fake_data::certificate("certificate_3"),
654            ];
655            let last_genesis_hash = certificates[1].hash.clone();
656            let service = MessageServiceBuilder::new()
657                .with_certificates(&certificates)
658                .build()
659                .await;
660
661            let certificate_message = service
662                .get_latest_genesis_certificate_message()
663                .await
664                .unwrap()
665                .expect("There should be a genesis certificate.");
666            assert_eq!(last_genesis_hash, certificate_message.hash);
667        }
668
669        #[tokio::test]
670        async fn get_last_certificates() {
671            let certificates = [
672                fake_data::genesis_certificate("certificate_1"),
673                fake_data::genesis_certificate("certificate_2"),
674            ];
675            let last_certificate_hash = certificates[1].hash.clone();
676            let service = MessageServiceBuilder::new()
677                .with_certificates(&certificates)
678                .build()
679                .await;
680
681            let certificate_messages = service.get_certificate_list_message(5).await.unwrap();
682
683            assert_eq!(2, certificate_messages.len());
684            assert_eq!(last_certificate_hash, certificate_messages[0].hash);
685        }
686    }
687
688    mod snapshot {
689        use super::*;
690
691        #[tokio::test]
692        async fn get_snapshot_not_exist() {
693            let service = MessageServiceBuilder::new().build().await;
694            let snapshot = service.get_snapshot_message("whatever").await.unwrap();
695
696            assert!(snapshot.is_none());
697        }
698
699        #[tokio::test]
700        async fn get_snapshot() {
701            let record = SignedEntityRecord {
702                signed_entity_id: "signed_entity_id".to_string(),
703                signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(fake_data::beacon()),
704                certificate_id: "cert_id".to_string(),
705                artifact: serde_json::to_string(&fake_data::snapshot(1)).unwrap(),
706                created_at: Default::default(),
707            };
708            let message: SnapshotMessage = record.clone().try_into().unwrap();
709
710            let service = MessageServiceBuilder::new()
711                .with_signed_entity_records(std::slice::from_ref(&record))
712                .build()
713                .await;
714
715            let response = service
716                .get_snapshot_message(&record.signed_entity_id)
717                .await
718                .unwrap()
719                .expect("A SnapshotMessage was expected.");
720
721            assert_eq!(message, response);
722        }
723
724        #[tokio::test]
725        async fn get_snapshot_list_message() {
726            let records = vec![
727                SignedEntityRecord {
728                    signed_entity_id: "signed_entity_id-1".to_string(),
729                    signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(
730                        fake_data::beacon(),
731                    ),
732                    certificate_id: "cert_id-1".to_string(),
733                    artifact: serde_json::to_string(&fake_data::snapshot(1)).unwrap(),
734                    created_at: Default::default(),
735                },
736                SignedEntityRecord {
737                    signed_entity_id: "signed_entity_id-2".to_string(),
738                    signed_entity_type: SignedEntityType::CardanoDatabase(fake_data::beacon()),
739                    certificate_id: "cert_id-2".to_string(),
740                    artifact: serde_json::to_string(&fake_data::cardano_database_snapshot(1))
741                        .unwrap(),
742                    created_at: Default::default(),
743                },
744            ];
745            let message: SnapshotListMessage = vec![records[0].clone().try_into().unwrap()];
746
747            let service = MessageServiceBuilder::new()
748                .with_signed_entity_records(&records)
749                .build()
750                .await;
751
752            let response = service.get_snapshot_list_message(0).await.unwrap();
753            assert!(response.is_empty());
754
755            let response = service.get_snapshot_list_message(3).await.unwrap();
756            assert_eq!(message, response);
757        }
758    }
759
760    mod cardano_database {
761        use super::*;
762
763        #[tokio::test]
764        async fn get_cardano_database_when_record_does_not_exist() {
765            let service = MessageServiceBuilder::new().build().await;
766            let snapshot = service.get_cardano_database_message("whatever").await.unwrap();
767
768            assert!(snapshot.is_none());
769        }
770
771        #[tokio::test]
772        async fn get_cardano_database() {
773            let record = SignedEntityRecord {
774                signed_entity_id: "signed_entity_id".to_string(),
775                signed_entity_type: SignedEntityType::CardanoDatabase(fake_data::beacon()),
776                certificate_id: "cert_id".to_string(),
777                artifact: serde_json::to_string(&fake_data::cardano_database_snapshot(1)).unwrap(),
778                created_at: Default::default(),
779            };
780            let message: CardanoDatabaseSnapshotMessage = record.clone().try_into().unwrap();
781
782            let service = MessageServiceBuilder::new()
783                .with_signed_entity_records(std::slice::from_ref(&record))
784                .build()
785                .await;
786
787            let response = service
788                .get_cardano_database_message(&record.signed_entity_id)
789                .await
790                .unwrap()
791                .expect("A CardanoDatabaseSnapshotMessage was expected.");
792
793            assert_eq!(message, response);
794        }
795
796        #[tokio::test]
797        async fn get_cardano_database_list_message() {
798            let records = vec![
799                SignedEntityRecord {
800                    signed_entity_id: "signed_entity_id-1".to_string(),
801                    signed_entity_type: SignedEntityType::CardanoDatabase(fake_data::beacon()),
802                    certificate_id: "cert_id-1".to_string(),
803                    artifact: serde_json::to_string(&fake_data::cardano_database_snapshot(1))
804                        .unwrap(),
805                    created_at: Default::default(),
806                },
807                SignedEntityRecord {
808                    signed_entity_id: "signed_entity_id-2".to_string(),
809                    signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(
810                        fake_data::beacon(),
811                    ),
812                    certificate_id: "cert_id-2".to_string(),
813                    artifact: serde_json::to_string(&fake_data::snapshot(1)).unwrap(),
814                    created_at: Default::default(),
815                },
816            ];
817            let message: CardanoDatabaseSnapshotListMessage =
818                vec![records[0].clone().try_into().unwrap()];
819
820            let service = MessageServiceBuilder::new()
821                .with_signed_entity_records(&records)
822                .build()
823                .await;
824
825            let response = service.get_cardano_database_list_message(0).await.unwrap();
826            assert!(response.is_empty());
827
828            let response = service.get_cardano_database_list_message(3).await.unwrap();
829            assert_eq!(message, response);
830        }
831
832        #[tokio::test]
833        async fn get_cardano_database_list_message_by_epoch() {
834            let records = vec![
835                // Cardano database on epoch 3
836                SignedEntityRecord {
837                    signed_entity_id: "signed_entity_id-1".to_string(),
838                    signed_entity_type: SignedEntityType::CardanoDatabase(CardanoDbBeacon::new(
839                        3, 100,
840                    )),
841                    certificate_id: "cert_id-1".to_string(),
842                    artifact: serde_json::to_string(&fake_data::cardano_database_snapshot(100))
843                        .unwrap(),
844                    created_at: Default::default(),
845                },
846                // Another signed entity type on the same epoch
847                SignedEntityRecord {
848                    signed_entity_id: "signed_entity_id-2".to_string(),
849                    signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(
850                        CardanoDbBeacon::new(3, 100),
851                    ),
852                    certificate_id: "cert_id-2".to_string(),
853                    artifact: serde_json::to_string(&fake_data::snapshot(1)).unwrap(),
854                    created_at: Default::default(),
855                },
856                // Cardano database also on epoch 3
857                SignedEntityRecord {
858                    signed_entity_id: "signed_entity_id-3".to_string(),
859                    signed_entity_type: SignedEntityType::CardanoDatabase(CardanoDbBeacon::new(
860                        3, 102,
861                    )),
862                    certificate_id: "cert_id-3".to_string(),
863                    artifact: serde_json::to_string(&fake_data::cardano_database_snapshot(102))
864                        .unwrap(),
865                    created_at: Default::default(),
866                },
867                // Cardano database also another epoch
868                SignedEntityRecord {
869                    signed_entity_id: "signed_entity_id-4".to_string(),
870                    signed_entity_type: SignedEntityType::CardanoDatabase(CardanoDbBeacon::new(
871                        4, 104,
872                    )),
873                    certificate_id: "cert_id-4".to_string(),
874                    artifact: serde_json::to_string(&fake_data::cardano_database_snapshot(104))
875                        .unwrap(),
876                    created_at: Default::default(),
877                },
878            ];
879            let message: CardanoDatabaseSnapshotListMessage = vec![
880                records[2].clone().try_into().unwrap(),
881                records[0].clone().try_into().unwrap(),
882            ];
883
884            let service = MessageServiceBuilder::new()
885                .with_signed_entity_records(&records)
886                .build()
887                .await;
888
889            let response = service
890                .get_cardano_database_list_message_by_epoch(0, Epoch(3))
891                .await
892                .unwrap();
893            assert!(response.is_empty());
894
895            let response = service
896                .get_cardano_database_list_message_by_epoch(3, Epoch(3))
897                .await
898                .unwrap();
899            assert_eq!(message, response);
900        }
901
902        #[tokio::test]
903        async fn get_cardano_database_digest_list_message() {
904            let messages: CardanoDatabaseDigestListMessage = vec![
905                CardanoDatabaseDigestListItemMessage {
906                    immutable_file_name: "06685.chunk".to_string(),
907                    digest: "0af556ab2620dd9363bf76963a231abe8948a500ea6be31b131d87907ab09b1e"
908                        .to_string(),
909                },
910                CardanoDatabaseDigestListItemMessage {
911                    immutable_file_name: "06685.primary".to_string(),
912                    digest: "32dfd6b722d87f253e78eb8b478fb94f1e13463826e674d6ec7b6bf0892b2e39"
913                        .to_string(),
914                },
915            ];
916
917            let service = MessageServiceBuilder::new()
918                .with_immutable_file_digest_messages(&messages)
919                .build()
920                .await;
921
922            let response = service.get_cardano_database_digest_list_message().await.unwrap();
923
924            assert_eq!(messages, response);
925        }
926    }
927
928    mod mithril_stake_distribution {
929        use super::*;
930
931        #[tokio::test]
932        async fn get_mithril_stake_distribution() {
933            let record = SignedEntityRecord {
934                signed_entity_id: "signed_entity_id".to_string(),
935                signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(18)),
936                certificate_id: "cert_id".to_string(),
937                artifact: serde_json::to_string(&fake_data::mithril_stake_distribution(
938                    Epoch(1),
939                    vec![],
940                ))
941                .unwrap(),
942                created_at: Default::default(),
943            };
944            let message: MithrilStakeDistributionMessage = record.clone().try_into().unwrap();
945
946            let service = MessageServiceBuilder::new()
947                .with_signed_entity_records(std::slice::from_ref(&record))
948                .build()
949                .await;
950
951            let response = service
952                .get_mithril_stake_distribution_message(&record.signed_entity_id)
953                .await
954                .unwrap()
955                .expect("A MithrilStakeDistributionMessage was expected.");
956
957            assert_eq!(message, response);
958        }
959
960        #[tokio::test]
961        async fn get_mithril_stake_distribution_not_exist() {
962            let service = MessageServiceBuilder::new().build().await;
963
964            let response = service
965                .get_mithril_stake_distribution_message("whatever")
966                .await
967                .unwrap();
968
969            assert!(response.is_none());
970        }
971
972        #[tokio::test]
973        async fn get_mithril_stake_distribution_list_message() {
974            let records = vec![
975                SignedEntityRecord {
976                    signed_entity_id: "signed_entity_id-1".to_string(),
977                    signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(18)),
978                    certificate_id: "cert_id-1".to_string(),
979                    artifact: serde_json::to_string(&fake_data::mithril_stake_distribution(
980                        Epoch(1),
981                        vec![],
982                    ))
983                    .unwrap(),
984                    created_at: Default::default(),
985                },
986                SignedEntityRecord {
987                    signed_entity_id: "signed_entity_id-2".to_string(),
988                    signed_entity_type: SignedEntityType::CardanoDatabase(fake_data::beacon()),
989                    certificate_id: "cert_id-2".to_string(),
990                    artifact: serde_json::to_string(&fake_data::cardano_database_snapshot(1))
991                        .unwrap(),
992                    created_at: Default::default(),
993                },
994            ];
995            let message: MithrilStakeDistributionListMessage =
996                vec![records[0].clone().try_into().unwrap()];
997
998            let service = MessageServiceBuilder::new()
999                .with_signed_entity_records(&records)
1000                .build()
1001                .await;
1002
1003            let response = service.get_mithril_stake_distribution_list_message(0).await.unwrap();
1004            assert!(response.is_empty());
1005
1006            let response = service.get_mithril_stake_distribution_list_message(3).await.unwrap();
1007            assert_eq!(message, response);
1008        }
1009    }
1010
1011    mod cardano_transaction {
1012        use super::*;
1013
1014        #[tokio::test]
1015        async fn get_cardano_transaction() {
1016            let record = SignedEntityRecord {
1017                signed_entity_id: "signed_entity_id".to_string(),
1018                signed_entity_type: SignedEntityType::CardanoTransactions(
1019                    Epoch(18),
1020                    BlockNumber(120),
1021                ),
1022                certificate_id: "cert_id".to_string(),
1023                artifact: serde_json::to_string(&fake_data::cardano_transactions_snapshot(
1024                    BlockNumber(1),
1025                ))
1026                .unwrap(),
1027                created_at: Default::default(),
1028            };
1029            let message: CardanoTransactionSnapshotMessage = record.clone().try_into().unwrap();
1030
1031            let service = MessageServiceBuilder::new()
1032                .with_signed_entity_records(std::slice::from_ref(&record))
1033                .build()
1034                .await;
1035
1036            let response = service
1037                .get_cardano_transaction_message(&record.signed_entity_id)
1038                .await
1039                .unwrap()
1040                .expect("A CardanoTransactionMessage was expected.");
1041
1042            assert_eq!(message, response);
1043        }
1044
1045        #[tokio::test]
1046        async fn get_cardano_transaction_not_exist() {
1047            let service = MessageServiceBuilder::new().build().await;
1048
1049            let response = service.get_cardano_transaction_message("whatever").await.unwrap();
1050
1051            assert!(response.is_none());
1052        }
1053
1054        #[tokio::test]
1055        async fn get_cardano_transaction_list_message() {
1056            let records = vec![
1057                SignedEntityRecord {
1058                    signed_entity_id: "signed_entity_id-1".to_string(),
1059                    signed_entity_type: SignedEntityType::CardanoTransactions(
1060                        Epoch(18),
1061                        BlockNumber(120),
1062                    ),
1063                    certificate_id: "cert_id-1".to_string(),
1064                    artifact: serde_json::to_string(&fake_data::cardano_transactions_snapshot(
1065                        BlockNumber(1),
1066                    ))
1067                    .unwrap(),
1068                    created_at: Default::default(),
1069                },
1070                SignedEntityRecord {
1071                    signed_entity_id: "signed_entity_id-2".to_string(),
1072                    signed_entity_type: SignedEntityType::CardanoDatabase(fake_data::beacon()),
1073                    certificate_id: "cert_id-2".to_string(),
1074                    artifact: serde_json::to_string(&fake_data::cardano_database_snapshot(1))
1075                        .unwrap(),
1076                    created_at: Default::default(),
1077                },
1078            ];
1079            let message: CardanoTransactionSnapshotListMessage =
1080                vec![records[0].clone().try_into().unwrap()];
1081
1082            let service = MessageServiceBuilder::new()
1083                .with_signed_entity_records(&records)
1084                .build()
1085                .await;
1086
1087            let response = service.get_cardano_transaction_list_message(0).await.unwrap();
1088            assert!(response.is_empty());
1089
1090            let response = service.get_cardano_transaction_list_message(3).await.unwrap();
1091            assert_eq!(message, response);
1092        }
1093    }
1094
1095    mod cardano_stake_distribution {
1096        use super::*;
1097
1098        #[tokio::test]
1099        async fn get_cardano_stake_distribution() {
1100            let record = SignedEntityRecord {
1101                signed_entity_id: "signed_entity_id".to_string(),
1102                signed_entity_type: SignedEntityType::CardanoStakeDistribution(Epoch(18)),
1103                certificate_id: "cert_id".to_string(),
1104                artifact: serde_json::to_string(&fake_data::cardano_stake_distribution(Epoch(1)))
1105                    .unwrap(),
1106                created_at: Default::default(),
1107            };
1108            let message: CardanoStakeDistributionMessage = record.clone().try_into().unwrap();
1109
1110            let service = MessageServiceBuilder::new()
1111                .with_signed_entity_records(std::slice::from_ref(&record))
1112                .build()
1113                .await;
1114
1115            let response = service
1116                .get_cardano_stake_distribution_message(&record.signed_entity_id)
1117                .await
1118                .unwrap()
1119                .expect("A CardanoStakeDistributionMessage was expected.");
1120
1121            assert_eq!(message, response);
1122        }
1123
1124        #[tokio::test]
1125        async fn get_cardano_stake_distribution_not_exist() {
1126            let service = MessageServiceBuilder::new().build().await;
1127
1128            let response = service
1129                .get_cardano_stake_distribution_message("whatever")
1130                .await
1131                .unwrap();
1132
1133            assert!(response.is_none());
1134        }
1135
1136        #[tokio::test]
1137        async fn get_cardano_stake_distribution_by_epoch() {
1138            let record = SignedEntityRecord {
1139                signed_entity_id: "signed_entity_id".to_string(),
1140                signed_entity_type: SignedEntityType::CardanoStakeDistribution(Epoch(18)),
1141                certificate_id: "cert_id".to_string(),
1142                artifact: serde_json::to_string(&fake_data::cardano_stake_distribution(Epoch(1)))
1143                    .unwrap(),
1144                created_at: Default::default(),
1145            };
1146            let message: CardanoStakeDistributionMessage = record.clone().try_into().unwrap();
1147
1148            let service = MessageServiceBuilder::new()
1149                .with_signed_entity_records(std::slice::from_ref(&record))
1150                .build()
1151                .await;
1152
1153            let response = service
1154                .get_cardano_stake_distribution_message_by_epoch(
1155                    record.signed_entity_type.get_epoch(),
1156                )
1157                .await
1158                .unwrap()
1159                .expect("A CardanoStakeDistributionMessage was expected.");
1160
1161            assert_eq!(message, response);
1162        }
1163
1164        #[tokio::test]
1165        async fn get_cardano_stake_distribution_by_epoch_not_exist() {
1166            let service = MessageServiceBuilder::new().build().await;
1167
1168            let response = service
1169                .get_cardano_stake_distribution_message_by_epoch(Epoch(999))
1170                .await
1171                .unwrap();
1172
1173            assert!(response.is_none());
1174        }
1175
1176        #[tokio::test]
1177        async fn get_cardano_stake_distribution_list_message() {
1178            let records = vec![
1179                SignedEntityRecord {
1180                    signed_entity_id: "signed_entity_id-1".to_string(),
1181                    signed_entity_type: SignedEntityType::CardanoStakeDistribution(Epoch(18)),
1182                    certificate_id: "cert_id-1".to_string(),
1183                    artifact: serde_json::to_string(&fake_data::cardano_stake_distribution(Epoch(
1184                        1,
1185                    )))
1186                    .unwrap(),
1187                    created_at: Default::default(),
1188                },
1189                SignedEntityRecord {
1190                    signed_entity_id: "signed_entity_id-2".to_string(),
1191                    signed_entity_type: SignedEntityType::CardanoDatabase(fake_data::beacon()),
1192                    certificate_id: "cert_id-2".to_string(),
1193                    artifact: serde_json::to_string(&fake_data::cardano_database_snapshot(1))
1194                        .unwrap(),
1195                    created_at: Default::default(),
1196                },
1197            ];
1198            let message: CardanoStakeDistributionListMessage =
1199                vec![records[0].clone().try_into().unwrap()];
1200
1201            let service = MessageServiceBuilder::new()
1202                .with_signed_entity_records(&records)
1203                .build()
1204                .await;
1205
1206            let response = service.get_cardano_stake_distribution_list_message(0).await.unwrap();
1207            assert!(response.is_empty());
1208
1209            let response = service.get_cardano_stake_distribution_list_message(3).await.unwrap();
1210            assert_eq!(message, response);
1211        }
1212    }
1213}