mithril_aggregator/services/
signed_entity.rs

1//! ## SignedEntityService
2//!
3//! This service is responsible for dealing with [SignedEntity] type.
4//! It creates [Artifact] that can be accessed by clients.
5use anyhow::{Context, anyhow};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{Logger, info, warn};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13    StdResult,
14    entities::{
15        BlockNumber, CardanoDatabaseSnapshot, CardanoDbBeacon, CardanoStakeDistribution,
16        CardanoTransactionsSnapshot, Certificate, Epoch, MithrilStakeDistribution,
17        SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
18    },
19    logging::LoggerExtensions,
20    signable_builder::{Artifact, SignedEntity},
21};
22
23use crate::{
24    MetricsService,
25    artifact_builder::ArtifactBuilder,
26    database::{record::SignedEntityRecord, repository::SignedEntityStorer},
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30/// ArtifactBuilder Service trait
31#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34    /// Create artifact for a signed entity type and a certificate
35    async fn create_artifact(
36        &self,
37        signed_entity_type: SignedEntityType,
38        certificate: &Certificate,
39    ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41    /// Return a list of signed snapshots order by creation date descending.
42    async fn get_last_signed_snapshots(
43        &self,
44        total: usize,
45    ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47    /// Return a signed snapshot
48    async fn get_signed_snapshot_by_id(
49        &self,
50        signed_entity_id: &str,
51    ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53    /// Return a list of Cardano Database snapshots order by creation date descending.
54    async fn get_last_signed_cardano_database_snapshots(
55        &self,
56        total: usize,
57    ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59    /// Return a Cardano Database snapshot
60    async fn get_signed_cardano_database_snapshot_by_id(
61        &self,
62        signed_entity_id: &str,
63    ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65    /// Return a list of signed Mithril stake distribution ordered by creation
66    /// date descending.
67    async fn get_last_signed_mithril_stake_distributions(
68        &self,
69        total: usize,
70    ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72    /// Return a signed Mithril stake distribution
73    async fn get_signed_mithril_stake_distribution_by_id(
74        &self,
75        signed_entity_id: &str,
76    ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78    /// Return the last signed Cardano Transaction Snapshot.
79    async fn get_last_cardano_transaction_snapshot(
80        &self,
81    ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83    /// Return a list of signed Cardano stake distribution ordered by creation
84    /// date descending.
85    async fn get_last_signed_cardano_stake_distributions(
86        &self,
87        total: usize,
88    ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
89}
90
91/// Mithril ArtifactBuilder Service
92#[derive(Clone)]
93pub struct MithrilSignedEntityService {
94    signed_entity_storer: Arc<dyn SignedEntityStorer>,
95    mithril_stake_distribution_artifact_builder:
96        Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
97    cardano_immutable_files_full_artifact_builder:
98        Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
99    cardano_transactions_artifact_builder:
100        Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
101    signed_entity_type_lock: Arc<SignedEntityTypeLock>,
102    cardano_stake_distribution_artifact_builder:
103        Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
104    cardano_database_artifact_builder:
105        Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
106    metrics_service: Arc<MetricsService>,
107    logger: Logger,
108}
109
110/// ArtifactsBuilder dependencies required by the [MithrilSignedEntityService].
111pub struct SignedEntityServiceArtifactsDependencies {
112    mithril_stake_distribution_artifact_builder:
113        Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
114    cardano_immutable_files_full_artifact_builder:
115        Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
116    cardano_transactions_artifact_builder:
117        Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
118    cardano_stake_distribution_artifact_builder:
119        Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
120    cardano_database_artifact_builder:
121        Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
122}
123
124impl SignedEntityServiceArtifactsDependencies {
125    /// Create a new instance of [SignedEntityServiceArtifactsDependencies].
126    pub fn new(
127        mithril_stake_distribution_artifact_builder: Arc<
128            dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
129        >,
130        cardano_immutable_files_full_artifact_builder: Arc<
131            dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
132        >,
133        cardano_transactions_artifact_builder: Arc<
134            dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
135        >,
136        cardano_stake_distribution_artifact_builder: Arc<
137            dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
138        >,
139        cardano_database_artifact_builder: Arc<
140            dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
141        >,
142    ) -> Self {
143        Self {
144            mithril_stake_distribution_artifact_builder,
145            cardano_immutable_files_full_artifact_builder,
146            cardano_transactions_artifact_builder,
147            cardano_stake_distribution_artifact_builder,
148            cardano_database_artifact_builder,
149        }
150    }
151}
152
153impl MithrilSignedEntityService {
154    /// MithrilSignedEntityService factory
155    pub fn new(
156        signed_entity_storer: Arc<dyn SignedEntityStorer>,
157        dependencies: SignedEntityServiceArtifactsDependencies,
158        signed_entity_type_lock: Arc<SignedEntityTypeLock>,
159        metrics_service: Arc<MetricsService>,
160        logger: Logger,
161    ) -> Self {
162        Self {
163            signed_entity_storer,
164            mithril_stake_distribution_artifact_builder: dependencies
165                .mithril_stake_distribution_artifact_builder,
166            cardano_immutable_files_full_artifact_builder: dependencies
167                .cardano_immutable_files_full_artifact_builder,
168            cardano_transactions_artifact_builder: dependencies
169                .cardano_transactions_artifact_builder,
170            cardano_stake_distribution_artifact_builder: dependencies
171                .cardano_stake_distribution_artifact_builder,
172            cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
173            signed_entity_type_lock,
174            metrics_service,
175            logger: logger.new_with_component_name::<Self>(),
176        }
177    }
178
179    async fn create_artifact_task(
180        &self,
181        signed_entity_type: SignedEntityType,
182        certificate: &Certificate,
183    ) -> StdResult<()> {
184        info!(
185            self.logger, ">> create_artifact_task";
186            "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
187        );
188
189        let mut remaining_retries = 2;
190        let artifact = loop {
191            remaining_retries -= 1;
192
193            match self.compute_artifact(signed_entity_type.clone(), certificate).await {
194                Err(error) if remaining_retries == 0 => break Err(error),
195                Err(_error) => (),
196                Ok(artifact) => break Ok(artifact),
197            };
198        }?;
199
200        let signed_entity = SignedEntityRecord {
201            signed_entity_id: artifact.get_id(),
202            signed_entity_type: signed_entity_type.clone(),
203            certificate_id: certificate.hash.clone(),
204            artifact: serde_json::to_string(&artifact)?,
205            created_at: Utc::now(),
206        };
207
208        self.signed_entity_storer
209            .store_signed_entity(&signed_entity)
210            .await
211            .with_context(|| {
212                format!(
213                    "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
214                )
215            })?;
216
217        self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
218
219        Ok(())
220    }
221
222    /// Compute artifact from signed entity type
223    async fn compute_artifact(
224        &self,
225        signed_entity_type: SignedEntityType,
226        certificate: &Certificate,
227    ) -> StdResult<Arc<dyn Artifact>> {
228        match signed_entity_type.clone() {
229            SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
230                self.mithril_stake_distribution_artifact_builder
231                    .compute_artifact(epoch, certificate)
232                    .await
233                    .with_context(|| {
234                        format!(
235                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
236                        )
237                    })?,
238            )),
239            SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
240                self.cardano_immutable_files_full_artifact_builder
241                    .compute_artifact(beacon.clone(), certificate)
242                    .await
243                    .with_context(|| {
244                        format!(
245                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
246                        )
247                    })?,
248            )),
249            SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
250                self.cardano_stake_distribution_artifact_builder
251                .compute_artifact(epoch, certificate)
252                .await
253                .with_context(|| {
254                    format!(
255                        "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
256                    )
257                })?)),
258            SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
259                self.cardano_transactions_artifact_builder
260                    .compute_artifact(block_number, certificate)
261                    .await
262                    .with_context(|| {
263                        format!(
264                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
265                        )
266                    })?,
267            )),
268            SignedEntityType::CardanoBlocksTransactions(_epoch, _block_number) =>
269                Err(anyhow!("Cardano blocks transactions is not supported yet")),
270            SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
271                self.cardano_database_artifact_builder
272                    .compute_artifact(beacon, certificate)
273                    .await
274                    .with_context(|| {
275                        format!(
276                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
277                        )
278                    })?
279            )),
280        }
281    }
282
283    async fn get_last_signed_entities(
284        &self,
285        total: usize,
286        discriminants: &SignedEntityTypeDiscriminants,
287    ) -> StdResult<Vec<SignedEntityRecord>> {
288        self.signed_entity_storer
289            .get_last_signed_entities_by_type(discriminants, total)
290            .await
291            .with_context(|| {
292                format!(
293                    "Signed Entity Service can not get last signed entities with type: '{discriminants:?}'"
294                )
295            })
296    }
297
298    fn increment_artifact_total_produced_metric_since_startup(
299        &self,
300        signed_entity_type: SignedEntityType,
301    ) {
302        let metrics = self.metrics_service.clone();
303        let metric_counter = match signed_entity_type {
304            SignedEntityType::MithrilStakeDistribution(..) => {
305                metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
306            }
307            SignedEntityType::CardanoImmutableFilesFull(..) => {
308                metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
309            }
310            SignedEntityType::CardanoStakeDistribution(..) => {
311                metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
312            }
313            SignedEntityType::CardanoTransactions(..) => {
314                metrics.get_artifact_cardano_transaction_total_produced_since_startup()
315            }
316            SignedEntityType::CardanoBlocksTransactions(..) => {
317                metrics.get_artifact_cardano_blocks_transactions_total_produced_since_startup()
318            }
319            SignedEntityType::CardanoDatabase(..) => {
320                metrics.get_artifact_cardano_database_total_produced_since_startup()
321            }
322        };
323
324        metric_counter.increment();
325    }
326}
327
328#[async_trait]
329impl SignedEntityService for MithrilSignedEntityService {
330    async fn create_artifact(
331        &self,
332        signed_entity_type: SignedEntityType,
333        certificate: &Certificate,
334    ) -> StdResult<JoinHandle<StdResult<()>>> {
335        if self.signed_entity_type_lock.is_locked(&signed_entity_type).await {
336            return Err(anyhow!(
337                "Signed entity type '{:?}' is already locked",
338                signed_entity_type
339            ));
340        }
341
342        let service = self.clone();
343        let certificate_cloned = certificate.clone();
344        service.signed_entity_type_lock.lock(&signed_entity_type).await;
345
346        Ok(tokio::task::spawn(async move {
347            let signed_entity_type_clone = signed_entity_type.clone();
348            let service_clone = service.clone();
349            let result = tokio::task::spawn(async move {
350                service_clone
351                    .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
352                    .await
353            })
354            .await;
355            service
356                .signed_entity_type_lock
357                .release(signed_entity_type.clone())
358                .await;
359
360            result.with_context(|| format!(
361                "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
362            ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
363        }))
364    }
365
366    async fn get_last_signed_snapshots(
367        &self,
368        total: usize,
369    ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
370        let signed_entities = self
371            .get_last_signed_entities(
372                total,
373                &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
374            )
375            .await?
376            .into_iter()
377            .map(|record| record.try_into())
378            .collect::<Result<Vec<_>, _>>()?;
379
380        Ok(signed_entities)
381    }
382
383    async fn get_signed_snapshot_by_id(
384        &self,
385        signed_entity_id: &str,
386    ) -> StdResult<Option<SignedEntity<Snapshot>>> {
387        let entity: Option<SignedEntity<Snapshot>> = match self
388            .signed_entity_storer
389            .get_signed_entity(signed_entity_id)
390            .await
391            .with_context(|| {
392                format!(
393                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
394                )
395            })? {
396            Some(entity) => Some(entity.try_into()?),
397            None => None,
398        };
399
400        Ok(entity)
401    }
402
403    async fn get_last_signed_cardano_database_snapshots(
404        &self,
405        total: usize,
406    ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
407        let signed_entities = self
408            .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
409            .await?
410            .into_iter()
411            .map(|record| record.try_into())
412            .collect::<Result<Vec<_>, _>>()?;
413
414        Ok(signed_entities)
415    }
416
417    async fn get_signed_cardano_database_snapshot_by_id(
418        &self,
419        signed_entity_id: &str,
420    ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
421        let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
422            .signed_entity_storer
423            .get_signed_entity(signed_entity_id)
424            .await
425            .with_context(|| {
426                format!(
427                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
428                )
429            })? {
430            Some(entity) => Some(entity.try_into()?),
431            None => None,
432        };
433
434        Ok(entity)
435    }
436
437    async fn get_last_signed_mithril_stake_distributions(
438        &self,
439        total: usize,
440    ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
441        let signed_entities = self
442            .get_last_signed_entities(
443                total,
444                &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
445            )
446            .await?
447            .into_iter()
448            .map(|record| record.try_into())
449            .collect::<Result<Vec<_>, _>>()?;
450
451        Ok(signed_entities)
452    }
453
454    async fn get_signed_mithril_stake_distribution_by_id(
455        &self,
456        signed_entity_id: &str,
457    ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
458        let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
459            .signed_entity_storer
460            .get_signed_entity(signed_entity_id)
461            .await
462            .with_context(|| {
463                format!(
464                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
465                )
466            })? {
467            Some(entity) => Some(entity.try_into()?),
468            None => None,
469        };
470
471        Ok(entity)
472    }
473
474    async fn get_last_cardano_transaction_snapshot(
475        &self,
476    ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
477        let mut signed_entities_records = self
478            .get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
479            .await?;
480
481        match signed_entities_records.pop() {
482            Some(record) => Ok(Some(record.try_into()?)),
483            None => Ok(None),
484        }
485    }
486
487    async fn get_last_signed_cardano_stake_distributions(
488        &self,
489        total: usize,
490    ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
491        let signed_entities = self
492            .get_last_signed_entities(
493                total,
494                &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
495            )
496            .await?
497            .into_iter()
498            .map(|record| record.try_into())
499            .collect::<Result<Vec<_>, _>>()?;
500
501        Ok(signed_entities)
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use std::{sync::atomic::Ordering, time::Duration};
508
509    use mithril_common::{
510        entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
511        signable_builder,
512        test::double::fake_data,
513    };
514    use mithril_metric::CounterValue;
515    use serde::{Serialize, de::DeserializeOwned};
516    use std::sync::atomic::AtomicBool;
517
518    use crate::artifact_builder::MockArtifactBuilder;
519    use crate::database::repository::MockSignedEntityStorer;
520    use crate::test::TestLogger;
521
522    use super::*;
523
524    fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
525        MithrilStakeDistribution::new(
526            epoch,
527            fake_data::signers_with_stakes(signers),
528            &fake_data::protocol_parameters(),
529        )
530    }
531
532    fn create_cardano_stake_distribution(
533        epoch: Epoch,
534        stake_distribution: StakeDistribution,
535    ) -> CardanoStakeDistribution {
536        CardanoStakeDistribution::new(epoch, stake_distribution)
537    }
538
539    fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
540    where
541        T: Serialize + DeserializeOwned,
542    {
543        let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
544        assert_eq!(
545            serde_json::to_string(&expected).unwrap(),
546            serde_json::to_string(&current).unwrap()
547        );
548    }
549
550    /// Struct that create mocks needed in tests and build objects injecting them.
551    struct MockDependencyInjector {
552        mock_signed_entity_storer: MockSignedEntityStorer,
553        mock_mithril_stake_distribution_artifact_builder:
554            MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
555        mock_cardano_immutable_files_full_artifact_builder:
556            MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
557        mock_cardano_transactions_artifact_builder:
558            MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
559        mock_cardano_stake_distribution_artifact_builder:
560            MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
561        mock_cardano_database_artifact_builder:
562            MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
563    }
564
565    impl MockDependencyInjector {
566        fn new() -> MockDependencyInjector {
567            MockDependencyInjector {
568                mock_signed_entity_storer: MockSignedEntityStorer::new(),
569                mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
570                    Epoch,
571                    MithrilStakeDistribution,
572                >::new(),
573                mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
574                    CardanoDbBeacon,
575                    Snapshot,
576                >::new(),
577                mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
578                    BlockNumber,
579                    CardanoTransactionsSnapshot,
580                >::new(),
581                mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
582                    Epoch,
583                    CardanoStakeDistribution,
584                >::new(),
585                mock_cardano_database_artifact_builder: MockArtifactBuilder::<
586                    CardanoDbBeacon,
587                    CardanoDatabaseSnapshot,
588                >::new(),
589            }
590        }
591
592        fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
593            let dependencies = SignedEntityServiceArtifactsDependencies::new(
594                Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
595                Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
596                Arc::new(self.mock_cardano_transactions_artifact_builder),
597                Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
598                Arc::new(self.mock_cardano_database_artifact_builder),
599            );
600            MithrilSignedEntityService::new(
601                Arc::new(self.mock_signed_entity_storer),
602                dependencies,
603                Arc::new(SignedEntityTypeLock::default()),
604                Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
605                TestLogger::stdout(),
606            )
607        }
608
609        fn build_artifact_builder_service_with_time_consuming_process(
610            mut self,
611            atomic_stop: Arc<AtomicBool>,
612        ) -> MithrilSignedEntityService {
613            struct LongArtifactBuilder {
614                atomic_stop: Arc<AtomicBool>,
615                snapshot: Snapshot,
616            }
617
618            let snapshot = fake_data::snapshot(1);
619
620            #[async_trait]
621            impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
622                async fn compute_artifact(
623                    &self,
624                    _beacon: CardanoDbBeacon,
625                    _certificate: &Certificate,
626                ) -> StdResult<Snapshot> {
627                    let mut max_iteration = 100;
628                    while !self.atomic_stop.load(Ordering::Relaxed) {
629                        max_iteration -= 1;
630                        if max_iteration <= 0 {
631                            return Err(anyhow!("Test should handle the stop"));
632                        }
633                        tokio::time::sleep(Duration::from_millis(10)).await;
634                    }
635                    Ok(self.snapshot.clone())
636                }
637            }
638            let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
639                atomic_stop: atomic_stop.clone(),
640                snapshot: snapshot.clone(),
641            };
642
643            let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
644            let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
645            self.mock_signed_entity_storer
646                .expect_store_signed_entity()
647                .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
648                .return_once(|_| Ok(()));
649
650            let dependencies = SignedEntityServiceArtifactsDependencies::new(
651                Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
652                Arc::new(cardano_immutable_files_full_long_artifact_builder),
653                Arc::new(self.mock_cardano_transactions_artifact_builder),
654                Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
655                Arc::new(self.mock_cardano_database_artifact_builder),
656            );
657            MithrilSignedEntityService::new(
658                Arc::new(self.mock_signed_entity_storer),
659                dependencies,
660                Arc::new(SignedEntityTypeLock::default()),
661                Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
662                TestLogger::stdout(),
663            )
664        }
665
666        fn mock_artifact_processing<
667            T: Artifact + Clone + Serialize + 'static,
668            U: signable_builder::Beacon,
669        >(
670            &mut self,
671            artifact: T,
672            mock_that_provide_artifact: &dyn Fn(
673                &mut MockDependencyInjector,
674            ) -> &mut MockArtifactBuilder<U, T>,
675        ) {
676            {
677                let artifact_cloned = artifact.clone();
678                mock_that_provide_artifact(self)
679                    .expect_compute_artifact()
680                    .times(1)
681                    .return_once(|_, _| Ok(artifact_cloned));
682            }
683            {
684                let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
685                let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
686                self.mock_signed_entity_storer
687                    .expect_store_signed_entity()
688                    .withf(move |signed_entity| signed_entity.artifact == artifact_json)
689                    .return_once(|_| Ok(()));
690            }
691        }
692
693        fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
694            self.mock_artifact_processing(artifact, &|mock_injector| {
695                &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
696            });
697        }
698    }
699
700    fn get_artifact_total_produced_metric_since_startup_counter_value(
701        metrics_service: Arc<MetricsService>,
702        signed_entity_type: &SignedEntityType,
703    ) -> CounterValue {
704        match signed_entity_type {
705            SignedEntityType::MithrilStakeDistribution(..) => metrics_service
706                .get_artifact_mithril_stake_distribution_total_produced_since_startup()
707                .get(),
708            SignedEntityType::CardanoImmutableFilesFull(..) => metrics_service
709                .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
710                .get(),
711            SignedEntityType::CardanoStakeDistribution(..) => metrics_service
712                .get_artifact_cardano_stake_distribution_total_produced_since_startup()
713                .get(),
714            SignedEntityType::CardanoTransactions(..) => metrics_service
715                .get_artifact_cardano_transaction_total_produced_since_startup()
716                .get(),
717            SignedEntityType::CardanoBlocksTransactions(..) => metrics_service
718                .get_artifact_cardano_blocks_transactions_total_produced_since_startup()
719                .get(),
720            SignedEntityType::CardanoDatabase(..) => metrics_service
721                .get_artifact_cardano_database_total_produced_since_startup()
722                .get(),
723        }
724    }
725
726    #[tokio::test]
727    async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type()
728     {
729        let mut mock_container = MockDependencyInjector::new();
730
731        let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
732
733        mock_container
734            .mock_mithril_stake_distribution_artifact_builder
735            .expect_compute_artifact()
736            .times(1)
737            .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
738
739        let artifact_builder_service = mock_container.build_artifact_builder_service();
740
741        let certificate = fake_data::certificate("hash".to_string());
742        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
743        let artifact = artifact_builder_service
744            .compute_artifact(signed_entity_type.clone(), &certificate)
745            .await
746            .unwrap();
747
748        assert_expected(&mithril_stake_distribution_expected, &artifact);
749    }
750
751    #[tokio::test]
752    async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
753        generic_test_that_the_artifact_is_stored(
754            SignedEntityType::MithrilStakeDistribution(Epoch(1)),
755            create_stake_distribution(Epoch(1), 5),
756            &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
757        )
758        .await;
759    }
760
761    #[tokio::test]
762    async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type()
763     {
764        let mut mock_container = MockDependencyInjector::new();
765
766        let cardano_stake_distribution_expected = create_cardano_stake_distribution(
767            Epoch(1),
768            StakeDistribution::from([("pool-1".to_string(), 100)]),
769        );
770
771        mock_container
772            .mock_cardano_stake_distribution_artifact_builder
773            .expect_compute_artifact()
774            .times(1)
775            .returning(|_, _| {
776                Ok(create_cardano_stake_distribution(
777                    Epoch(1),
778                    StakeDistribution::from([("pool-1".to_string(), 100)]),
779                ))
780            });
781
782        let artifact_builder_service = mock_container.build_artifact_builder_service();
783
784        let certificate = fake_data::certificate("hash".to_string());
785        let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
786        let artifact = artifact_builder_service
787            .compute_artifact(signed_entity_type.clone(), &certificate)
788            .await
789            .unwrap();
790
791        assert_expected(&cardano_stake_distribution_expected, &artifact);
792    }
793
794    #[tokio::test]
795    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
796        generic_test_that_the_artifact_is_stored(
797            SignedEntityType::CardanoStakeDistribution(Epoch(1)),
798            create_cardano_stake_distribution(
799                Epoch(1),
800                StakeDistribution::from([("pool-1".to_string(), 100)]),
801            ),
802            &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
803        )
804        .await;
805    }
806
807    #[tokio::test]
808    async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
809        let mut mock_container = MockDependencyInjector::new();
810
811        let snapshot_expected = fake_data::snapshot(1);
812
813        mock_container
814            .mock_cardano_immutable_files_full_artifact_builder
815            .expect_compute_artifact()
816            .times(1)
817            .returning(|_, _| Ok(fake_data::snapshot(1)));
818
819        let artifact_builder_service = mock_container.build_artifact_builder_service();
820
821        let certificate = fake_data::certificate("hash".to_string());
822        let signed_entity_type =
823            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
824        let artifact = artifact_builder_service
825            .compute_artifact(signed_entity_type.clone(), &certificate)
826            .await
827            .unwrap();
828
829        assert_expected(&snapshot_expected, &artifact);
830    }
831
832    #[tokio::test]
833    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
834        generic_test_that_the_artifact_is_stored(
835            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
836            fake_data::snapshot(1),
837            &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
838        )
839        .await;
840    }
841
842    #[tokio::test]
843    async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
844        let mut mock_container = MockDependencyInjector::new();
845
846        let block_number = BlockNumber(151);
847        let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
848
849        mock_container
850            .mock_cardano_transactions_artifact_builder
851            .expect_compute_artifact()
852            .times(1)
853            .returning(move |_, _| {
854                Ok(CardanoTransactionsSnapshot::new(
855                    "merkle_root".to_string(),
856                    block_number,
857                ))
858            });
859
860        let artifact_builder_service = mock_container.build_artifact_builder_service();
861
862        let certificate = fake_data::certificate("hash".to_string());
863        let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
864        let artifact = artifact_builder_service
865            .compute_artifact(signed_entity_type.clone(), &certificate)
866            .await
867            .unwrap();
868
869        assert_expected(&expected, &artifact);
870    }
871
872    #[tokio::test]
873    async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
874        let block_number = BlockNumber(149);
875        generic_test_that_the_artifact_is_stored(
876            SignedEntityType::CardanoTransactions(Epoch(1), block_number),
877            CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
878            &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
879        )
880        .await;
881    }
882
883    #[tokio::test]
884    async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
885        let mut mock_container = MockDependencyInjector::new();
886
887        let cardano_database_expected = fake_data::cardano_database_snapshot(1);
888
889        mock_container
890            .mock_cardano_database_artifact_builder
891            .expect_compute_artifact()
892            .times(1)
893            .returning(|_, _| Ok(fake_data::cardano_database_snapshot(1)));
894
895        let artifact_builder_service = mock_container.build_artifact_builder_service();
896
897        let certificate = fake_data::certificate("hash".to_string());
898        let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
899        let artifact = artifact_builder_service
900            .compute_artifact(signed_entity_type.clone(), &certificate)
901            .await
902            .unwrap();
903
904        assert_expected(&cardano_database_expected, &artifact);
905    }
906
907    #[tokio::test]
908    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
909        generic_test_that_the_artifact_is_stored(
910            SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
911            fake_data::cardano_database_snapshot(1),
912            &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
913        )
914        .await;
915    }
916
917    async fn generic_test_that_the_artifact_is_stored<
918        T: Artifact + Clone + Serialize + 'static,
919        U: signable_builder::Beacon,
920    >(
921        signed_entity_type: SignedEntityType,
922        artifact: T,
923        mock_that_provide_artifact: &dyn Fn(
924            &mut MockDependencyInjector,
925        ) -> &mut MockArtifactBuilder<U, T>,
926    ) {
927        let mut mock_container = MockDependencyInjector::new();
928        {
929            let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
930            let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
931            mock_container
932                .mock_signed_entity_storer
933                .expect_store_signed_entity()
934                .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
935                .return_once(|_| Ok(()));
936        }
937        {
938            let artifact_cloned = artifact.clone();
939            mock_that_provide_artifact(&mut mock_container)
940                .expect_compute_artifact()
941                .times(1)
942                .return_once(|_, _| Ok(artifact_cloned));
943        }
944        let artifact_builder_service = mock_container.build_artifact_builder_service();
945
946        let certificate = fake_data::certificate("hash".to_string());
947        let error_message = format!(
948            "Create artifact should not fail for {} signed entity",
949            std::any::type_name::<T>()
950        );
951        let error_message_str = error_message.as_str();
952
953        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
954            artifact_builder_service.metrics_service.clone(),
955            &signed_entity_type,
956        );
957
958        artifact_builder_service
959            .create_artifact_task(signed_entity_type.clone(), &certificate)
960            .await
961            .expect(error_message_str);
962
963        assert_eq!(
964            initial_counter_value + 1,
965            get_artifact_total_produced_metric_since_startup_counter_value(
966                artifact_builder_service.metrics_service.clone(),
967                &signed_entity_type,
968            )
969        )
970    }
971
972    #[tokio::test]
973    async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
974        let atomic_stop = Arc::new(AtomicBool::new(false));
975        let signed_entity_type_service = {
976            let mut mock_container = MockDependencyInjector::new();
977
978            let msd = create_stake_distribution(Epoch(1), 5);
979            mock_container.mock_stake_distribution_processing(msd);
980
981            mock_container
982                .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
983        };
984        let certificate = fake_data::certificate("hash".to_string());
985
986        let signed_entity_type_immutable =
987            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
988        let first_task_that_never_finished = signed_entity_type_service
989            .create_artifact(signed_entity_type_immutable, &certificate)
990            .await
991            .unwrap();
992
993        let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
994        let second_task_that_finish_first = signed_entity_type_service
995            .create_artifact(signed_entity_type_msd, &certificate)
996            .await
997            .unwrap();
998
999        second_task_that_finish_first.await.unwrap().unwrap();
1000        assert!(!first_task_that_never_finished.is_finished());
1001
1002        atomic_stop.swap(true, Ordering::Relaxed);
1003    }
1004
1005    #[tokio::test]
1006    async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1007        let atomic_stop = Arc::new(AtomicBool::new(false));
1008        let signed_entity_type_service = MockDependencyInjector::new()
1009            .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1010        let certificate = fake_data::certificate("hash".to_string());
1011
1012        let signed_entity_type_immutable =
1013            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1014        assert!(
1015            !signed_entity_type_service
1016                .signed_entity_type_lock
1017                .is_locked(&signed_entity_type_immutable)
1018                .await
1019        );
1020        let join_handle = signed_entity_type_service
1021            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1022            .await
1023            .unwrap();
1024
1025        // Results are stored to finalize the task before assertions,
1026        // ensuring 'atomic_stop' is always assigned a new value.
1027        let is_locked = signed_entity_type_service
1028            .signed_entity_type_lock
1029            .is_locked(&signed_entity_type_immutable)
1030            .await;
1031        let is_finished = join_handle.is_finished();
1032
1033        atomic_stop.swap(true, Ordering::Relaxed);
1034        join_handle.await.unwrap().unwrap();
1035
1036        assert!(is_locked);
1037        assert!(!is_finished);
1038
1039        assert!(
1040            !signed_entity_type_service
1041                .signed_entity_type_lock
1042                .is_locked(&signed_entity_type_immutable)
1043                .await
1044        );
1045    }
1046
1047    #[tokio::test]
1048    async fn create_artifact_unlock_signed_entity_type_when_error() {
1049        let signed_entity_type_service = {
1050            let mut mock_container = MockDependencyInjector::new();
1051            mock_container
1052                .mock_cardano_immutable_files_full_artifact_builder
1053                .expect_compute_artifact()
1054                .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1055
1056            mock_container.build_artifact_builder_service()
1057        };
1058        let certificate = fake_data::certificate("hash".to_string());
1059
1060        let signed_entity_type_immutable =
1061            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1062
1063        let join_handle = signed_entity_type_service
1064            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1065            .await
1066            .unwrap();
1067
1068        let error = join_handle.await.unwrap().unwrap_err();
1069        assert!(
1070            error.to_string().contains("CardanoImmutableFilesFull"),
1071            "Error should contains CardanoImmutableFilesFull but was: {error}"
1072        );
1073
1074        assert!(
1075            !signed_entity_type_service
1076                .signed_entity_type_lock
1077                .is_locked(&signed_entity_type_immutable)
1078                .await
1079        );
1080    }
1081
1082    #[tokio::test]
1083    async fn create_artifact_unlock_signed_entity_type_when_panic() {
1084        let signed_entity_type_service =
1085            MockDependencyInjector::new().build_artifact_builder_service();
1086        let certificate = fake_data::certificate("hash".to_string());
1087
1088        let signed_entity_type_immutable =
1089            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1090
1091        let join_handle = signed_entity_type_service
1092            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1093            .await
1094            .unwrap();
1095
1096        let error = join_handle.await.unwrap().unwrap_err();
1097        assert!(
1098            error.to_string().contains("CardanoImmutableFilesFull"),
1099            "Error should contains CardanoImmutableFilesFull but was: {error}"
1100        );
1101
1102        assert!(
1103            !signed_entity_type_service
1104                .signed_entity_type_lock
1105                .is_locked(&signed_entity_type_immutable)
1106                .await
1107        );
1108    }
1109
1110    #[tokio::test]
1111    async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1112        let atomic_stop = Arc::new(AtomicBool::new(false));
1113        let signed_entity_service = MockDependencyInjector::new()
1114            .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1115        let certificate = fake_data::certificate("hash".to_string());
1116        let signed_entity_type_immutable =
1117            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1118
1119        signed_entity_service
1120            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1121            .await
1122            .unwrap();
1123
1124        signed_entity_service
1125            .create_artifact(signed_entity_type_immutable, &certificate)
1126            .await
1127            .expect_err("Should return error when signed entity type is already locked");
1128
1129        atomic_stop.swap(true, Ordering::Relaxed);
1130    }
1131
1132    #[tokio::test]
1133    async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1134        let signed_entity_service = {
1135            let mut mock_container = MockDependencyInjector::new();
1136            mock_container
1137                .mock_cardano_immutable_files_full_artifact_builder
1138                .expect_compute_artifact()
1139                .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1140
1141            mock_container.build_artifact_builder_service()
1142        };
1143
1144        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1145
1146        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1147            signed_entity_service.metrics_service.clone(),
1148            &signed_entity_type,
1149        );
1150
1151        signed_entity_service
1152            .create_artifact(
1153                signed_entity_type.clone(),
1154                &fake_data::certificate("hash".to_string()),
1155            )
1156            .await
1157            .unwrap();
1158
1159        assert_eq!(
1160            initial_counter_value,
1161            get_artifact_total_produced_metric_since_startup_counter_value(
1162                signed_entity_service.metrics_service.clone(),
1163                &signed_entity_type,
1164            )
1165        );
1166    }
1167
1168    #[tokio::test]
1169    async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1170        let signed_entity_service = {
1171            let mut mock_container = MockDependencyInjector::new();
1172            mock_container
1173                .mock_signed_entity_storer
1174                .expect_store_signed_entity()
1175                .returning(|_| Err(anyhow!("Error while storing signed entity")));
1176
1177            mock_container.build_artifact_builder_service()
1178        };
1179
1180        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1181
1182        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1183            signed_entity_service.metrics_service.clone(),
1184            &signed_entity_type,
1185        );
1186
1187        signed_entity_service
1188            .create_artifact(
1189                signed_entity_type.clone(),
1190                &fake_data::certificate("hash".to_string()),
1191            )
1192            .await
1193            .unwrap();
1194
1195        assert_eq!(
1196            initial_counter_value,
1197            get_artifact_total_produced_metric_since_startup_counter_value(
1198                signed_entity_service.metrics_service.clone(),
1199                &signed_entity_type,
1200            )
1201        );
1202    }
1203}