mithril_aggregator/services/
signed_entity.rs

1//! ## SignedEntityService
2//!
3//! This service is responsible for dealing with [SignedEntity] type.
4//! It creates [Artifact] that can be accessed by clients.
5use anyhow::{anyhow, Context};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{info, warn, Logger};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13    entities::{
14        BlockNumber, CardanoDatabaseSnapshot, CardanoDbBeacon, CardanoStakeDistribution,
15        CardanoTransactionsSnapshot, Certificate, Epoch, MithrilStakeDistribution,
16        SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
17    },
18    logging::LoggerExtensions,
19    signable_builder::{Artifact, SignedEntity},
20    StdResult,
21};
22
23use crate::{
24    artifact_builder::ArtifactBuilder,
25    database::{record::SignedEntityRecord, repository::SignedEntityStorer},
26    MetricsService,
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30/// ArtifactBuilder Service trait
31#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34    /// Create artifact for a signed entity type and a certificate
35    async fn create_artifact(
36        &self,
37        signed_entity_type: SignedEntityType,
38        certificate: &Certificate,
39    ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41    /// Return a list of signed snapshots order by creation date descending.
42    async fn get_last_signed_snapshots(
43        &self,
44        total: usize,
45    ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47    /// Return a signed snapshot
48    async fn get_signed_snapshot_by_id(
49        &self,
50        signed_entity_id: &str,
51    ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53    /// Return a list of Cardano Database snapshots order by creation date descending.
54    async fn get_last_signed_cardano_database_snapshots(
55        &self,
56        total: usize,
57    ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59    /// Return a Cardano Database snapshot
60    async fn get_signed_cardano_database_snapshot_by_id(
61        &self,
62        signed_entity_id: &str,
63    ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65    /// Return a list of signed Mithril stake distribution ordered by creation
66    /// date descending.
67    async fn get_last_signed_mithril_stake_distributions(
68        &self,
69        total: usize,
70    ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72    /// Return a signed Mithril stake distribution
73    async fn get_signed_mithril_stake_distribution_by_id(
74        &self,
75        signed_entity_id: &str,
76    ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78    /// Return the last signed Cardano Transaction Snapshot.
79    async fn get_last_cardano_transaction_snapshot(
80        &self,
81    ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83    /// Return a list of signed Cardano stake distribution ordered by creation
84    /// date descending.
85    async fn get_last_signed_cardano_stake_distributions(
86        &self,
87        total: usize,
88    ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
89}
90
91/// Mithril ArtifactBuilder Service
92#[derive(Clone)]
93pub struct MithrilSignedEntityService {
94    signed_entity_storer: Arc<dyn SignedEntityStorer>,
95    mithril_stake_distribution_artifact_builder:
96        Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
97    cardano_immutable_files_full_artifact_builder:
98        Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
99    cardano_transactions_artifact_builder:
100        Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
101    signed_entity_type_lock: Arc<SignedEntityTypeLock>,
102    cardano_stake_distribution_artifact_builder:
103        Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
104    cardano_database_artifact_builder:
105        Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
106    metrics_service: Arc<MetricsService>,
107    logger: Logger,
108}
109
110/// ArtifactsBuilder dependencies required by the [MithrilSignedEntityService].
111pub struct SignedEntityServiceArtifactsDependencies {
112    mithril_stake_distribution_artifact_builder:
113        Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
114    cardano_immutable_files_full_artifact_builder:
115        Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
116    cardano_transactions_artifact_builder:
117        Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
118    cardano_stake_distribution_artifact_builder:
119        Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
120    cardano_database_artifact_builder:
121        Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
122}
123
124impl SignedEntityServiceArtifactsDependencies {
125    /// Create a new instance of [SignedEntityServiceArtifactsDependencies].
126    pub fn new(
127        mithril_stake_distribution_artifact_builder: Arc<
128            dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
129        >,
130        cardano_immutable_files_full_artifact_builder: Arc<
131            dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
132        >,
133        cardano_transactions_artifact_builder: Arc<
134            dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
135        >,
136        cardano_stake_distribution_artifact_builder: Arc<
137            dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
138        >,
139        cardano_database_artifact_builder: Arc<
140            dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
141        >,
142    ) -> Self {
143        Self {
144            mithril_stake_distribution_artifact_builder,
145            cardano_immutable_files_full_artifact_builder,
146            cardano_transactions_artifact_builder,
147            cardano_stake_distribution_artifact_builder,
148            cardano_database_artifact_builder,
149        }
150    }
151}
152
153impl MithrilSignedEntityService {
154    /// MithrilSignedEntityService factory
155    pub fn new(
156        signed_entity_storer: Arc<dyn SignedEntityStorer>,
157        dependencies: SignedEntityServiceArtifactsDependencies,
158        signed_entity_type_lock: Arc<SignedEntityTypeLock>,
159        metrics_service: Arc<MetricsService>,
160        logger: Logger,
161    ) -> Self {
162        Self {
163            signed_entity_storer,
164            mithril_stake_distribution_artifact_builder: dependencies
165                .mithril_stake_distribution_artifact_builder,
166            cardano_immutable_files_full_artifact_builder: dependencies
167                .cardano_immutable_files_full_artifact_builder,
168            cardano_transactions_artifact_builder: dependencies
169                .cardano_transactions_artifact_builder,
170            cardano_stake_distribution_artifact_builder: dependencies
171                .cardano_stake_distribution_artifact_builder,
172            cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
173            signed_entity_type_lock,
174            metrics_service,
175            logger: logger.new_with_component_name::<Self>(),
176        }
177    }
178
179    async fn create_artifact_task(
180        &self,
181        signed_entity_type: SignedEntityType,
182        certificate: &Certificate,
183    ) -> StdResult<()> {
184        info!(
185            self.logger, ">> create_artifact_task";
186            "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
187        );
188
189        let mut remaining_retries = 2;
190        let artifact = loop {
191            remaining_retries -= 1;
192
193            match self
194                .compute_artifact(signed_entity_type.clone(), certificate)
195                .await
196            {
197                Err(error) if remaining_retries == 0 => break Err(error),
198                Err(_error) => (),
199                Ok(artifact) => break Ok(artifact),
200            };
201        }?;
202
203        let signed_entity = SignedEntityRecord {
204            signed_entity_id: artifact.get_id(),
205            signed_entity_type: signed_entity_type.clone(),
206            certificate_id: certificate.hash.clone(),
207            artifact: serde_json::to_string(&artifact)?,
208            created_at: Utc::now(),
209        };
210
211        self.signed_entity_storer
212            .store_signed_entity(&signed_entity)
213            .await
214            .with_context(|| {
215                format!(
216                    "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
217                )
218            })?;
219
220        self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
221
222        Ok(())
223    }
224
225    /// Compute artifact from signed entity type
226    async fn compute_artifact(
227        &self,
228        signed_entity_type: SignedEntityType,
229        certificate: &Certificate,
230    ) -> StdResult<Arc<dyn Artifact>> {
231        match signed_entity_type.clone() {
232            SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
233                self.mithril_stake_distribution_artifact_builder
234                    .compute_artifact(epoch, certificate)
235                    .await
236                    .with_context(|| {
237                        format!(
238                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
239                        )
240                    })?,
241            )),
242            SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
243                self.cardano_immutable_files_full_artifact_builder
244                    .compute_artifact(beacon.clone(), certificate)
245                    .await
246                    .with_context(|| {
247                        format!(
248                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
249                        )
250                    })?,
251            )),
252            SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
253                self.cardano_stake_distribution_artifact_builder
254                .compute_artifact(epoch, certificate)
255                .await
256                .with_context(|| {
257                    format!(
258                        "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
259                    )
260                })?)),
261            SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
262                self.cardano_transactions_artifact_builder
263                    .compute_artifact(block_number, certificate)
264                    .await
265                    .with_context(|| {
266                        format!(
267                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
268                        )
269                    })?,
270            )),
271            SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
272                self.cardano_database_artifact_builder
273                    .compute_artifact(beacon, certificate)
274                    .await
275                    .with_context(|| {
276                        format!(
277                            "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
278                        )
279                    })?
280            )),
281        }
282    }
283
284    async fn get_last_signed_entities(
285        &self,
286        total: usize,
287        discriminants: &SignedEntityTypeDiscriminants,
288    ) -> StdResult<Vec<SignedEntityRecord>> {
289        self.signed_entity_storer
290            .get_last_signed_entities_by_type(discriminants, total)
291            .await
292            .with_context(|| {
293                format!(
294                    "Signed Entity Service can not get last signed entities with type: '{:?}'",
295                    discriminants
296                )
297            })
298    }
299
300    fn increment_artifact_total_produced_metric_since_startup(
301        &self,
302        signed_entity_type: SignedEntityType,
303    ) {
304        let metrics = self.metrics_service.clone();
305        let metric_counter = match signed_entity_type {
306            SignedEntityType::MithrilStakeDistribution(_) => {
307                metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
308            }
309            SignedEntityType::CardanoImmutableFilesFull(_) => {
310                metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
311            }
312            SignedEntityType::CardanoStakeDistribution(_) => {
313                metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
314            }
315            SignedEntityType::CardanoTransactions(_, _) => {
316                metrics.get_artifact_cardano_transaction_total_produced_since_startup()
317            }
318            SignedEntityType::CardanoDatabase(_) => {
319                metrics.get_artifact_cardano_database_total_produced_since_startup()
320            }
321        };
322
323        metric_counter.increment();
324    }
325}
326
327#[async_trait]
328impl SignedEntityService for MithrilSignedEntityService {
329    async fn create_artifact(
330        &self,
331        signed_entity_type: SignedEntityType,
332        certificate: &Certificate,
333    ) -> StdResult<JoinHandle<StdResult<()>>> {
334        if self
335            .signed_entity_type_lock
336            .is_locked(&signed_entity_type)
337            .await
338        {
339            return Err(anyhow!(
340                "Signed entity type '{:?}' is already locked",
341                signed_entity_type
342            ));
343        }
344
345        let service = self.clone();
346        let certificate_cloned = certificate.clone();
347        service
348            .signed_entity_type_lock
349            .lock(&signed_entity_type)
350            .await;
351
352        Ok(tokio::task::spawn(async move {
353            let signed_entity_type_clone = signed_entity_type.clone();
354            let service_clone = service.clone();
355            let result = tokio::task::spawn(async move {
356                service_clone
357                    .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
358                    .await
359            })
360            .await;
361            service
362                .signed_entity_type_lock
363                .release(signed_entity_type.clone())
364                .await;
365
366            result.with_context(|| format!(
367                "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
368            ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
369        }))
370    }
371
372    async fn get_last_signed_snapshots(
373        &self,
374        total: usize,
375    ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
376        let signed_entities = self
377            .get_last_signed_entities(
378                total,
379                &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
380            )
381            .await?
382            .into_iter()
383            .map(|record| record.try_into())
384            .collect::<Result<Vec<_>, _>>()?;
385
386        Ok(signed_entities)
387    }
388
389    async fn get_signed_snapshot_by_id(
390        &self,
391        signed_entity_id: &str,
392    ) -> StdResult<Option<SignedEntity<Snapshot>>> {
393        let entity: Option<SignedEntity<Snapshot>> = match self
394            .signed_entity_storer
395            .get_signed_entity(signed_entity_id)
396            .await
397            .with_context(|| {
398                format!(
399                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
400                )
401            })? {
402            Some(entity) => Some(entity.try_into()?),
403            None => None,
404        };
405
406        Ok(entity)
407    }
408
409    async fn get_last_signed_cardano_database_snapshots(
410        &self,
411        total: usize,
412    ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
413        let signed_entities = self
414            .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
415            .await?
416            .into_iter()
417            .map(|record| record.try_into())
418            .collect::<Result<Vec<_>, _>>()?;
419
420        Ok(signed_entities)
421    }
422
423    async fn get_signed_cardano_database_snapshot_by_id(
424        &self,
425        signed_entity_id: &str,
426    ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
427        let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
428            .signed_entity_storer
429            .get_signed_entity(signed_entity_id)
430            .await
431            .with_context(|| {
432                format!(
433                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
434                )
435            })? {
436            Some(entity) => Some(entity.try_into()?),
437            None => None,
438        };
439
440        Ok(entity)
441    }
442
443    async fn get_last_signed_mithril_stake_distributions(
444        &self,
445        total: usize,
446    ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
447        let signed_entities = self
448            .get_last_signed_entities(
449                total,
450                &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
451            )
452            .await?
453            .into_iter()
454            .map(|record| record.try_into())
455            .collect::<Result<Vec<_>, _>>()?;
456
457        Ok(signed_entities)
458    }
459
460    async fn get_signed_mithril_stake_distribution_by_id(
461        &self,
462        signed_entity_id: &str,
463    ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
464        let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
465            .signed_entity_storer
466            .get_signed_entity(signed_entity_id)
467            .await
468            .with_context(|| {
469                format!(
470                    "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
471                )
472            })? {
473            Some(entity) => Some(entity.try_into()?),
474            None => None,
475        };
476
477        Ok(entity)
478    }
479
480    async fn get_last_cardano_transaction_snapshot(
481        &self,
482    ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
483        let mut signed_entities_records = self
484            .get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
485            .await?;
486
487        match signed_entities_records.pop() {
488            Some(record) => Ok(Some(record.try_into()?)),
489            None => Ok(None),
490        }
491    }
492
493    async fn get_last_signed_cardano_stake_distributions(
494        &self,
495        total: usize,
496    ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
497        let signed_entities = self
498            .get_last_signed_entities(
499                total,
500                &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
501            )
502            .await?
503            .into_iter()
504            .map(|record| record.try_into())
505            .collect::<Result<Vec<_>, _>>()?;
506
507        Ok(signed_entities)
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use std::{sync::atomic::Ordering, time::Duration};
514
515    use mithril_common::{
516        entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
517        signable_builder,
518        test_utils::fake_data,
519    };
520    use mithril_metric::CounterValue;
521    use serde::{de::DeserializeOwned, Serialize};
522    use std::sync::atomic::AtomicBool;
523
524    use crate::artifact_builder::MockArtifactBuilder;
525    use crate::database::repository::MockSignedEntityStorer;
526    use crate::test_tools::TestLogger;
527
528    use super::*;
529
530    fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
531        MithrilStakeDistribution::new(
532            epoch,
533            fake_data::signers_with_stakes(signers),
534            &fake_data::protocol_parameters(),
535        )
536    }
537
538    fn create_cardano_stake_distribution(
539        epoch: Epoch,
540        stake_distribution: StakeDistribution,
541    ) -> CardanoStakeDistribution {
542        CardanoStakeDistribution::new(epoch, stake_distribution)
543    }
544
545    fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
546    where
547        T: Serialize + DeserializeOwned,
548    {
549        let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
550        assert_eq!(
551            serde_json::to_string(&expected).unwrap(),
552            serde_json::to_string(&current).unwrap()
553        );
554    }
555
556    /// Struct that create mocks needed in tests and build objects injecting them.
557    struct MockDependencyInjector {
558        mock_signed_entity_storer: MockSignedEntityStorer,
559        mock_mithril_stake_distribution_artifact_builder:
560            MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
561        mock_cardano_immutable_files_full_artifact_builder:
562            MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
563        mock_cardano_transactions_artifact_builder:
564            MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
565        mock_cardano_stake_distribution_artifact_builder:
566            MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
567        mock_cardano_database_artifact_builder:
568            MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
569    }
570
571    impl MockDependencyInjector {
572        fn new() -> MockDependencyInjector {
573            MockDependencyInjector {
574                mock_signed_entity_storer: MockSignedEntityStorer::new(),
575                mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
576                    Epoch,
577                    MithrilStakeDistribution,
578                >::new(),
579                mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
580                    CardanoDbBeacon,
581                    Snapshot,
582                >::new(),
583                mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
584                    BlockNumber,
585                    CardanoTransactionsSnapshot,
586                >::new(),
587                mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
588                    Epoch,
589                    CardanoStakeDistribution,
590                >::new(),
591                mock_cardano_database_artifact_builder: MockArtifactBuilder::<
592                    CardanoDbBeacon,
593                    CardanoDatabaseSnapshot,
594                >::new(),
595            }
596        }
597
598        fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
599            let dependencies = SignedEntityServiceArtifactsDependencies::new(
600                Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
601                Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
602                Arc::new(self.mock_cardano_transactions_artifact_builder),
603                Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
604                Arc::new(self.mock_cardano_database_artifact_builder),
605            );
606            MithrilSignedEntityService::new(
607                Arc::new(self.mock_signed_entity_storer),
608                dependencies,
609                Arc::new(SignedEntityTypeLock::default()),
610                Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
611                TestLogger::stdout(),
612            )
613        }
614
615        fn build_artifact_builder_service_with_time_consuming_process(
616            mut self,
617            atomic_stop: Arc<AtomicBool>,
618        ) -> MithrilSignedEntityService {
619            struct LongArtifactBuilder {
620                atomic_stop: Arc<AtomicBool>,
621                snapshot: Snapshot,
622            }
623
624            let snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
625
626            #[async_trait]
627            impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
628                async fn compute_artifact(
629                    &self,
630                    _beacon: CardanoDbBeacon,
631                    _certificate: &Certificate,
632                ) -> StdResult<Snapshot> {
633                    let mut max_iteration = 100;
634                    while !self.atomic_stop.load(Ordering::Relaxed) {
635                        max_iteration -= 1;
636                        if max_iteration <= 0 {
637                            return Err(anyhow!("Test should handle the stop"));
638                        }
639                        tokio::time::sleep(Duration::from_millis(10)).await;
640                    }
641                    Ok(self.snapshot.clone())
642                }
643            }
644            let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
645                atomic_stop: atomic_stop.clone(),
646                snapshot: snapshot.clone(),
647            };
648
649            let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
650            let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
651            self.mock_signed_entity_storer
652                .expect_store_signed_entity()
653                .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
654                .return_once(|_| Ok(()));
655
656            let dependencies = SignedEntityServiceArtifactsDependencies::new(
657                Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
658                Arc::new(cardano_immutable_files_full_long_artifact_builder),
659                Arc::new(self.mock_cardano_transactions_artifact_builder),
660                Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
661                Arc::new(self.mock_cardano_database_artifact_builder),
662            );
663            MithrilSignedEntityService::new(
664                Arc::new(self.mock_signed_entity_storer),
665                dependencies,
666                Arc::new(SignedEntityTypeLock::default()),
667                Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
668                TestLogger::stdout(),
669            )
670        }
671
672        fn mock_artifact_processing<
673            T: Artifact + Clone + Serialize + 'static,
674            U: signable_builder::Beacon,
675        >(
676            &mut self,
677            artifact: T,
678            mock_that_provide_artifact: &dyn Fn(
679                &mut MockDependencyInjector,
680            ) -> &mut MockArtifactBuilder<U, T>,
681        ) {
682            {
683                let artifact_cloned = artifact.clone();
684                mock_that_provide_artifact(self)
685                    .expect_compute_artifact()
686                    .times(1)
687                    .return_once(|_, _| Ok(artifact_cloned));
688            }
689            {
690                let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
691                let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
692                self.mock_signed_entity_storer
693                    .expect_store_signed_entity()
694                    .withf(move |signed_entity| signed_entity.artifact == artifact_json)
695                    .return_once(|_| Ok(()));
696            }
697        }
698
699        fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
700            self.mock_artifact_processing(artifact, &|mock_injector| {
701                &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
702            });
703        }
704    }
705
706    fn get_artifact_total_produced_metric_since_startup_counter_value(
707        metrics_service: Arc<MetricsService>,
708        signed_entity_type: &SignedEntityType,
709    ) -> CounterValue {
710        match signed_entity_type {
711            SignedEntityType::MithrilStakeDistribution(_) => metrics_service
712                .get_artifact_mithril_stake_distribution_total_produced_since_startup()
713                .get(),
714            SignedEntityType::CardanoImmutableFilesFull(_) => metrics_service
715                .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
716                .get(),
717            SignedEntityType::CardanoStakeDistribution(_) => metrics_service
718                .get_artifact_cardano_stake_distribution_total_produced_since_startup()
719                .get(),
720            SignedEntityType::CardanoTransactions(_, _) => metrics_service
721                .get_artifact_cardano_transaction_total_produced_since_startup()
722                .get(),
723            SignedEntityType::CardanoDatabase(_) => metrics_service
724                .get_artifact_cardano_database_total_produced_since_startup()
725                .get(),
726        }
727    }
728
729    #[tokio::test]
730    async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type(
731    ) {
732        let mut mock_container = MockDependencyInjector::new();
733
734        let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
735
736        mock_container
737            .mock_mithril_stake_distribution_artifact_builder
738            .expect_compute_artifact()
739            .times(1)
740            .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
741
742        let artifact_builder_service = mock_container.build_artifact_builder_service();
743
744        let certificate = fake_data::certificate("hash".to_string());
745        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
746        let artifact = artifact_builder_service
747            .compute_artifact(signed_entity_type.clone(), &certificate)
748            .await
749            .unwrap();
750
751        assert_expected(&mithril_stake_distribution_expected, &artifact);
752    }
753
754    #[tokio::test]
755    async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
756        generic_test_that_the_artifact_is_stored(
757            SignedEntityType::MithrilStakeDistribution(Epoch(1)),
758            create_stake_distribution(Epoch(1), 5),
759            &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
760        )
761        .await;
762    }
763
764    #[tokio::test]
765    async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type(
766    ) {
767        let mut mock_container = MockDependencyInjector::new();
768
769        let cardano_stake_distribution_expected = create_cardano_stake_distribution(
770            Epoch(1),
771            StakeDistribution::from([("pool-1".to_string(), 100)]),
772        );
773
774        mock_container
775            .mock_cardano_stake_distribution_artifact_builder
776            .expect_compute_artifact()
777            .times(1)
778            .returning(|_, _| {
779                Ok(create_cardano_stake_distribution(
780                    Epoch(1),
781                    StakeDistribution::from([("pool-1".to_string(), 100)]),
782                ))
783            });
784
785        let artifact_builder_service = mock_container.build_artifact_builder_service();
786
787        let certificate = fake_data::certificate("hash".to_string());
788        let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
789        let artifact = artifact_builder_service
790            .compute_artifact(signed_entity_type.clone(), &certificate)
791            .await
792            .unwrap();
793
794        assert_expected(&cardano_stake_distribution_expected, &artifact);
795    }
796
797    #[tokio::test]
798    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
799        generic_test_that_the_artifact_is_stored(
800            SignedEntityType::CardanoStakeDistribution(Epoch(1)),
801            create_cardano_stake_distribution(
802                Epoch(1),
803                StakeDistribution::from([("pool-1".to_string(), 100)]),
804            ),
805            &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
806        )
807        .await;
808    }
809
810    #[tokio::test]
811    async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
812        let mut mock_container = MockDependencyInjector::new();
813
814        let snapshot_expected = fake_data::snapshots(1).first().unwrap().to_owned();
815
816        mock_container
817            .mock_cardano_immutable_files_full_artifact_builder
818            .expect_compute_artifact()
819            .times(1)
820            .returning(|_, _| Ok(fake_data::snapshots(1).first().unwrap().to_owned()));
821
822        let artifact_builder_service = mock_container.build_artifact_builder_service();
823
824        let certificate = fake_data::certificate("hash".to_string());
825        let signed_entity_type =
826            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
827        let artifact = artifact_builder_service
828            .compute_artifact(signed_entity_type.clone(), &certificate)
829            .await
830            .unwrap();
831
832        assert_expected(&snapshot_expected, &artifact);
833    }
834
835    #[tokio::test]
836    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
837        generic_test_that_the_artifact_is_stored(
838            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
839            fake_data::snapshots(1).first().unwrap().to_owned(),
840            &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
841        )
842        .await;
843    }
844
845    #[tokio::test]
846    async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
847        let mut mock_container = MockDependencyInjector::new();
848
849        let block_number = BlockNumber(151);
850        let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
851
852        mock_container
853            .mock_cardano_transactions_artifact_builder
854            .expect_compute_artifact()
855            .times(1)
856            .returning(move |_, _| {
857                Ok(CardanoTransactionsSnapshot::new(
858                    "merkle_root".to_string(),
859                    block_number,
860                ))
861            });
862
863        let artifact_builder_service = mock_container.build_artifact_builder_service();
864
865        let certificate = fake_data::certificate("hash".to_string());
866        let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
867        let artifact = artifact_builder_service
868            .compute_artifact(signed_entity_type.clone(), &certificate)
869            .await
870            .unwrap();
871
872        assert_expected(&expected, &artifact);
873    }
874
875    #[tokio::test]
876    async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
877        let block_number = BlockNumber(149);
878        generic_test_that_the_artifact_is_stored(
879            SignedEntityType::CardanoTransactions(Epoch(1), block_number),
880            CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
881            &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
882        )
883        .await;
884    }
885
886    #[tokio::test]
887    async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
888        let mut mock_container = MockDependencyInjector::new();
889
890        let cardano_database_expected = fake_data::cardano_database_snapshots(1)
891            .first()
892            .unwrap()
893            .to_owned();
894
895        mock_container
896            .mock_cardano_database_artifact_builder
897            .expect_compute_artifact()
898            .times(1)
899            .returning(|_, _| {
900                Ok(fake_data::cardano_database_snapshots(1)
901                    .first()
902                    .unwrap()
903                    .to_owned())
904            });
905
906        let artifact_builder_service = mock_container.build_artifact_builder_service();
907
908        let certificate = fake_data::certificate("hash".to_string());
909        let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
910        let artifact = artifact_builder_service
911            .compute_artifact(signed_entity_type.clone(), &certificate)
912            .await
913            .unwrap();
914
915        assert_expected(&cardano_database_expected, &artifact);
916    }
917
918    #[tokio::test]
919    async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
920        generic_test_that_the_artifact_is_stored(
921            SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
922            fake_data::cardano_database_snapshots(1)
923                .first()
924                .unwrap()
925                .to_owned(),
926            &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
927        )
928        .await;
929    }
930
931    async fn generic_test_that_the_artifact_is_stored<
932        T: Artifact + Clone + Serialize + 'static,
933        U: signable_builder::Beacon,
934    >(
935        signed_entity_type: SignedEntityType,
936        artifact: T,
937        mock_that_provide_artifact: &dyn Fn(
938            &mut MockDependencyInjector,
939        ) -> &mut MockArtifactBuilder<U, T>,
940    ) {
941        let mut mock_container = MockDependencyInjector::new();
942        {
943            let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
944            let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
945            mock_container
946                .mock_signed_entity_storer
947                .expect_store_signed_entity()
948                .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
949                .return_once(|_| Ok(()));
950        }
951        {
952            let artifact_cloned = artifact.clone();
953            mock_that_provide_artifact(&mut mock_container)
954                .expect_compute_artifact()
955                .times(1)
956                .return_once(|_, _| Ok(artifact_cloned));
957        }
958        let artifact_builder_service = mock_container.build_artifact_builder_service();
959
960        let certificate = fake_data::certificate("hash".to_string());
961        let error_message = format!(
962            "Create artifact should not fail for {} signed entity",
963            std::any::type_name::<T>()
964        );
965        let error_message_str = error_message.as_str();
966
967        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
968            artifact_builder_service.metrics_service.clone(),
969            &signed_entity_type,
970        );
971
972        artifact_builder_service
973            .create_artifact_task(signed_entity_type.clone(), &certificate)
974            .await
975            .expect(error_message_str);
976
977        assert_eq!(
978            initial_counter_value + 1,
979            get_artifact_total_produced_metric_since_startup_counter_value(
980                artifact_builder_service.metrics_service.clone(),
981                &signed_entity_type,
982            )
983        )
984    }
985
986    #[tokio::test]
987    async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
988        let atomic_stop = Arc::new(AtomicBool::new(false));
989        let signed_entity_type_service = {
990            let mut mock_container = MockDependencyInjector::new();
991
992            let msd = create_stake_distribution(Epoch(1), 5);
993            mock_container.mock_stake_distribution_processing(msd);
994
995            mock_container
996                .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
997        };
998        let certificate = fake_data::certificate("hash".to_string());
999
1000        let signed_entity_type_immutable =
1001            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1002        let first_task_that_never_finished = signed_entity_type_service
1003            .create_artifact(signed_entity_type_immutable, &certificate)
1004            .await
1005            .unwrap();
1006
1007        let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
1008        let second_task_that_finish_first = signed_entity_type_service
1009            .create_artifact(signed_entity_type_msd, &certificate)
1010            .await
1011            .unwrap();
1012
1013        second_task_that_finish_first.await.unwrap().unwrap();
1014        assert!(!first_task_that_never_finished.is_finished());
1015
1016        atomic_stop.swap(true, Ordering::Relaxed);
1017    }
1018
1019    #[tokio::test]
1020    async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1021        let atomic_stop = Arc::new(AtomicBool::new(false));
1022        let signed_entity_type_service = MockDependencyInjector::new()
1023            .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1024        let certificate = fake_data::certificate("hash".to_string());
1025
1026        let signed_entity_type_immutable =
1027            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1028        assert!(
1029            !signed_entity_type_service
1030                .signed_entity_type_lock
1031                .is_locked(&signed_entity_type_immutable)
1032                .await
1033        );
1034        let join_handle = signed_entity_type_service
1035            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1036            .await
1037            .unwrap();
1038
1039        // Results are stored to finalize the task before assertions,
1040        // ensuring 'atomic_stop' is always assigned a new value.
1041        let is_locked = signed_entity_type_service
1042            .signed_entity_type_lock
1043            .is_locked(&signed_entity_type_immutable)
1044            .await;
1045        let is_finished = join_handle.is_finished();
1046
1047        atomic_stop.swap(true, Ordering::Relaxed);
1048        join_handle.await.unwrap().unwrap();
1049
1050        assert!(is_locked);
1051        assert!(!is_finished);
1052
1053        assert!(
1054            !signed_entity_type_service
1055                .signed_entity_type_lock
1056                .is_locked(&signed_entity_type_immutable)
1057                .await
1058        );
1059    }
1060
1061    #[tokio::test]
1062    async fn create_artifact_unlock_signed_entity_type_when_error() {
1063        let signed_entity_type_service = {
1064            let mut mock_container = MockDependencyInjector::new();
1065            mock_container
1066                .mock_cardano_immutable_files_full_artifact_builder
1067                .expect_compute_artifact()
1068                .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1069
1070            mock_container.build_artifact_builder_service()
1071        };
1072        let certificate = fake_data::certificate("hash".to_string());
1073
1074        let signed_entity_type_immutable =
1075            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1076
1077        let join_handle = signed_entity_type_service
1078            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1079            .await
1080            .unwrap();
1081
1082        let error = join_handle.await.unwrap().unwrap_err();
1083        assert!(
1084            error.to_string().contains("CardanoImmutableFilesFull"),
1085            "Error should contains CardanoImmutableFilesFull but was: {}",
1086            error
1087        );
1088
1089        assert!(
1090            !signed_entity_type_service
1091                .signed_entity_type_lock
1092                .is_locked(&signed_entity_type_immutable)
1093                .await
1094        );
1095    }
1096
1097    #[tokio::test]
1098    async fn create_artifact_unlock_signed_entity_type_when_panic() {
1099        let signed_entity_type_service =
1100            MockDependencyInjector::new().build_artifact_builder_service();
1101        let certificate = fake_data::certificate("hash".to_string());
1102
1103        let signed_entity_type_immutable =
1104            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1105
1106        let join_handle = signed_entity_type_service
1107            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1108            .await
1109            .unwrap();
1110
1111        let error = join_handle.await.unwrap().unwrap_err();
1112        assert!(
1113            error.to_string().contains("CardanoImmutableFilesFull"),
1114            "Error should contains CardanoImmutableFilesFull but was: {}",
1115            error
1116        );
1117
1118        assert!(
1119            !signed_entity_type_service
1120                .signed_entity_type_lock
1121                .is_locked(&signed_entity_type_immutable)
1122                .await
1123        );
1124    }
1125
1126    #[tokio::test]
1127    async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1128        let atomic_stop = Arc::new(AtomicBool::new(false));
1129        let signed_entity_service = MockDependencyInjector::new()
1130            .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1131        let certificate = fake_data::certificate("hash".to_string());
1132        let signed_entity_type_immutable =
1133            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1134
1135        signed_entity_service
1136            .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1137            .await
1138            .unwrap();
1139
1140        signed_entity_service
1141            .create_artifact(signed_entity_type_immutable, &certificate)
1142            .await
1143            .expect_err("Should return error when signed entity type is already locked");
1144
1145        atomic_stop.swap(true, Ordering::Relaxed);
1146    }
1147
1148    #[tokio::test]
1149    async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1150        let signed_entity_service = {
1151            let mut mock_container = MockDependencyInjector::new();
1152            mock_container
1153                .mock_cardano_immutable_files_full_artifact_builder
1154                .expect_compute_artifact()
1155                .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1156
1157            mock_container.build_artifact_builder_service()
1158        };
1159
1160        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1161
1162        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1163            signed_entity_service.metrics_service.clone(),
1164            &signed_entity_type,
1165        );
1166
1167        signed_entity_service
1168            .create_artifact(
1169                signed_entity_type.clone(),
1170                &fake_data::certificate("hash".to_string()),
1171            )
1172            .await
1173            .unwrap();
1174
1175        assert_eq!(
1176            initial_counter_value,
1177            get_artifact_total_produced_metric_since_startup_counter_value(
1178                signed_entity_service.metrics_service.clone(),
1179                &signed_entity_type,
1180            )
1181        );
1182    }
1183
1184    #[tokio::test]
1185    async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1186        let signed_entity_service = {
1187            let mut mock_container = MockDependencyInjector::new();
1188            mock_container
1189                .mock_signed_entity_storer
1190                .expect_store_signed_entity()
1191                .returning(|_| Err(anyhow!("Error while storing signed entity")));
1192
1193            mock_container.build_artifact_builder_service()
1194        };
1195
1196        let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1197
1198        let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1199            signed_entity_service.metrics_service.clone(),
1200            &signed_entity_type,
1201        );
1202
1203        signed_entity_service
1204            .create_artifact(
1205                signed_entity_type.clone(),
1206                &fake_data::certificate("hash".to_string()),
1207            )
1208            .await
1209            .unwrap();
1210
1211        assert_eq!(
1212            initial_counter_value,
1213            get_artifact_total_produced_metric_since_startup_counter_value(
1214                signed_entity_service.metrics_service.clone(),
1215                &signed_entity_type,
1216            )
1217        );
1218    }
1219}