mithril_aggregator/services/
signed_entity.rs

1//! ## SignedEntityService
2//!
3//! This service is responsible for dealing with [SignedEntity] type.
4//! It creates [Artifact] that can be accessed by clients.
5use anyhow::{anyhow, Context};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{info, warn, Logger};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13    entities::{
14        BlockNumber, CardanoDatabaseSnapshot, CardanoDbBeacon, CardanoStakeDistribution,
15        CardanoTransactionsSnapshot, Certificate, Epoch, MithrilStakeDistribution,
16        SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
17    },
18    logging::LoggerExtensions,
19    signable_builder::{Artifact, SignedEntity},
20    StdResult,
21};
22
23use crate::{
24    artifact_builder::ArtifactBuilder,
25    database::{record::SignedEntityRecord, repository::SignedEntityStorer},
26    MetricsService,
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30/// ArtifactBuilder Service trait
31#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34    /// Create artifact for a signed entity type and a certificate
35    async fn create_artifact(
36        &self,
37        signed_entity_type: SignedEntityType,
38        certificate: &Certificate,
39    ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41    /// Return a list of signed snapshots order by creation date descending.
42    async fn get_last_signed_snapshots(
43        &self,
44        total: usize,
45    ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47    /// Return a signed snapshot
48    async fn get_signed_snapshot_by_id(
49        &self,
50        signed_entity_id: &str,
51    ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53    /// Return a list of Cardano Database snapshots order by creation date descending.
54    async fn get_last_signed_cardano_database_snapshots(
55        &self,
56        total: usize,
57    ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59    /// Return a Cardano Database snapshot
60    async fn get_signed_cardano_database_snapshot_by_id(
61        &self,
62        signed_entity_id: &str,
63    ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65    /// Return a list of signed Mithril stake distribution ordered by creation
66    /// date descending.
67    async fn get_last_signed_mithril_stake_distributions(
68        &self,
69        total: usize,
70    ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72    /// Return a signed Mithril stake distribution
73    async fn get_signed_mithril_stake_distribution_by_id(
74        &self,
75        signed_entity_id: &str,
76    ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78    /// Return the last signed Cardano Transaction Snapshot.
79    async fn get_last_cardano_transaction_snapshot(
80        &self,
81    ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83    /// Return a list of signed Cardano stake distribution ordered by creation
84    /// date descending.
85    async fn get_last_signed_cardano_stake_distributions(
86        &self,
87        total: usize,
88    ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
89}
90
91/// Mithril ArtifactBuilder Service
92#[derive(Clone)]
93pub struct MithrilSignedEntityService {
94    signed_entity_storer: Arc<dyn SignedEntityStorer>,
95    mithril_stake_distribution_artifact_builder:
96        Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
97    cardano_immutable_files_full_artifact_builder:
98        Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
99    cardano_transactions_artifact_builder:
100        Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
101    signed_entity_type_lock: Arc<SignedEntityTypeLock>,
102    cardano_stake_distribution_artifact_builder:
103        Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
104    cardano_database_artifact_builder:
105        Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
106    metrics_service: Arc<MetricsService>,
107    logger: Logger,
108}
109
110/// ArtifactsBuilder dependencies required by the [MithrilSignedEntityService].
111pub struct SignedEntityServiceArtifactsDependencies {
112    mithril_stake_distribution_artifact_builder:
113        Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
114    cardano_immutable_files_full_artifact_builder:
115        Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
116    cardano_transactions_artifact_builder:
117        Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
118    cardano_stake_distribution_artifact_builder:
119        Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
120    cardano_database_artifact_builder:
121        Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
122}
123
124impl SignedEntityServiceArtifactsDependencies {
125    /// Create a new instance of [SignedEntityServiceArtifactsDependencies].
126    pub fn new(
127        mithril_stake_distribution_artifact_builder: Arc<
128            dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
129        >,
130        cardano_immutable_files_full_artifact_builder: Arc<
131            dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
132        >,
133        cardano_transactions_artifact_builder: Arc<
134            dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
135        >,
136        cardano_stake_distribution_artifact_builder: Arc<
137            dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
138        >,
139        cardano_database_artifact_builder: Arc<
140            dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
141        >,
142    ) -> Self {
143        Self {
144            mithril_stake_distribution_artifact_builder,
145            cardano_immutable_files_full_artifact_builder,
146            cardano_transactions_artifact_builder,
147            cardano_stake_distribution_artifact_builder,
148            cardano_database_artifact_builder,
149        }
150    }
151}
152
153impl MithrilSignedEntityService {
154    /// MithrilSignedEntityService factory
155    pub fn new(
156        signed_entity_storer: Arc<dyn SignedEntityStorer>,
157        dependencies: SignedEntityServiceArtifactsDependencies,
158        signed_entity_type_lock: Arc<SignedEntityTypeLock>,
159        metrics_service: Arc<MetricsService>,
160        logger: Logger,
161    ) -> Self {
162        Self {
163            signed_entity_storer,
164            mithril_stake_distribution_artifact_builder: dependencies
165                .mithril_stake_distribution_artifact_builder,
166            cardano_immutable_files_full_artifact_builder: dependencies
167                .cardano_immutable_files_full_artifact_builder,
168            cardano_transactions_artifact_builder: dependencies
169                .cardano_transactions_artifact_builder,
170            cardano_stake_distribution_artifact_builder: dependencies
171                .cardano_stake_distribution_artifact_builder,
172            cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
173            signed_entity_type_lock,
174            metrics_service,
175            logger: logger.new_with_component_name::<Self>(),
176        }
177    }
178
179    async fn create_artifact_task(
180        &self,
181        signed_entity_type: SignedEntityType,
182        certificate: &Certificate,
183    ) -> StdResult<()> {
184        info!(
185            self.logger, ">> create_artifact_task";
186            "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
187        );
188
189        let mut remaining_retries = 2;
190        let artifact = loop {
191            remaining_retries -= 1;
192
193            match self
194                .compute_artifact(signed_entity_type.clone(), certificate)
195                .await
196            {
197                Err(error) if remaining_retries == 0 => break Err(error),
198                Err(_error) => (),
199                Ok(artifact) => break Ok(artifact),
200            };
201        }?;
202
203        let signed_entity = SignedEntityRecord {
204            signed_entity_id: artifact.get_id(),
205            signed_entity_type: signed_entity_type.clone(),
206            certificate_id: certificate.hash.clone(),
207            artifact: serde_json::to_string(&artifact)?,
208            created_at: Utc::now(),
209        };
210
211        self.signed_entity_storer
212            .store_signed_entity(&signed_entity)
213            .await
214            .with_context(|| {
215                format!(
216                    "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
217                )
218            })?;
219
220        self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
221
222        Ok(())
223    }
224
225    /// Compute artifact from signed entity type
226    async fn compute_artifact(
227        &self,
228        signed_entity_type: SignedEntityType,
229        certificate: &Certificate,
230    ) -> StdResult<Arc<dyn Artifact>> {
231        match signed_entity_type.clone() {
232            SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
233                self.mithril_stake_distribution_artifact_builder
234                    .compute_artifact(epoch, certificate)
235                    .await
236                    .with_context(|| {
237                        format!(
238                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
239                        )
240                    })?,
241            )),
242            SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
243                self.cardano_immutable_files_full_artifact_builder
244                    .compute_artifact(beacon.clone(), certificate)
245                    .await
246                    .with_context(|| {
247                        format!(
248                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
249                        )
250                    })?,
251            )),
252            SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
253                self.cardano_stake_distribution_artifact_builder
254                .compute_artifact(epoch, certificate)
255                .await
256                .with_context(|| {
257                    format!(
258                        "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
259                    )
260                })?)),
261            SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
262                self.cardano_transactions_artifact_builder
263                    .compute_artifact(block_number, certificate)
264                    .await
265                    .with_context(|| {
266                        format!(
267                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
268                        )
269                    })?,
270            )),
271            SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
272                self.cardano_database_artifact_builder
273                    .compute_artifact(beacon, certificate)
274                    .await
275                    .with_context(|| {
276                        format!(
277                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
278                        )
279                    })?
280            )),
281        }
282    }
283
284    async fn get_last_signed_entities(
285        &self,
286        total: usize,
287        discriminants: &SignedEntityTypeDiscriminants,
288    ) -> StdResult<Vec<SignedEntityRecord>> {
289        self.signed_entity_storer
290            .get_last_signed_entities_by_type(discriminants, total)
291            .await
292            .with_context(|| {
293                format!(
294                    "Signed Entity Service can not get last signed entities with type: '{discriminants:?}'"
295                )
296            })
297    }
298
299    fn increment_artifact_total_produced_metric_since_startup(
300        &self,
301        signed_entity_type: SignedEntityType,
302    ) {
303        let metrics = self.metrics_service.clone();
304        let metric_counter = match signed_entity_type {
305            SignedEntityType::MithrilStakeDistribution(_) => {
306                metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
307            }
308            SignedEntityType::CardanoImmutableFilesFull(_) => {
309                metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
310            }
311            SignedEntityType::CardanoStakeDistribution(_) => {
312                metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
313            }
314            SignedEntityType::CardanoTransactions(_, _) => {
315                metrics.get_artifact_cardano_transaction_total_produced_since_startup()
316            }
317            SignedEntityType::CardanoDatabase(_) => {
318                metrics.get_artifact_cardano_database_total_produced_since_startup()
319            }
320        };
321
322        metric_counter.increment();
323    }
324}
325
326#[async_trait]
327impl SignedEntityService for MithrilSignedEntityService {
328    async fn create_artifact(
329        &self,
330        signed_entity_type: SignedEntityType,
331        certificate: &Certificate,
332    ) -> StdResult<JoinHandle<StdResult<()>>> {
333        if self
334            .signed_entity_type_lock
335            .is_locked(&signed_entity_type)
336            .await
337        {
338            return Err(anyhow!(
339                "Signed entity type '{:?}' is already locked",
340                signed_entity_type
341            ));
342        }
343
344        let service = self.clone();
345        let certificate_cloned = certificate.clone();
346        service
347            .signed_entity_type_lock
348            .lock(&signed_entity_type)
349            .await;
350
351        Ok(tokio::task::spawn(async move {
352            let signed_entity_type_clone = signed_entity_type.clone();
353            let service_clone = service.clone();
354            let result = tokio::task::spawn(async move {
355                service_clone
356                    .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
357                    .await
358            })
359            .await;
360            service
361                .signed_entity_type_lock
362                .release(signed_entity_type.clone())
363                .await;
364
365            result.with_context(|| format!(
366                "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
367            ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
368        }))
369    }
370
371    async fn get_last_signed_snapshots(
372        &self,
373        total: usize,
374    ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
375        let signed_entities = self
376            .get_last_signed_entities(
377                total,
378                &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
379            )
380            .await?
381            .into_iter()
382            .map(|record| record.try_into())
383            .collect::<Result<Vec<_>, _>>()?;
384
385        Ok(signed_entities)
386    }
387
388    async fn get_signed_snapshot_by_id(
389        &self,
390        signed_entity_id: &str,
391    ) -> StdResult<Option<SignedEntity<Snapshot>>> {
392        let entity: Option<SignedEntity<Snapshot>> = match self
393            .signed_entity_storer
394            .get_signed_entity(signed_entity_id)
395            .await
396            .with_context(|| {
397                format!(
398                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
399                )
400            })? {
401            Some(entity) => Some(entity.try_into()?),
402            None => None,
403        };
404
405        Ok(entity)
406    }
407
408    async fn get_last_signed_cardano_database_snapshots(
409        &self,
410        total: usize,
411    ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
412        let signed_entities = self
413            .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
414            .await?
415            .into_iter()
416            .map(|record| record.try_into())
417            .collect::<Result<Vec<_>, _>>()?;
418
419        Ok(signed_entities)
420    }
421
422    async fn get_signed_cardano_database_snapshot_by_id(
423        &self,
424        signed_entity_id: &str,
425    ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
426        let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
427            .signed_entity_storer
428            .get_signed_entity(signed_entity_id)
429            .await
430            .with_context(|| {
431                format!(
432                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
433                )
434            })? {
435            Some(entity) => Some(entity.try_into()?),
436            None => None,
437        };
438
439        Ok(entity)
440    }
441
442    async fn get_last_signed_mithril_stake_distributions(
443        &self,
444        total: usize,
445    ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
446        let signed_entities = self
447            .get_last_signed_entities(
448                total,
449                &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
450            )
451            .await?
452            .into_iter()
453            .map(|record| record.try_into())
454            .collect::<Result<Vec<_>, _>>()?;
455
456        Ok(signed_entities)
457    }
458
459    async fn get_signed_mithril_stake_distribution_by_id(
460        &self,
461        signed_entity_id: &str,
462    ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
463        let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
464            .signed_entity_storer
465            .get_signed_entity(signed_entity_id)
466            .await
467            .with_context(|| {
468                format!(
469                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
470                )
471            })? {
472            Some(entity) => Some(entity.try_into()?),
473            None => None,
474        };
475
476        Ok(entity)
477    }
478
479    async fn get_last_cardano_transaction_snapshot(
480        &self,
481    ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
482        let mut signed_entities_records = self
483            .get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
484            .await?;
485
486        match signed_entities_records.pop() {
487            Some(record) => Ok(Some(record.try_into()?)),
488            None => Ok(None),
489        }
490    }
491
492    async fn get_last_signed_cardano_stake_distributions(
493        &self,
494        total: usize,
495    ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
496        let signed_entities = self
497            .get_last_signed_entities(
498                total,
499                &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
500            )
501            .await?
502            .into_iter()
503            .map(|record| record.try_into())
504            .collect::<Result<Vec<_>, _>>()?;
505
506        Ok(signed_entities)
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use std::{sync::atomic::Ordering, time::Duration};
513
514    use mithril_common::{
515        entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
516        signable_builder,
517        test_utils::fake_data,
518    };
519    use mithril_metric::CounterValue;
520    use serde::{de::DeserializeOwned, Serialize};
521    use std::sync::atomic::AtomicBool;
522
523    use crate::artifact_builder::MockArtifactBuilder;
524    use crate::database::repository::MockSignedEntityStorer;
525    use crate::test_tools::TestLogger;
526
527    use super::*;
528
529    fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
530        MithrilStakeDistribution::new(
531            epoch,
532            fake_data::signers_with_stakes(signers),
533            &fake_data::protocol_parameters(),
534        )
535    }
536
537    fn create_cardano_stake_distribution(
538        epoch: Epoch,
539        stake_distribution: StakeDistribution,
540    ) -> CardanoStakeDistribution {
541        CardanoStakeDistribution::new(epoch, stake_distribution)
542    }
543
544    fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
545    where
546        T: Serialize + DeserializeOwned,
547    {
548        let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
549        assert_eq!(
550            serde_json::to_string(&expected).unwrap(),
551            serde_json::to_string(&current).unwrap()
552        );
553    }
554
555    /// Struct that create mocks needed in tests and build objects injecting them.
556    struct MockDependencyInjector {
557        mock_signed_entity_storer: MockSignedEntityStorer,
558        mock_mithril_stake_distribution_artifact_builder:
559            MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
560        mock_cardano_immutable_files_full_artifact_builder:
561            MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
562        mock_cardano_transactions_artifact_builder:
563            MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
564        mock_cardano_stake_distribution_artifact_builder:
565            MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
566        mock_cardano_database_artifact_builder:
567            MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
568    }
569
570    impl MockDependencyInjector {
571        fn new() -> MockDependencyInjector {
572            MockDependencyInjector {
573                mock_signed_entity_storer: MockSignedEntityStorer::new(),
574                mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
575                    Epoch,
576                    MithrilStakeDistribution,
577                >::new(),
578                mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
579                    CardanoDbBeacon,
580                    Snapshot,
581                >::new(),
582                mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
583                    BlockNumber,
584                    CardanoTransactionsSnapshot,
585                >::new(),
586                mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
587                    Epoch,
588                    CardanoStakeDistribution,
589                >::new(),
590                mock_cardano_database_artifact_builder: MockArtifactBuilder::<
591                    CardanoDbBeacon,
592                    CardanoDatabaseSnapshot,
593                >::new(),
594            }
595        }
596
597        fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
598            let dependencies = SignedEntityServiceArtifactsDependencies::new(
599                Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
600                Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
601                Arc::new(self.mock_cardano_transactions_artifact_builder),
602                Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
603                Arc::new(self.mock_cardano_database_artifact_builder),
604            );
605            MithrilSignedEntityService::new(
606                Arc::new(self.mock_signed_entity_storer),
607                dependencies,
608                Arc::new(SignedEntityTypeLock::default()),
609                Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
610                TestLogger::stdout(),
611            )
612        }
613
614        fn build_artifact_builder_service_with_time_consuming_process(
615            mut self,
616            atomic_stop: Arc<AtomicBool>,
617        ) -> MithrilSignedEntityService {
618            struct LongArtifactBuilder {
619                atomic_stop: Arc<AtomicBool>,
620                snapshot: Snapshot,
621            }
622
623            let snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
624
625            #[async_trait]
626            impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
627                async fn compute_artifact(
628                    &self,
629                    _beacon: CardanoDbBeacon,
630                    _certificate: &Certificate,
631                ) -> StdResult<Snapshot> {
632                    let mut max_iteration = 100;
633                    while !self.atomic_stop.load(Ordering::Relaxed) {
634                        max_iteration -= 1;
635                        if max_iteration <= 0 {
636                            return Err(anyhow!("Test should handle the stop"));
637                        }
638                        tokio::time::sleep(Duration::from_millis(10)).await;
639                    }
640                    Ok(self.snapshot.clone())
641                }
642            }
643            let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
644                atomic_stop: atomic_stop.clone(),
645                snapshot: snapshot.clone(),
646            };
647
648            let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
649            let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
650            self.mock_signed_entity_storer
651                .expect_store_signed_entity()
652                .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
653                .return_once(|_| Ok(()));
654
655            let dependencies = SignedEntityServiceArtifactsDependencies::new(
656                Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
657                Arc::new(cardano_immutable_files_full_long_artifact_builder),
658                Arc::new(self.mock_cardano_transactions_artifact_builder),
659                Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
660                Arc::new(self.mock_cardano_database_artifact_builder),
661            );
662            MithrilSignedEntityService::new(
663                Arc::new(self.mock_signed_entity_storer),
664                dependencies,
665                Arc::new(SignedEntityTypeLock::default()),
666                Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
667                TestLogger::stdout(),
668            )
669        }
670
671        fn mock_artifact_processing<
672            T: Artifact + Clone + Serialize + 'static,
673            U: signable_builder::Beacon,
674        >(
675            &mut self,
676            artifact: T,
677            mock_that_provide_artifact: &dyn Fn(
678                &mut MockDependencyInjector,
679            ) -> &mut MockArtifactBuilder<U, T>,
680        ) {
681            {
682                let artifact_cloned = artifact.clone();
683                mock_that_provide_artifact(self)
684                    .expect_compute_artifact()
685                    .times(1)
686                    .return_once(|_, _| Ok(artifact_cloned));
687            }
688            {
689                let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
690                let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
691                self.mock_signed_entity_storer
692                    .expect_store_signed_entity()
693                    .withf(move |signed_entity| signed_entity.artifact == artifact_json)
694                    .return_once(|_| Ok(()));
695            }
696        }
697
698        fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
699            self.mock_artifact_processing(artifact, &|mock_injector| {
700                &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
701            });
702        }
703    }
704
705    fn get_artifact_total_produced_metric_since_startup_counter_value(
706        metrics_service: Arc<MetricsService>,
707        signed_entity_type: &SignedEntityType,
708    ) -> CounterValue {
709        match signed_entity_type {
710            SignedEntityType::MithrilStakeDistribution(_) => metrics_service
711                .get_artifact_mithril_stake_distribution_total_produced_since_startup()
712                .get(),
713            SignedEntityType::CardanoImmutableFilesFull(_) => metrics_service
714                .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
715                .get(),
716            SignedEntityType::CardanoStakeDistribution(_) => metrics_service
717                .get_artifact_cardano_stake_distribution_total_produced_since_startup()
718                .get(),
719            SignedEntityType::CardanoTransactions(_, _) => metrics_service
720                .get_artifact_cardano_transaction_total_produced_since_startup()
721                .get(),
722            SignedEntityType::CardanoDatabase(_) => metrics_service
723                .get_artifact_cardano_database_total_produced_since_startup()
724                .get(),
725        }
726    }
727
728    #[tokio::test]
729    async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type(
730    ) {
731        let mut mock_container = MockDependencyInjector::new();
732
733        let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
734
735        mock_container
736            .mock_mithril_stake_distribution_artifact_builder
737            .expect_compute_artifact()
738            .times(1)
739            .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
740
741        let artifact_builder_service = mock_container.build_artifact_builder_service();
742
743        let certificate = fake_data::certificate("hash".to_string());
744        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
745        let artifact = artifact_builder_service
746            .compute_artifact(signed_entity_type.clone(), &certificate)
747            .await
748            .unwrap();
749
750        assert_expected(&mithril_stake_distribution_expected, &artifact);
751    }
752
753    #[tokio::test]
754    async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
755        generic_test_that_the_artifact_is_stored(
756            SignedEntityType::MithrilStakeDistribution(Epoch(1)),
757            create_stake_distribution(Epoch(1), 5),
758            &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
759        )
760        .await;
761    }
762
763    #[tokio::test]
764    async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type(
765    ) {
766        let mut mock_container = MockDependencyInjector::new();
767
768        let cardano_stake_distribution_expected = create_cardano_stake_distribution(
769            Epoch(1),
770            StakeDistribution::from([("pool-1".to_string(), 100)]),
771        );
772
773        mock_container
774            .mock_cardano_stake_distribution_artifact_builder
775            .expect_compute_artifact()
776            .times(1)
777            .returning(|_, _| {
778                Ok(create_cardano_stake_distribution(
779                    Epoch(1),
780                    StakeDistribution::from([("pool-1".to_string(), 100)]),
781                ))
782            });
783
784        let artifact_builder_service = mock_container.build_artifact_builder_service();
785
786        let certificate = fake_data::certificate("hash".to_string());
787        let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
788        let artifact = artifact_builder_service
789            .compute_artifact(signed_entity_type.clone(), &certificate)
790            .await
791            .unwrap();
792
793        assert_expected(&cardano_stake_distribution_expected, &artifact);
794    }
795
796    #[tokio::test]
797    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
798        generic_test_that_the_artifact_is_stored(
799            SignedEntityType::CardanoStakeDistribution(Epoch(1)),
800            create_cardano_stake_distribution(
801                Epoch(1),
802                StakeDistribution::from([("pool-1".to_string(), 100)]),
803            ),
804            &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
805        )
806        .await;
807    }
808
809    #[tokio::test]
810    async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
811        let mut mock_container = MockDependencyInjector::new();
812
813        let snapshot_expected = fake_data::snapshots(1).first().unwrap().to_owned();
814
815        mock_container
816            .mock_cardano_immutable_files_full_artifact_builder
817            .expect_compute_artifact()
818            .times(1)
819            .returning(|_, _| Ok(fake_data::snapshots(1).first().unwrap().to_owned()));
820
821        let artifact_builder_service = mock_container.build_artifact_builder_service();
822
823        let certificate = fake_data::certificate("hash".to_string());
824        let signed_entity_type =
825            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
826        let artifact = artifact_builder_service
827            .compute_artifact(signed_entity_type.clone(), &certificate)
828            .await
829            .unwrap();
830
831        assert_expected(&snapshot_expected, &artifact);
832    }
833
834    #[tokio::test]
835    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
836        generic_test_that_the_artifact_is_stored(
837            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
838            fake_data::snapshots(1).first().unwrap().to_owned(),
839            &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
840        )
841        .await;
842    }
843
844    #[tokio::test]
845    async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
846        let mut mock_container = MockDependencyInjector::new();
847
848        let block_number = BlockNumber(151);
849        let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
850
851        mock_container
852            .mock_cardano_transactions_artifact_builder
853            .expect_compute_artifact()
854            .times(1)
855            .returning(move |_, _| {
856                Ok(CardanoTransactionsSnapshot::new(
857                    "merkle_root".to_string(),
858                    block_number,
859                ))
860            });
861
862        let artifact_builder_service = mock_container.build_artifact_builder_service();
863
864        let certificate = fake_data::certificate("hash".to_string());
865        let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
866        let artifact = artifact_builder_service
867            .compute_artifact(signed_entity_type.clone(), &certificate)
868            .await
869            .unwrap();
870
871        assert_expected(&expected, &artifact);
872    }
873
874    #[tokio::test]
875    async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
876        let block_number = BlockNumber(149);
877        generic_test_that_the_artifact_is_stored(
878            SignedEntityType::CardanoTransactions(Epoch(1), block_number),
879            CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
880            &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
881        )
882        .await;
883    }
884
885    #[tokio::test]
886    async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
887        let mut mock_container = MockDependencyInjector::new();
888
889        let cardano_database_expected = fake_data::cardano_database_snapshots(1)
890            .first()
891            .unwrap()
892            .to_owned();
893
894        mock_container
895            .mock_cardano_database_artifact_builder
896            .expect_compute_artifact()
897            .times(1)
898            .returning(|_, _| {
899                Ok(fake_data::cardano_database_snapshots(1)
900                    .first()
901                    .unwrap()
902                    .to_owned())
903            });
904
905        let artifact_builder_service = mock_container.build_artifact_builder_service();
906
907        let certificate = fake_data::certificate("hash".to_string());
908        let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
909        let artifact = artifact_builder_service
910            .compute_artifact(signed_entity_type.clone(), &certificate)
911            .await
912            .unwrap();
913
914        assert_expected(&cardano_database_expected, &artifact);
915    }
916
917    #[tokio::test]
918    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
919        generic_test_that_the_artifact_is_stored(
920            SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
921            fake_data::cardano_database_snapshots(1)
922                .first()
923                .unwrap()
924                .to_owned(),
925            &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
926        )
927        .await;
928    }
929
930    async fn generic_test_that_the_artifact_is_stored<
931        T: Artifact + Clone + Serialize + 'static,
932        U: signable_builder::Beacon,
933    >(
934        signed_entity_type: SignedEntityType,
935        artifact: T,
936        mock_that_provide_artifact: &dyn Fn(
937            &mut MockDependencyInjector,
938        ) -> &mut MockArtifactBuilder<U, T>,
939    ) {
940        let mut mock_container = MockDependencyInjector::new();
941        {
942            let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
943            let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
944            mock_container
945                .mock_signed_entity_storer
946                .expect_store_signed_entity()
947                .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
948                .return_once(|_| Ok(()));
949        }
950        {
951            let artifact_cloned = artifact.clone();
952            mock_that_provide_artifact(&mut mock_container)
953                .expect_compute_artifact()
954                .times(1)
955                .return_once(|_, _| Ok(artifact_cloned));
956        }
957        let artifact_builder_service = mock_container.build_artifact_builder_service();
958
959        let certificate = fake_data::certificate("hash".to_string());
960        let error_message = format!(
961            "Create artifact should not fail for {} signed entity",
962            std::any::type_name::<T>()
963        );
964        let error_message_str = error_message.as_str();
965
966        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
967            artifact_builder_service.metrics_service.clone(),
968            &signed_entity_type,
969        );
970
971        artifact_builder_service
972            .create_artifact_task(signed_entity_type.clone(), &certificate)
973            .await
974            .expect(error_message_str);
975
976        assert_eq!(
977            initial_counter_value + 1,
978            get_artifact_total_produced_metric_since_startup_counter_value(
979                artifact_builder_service.metrics_service.clone(),
980                &signed_entity_type,
981            )
982        )
983    }
984
985    #[tokio::test]
986    async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
987        let atomic_stop = Arc::new(AtomicBool::new(false));
988        let signed_entity_type_service = {
989            let mut mock_container = MockDependencyInjector::new();
990
991            let msd = create_stake_distribution(Epoch(1), 5);
992            mock_container.mock_stake_distribution_processing(msd);
993
994            mock_container
995                .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
996        };
997        let certificate = fake_data::certificate("hash".to_string());
998
999        let signed_entity_type_immutable =
1000            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1001        let first_task_that_never_finished = signed_entity_type_service
1002            .create_artifact(signed_entity_type_immutable, &certificate)
1003            .await
1004            .unwrap();
1005
1006        let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
1007        let second_task_that_finish_first = signed_entity_type_service
1008            .create_artifact(signed_entity_type_msd, &certificate)
1009            .await
1010            .unwrap();
1011
1012        second_task_that_finish_first.await.unwrap().unwrap();
1013        assert!(!first_task_that_never_finished.is_finished());
1014
1015        atomic_stop.swap(true, Ordering::Relaxed);
1016    }
1017
1018    #[tokio::test]
1019    async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1020        let atomic_stop = Arc::new(AtomicBool::new(false));
1021        let signed_entity_type_service = MockDependencyInjector::new()
1022            .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1023        let certificate = fake_data::certificate("hash".to_string());
1024
1025        let signed_entity_type_immutable =
1026            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1027        assert!(
1028            !signed_entity_type_service
1029                .signed_entity_type_lock
1030                .is_locked(&signed_entity_type_immutable)
1031                .await
1032        );
1033        let join_handle = signed_entity_type_service
1034            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1035            .await
1036            .unwrap();
1037
1038        // Results are stored to finalize the task before assertions,
1039        // ensuring 'atomic_stop' is always assigned a new value.
1040        let is_locked = signed_entity_type_service
1041            .signed_entity_type_lock
1042            .is_locked(&signed_entity_type_immutable)
1043            .await;
1044        let is_finished = join_handle.is_finished();
1045
1046        atomic_stop.swap(true, Ordering::Relaxed);
1047        join_handle.await.unwrap().unwrap();
1048
1049        assert!(is_locked);
1050        assert!(!is_finished);
1051
1052        assert!(
1053            !signed_entity_type_service
1054                .signed_entity_type_lock
1055                .is_locked(&signed_entity_type_immutable)
1056                .await
1057        );
1058    }
1059
1060    #[tokio::test]
1061    async fn create_artifact_unlock_signed_entity_type_when_error() {
1062        let signed_entity_type_service = {
1063            let mut mock_container = MockDependencyInjector::new();
1064            mock_container
1065                .mock_cardano_immutable_files_full_artifact_builder
1066                .expect_compute_artifact()
1067                .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1068
1069            mock_container.build_artifact_builder_service()
1070        };
1071        let certificate = fake_data::certificate("hash".to_string());
1072
1073        let signed_entity_type_immutable =
1074            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1075
1076        let join_handle = signed_entity_type_service
1077            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1078            .await
1079            .unwrap();
1080
1081        let error = join_handle.await.unwrap().unwrap_err();
1082        assert!(
1083            error.to_string().contains("CardanoImmutableFilesFull"),
1084            "Error should contains CardanoImmutableFilesFull but was: {error}"
1085        );
1086
1087        assert!(
1088            !signed_entity_type_service
1089                .signed_entity_type_lock
1090                .is_locked(&signed_entity_type_immutable)
1091                .await
1092        );
1093    }
1094
1095    #[tokio::test]
1096    async fn create_artifact_unlock_signed_entity_type_when_panic() {
1097        let signed_entity_type_service =
1098            MockDependencyInjector::new().build_artifact_builder_service();
1099        let certificate = fake_data::certificate("hash".to_string());
1100
1101        let signed_entity_type_immutable =
1102            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1103
1104        let join_handle = signed_entity_type_service
1105            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1106            .await
1107            .unwrap();
1108
1109        let error = join_handle.await.unwrap().unwrap_err();
1110        assert!(
1111            error.to_string().contains("CardanoImmutableFilesFull"),
1112            "Error should contains CardanoImmutableFilesFull but was: {error}"
1113        );
1114
1115        assert!(
1116            !signed_entity_type_service
1117                .signed_entity_type_lock
1118                .is_locked(&signed_entity_type_immutable)
1119                .await
1120        );
1121    }
1122
1123    #[tokio::test]
1124    async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1125        let atomic_stop = Arc::new(AtomicBool::new(false));
1126        let signed_entity_service = MockDependencyInjector::new()
1127            .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1128        let certificate = fake_data::certificate("hash".to_string());
1129        let signed_entity_type_immutable =
1130            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1131
1132        signed_entity_service
1133            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1134            .await
1135            .unwrap();
1136
1137        signed_entity_service
1138            .create_artifact(signed_entity_type_immutable, &certificate)
1139            .await
1140            .expect_err("Should return error when signed entity type is already locked");
1141
1142        atomic_stop.swap(true, Ordering::Relaxed);
1143    }
1144
1145    #[tokio::test]
1146    async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1147        let signed_entity_service = {
1148            let mut mock_container = MockDependencyInjector::new();
1149            mock_container
1150                .mock_cardano_immutable_files_full_artifact_builder
1151                .expect_compute_artifact()
1152                .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1153
1154            mock_container.build_artifact_builder_service()
1155        };
1156
1157        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1158
1159        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1160            signed_entity_service.metrics_service.clone(),
1161            &signed_entity_type,
1162        );
1163
1164        signed_entity_service
1165            .create_artifact(
1166                signed_entity_type.clone(),
1167                &fake_data::certificate("hash".to_string()),
1168            )
1169            .await
1170            .unwrap();
1171
1172        assert_eq!(
1173            initial_counter_value,
1174            get_artifact_total_produced_metric_since_startup_counter_value(
1175                signed_entity_service.metrics_service.clone(),
1176                &signed_entity_type,
1177            )
1178        );
1179    }
1180
1181    #[tokio::test]
1182    async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1183        let signed_entity_service = {
1184            let mut mock_container = MockDependencyInjector::new();
1185            mock_container
1186                .mock_signed_entity_storer
1187                .expect_store_signed_entity()
1188                .returning(|_| Err(anyhow!("Error while storing signed entity")));
1189
1190            mock_container.build_artifact_builder_service()
1191        };
1192
1193        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1194
1195        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1196            signed_entity_service.metrics_service.clone(),
1197            &signed_entity_type,
1198        );
1199
1200        signed_entity_service
1201            .create_artifact(
1202                signed_entity_type.clone(),
1203                &fake_data::certificate("hash".to_string()),
1204            )
1205            .await
1206            .unwrap();
1207
1208        assert_eq!(
1209            initial_counter_value,
1210            get_artifact_total_produced_metric_since_startup_counter_value(
1211                signed_entity_service.metrics_service.clone(),
1212                &signed_entity_type,
1213            )
1214        );
1215    }
1216}