1use anyhow::{Context, anyhow};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{Logger, info, warn};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13 StdResult,
14 entities::{
15 BlockNumber, CardanoDatabaseSnapshot, CardanoDbBeacon, CardanoStakeDistribution,
16 CardanoTransactionsSnapshot, Certificate, Epoch, MithrilStakeDistribution,
17 SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
18 },
19 logging::LoggerExtensions,
20 signable_builder::{Artifact, SignedEntity},
21};
22
23use crate::{
24 MetricsService,
25 artifact_builder::ArtifactBuilder,
26 database::{record::SignedEntityRecord, repository::SignedEntityStorer},
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34 async fn create_artifact(
36 &self,
37 signed_entity_type: SignedEntityType,
38 certificate: &Certificate,
39 ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41 async fn get_last_signed_snapshots(
43 &self,
44 total: usize,
45 ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47 async fn get_signed_snapshot_by_id(
49 &self,
50 signed_entity_id: &str,
51 ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53 async fn get_last_signed_cardano_database_snapshots(
55 &self,
56 total: usize,
57 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59 async fn get_signed_cardano_database_snapshot_by_id(
61 &self,
62 signed_entity_id: &str,
63 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65 async fn get_last_signed_mithril_stake_distributions(
68 &self,
69 total: usize,
70 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72 async fn get_signed_mithril_stake_distribution_by_id(
74 &self,
75 signed_entity_id: &str,
76 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78 async fn get_last_cardano_transaction_snapshot(
80 &self,
81 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83 async fn get_last_signed_cardano_stake_distributions(
86 &self,
87 total: usize,
88 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
89}
90
91#[derive(Clone)]
93pub struct MithrilSignedEntityService {
94 signed_entity_storer: Arc<dyn SignedEntityStorer>,
95 mithril_stake_distribution_artifact_builder:
96 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
97 cardano_immutable_files_full_artifact_builder:
98 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
99 cardano_transactions_artifact_builder:
100 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
101 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
102 cardano_stake_distribution_artifact_builder:
103 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
104 cardano_database_artifact_builder:
105 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
106 metrics_service: Arc<MetricsService>,
107 logger: Logger,
108}
109
110pub struct SignedEntityServiceArtifactsDependencies {
112 mithril_stake_distribution_artifact_builder:
113 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
114 cardano_immutable_files_full_artifact_builder:
115 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
116 cardano_transactions_artifact_builder:
117 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
118 cardano_stake_distribution_artifact_builder:
119 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
120 cardano_database_artifact_builder:
121 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
122}
123
124impl SignedEntityServiceArtifactsDependencies {
125 pub fn new(
127 mithril_stake_distribution_artifact_builder: Arc<
128 dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
129 >,
130 cardano_immutable_files_full_artifact_builder: Arc<
131 dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
132 >,
133 cardano_transactions_artifact_builder: Arc<
134 dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
135 >,
136 cardano_stake_distribution_artifact_builder: Arc<
137 dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
138 >,
139 cardano_database_artifact_builder: Arc<
140 dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
141 >,
142 ) -> Self {
143 Self {
144 mithril_stake_distribution_artifact_builder,
145 cardano_immutable_files_full_artifact_builder,
146 cardano_transactions_artifact_builder,
147 cardano_stake_distribution_artifact_builder,
148 cardano_database_artifact_builder,
149 }
150 }
151}
152
153impl MithrilSignedEntityService {
154 pub fn new(
156 signed_entity_storer: Arc<dyn SignedEntityStorer>,
157 dependencies: SignedEntityServiceArtifactsDependencies,
158 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
159 metrics_service: Arc<MetricsService>,
160 logger: Logger,
161 ) -> Self {
162 Self {
163 signed_entity_storer,
164 mithril_stake_distribution_artifact_builder: dependencies
165 .mithril_stake_distribution_artifact_builder,
166 cardano_immutable_files_full_artifact_builder: dependencies
167 .cardano_immutable_files_full_artifact_builder,
168 cardano_transactions_artifact_builder: dependencies
169 .cardano_transactions_artifact_builder,
170 cardano_stake_distribution_artifact_builder: dependencies
171 .cardano_stake_distribution_artifact_builder,
172 cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
173 signed_entity_type_lock,
174 metrics_service,
175 logger: logger.new_with_component_name::<Self>(),
176 }
177 }
178
179 async fn create_artifact_task(
180 &self,
181 signed_entity_type: SignedEntityType,
182 certificate: &Certificate,
183 ) -> StdResult<()> {
184 info!(
185 self.logger, ">> create_artifact_task";
186 "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
187 );
188
189 let mut remaining_retries = 2;
190 let artifact = loop {
191 remaining_retries -= 1;
192
193 match self.compute_artifact(signed_entity_type.clone(), certificate).await {
194 Err(error) if remaining_retries == 0 => break Err(error),
195 Err(_error) => (),
196 Ok(artifact) => break Ok(artifact),
197 };
198 }?;
199
200 let signed_entity = SignedEntityRecord {
201 signed_entity_id: artifact.get_id(),
202 signed_entity_type: signed_entity_type.clone(),
203 certificate_id: certificate.hash.clone(),
204 artifact: serde_json::to_string(&artifact)?,
205 created_at: Utc::now(),
206 };
207
208 self.signed_entity_storer
209 .store_signed_entity(&signed_entity)
210 .await
211 .with_context(|| {
212 format!(
213 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
214 )
215 })?;
216
217 self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
218
219 Ok(())
220 }
221
222 async fn compute_artifact(
224 &self,
225 signed_entity_type: SignedEntityType,
226 certificate: &Certificate,
227 ) -> StdResult<Arc<dyn Artifact>> {
228 match signed_entity_type.clone() {
229 SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
230 self.mithril_stake_distribution_artifact_builder
231 .compute_artifact(epoch, certificate)
232 .await
233 .with_context(|| {
234 format!(
235 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
236 )
237 })?,
238 )),
239 SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
240 self.cardano_immutable_files_full_artifact_builder
241 .compute_artifact(beacon.clone(), certificate)
242 .await
243 .with_context(|| {
244 format!(
245 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
246 )
247 })?,
248 )),
249 SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
250 self.cardano_stake_distribution_artifact_builder
251 .compute_artifact(epoch, certificate)
252 .await
253 .with_context(|| {
254 format!(
255 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
256 )
257 })?)),
258 SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
259 self.cardano_transactions_artifact_builder
260 .compute_artifact(block_number, certificate)
261 .await
262 .with_context(|| {
263 format!(
264 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
265 )
266 })?,
267 )),
268 SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
269 self.cardano_database_artifact_builder
270 .compute_artifact(beacon, certificate)
271 .await
272 .with_context(|| {
273 format!(
274 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
275 )
276 })?
277 )),
278 }
279 }
280
281 async fn get_last_signed_entities(
282 &self,
283 total: usize,
284 discriminants: &SignedEntityTypeDiscriminants,
285 ) -> StdResult<Vec<SignedEntityRecord>> {
286 self.signed_entity_storer
287 .get_last_signed_entities_by_type(discriminants, total)
288 .await
289 .with_context(|| {
290 format!(
291 "Signed Entity Service can not get last signed entities with type: '{discriminants:?}'"
292 )
293 })
294 }
295
296 fn increment_artifact_total_produced_metric_since_startup(
297 &self,
298 signed_entity_type: SignedEntityType,
299 ) {
300 let metrics = self.metrics_service.clone();
301 let metric_counter = match signed_entity_type {
302 SignedEntityType::MithrilStakeDistribution(_) => {
303 metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
304 }
305 SignedEntityType::CardanoImmutableFilesFull(_) => {
306 metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
307 }
308 SignedEntityType::CardanoStakeDistribution(_) => {
309 metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
310 }
311 SignedEntityType::CardanoTransactions(_, _) => {
312 metrics.get_artifact_cardano_transaction_total_produced_since_startup()
313 }
314 SignedEntityType::CardanoDatabase(_) => {
315 metrics.get_artifact_cardano_database_total_produced_since_startup()
316 }
317 };
318
319 metric_counter.increment();
320 }
321}
322
323#[async_trait]
324impl SignedEntityService for MithrilSignedEntityService {
325 async fn create_artifact(
326 &self,
327 signed_entity_type: SignedEntityType,
328 certificate: &Certificate,
329 ) -> StdResult<JoinHandle<StdResult<()>>> {
330 if self.signed_entity_type_lock.is_locked(&signed_entity_type).await {
331 return Err(anyhow!(
332 "Signed entity type '{:?}' is already locked",
333 signed_entity_type
334 ));
335 }
336
337 let service = self.clone();
338 let certificate_cloned = certificate.clone();
339 service.signed_entity_type_lock.lock(&signed_entity_type).await;
340
341 Ok(tokio::task::spawn(async move {
342 let signed_entity_type_clone = signed_entity_type.clone();
343 let service_clone = service.clone();
344 let result = tokio::task::spawn(async move {
345 service_clone
346 .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
347 .await
348 })
349 .await;
350 service
351 .signed_entity_type_lock
352 .release(signed_entity_type.clone())
353 .await;
354
355 result.with_context(|| format!(
356 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
357 ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
358 }))
359 }
360
361 async fn get_last_signed_snapshots(
362 &self,
363 total: usize,
364 ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
365 let signed_entities = self
366 .get_last_signed_entities(
367 total,
368 &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
369 )
370 .await?
371 .into_iter()
372 .map(|record| record.try_into())
373 .collect::<Result<Vec<_>, _>>()?;
374
375 Ok(signed_entities)
376 }
377
378 async fn get_signed_snapshot_by_id(
379 &self,
380 signed_entity_id: &str,
381 ) -> StdResult<Option<SignedEntity<Snapshot>>> {
382 let entity: Option<SignedEntity<Snapshot>> = match self
383 .signed_entity_storer
384 .get_signed_entity(signed_entity_id)
385 .await
386 .with_context(|| {
387 format!(
388 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
389 )
390 })? {
391 Some(entity) => Some(entity.try_into()?),
392 None => None,
393 };
394
395 Ok(entity)
396 }
397
398 async fn get_last_signed_cardano_database_snapshots(
399 &self,
400 total: usize,
401 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
402 let signed_entities = self
403 .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
404 .await?
405 .into_iter()
406 .map(|record| record.try_into())
407 .collect::<Result<Vec<_>, _>>()?;
408
409 Ok(signed_entities)
410 }
411
412 async fn get_signed_cardano_database_snapshot_by_id(
413 &self,
414 signed_entity_id: &str,
415 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
416 let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
417 .signed_entity_storer
418 .get_signed_entity(signed_entity_id)
419 .await
420 .with_context(|| {
421 format!(
422 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
423 )
424 })? {
425 Some(entity) => Some(entity.try_into()?),
426 None => None,
427 };
428
429 Ok(entity)
430 }
431
432 async fn get_last_signed_mithril_stake_distributions(
433 &self,
434 total: usize,
435 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
436 let signed_entities = self
437 .get_last_signed_entities(
438 total,
439 &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
440 )
441 .await?
442 .into_iter()
443 .map(|record| record.try_into())
444 .collect::<Result<Vec<_>, _>>()?;
445
446 Ok(signed_entities)
447 }
448
449 async fn get_signed_mithril_stake_distribution_by_id(
450 &self,
451 signed_entity_id: &str,
452 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
453 let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
454 .signed_entity_storer
455 .get_signed_entity(signed_entity_id)
456 .await
457 .with_context(|| {
458 format!(
459 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
460 )
461 })? {
462 Some(entity) => Some(entity.try_into()?),
463 None => None,
464 };
465
466 Ok(entity)
467 }
468
469 async fn get_last_cardano_transaction_snapshot(
470 &self,
471 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
472 let mut signed_entities_records = self
473 .get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
474 .await?;
475
476 match signed_entities_records.pop() {
477 Some(record) => Ok(Some(record.try_into()?)),
478 None => Ok(None),
479 }
480 }
481
482 async fn get_last_signed_cardano_stake_distributions(
483 &self,
484 total: usize,
485 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
486 let signed_entities = self
487 .get_last_signed_entities(
488 total,
489 &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
490 )
491 .await?
492 .into_iter()
493 .map(|record| record.try_into())
494 .collect::<Result<Vec<_>, _>>()?;
495
496 Ok(signed_entities)
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use std::{sync::atomic::Ordering, time::Duration};
503
504 use mithril_common::{
505 entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
506 signable_builder,
507 test::double::fake_data,
508 };
509 use mithril_metric::CounterValue;
510 use serde::{Serialize, de::DeserializeOwned};
511 use std::sync::atomic::AtomicBool;
512
513 use crate::artifact_builder::MockArtifactBuilder;
514 use crate::database::repository::MockSignedEntityStorer;
515 use crate::test::TestLogger;
516
517 use super::*;
518
519 fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
520 MithrilStakeDistribution::new(
521 epoch,
522 fake_data::signers_with_stakes(signers),
523 &fake_data::protocol_parameters(),
524 )
525 }
526
527 fn create_cardano_stake_distribution(
528 epoch: Epoch,
529 stake_distribution: StakeDistribution,
530 ) -> CardanoStakeDistribution {
531 CardanoStakeDistribution::new(epoch, stake_distribution)
532 }
533
534 fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
535 where
536 T: Serialize + DeserializeOwned,
537 {
538 let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
539 assert_eq!(
540 serde_json::to_string(&expected).unwrap(),
541 serde_json::to_string(¤t).unwrap()
542 );
543 }
544
545 struct MockDependencyInjector {
547 mock_signed_entity_storer: MockSignedEntityStorer,
548 mock_mithril_stake_distribution_artifact_builder:
549 MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
550 mock_cardano_immutable_files_full_artifact_builder:
551 MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
552 mock_cardano_transactions_artifact_builder:
553 MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
554 mock_cardano_stake_distribution_artifact_builder:
555 MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
556 mock_cardano_database_artifact_builder:
557 MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
558 }
559
560 impl MockDependencyInjector {
561 fn new() -> MockDependencyInjector {
562 MockDependencyInjector {
563 mock_signed_entity_storer: MockSignedEntityStorer::new(),
564 mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
565 Epoch,
566 MithrilStakeDistribution,
567 >::new(),
568 mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
569 CardanoDbBeacon,
570 Snapshot,
571 >::new(),
572 mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
573 BlockNumber,
574 CardanoTransactionsSnapshot,
575 >::new(),
576 mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
577 Epoch,
578 CardanoStakeDistribution,
579 >::new(),
580 mock_cardano_database_artifact_builder: MockArtifactBuilder::<
581 CardanoDbBeacon,
582 CardanoDatabaseSnapshot,
583 >::new(),
584 }
585 }
586
587 fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
588 let dependencies = SignedEntityServiceArtifactsDependencies::new(
589 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
590 Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
591 Arc::new(self.mock_cardano_transactions_artifact_builder),
592 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
593 Arc::new(self.mock_cardano_database_artifact_builder),
594 );
595 MithrilSignedEntityService::new(
596 Arc::new(self.mock_signed_entity_storer),
597 dependencies,
598 Arc::new(SignedEntityTypeLock::default()),
599 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
600 TestLogger::stdout(),
601 )
602 }
603
604 fn build_artifact_builder_service_with_time_consuming_process(
605 mut self,
606 atomic_stop: Arc<AtomicBool>,
607 ) -> MithrilSignedEntityService {
608 struct LongArtifactBuilder {
609 atomic_stop: Arc<AtomicBool>,
610 snapshot: Snapshot,
611 }
612
613 let snapshot = fake_data::snapshot(1);
614
615 #[async_trait]
616 impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
617 async fn compute_artifact(
618 &self,
619 _beacon: CardanoDbBeacon,
620 _certificate: &Certificate,
621 ) -> StdResult<Snapshot> {
622 let mut max_iteration = 100;
623 while !self.atomic_stop.load(Ordering::Relaxed) {
624 max_iteration -= 1;
625 if max_iteration <= 0 {
626 return Err(anyhow!("Test should handle the stop"));
627 }
628 tokio::time::sleep(Duration::from_millis(10)).await;
629 }
630 Ok(self.snapshot.clone())
631 }
632 }
633 let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
634 atomic_stop: atomic_stop.clone(),
635 snapshot: snapshot.clone(),
636 };
637
638 let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
639 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
640 self.mock_signed_entity_storer
641 .expect_store_signed_entity()
642 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
643 .return_once(|_| Ok(()));
644
645 let dependencies = SignedEntityServiceArtifactsDependencies::new(
646 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
647 Arc::new(cardano_immutable_files_full_long_artifact_builder),
648 Arc::new(self.mock_cardano_transactions_artifact_builder),
649 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
650 Arc::new(self.mock_cardano_database_artifact_builder),
651 );
652 MithrilSignedEntityService::new(
653 Arc::new(self.mock_signed_entity_storer),
654 dependencies,
655 Arc::new(SignedEntityTypeLock::default()),
656 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
657 TestLogger::stdout(),
658 )
659 }
660
661 fn mock_artifact_processing<
662 T: Artifact + Clone + Serialize + 'static,
663 U: signable_builder::Beacon,
664 >(
665 &mut self,
666 artifact: T,
667 mock_that_provide_artifact: &dyn Fn(
668 &mut MockDependencyInjector,
669 ) -> &mut MockArtifactBuilder<U, T>,
670 ) {
671 {
672 let artifact_cloned = artifact.clone();
673 mock_that_provide_artifact(self)
674 .expect_compute_artifact()
675 .times(1)
676 .return_once(|_, _| Ok(artifact_cloned));
677 }
678 {
679 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
680 let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
681 self.mock_signed_entity_storer
682 .expect_store_signed_entity()
683 .withf(move |signed_entity| signed_entity.artifact == artifact_json)
684 .return_once(|_| Ok(()));
685 }
686 }
687
688 fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
689 self.mock_artifact_processing(artifact, &|mock_injector| {
690 &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
691 });
692 }
693 }
694
695 fn get_artifact_total_produced_metric_since_startup_counter_value(
696 metrics_service: Arc<MetricsService>,
697 signed_entity_type: &SignedEntityType,
698 ) -> CounterValue {
699 match signed_entity_type {
700 SignedEntityType::MithrilStakeDistribution(_) => metrics_service
701 .get_artifact_mithril_stake_distribution_total_produced_since_startup()
702 .get(),
703 SignedEntityType::CardanoImmutableFilesFull(_) => metrics_service
704 .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
705 .get(),
706 SignedEntityType::CardanoStakeDistribution(_) => metrics_service
707 .get_artifact_cardano_stake_distribution_total_produced_since_startup()
708 .get(),
709 SignedEntityType::CardanoTransactions(_, _) => metrics_service
710 .get_artifact_cardano_transaction_total_produced_since_startup()
711 .get(),
712 SignedEntityType::CardanoDatabase(_) => metrics_service
713 .get_artifact_cardano_database_total_produced_since_startup()
714 .get(),
715 }
716 }
717
718 #[tokio::test]
719 async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type()
720 {
721 let mut mock_container = MockDependencyInjector::new();
722
723 let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
724
725 mock_container
726 .mock_mithril_stake_distribution_artifact_builder
727 .expect_compute_artifact()
728 .times(1)
729 .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
730
731 let artifact_builder_service = mock_container.build_artifact_builder_service();
732
733 let certificate = fake_data::certificate("hash".to_string());
734 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
735 let artifact = artifact_builder_service
736 .compute_artifact(signed_entity_type.clone(), &certificate)
737 .await
738 .unwrap();
739
740 assert_expected(&mithril_stake_distribution_expected, &artifact);
741 }
742
743 #[tokio::test]
744 async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
745 generic_test_that_the_artifact_is_stored(
746 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
747 create_stake_distribution(Epoch(1), 5),
748 &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
749 )
750 .await;
751 }
752
753 #[tokio::test]
754 async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type()
755 {
756 let mut mock_container = MockDependencyInjector::new();
757
758 let cardano_stake_distribution_expected = create_cardano_stake_distribution(
759 Epoch(1),
760 StakeDistribution::from([("pool-1".to_string(), 100)]),
761 );
762
763 mock_container
764 .mock_cardano_stake_distribution_artifact_builder
765 .expect_compute_artifact()
766 .times(1)
767 .returning(|_, _| {
768 Ok(create_cardano_stake_distribution(
769 Epoch(1),
770 StakeDistribution::from([("pool-1".to_string(), 100)]),
771 ))
772 });
773
774 let artifact_builder_service = mock_container.build_artifact_builder_service();
775
776 let certificate = fake_data::certificate("hash".to_string());
777 let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
778 let artifact = artifact_builder_service
779 .compute_artifact(signed_entity_type.clone(), &certificate)
780 .await
781 .unwrap();
782
783 assert_expected(&cardano_stake_distribution_expected, &artifact);
784 }
785
786 #[tokio::test]
787 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
788 generic_test_that_the_artifact_is_stored(
789 SignedEntityType::CardanoStakeDistribution(Epoch(1)),
790 create_cardano_stake_distribution(
791 Epoch(1),
792 StakeDistribution::from([("pool-1".to_string(), 100)]),
793 ),
794 &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
795 )
796 .await;
797 }
798
799 #[tokio::test]
800 async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
801 let mut mock_container = MockDependencyInjector::new();
802
803 let snapshot_expected = fake_data::snapshot(1);
804
805 mock_container
806 .mock_cardano_immutable_files_full_artifact_builder
807 .expect_compute_artifact()
808 .times(1)
809 .returning(|_, _| Ok(fake_data::snapshot(1)));
810
811 let artifact_builder_service = mock_container.build_artifact_builder_service();
812
813 let certificate = fake_data::certificate("hash".to_string());
814 let signed_entity_type =
815 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
816 let artifact = artifact_builder_service
817 .compute_artifact(signed_entity_type.clone(), &certificate)
818 .await
819 .unwrap();
820
821 assert_expected(&snapshot_expected, &artifact);
822 }
823
824 #[tokio::test]
825 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
826 generic_test_that_the_artifact_is_stored(
827 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
828 fake_data::snapshot(1),
829 &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
830 )
831 .await;
832 }
833
834 #[tokio::test]
835 async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
836 let mut mock_container = MockDependencyInjector::new();
837
838 let block_number = BlockNumber(151);
839 let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
840
841 mock_container
842 .mock_cardano_transactions_artifact_builder
843 .expect_compute_artifact()
844 .times(1)
845 .returning(move |_, _| {
846 Ok(CardanoTransactionsSnapshot::new(
847 "merkle_root".to_string(),
848 block_number,
849 ))
850 });
851
852 let artifact_builder_service = mock_container.build_artifact_builder_service();
853
854 let certificate = fake_data::certificate("hash".to_string());
855 let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
856 let artifact = artifact_builder_service
857 .compute_artifact(signed_entity_type.clone(), &certificate)
858 .await
859 .unwrap();
860
861 assert_expected(&expected, &artifact);
862 }
863
864 #[tokio::test]
865 async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
866 let block_number = BlockNumber(149);
867 generic_test_that_the_artifact_is_stored(
868 SignedEntityType::CardanoTransactions(Epoch(1), block_number),
869 CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
870 &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
871 )
872 .await;
873 }
874
875 #[tokio::test]
876 async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
877 let mut mock_container = MockDependencyInjector::new();
878
879 let cardano_database_expected = fake_data::cardano_database_snapshot(1);
880
881 mock_container
882 .mock_cardano_database_artifact_builder
883 .expect_compute_artifact()
884 .times(1)
885 .returning(|_, _| Ok(fake_data::cardano_database_snapshot(1)));
886
887 let artifact_builder_service = mock_container.build_artifact_builder_service();
888
889 let certificate = fake_data::certificate("hash".to_string());
890 let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
891 let artifact = artifact_builder_service
892 .compute_artifact(signed_entity_type.clone(), &certificate)
893 .await
894 .unwrap();
895
896 assert_expected(&cardano_database_expected, &artifact);
897 }
898
899 #[tokio::test]
900 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
901 generic_test_that_the_artifact_is_stored(
902 SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
903 fake_data::cardano_database_snapshot(1),
904 &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
905 )
906 .await;
907 }
908
909 async fn generic_test_that_the_artifact_is_stored<
910 T: Artifact + Clone + Serialize + 'static,
911 U: signable_builder::Beacon,
912 >(
913 signed_entity_type: SignedEntityType,
914 artifact: T,
915 mock_that_provide_artifact: &dyn Fn(
916 &mut MockDependencyInjector,
917 ) -> &mut MockArtifactBuilder<U, T>,
918 ) {
919 let mut mock_container = MockDependencyInjector::new();
920 {
921 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
922 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
923 mock_container
924 .mock_signed_entity_storer
925 .expect_store_signed_entity()
926 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
927 .return_once(|_| Ok(()));
928 }
929 {
930 let artifact_cloned = artifact.clone();
931 mock_that_provide_artifact(&mut mock_container)
932 .expect_compute_artifact()
933 .times(1)
934 .return_once(|_, _| Ok(artifact_cloned));
935 }
936 let artifact_builder_service = mock_container.build_artifact_builder_service();
937
938 let certificate = fake_data::certificate("hash".to_string());
939 let error_message = format!(
940 "Create artifact should not fail for {} signed entity",
941 std::any::type_name::<T>()
942 );
943 let error_message_str = error_message.as_str();
944
945 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
946 artifact_builder_service.metrics_service.clone(),
947 &signed_entity_type,
948 );
949
950 artifact_builder_service
951 .create_artifact_task(signed_entity_type.clone(), &certificate)
952 .await
953 .expect(error_message_str);
954
955 assert_eq!(
956 initial_counter_value + 1,
957 get_artifact_total_produced_metric_since_startup_counter_value(
958 artifact_builder_service.metrics_service.clone(),
959 &signed_entity_type,
960 )
961 )
962 }
963
964 #[tokio::test]
965 async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
966 let atomic_stop = Arc::new(AtomicBool::new(false));
967 let signed_entity_type_service = {
968 let mut mock_container = MockDependencyInjector::new();
969
970 let msd = create_stake_distribution(Epoch(1), 5);
971 mock_container.mock_stake_distribution_processing(msd);
972
973 mock_container
974 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
975 };
976 let certificate = fake_data::certificate("hash".to_string());
977
978 let signed_entity_type_immutable =
979 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
980 let first_task_that_never_finished = signed_entity_type_service
981 .create_artifact(signed_entity_type_immutable, &certificate)
982 .await
983 .unwrap();
984
985 let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
986 let second_task_that_finish_first = signed_entity_type_service
987 .create_artifact(signed_entity_type_msd, &certificate)
988 .await
989 .unwrap();
990
991 second_task_that_finish_first.await.unwrap().unwrap();
992 assert!(!first_task_that_never_finished.is_finished());
993
994 atomic_stop.swap(true, Ordering::Relaxed);
995 }
996
997 #[tokio::test]
998 async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
999 let atomic_stop = Arc::new(AtomicBool::new(false));
1000 let signed_entity_type_service = MockDependencyInjector::new()
1001 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1002 let certificate = fake_data::certificate("hash".to_string());
1003
1004 let signed_entity_type_immutable =
1005 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1006 assert!(
1007 !signed_entity_type_service
1008 .signed_entity_type_lock
1009 .is_locked(&signed_entity_type_immutable)
1010 .await
1011 );
1012 let join_handle = signed_entity_type_service
1013 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1014 .await
1015 .unwrap();
1016
1017 let is_locked = signed_entity_type_service
1020 .signed_entity_type_lock
1021 .is_locked(&signed_entity_type_immutable)
1022 .await;
1023 let is_finished = join_handle.is_finished();
1024
1025 atomic_stop.swap(true, Ordering::Relaxed);
1026 join_handle.await.unwrap().unwrap();
1027
1028 assert!(is_locked);
1029 assert!(!is_finished);
1030
1031 assert!(
1032 !signed_entity_type_service
1033 .signed_entity_type_lock
1034 .is_locked(&signed_entity_type_immutable)
1035 .await
1036 );
1037 }
1038
1039 #[tokio::test]
1040 async fn create_artifact_unlock_signed_entity_type_when_error() {
1041 let signed_entity_type_service = {
1042 let mut mock_container = MockDependencyInjector::new();
1043 mock_container
1044 .mock_cardano_immutable_files_full_artifact_builder
1045 .expect_compute_artifact()
1046 .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1047
1048 mock_container.build_artifact_builder_service()
1049 };
1050 let certificate = fake_data::certificate("hash".to_string());
1051
1052 let signed_entity_type_immutable =
1053 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1054
1055 let join_handle = signed_entity_type_service
1056 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1057 .await
1058 .unwrap();
1059
1060 let error = join_handle.await.unwrap().unwrap_err();
1061 assert!(
1062 error.to_string().contains("CardanoImmutableFilesFull"),
1063 "Error should contains CardanoImmutableFilesFull but was: {error}"
1064 );
1065
1066 assert!(
1067 !signed_entity_type_service
1068 .signed_entity_type_lock
1069 .is_locked(&signed_entity_type_immutable)
1070 .await
1071 );
1072 }
1073
1074 #[tokio::test]
1075 async fn create_artifact_unlock_signed_entity_type_when_panic() {
1076 let signed_entity_type_service =
1077 MockDependencyInjector::new().build_artifact_builder_service();
1078 let certificate = fake_data::certificate("hash".to_string());
1079
1080 let signed_entity_type_immutable =
1081 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1082
1083 let join_handle = signed_entity_type_service
1084 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1085 .await
1086 .unwrap();
1087
1088 let error = join_handle.await.unwrap().unwrap_err();
1089 assert!(
1090 error.to_string().contains("CardanoImmutableFilesFull"),
1091 "Error should contains CardanoImmutableFilesFull but was: {error}"
1092 );
1093
1094 assert!(
1095 !signed_entity_type_service
1096 .signed_entity_type_lock
1097 .is_locked(&signed_entity_type_immutable)
1098 .await
1099 );
1100 }
1101
1102 #[tokio::test]
1103 async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1104 let atomic_stop = Arc::new(AtomicBool::new(false));
1105 let signed_entity_service = MockDependencyInjector::new()
1106 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1107 let certificate = fake_data::certificate("hash".to_string());
1108 let signed_entity_type_immutable =
1109 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1110
1111 signed_entity_service
1112 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1113 .await
1114 .unwrap();
1115
1116 signed_entity_service
1117 .create_artifact(signed_entity_type_immutable, &certificate)
1118 .await
1119 .expect_err("Should return error when signed entity type is already locked");
1120
1121 atomic_stop.swap(true, Ordering::Relaxed);
1122 }
1123
1124 #[tokio::test]
1125 async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1126 let signed_entity_service = {
1127 let mut mock_container = MockDependencyInjector::new();
1128 mock_container
1129 .mock_cardano_immutable_files_full_artifact_builder
1130 .expect_compute_artifact()
1131 .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1132
1133 mock_container.build_artifact_builder_service()
1134 };
1135
1136 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1137
1138 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1139 signed_entity_service.metrics_service.clone(),
1140 &signed_entity_type,
1141 );
1142
1143 signed_entity_service
1144 .create_artifact(
1145 signed_entity_type.clone(),
1146 &fake_data::certificate("hash".to_string()),
1147 )
1148 .await
1149 .unwrap();
1150
1151 assert_eq!(
1152 initial_counter_value,
1153 get_artifact_total_produced_metric_since_startup_counter_value(
1154 signed_entity_service.metrics_service.clone(),
1155 &signed_entity_type,
1156 )
1157 );
1158 }
1159
1160 #[tokio::test]
1161 async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1162 let signed_entity_service = {
1163 let mut mock_container = MockDependencyInjector::new();
1164 mock_container
1165 .mock_signed_entity_storer
1166 .expect_store_signed_entity()
1167 .returning(|_| Err(anyhow!("Error while storing signed entity")));
1168
1169 mock_container.build_artifact_builder_service()
1170 };
1171
1172 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1173
1174 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1175 signed_entity_service.metrics_service.clone(),
1176 &signed_entity_type,
1177 );
1178
1179 signed_entity_service
1180 .create_artifact(
1181 signed_entity_type.clone(),
1182 &fake_data::certificate("hash".to_string()),
1183 )
1184 .await
1185 .unwrap();
1186
1187 assert_eq!(
1188 initial_counter_value,
1189 get_artifact_total_produced_metric_since_startup_counter_value(
1190 signed_entity_service.metrics_service.clone(),
1191 &signed_entity_type,
1192 )
1193 );
1194 }
1195}