mithril_aggregator/services/
signed_entity.rs

1//! ## SignedEntityService
2//!
3//! This service is responsible for dealing with [SignedEntity] type.
4//! It creates [Artifact] that can be accessed by clients.
5use anyhow::{Context, anyhow};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{Logger, info, warn};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13    StdResult,
14    entities::{
15        BlockNumber, CardanoDatabaseSnapshot, CardanoDbBeacon, CardanoStakeDistribution,
16        CardanoTransactionsSnapshot, Certificate, Epoch, MithrilStakeDistribution,
17        SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
18    },
19    logging::LoggerExtensions,
20    signable_builder::{Artifact, SignedEntity},
21};
22
23use crate::{
24    MetricsService,
25    artifact_builder::ArtifactBuilder,
26    database::{record::SignedEntityRecord, repository::SignedEntityStorer},
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30/// ArtifactBuilder Service trait
31#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34    /// Create artifact for a signed entity type and a certificate
35    async fn create_artifact(
36        &self,
37        signed_entity_type: SignedEntityType,
38        certificate: &Certificate,
39    ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41    /// Return a list of signed snapshots order by creation date descending.
42    async fn get_last_signed_snapshots(
43        &self,
44        total: usize,
45    ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47    /// Return a signed snapshot
48    async fn get_signed_snapshot_by_id(
49        &self,
50        signed_entity_id: &str,
51    ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53    /// Return a list of Cardano Database snapshots order by creation date descending.
54    async fn get_last_signed_cardano_database_snapshots(
55        &self,
56        total: usize,
57    ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59    /// Return a Cardano Database snapshot
60    async fn get_signed_cardano_database_snapshot_by_id(
61        &self,
62        signed_entity_id: &str,
63    ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65    /// Return a list of signed Mithril stake distribution ordered by creation
66    /// date descending.
67    async fn get_last_signed_mithril_stake_distributions(
68        &self,
69        total: usize,
70    ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72    /// Return a signed Mithril stake distribution
73    async fn get_signed_mithril_stake_distribution_by_id(
74        &self,
75        signed_entity_id: &str,
76    ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78    /// Return the last signed Cardano Transaction Snapshot.
79    async fn get_last_cardano_transaction_snapshot(
80        &self,
81    ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83    /// Return a list of signed Cardano stake distribution ordered by creation
84    /// date descending.
85    async fn get_last_signed_cardano_stake_distributions(
86        &self,
87        total: usize,
88    ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
89}
90
91/// Mithril ArtifactBuilder Service
92#[derive(Clone)]
93pub struct MithrilSignedEntityService {
94    signed_entity_storer: Arc<dyn SignedEntityStorer>,
95    mithril_stake_distribution_artifact_builder:
96        Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
97    cardano_immutable_files_full_artifact_builder:
98        Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
99    cardano_transactions_artifact_builder:
100        Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
101    signed_entity_type_lock: Arc<SignedEntityTypeLock>,
102    cardano_stake_distribution_artifact_builder:
103        Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
104    cardano_database_artifact_builder:
105        Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
106    metrics_service: Arc<MetricsService>,
107    logger: Logger,
108}
109
110/// ArtifactsBuilder dependencies required by the [MithrilSignedEntityService].
111pub struct SignedEntityServiceArtifactsDependencies {
112    mithril_stake_distribution_artifact_builder:
113        Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
114    cardano_immutable_files_full_artifact_builder:
115        Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
116    cardano_transactions_artifact_builder:
117        Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
118    cardano_stake_distribution_artifact_builder:
119        Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
120    cardano_database_artifact_builder:
121        Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
122}
123
124impl SignedEntityServiceArtifactsDependencies {
125    /// Create a new instance of [SignedEntityServiceArtifactsDependencies].
126    pub fn new(
127        mithril_stake_distribution_artifact_builder: Arc<
128            dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
129        >,
130        cardano_immutable_files_full_artifact_builder: Arc<
131            dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
132        >,
133        cardano_transactions_artifact_builder: Arc<
134            dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
135        >,
136        cardano_stake_distribution_artifact_builder: Arc<
137            dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
138        >,
139        cardano_database_artifact_builder: Arc<
140            dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
141        >,
142    ) -> Self {
143        Self {
144            mithril_stake_distribution_artifact_builder,
145            cardano_immutable_files_full_artifact_builder,
146            cardano_transactions_artifact_builder,
147            cardano_stake_distribution_artifact_builder,
148            cardano_database_artifact_builder,
149        }
150    }
151}
152
153impl MithrilSignedEntityService {
154    /// MithrilSignedEntityService factory
155    pub fn new(
156        signed_entity_storer: Arc<dyn SignedEntityStorer>,
157        dependencies: SignedEntityServiceArtifactsDependencies,
158        signed_entity_type_lock: Arc<SignedEntityTypeLock>,
159        metrics_service: Arc<MetricsService>,
160        logger: Logger,
161    ) -> Self {
162        Self {
163            signed_entity_storer,
164            mithril_stake_distribution_artifact_builder: dependencies
165                .mithril_stake_distribution_artifact_builder,
166            cardano_immutable_files_full_artifact_builder: dependencies
167                .cardano_immutable_files_full_artifact_builder,
168            cardano_transactions_artifact_builder: dependencies
169                .cardano_transactions_artifact_builder,
170            cardano_stake_distribution_artifact_builder: dependencies
171                .cardano_stake_distribution_artifact_builder,
172            cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
173            signed_entity_type_lock,
174            metrics_service,
175            logger: logger.new_with_component_name::<Self>(),
176        }
177    }
178
179    async fn create_artifact_task(
180        &self,
181        signed_entity_type: SignedEntityType,
182        certificate: &Certificate,
183    ) -> StdResult<()> {
184        info!(
185            self.logger, ">> create_artifact_task";
186            "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
187        );
188
189        let mut remaining_retries = 2;
190        let artifact = loop {
191            remaining_retries -= 1;
192
193            match self.compute_artifact(signed_entity_type.clone(), certificate).await {
194                Err(error) if remaining_retries == 0 => break Err(error),
195                Err(_error) => (),
196                Ok(artifact) => break Ok(artifact),
197            };
198        }?;
199
200        let signed_entity = SignedEntityRecord {
201            signed_entity_id: artifact.get_id(),
202            signed_entity_type: signed_entity_type.clone(),
203            certificate_id: certificate.hash.clone(),
204            artifact: serde_json::to_string(&artifact)?,
205            created_at: Utc::now(),
206        };
207
208        self.signed_entity_storer
209            .store_signed_entity(&signed_entity)
210            .await
211            .with_context(|| {
212                format!(
213                    "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
214                )
215            })?;
216
217        self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
218
219        Ok(())
220    }
221
222    /// Compute artifact from signed entity type
223    async fn compute_artifact(
224        &self,
225        signed_entity_type: SignedEntityType,
226        certificate: &Certificate,
227    ) -> StdResult<Arc<dyn Artifact>> {
228        match signed_entity_type.clone() {
229            SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
230                self.mithril_stake_distribution_artifact_builder
231                    .compute_artifact(epoch, certificate)
232                    .await
233                    .with_context(|| {
234                        format!(
235                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
236                        )
237                    })?,
238            )),
239            SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
240                self.cardano_immutable_files_full_artifact_builder
241                    .compute_artifact(beacon.clone(), certificate)
242                    .await
243                    .with_context(|| {
244                        format!(
245                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
246                        )
247                    })?,
248            )),
249            SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
250                self.cardano_stake_distribution_artifact_builder
251                .compute_artifact(epoch, certificate)
252                .await
253                .with_context(|| {
254                    format!(
255                        "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
256                    )
257                })?)),
258            SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
259                self.cardano_transactions_artifact_builder
260                    .compute_artifact(block_number, certificate)
261                    .await
262                    .with_context(|| {
263                        format!(
264                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
265                        )
266                    })?,
267            )),
268            SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
269                self.cardano_database_artifact_builder
270                    .compute_artifact(beacon, certificate)
271                    .await
272                    .with_context(|| {
273                        format!(
274                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
275                        )
276                    })?
277            )),
278        }
279    }
280
281    async fn get_last_signed_entities(
282        &self,
283        total: usize,
284        discriminants: &SignedEntityTypeDiscriminants,
285    ) -> StdResult<Vec<SignedEntityRecord>> {
286        self.signed_entity_storer
287            .get_last_signed_entities_by_type(discriminants, total)
288            .await
289            .with_context(|| {
290                format!(
291                    "Signed Entity Service can not get last signed entities with type: '{discriminants:?}'"
292                )
293            })
294    }
295
296    fn increment_artifact_total_produced_metric_since_startup(
297        &self,
298        signed_entity_type: SignedEntityType,
299    ) {
300        let metrics = self.metrics_service.clone();
301        let metric_counter = match signed_entity_type {
302            SignedEntityType::MithrilStakeDistribution(_) => {
303                metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
304            }
305            SignedEntityType::CardanoImmutableFilesFull(_) => {
306                metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
307            }
308            SignedEntityType::CardanoStakeDistribution(_) => {
309                metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
310            }
311            SignedEntityType::CardanoTransactions(_, _) => {
312                metrics.get_artifact_cardano_transaction_total_produced_since_startup()
313            }
314            SignedEntityType::CardanoDatabase(_) => {
315                metrics.get_artifact_cardano_database_total_produced_since_startup()
316            }
317        };
318
319        metric_counter.increment();
320    }
321}
322
323#[async_trait]
324impl SignedEntityService for MithrilSignedEntityService {
325    async fn create_artifact(
326        &self,
327        signed_entity_type: SignedEntityType,
328        certificate: &Certificate,
329    ) -> StdResult<JoinHandle<StdResult<()>>> {
330        if self.signed_entity_type_lock.is_locked(&signed_entity_type).await {
331            return Err(anyhow!(
332                "Signed entity type '{:?}' is already locked",
333                signed_entity_type
334            ));
335        }
336
337        let service = self.clone();
338        let certificate_cloned = certificate.clone();
339        service.signed_entity_type_lock.lock(&signed_entity_type).await;
340
341        Ok(tokio::task::spawn(async move {
342            let signed_entity_type_clone = signed_entity_type.clone();
343            let service_clone = service.clone();
344            let result = tokio::task::spawn(async move {
345                service_clone
346                    .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
347                    .await
348            })
349            .await;
350            service
351                .signed_entity_type_lock
352                .release(signed_entity_type.clone())
353                .await;
354
355            result.with_context(|| format!(
356                "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
357            ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
358        }))
359    }
360
361    async fn get_last_signed_snapshots(
362        &self,
363        total: usize,
364    ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
365        let signed_entities = self
366            .get_last_signed_entities(
367                total,
368                &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
369            )
370            .await?
371            .into_iter()
372            .map(|record| record.try_into())
373            .collect::<Result<Vec<_>, _>>()?;
374
375        Ok(signed_entities)
376    }
377
378    async fn get_signed_snapshot_by_id(
379        &self,
380        signed_entity_id: &str,
381    ) -> StdResult<Option<SignedEntity<Snapshot>>> {
382        let entity: Option<SignedEntity<Snapshot>> = match self
383            .signed_entity_storer
384            .get_signed_entity(signed_entity_id)
385            .await
386            .with_context(|| {
387                format!(
388                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
389                )
390            })? {
391            Some(entity) => Some(entity.try_into()?),
392            None => None,
393        };
394
395        Ok(entity)
396    }
397
398    async fn get_last_signed_cardano_database_snapshots(
399        &self,
400        total: usize,
401    ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
402        let signed_entities = self
403            .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
404            .await?
405            .into_iter()
406            .map(|record| record.try_into())
407            .collect::<Result<Vec<_>, _>>()?;
408
409        Ok(signed_entities)
410    }
411
412    async fn get_signed_cardano_database_snapshot_by_id(
413        &self,
414        signed_entity_id: &str,
415    ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
416        let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
417            .signed_entity_storer
418            .get_signed_entity(signed_entity_id)
419            .await
420            .with_context(|| {
421                format!(
422                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
423                )
424            })? {
425            Some(entity) => Some(entity.try_into()?),
426            None => None,
427        };
428
429        Ok(entity)
430    }
431
432    async fn get_last_signed_mithril_stake_distributions(
433        &self,
434        total: usize,
435    ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
436        let signed_entities = self
437            .get_last_signed_entities(
438                total,
439                &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
440            )
441            .await?
442            .into_iter()
443            .map(|record| record.try_into())
444            .collect::<Result<Vec<_>, _>>()?;
445
446        Ok(signed_entities)
447    }
448
449    async fn get_signed_mithril_stake_distribution_by_id(
450        &self,
451        signed_entity_id: &str,
452    ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
453        let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
454            .signed_entity_storer
455            .get_signed_entity(signed_entity_id)
456            .await
457            .with_context(|| {
458                format!(
459                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
460                )
461            })? {
462            Some(entity) => Some(entity.try_into()?),
463            None => None,
464        };
465
466        Ok(entity)
467    }
468
469    async fn get_last_cardano_transaction_snapshot(
470        &self,
471    ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
472        let mut signed_entities_records = self
473            .get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
474            .await?;
475
476        match signed_entities_records.pop() {
477            Some(record) => Ok(Some(record.try_into()?)),
478            None => Ok(None),
479        }
480    }
481
482    async fn get_last_signed_cardano_stake_distributions(
483        &self,
484        total: usize,
485    ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
486        let signed_entities = self
487            .get_last_signed_entities(
488                total,
489                &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
490            )
491            .await?
492            .into_iter()
493            .map(|record| record.try_into())
494            .collect::<Result<Vec<_>, _>>()?;
495
496        Ok(signed_entities)
497    }
498}
499
500#[cfg(test)]
501mod tests {
502    use std::{sync::atomic::Ordering, time::Duration};
503
504    use mithril_common::{
505        entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
506        signable_builder,
507        test_utils::fake_data,
508    };
509    use mithril_metric::CounterValue;
510    use serde::{Serialize, de::DeserializeOwned};
511    use std::sync::atomic::AtomicBool;
512
513    use crate::artifact_builder::MockArtifactBuilder;
514    use crate::database::repository::MockSignedEntityStorer;
515    use crate::test_tools::TestLogger;
516
517    use super::*;
518
519    fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
520        MithrilStakeDistribution::new(
521            epoch,
522            fake_data::signers_with_stakes(signers),
523            &fake_data::protocol_parameters(),
524        )
525    }
526
527    fn create_cardano_stake_distribution(
528        epoch: Epoch,
529        stake_distribution: StakeDistribution,
530    ) -> CardanoStakeDistribution {
531        CardanoStakeDistribution::new(epoch, stake_distribution)
532    }
533
534    fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
535    where
536        T: Serialize + DeserializeOwned,
537    {
538        let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
539        assert_eq!(
540            serde_json::to_string(&expected).unwrap(),
541            serde_json::to_string(&current).unwrap()
542        );
543    }
544
545    /// Struct that create mocks needed in tests and build objects injecting them.
546    struct MockDependencyInjector {
547        mock_signed_entity_storer: MockSignedEntityStorer,
548        mock_mithril_stake_distribution_artifact_builder:
549            MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
550        mock_cardano_immutable_files_full_artifact_builder:
551            MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
552        mock_cardano_transactions_artifact_builder:
553            MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
554        mock_cardano_stake_distribution_artifact_builder:
555            MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
556        mock_cardano_database_artifact_builder:
557            MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
558    }
559
560    impl MockDependencyInjector {
561        fn new() -> MockDependencyInjector {
562            MockDependencyInjector {
563                mock_signed_entity_storer: MockSignedEntityStorer::new(),
564                mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
565                    Epoch,
566                    MithrilStakeDistribution,
567                >::new(),
568                mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
569                    CardanoDbBeacon,
570                    Snapshot,
571                >::new(),
572                mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
573                    BlockNumber,
574                    CardanoTransactionsSnapshot,
575                >::new(),
576                mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
577                    Epoch,
578                    CardanoStakeDistribution,
579                >::new(),
580                mock_cardano_database_artifact_builder: MockArtifactBuilder::<
581                    CardanoDbBeacon,
582                    CardanoDatabaseSnapshot,
583                >::new(),
584            }
585        }
586
587        fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
588            let dependencies = SignedEntityServiceArtifactsDependencies::new(
589                Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
590                Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
591                Arc::new(self.mock_cardano_transactions_artifact_builder),
592                Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
593                Arc::new(self.mock_cardano_database_artifact_builder),
594            );
595            MithrilSignedEntityService::new(
596                Arc::new(self.mock_signed_entity_storer),
597                dependencies,
598                Arc::new(SignedEntityTypeLock::default()),
599                Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
600                TestLogger::stdout(),
601            )
602        }
603
604        fn build_artifact_builder_service_with_time_consuming_process(
605            mut self,
606            atomic_stop: Arc<AtomicBool>,
607        ) -> MithrilSignedEntityService {
608            struct LongArtifactBuilder {
609                atomic_stop: Arc<AtomicBool>,
610                snapshot: Snapshot,
611            }
612
613            let snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
614
615            #[async_trait]
616            impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
617                async fn compute_artifact(
618                    &self,
619                    _beacon: CardanoDbBeacon,
620                    _certificate: &Certificate,
621                ) -> StdResult<Snapshot> {
622                    let mut max_iteration = 100;
623                    while !self.atomic_stop.load(Ordering::Relaxed) {
624                        max_iteration -= 1;
625                        if max_iteration <= 0 {
626                            return Err(anyhow!("Test should handle the stop"));
627                        }
628                        tokio::time::sleep(Duration::from_millis(10)).await;
629                    }
630                    Ok(self.snapshot.clone())
631                }
632            }
633            let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
634                atomic_stop: atomic_stop.clone(),
635                snapshot: snapshot.clone(),
636            };
637
638            let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
639            let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
640            self.mock_signed_entity_storer
641                .expect_store_signed_entity()
642                .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
643                .return_once(|_| Ok(()));
644
645            let dependencies = SignedEntityServiceArtifactsDependencies::new(
646                Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
647                Arc::new(cardano_immutable_files_full_long_artifact_builder),
648                Arc::new(self.mock_cardano_transactions_artifact_builder),
649                Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
650                Arc::new(self.mock_cardano_database_artifact_builder),
651            );
652            MithrilSignedEntityService::new(
653                Arc::new(self.mock_signed_entity_storer),
654                dependencies,
655                Arc::new(SignedEntityTypeLock::default()),
656                Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
657                TestLogger::stdout(),
658            )
659        }
660
661        fn mock_artifact_processing<
662            T: Artifact + Clone + Serialize + 'static,
663            U: signable_builder::Beacon,
664        >(
665            &mut self,
666            artifact: T,
667            mock_that_provide_artifact: &dyn Fn(
668                &mut MockDependencyInjector,
669            ) -> &mut MockArtifactBuilder<U, T>,
670        ) {
671            {
672                let artifact_cloned = artifact.clone();
673                mock_that_provide_artifact(self)
674                    .expect_compute_artifact()
675                    .times(1)
676                    .return_once(|_, _| Ok(artifact_cloned));
677            }
678            {
679                let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
680                let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
681                self.mock_signed_entity_storer
682                    .expect_store_signed_entity()
683                    .withf(move |signed_entity| signed_entity.artifact == artifact_json)
684                    .return_once(|_| Ok(()));
685            }
686        }
687
688        fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
689            self.mock_artifact_processing(artifact, &|mock_injector| {
690                &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
691            });
692        }
693    }
694
695    fn get_artifact_total_produced_metric_since_startup_counter_value(
696        metrics_service: Arc<MetricsService>,
697        signed_entity_type: &SignedEntityType,
698    ) -> CounterValue {
699        match signed_entity_type {
700            SignedEntityType::MithrilStakeDistribution(_) => metrics_service
701                .get_artifact_mithril_stake_distribution_total_produced_since_startup()
702                .get(),
703            SignedEntityType::CardanoImmutableFilesFull(_) => metrics_service
704                .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
705                .get(),
706            SignedEntityType::CardanoStakeDistribution(_) => metrics_service
707                .get_artifact_cardano_stake_distribution_total_produced_since_startup()
708                .get(),
709            SignedEntityType::CardanoTransactions(_, _) => metrics_service
710                .get_artifact_cardano_transaction_total_produced_since_startup()
711                .get(),
712            SignedEntityType::CardanoDatabase(_) => metrics_service
713                .get_artifact_cardano_database_total_produced_since_startup()
714                .get(),
715        }
716    }
717
718    #[tokio::test]
719    async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type()
720     {
721        let mut mock_container = MockDependencyInjector::new();
722
723        let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
724
725        mock_container
726            .mock_mithril_stake_distribution_artifact_builder
727            .expect_compute_artifact()
728            .times(1)
729            .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
730
731        let artifact_builder_service = mock_container.build_artifact_builder_service();
732
733        let certificate = fake_data::certificate("hash".to_string());
734        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
735        let artifact = artifact_builder_service
736            .compute_artifact(signed_entity_type.clone(), &certificate)
737            .await
738            .unwrap();
739
740        assert_expected(&mithril_stake_distribution_expected, &artifact);
741    }
742
743    #[tokio::test]
744    async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
745        generic_test_that_the_artifact_is_stored(
746            SignedEntityType::MithrilStakeDistribution(Epoch(1)),
747            create_stake_distribution(Epoch(1), 5),
748            &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
749        )
750        .await;
751    }
752
753    #[tokio::test]
754    async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type()
755     {
756        let mut mock_container = MockDependencyInjector::new();
757
758        let cardano_stake_distribution_expected = create_cardano_stake_distribution(
759            Epoch(1),
760            StakeDistribution::from([("pool-1".to_string(), 100)]),
761        );
762
763        mock_container
764            .mock_cardano_stake_distribution_artifact_builder
765            .expect_compute_artifact()
766            .times(1)
767            .returning(|_, _| {
768                Ok(create_cardano_stake_distribution(
769                    Epoch(1),
770                    StakeDistribution::from([("pool-1".to_string(), 100)]),
771                ))
772            });
773
774        let artifact_builder_service = mock_container.build_artifact_builder_service();
775
776        let certificate = fake_data::certificate("hash".to_string());
777        let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
778        let artifact = artifact_builder_service
779            .compute_artifact(signed_entity_type.clone(), &certificate)
780            .await
781            .unwrap();
782
783        assert_expected(&cardano_stake_distribution_expected, &artifact);
784    }
785
786    #[tokio::test]
787    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
788        generic_test_that_the_artifact_is_stored(
789            SignedEntityType::CardanoStakeDistribution(Epoch(1)),
790            create_cardano_stake_distribution(
791                Epoch(1),
792                StakeDistribution::from([("pool-1".to_string(), 100)]),
793            ),
794            &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
795        )
796        .await;
797    }
798
799    #[tokio::test]
800    async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
801        let mut mock_container = MockDependencyInjector::new();
802
803        let snapshot_expected = fake_data::snapshots(1).first().unwrap().to_owned();
804
805        mock_container
806            .mock_cardano_immutable_files_full_artifact_builder
807            .expect_compute_artifact()
808            .times(1)
809            .returning(|_, _| Ok(fake_data::snapshots(1).first().unwrap().to_owned()));
810
811        let artifact_builder_service = mock_container.build_artifact_builder_service();
812
813        let certificate = fake_data::certificate("hash".to_string());
814        let signed_entity_type =
815            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
816        let artifact = artifact_builder_service
817            .compute_artifact(signed_entity_type.clone(), &certificate)
818            .await
819            .unwrap();
820
821        assert_expected(&snapshot_expected, &artifact);
822    }
823
824    #[tokio::test]
825    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
826        generic_test_that_the_artifact_is_stored(
827            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
828            fake_data::snapshots(1).first().unwrap().to_owned(),
829            &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
830        )
831        .await;
832    }
833
834    #[tokio::test]
835    async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
836        let mut mock_container = MockDependencyInjector::new();
837
838        let block_number = BlockNumber(151);
839        let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
840
841        mock_container
842            .mock_cardano_transactions_artifact_builder
843            .expect_compute_artifact()
844            .times(1)
845            .returning(move |_, _| {
846                Ok(CardanoTransactionsSnapshot::new(
847                    "merkle_root".to_string(),
848                    block_number,
849                ))
850            });
851
852        let artifact_builder_service = mock_container.build_artifact_builder_service();
853
854        let certificate = fake_data::certificate("hash".to_string());
855        let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
856        let artifact = artifact_builder_service
857            .compute_artifact(signed_entity_type.clone(), &certificate)
858            .await
859            .unwrap();
860
861        assert_expected(&expected, &artifact);
862    }
863
864    #[tokio::test]
865    async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
866        let block_number = BlockNumber(149);
867        generic_test_that_the_artifact_is_stored(
868            SignedEntityType::CardanoTransactions(Epoch(1), block_number),
869            CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
870            &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
871        )
872        .await;
873    }
874
875    #[tokio::test]
876    async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
877        let mut mock_container = MockDependencyInjector::new();
878
879        let cardano_database_expected =
880            fake_data::cardano_database_snapshots(1).first().unwrap().to_owned();
881
882        mock_container
883            .mock_cardano_database_artifact_builder
884            .expect_compute_artifact()
885            .times(1)
886            .returning(|_, _| {
887                Ok(fake_data::cardano_database_snapshots(1).first().unwrap().to_owned())
888            });
889
890        let artifact_builder_service = mock_container.build_artifact_builder_service();
891
892        let certificate = fake_data::certificate("hash".to_string());
893        let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
894        let artifact = artifact_builder_service
895            .compute_artifact(signed_entity_type.clone(), &certificate)
896            .await
897            .unwrap();
898
899        assert_expected(&cardano_database_expected, &artifact);
900    }
901
902    #[tokio::test]
903    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
904        generic_test_that_the_artifact_is_stored(
905            SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
906            fake_data::cardano_database_snapshots(1).first().unwrap().to_owned(),
907            &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
908        )
909        .await;
910    }
911
912    async fn generic_test_that_the_artifact_is_stored<
913        T: Artifact + Clone + Serialize + 'static,
914        U: signable_builder::Beacon,
915    >(
916        signed_entity_type: SignedEntityType,
917        artifact: T,
918        mock_that_provide_artifact: &dyn Fn(
919            &mut MockDependencyInjector,
920        ) -> &mut MockArtifactBuilder<U, T>,
921    ) {
922        let mut mock_container = MockDependencyInjector::new();
923        {
924            let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
925            let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
926            mock_container
927                .mock_signed_entity_storer
928                .expect_store_signed_entity()
929                .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
930                .return_once(|_| Ok(()));
931        }
932        {
933            let artifact_cloned = artifact.clone();
934            mock_that_provide_artifact(&mut mock_container)
935                .expect_compute_artifact()
936                .times(1)
937                .return_once(|_, _| Ok(artifact_cloned));
938        }
939        let artifact_builder_service = mock_container.build_artifact_builder_service();
940
941        let certificate = fake_data::certificate("hash".to_string());
942        let error_message = format!(
943            "Create artifact should not fail for {} signed entity",
944            std::any::type_name::<T>()
945        );
946        let error_message_str = error_message.as_str();
947
948        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
949            artifact_builder_service.metrics_service.clone(),
950            &signed_entity_type,
951        );
952
953        artifact_builder_service
954            .create_artifact_task(signed_entity_type.clone(), &certificate)
955            .await
956            .expect(error_message_str);
957
958        assert_eq!(
959            initial_counter_value + 1,
960            get_artifact_total_produced_metric_since_startup_counter_value(
961                artifact_builder_service.metrics_service.clone(),
962                &signed_entity_type,
963            )
964        )
965    }
966
967    #[tokio::test]
968    async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
969        let atomic_stop = Arc::new(AtomicBool::new(false));
970        let signed_entity_type_service = {
971            let mut mock_container = MockDependencyInjector::new();
972
973            let msd = create_stake_distribution(Epoch(1), 5);
974            mock_container.mock_stake_distribution_processing(msd);
975
976            mock_container
977                .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
978        };
979        let certificate = fake_data::certificate("hash".to_string());
980
981        let signed_entity_type_immutable =
982            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
983        let first_task_that_never_finished = signed_entity_type_service
984            .create_artifact(signed_entity_type_immutable, &certificate)
985            .await
986            .unwrap();
987
988        let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
989        let second_task_that_finish_first = signed_entity_type_service
990            .create_artifact(signed_entity_type_msd, &certificate)
991            .await
992            .unwrap();
993
994        second_task_that_finish_first.await.unwrap().unwrap();
995        assert!(!first_task_that_never_finished.is_finished());
996
997        atomic_stop.swap(true, Ordering::Relaxed);
998    }
999
1000    #[tokio::test]
1001    async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1002        let atomic_stop = Arc::new(AtomicBool::new(false));
1003        let signed_entity_type_service = MockDependencyInjector::new()
1004            .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1005        let certificate = fake_data::certificate("hash".to_string());
1006
1007        let signed_entity_type_immutable =
1008            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1009        assert!(
1010            !signed_entity_type_service
1011                .signed_entity_type_lock
1012                .is_locked(&signed_entity_type_immutable)
1013                .await
1014        );
1015        let join_handle = signed_entity_type_service
1016            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1017            .await
1018            .unwrap();
1019
1020        // Results are stored to finalize the task before assertions,
1021        // ensuring 'atomic_stop' is always assigned a new value.
1022        let is_locked = signed_entity_type_service
1023            .signed_entity_type_lock
1024            .is_locked(&signed_entity_type_immutable)
1025            .await;
1026        let is_finished = join_handle.is_finished();
1027
1028        atomic_stop.swap(true, Ordering::Relaxed);
1029        join_handle.await.unwrap().unwrap();
1030
1031        assert!(is_locked);
1032        assert!(!is_finished);
1033
1034        assert!(
1035            !signed_entity_type_service
1036                .signed_entity_type_lock
1037                .is_locked(&signed_entity_type_immutable)
1038                .await
1039        );
1040    }
1041
1042    #[tokio::test]
1043    async fn create_artifact_unlock_signed_entity_type_when_error() {
1044        let signed_entity_type_service = {
1045            let mut mock_container = MockDependencyInjector::new();
1046            mock_container
1047                .mock_cardano_immutable_files_full_artifact_builder
1048                .expect_compute_artifact()
1049                .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1050
1051            mock_container.build_artifact_builder_service()
1052        };
1053        let certificate = fake_data::certificate("hash".to_string());
1054
1055        let signed_entity_type_immutable =
1056            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1057
1058        let join_handle = signed_entity_type_service
1059            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1060            .await
1061            .unwrap();
1062
1063        let error = join_handle.await.unwrap().unwrap_err();
1064        assert!(
1065            error.to_string().contains("CardanoImmutableFilesFull"),
1066            "Error should contains CardanoImmutableFilesFull but was: {error}"
1067        );
1068
1069        assert!(
1070            !signed_entity_type_service
1071                .signed_entity_type_lock
1072                .is_locked(&signed_entity_type_immutable)
1073                .await
1074        );
1075    }
1076
1077    #[tokio::test]
1078    async fn create_artifact_unlock_signed_entity_type_when_panic() {
1079        let signed_entity_type_service =
1080            MockDependencyInjector::new().build_artifact_builder_service();
1081        let certificate = fake_data::certificate("hash".to_string());
1082
1083        let signed_entity_type_immutable =
1084            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1085
1086        let join_handle = signed_entity_type_service
1087            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1088            .await
1089            .unwrap();
1090
1091        let error = join_handle.await.unwrap().unwrap_err();
1092        assert!(
1093            error.to_string().contains("CardanoImmutableFilesFull"),
1094            "Error should contains CardanoImmutableFilesFull but was: {error}"
1095        );
1096
1097        assert!(
1098            !signed_entity_type_service
1099                .signed_entity_type_lock
1100                .is_locked(&signed_entity_type_immutable)
1101                .await
1102        );
1103    }
1104
1105    #[tokio::test]
1106    async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1107        let atomic_stop = Arc::new(AtomicBool::new(false));
1108        let signed_entity_service = MockDependencyInjector::new()
1109            .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1110        let certificate = fake_data::certificate("hash".to_string());
1111        let signed_entity_type_immutable =
1112            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1113
1114        signed_entity_service
1115            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1116            .await
1117            .unwrap();
1118
1119        signed_entity_service
1120            .create_artifact(signed_entity_type_immutable, &certificate)
1121            .await
1122            .expect_err("Should return error when signed entity type is already locked");
1123
1124        atomic_stop.swap(true, Ordering::Relaxed);
1125    }
1126
1127    #[tokio::test]
1128    async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1129        let signed_entity_service = {
1130            let mut mock_container = MockDependencyInjector::new();
1131            mock_container
1132                .mock_cardano_immutable_files_full_artifact_builder
1133                .expect_compute_artifact()
1134                .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1135
1136            mock_container.build_artifact_builder_service()
1137        };
1138
1139        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1140
1141        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1142            signed_entity_service.metrics_service.clone(),
1143            &signed_entity_type,
1144        );
1145
1146        signed_entity_service
1147            .create_artifact(
1148                signed_entity_type.clone(),
1149                &fake_data::certificate("hash".to_string()),
1150            )
1151            .await
1152            .unwrap();
1153
1154        assert_eq!(
1155            initial_counter_value,
1156            get_artifact_total_produced_metric_since_startup_counter_value(
1157                signed_entity_service.metrics_service.clone(),
1158                &signed_entity_type,
1159            )
1160        );
1161    }
1162
1163    #[tokio::test]
1164    async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1165        let signed_entity_service = {
1166            let mut mock_container = MockDependencyInjector::new();
1167            mock_container
1168                .mock_signed_entity_storer
1169                .expect_store_signed_entity()
1170                .returning(|_| Err(anyhow!("Error while storing signed entity")));
1171
1172            mock_container.build_artifact_builder_service()
1173        };
1174
1175        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1176
1177        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1178            signed_entity_service.metrics_service.clone(),
1179            &signed_entity_type,
1180        );
1181
1182        signed_entity_service
1183            .create_artifact(
1184                signed_entity_type.clone(),
1185                &fake_data::certificate("hash".to_string()),
1186            )
1187            .await
1188            .unwrap();
1189
1190        assert_eq!(
1191            initial_counter_value,
1192            get_artifact_total_produced_metric_since_startup_counter_value(
1193                signed_entity_service.metrics_service.clone(),
1194                &signed_entity_type,
1195            )
1196        );
1197    }
1198}