1use anyhow::{Context, anyhow};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{Logger, info, warn};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13 StdResult,
14 entities::{
15 BlockNumber, CardanoBlocksTransactionsSnapshot, CardanoDatabaseSnapshot, CardanoDbBeacon,
16 CardanoStakeDistribution, CardanoTransactionsSnapshot, Certificate, Epoch,
17 MithrilStakeDistribution, SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
18 },
19 logging::LoggerExtensions,
20 signable_builder::{Artifact, SignedEntity},
21};
22
23use crate::{
24 MetricsService,
25 artifact_builder::ArtifactBuilder,
26 database::{record::SignedEntityRecord, repository::SignedEntityStorer},
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34 async fn create_artifact(
36 &self,
37 signed_entity_type: SignedEntityType,
38 certificate: &Certificate,
39 ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41 async fn get_last_signed_snapshots(
43 &self,
44 total: usize,
45 ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47 async fn get_signed_snapshot_by_id(
49 &self,
50 signed_entity_id: &str,
51 ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53 async fn get_last_signed_cardano_database_snapshots(
55 &self,
56 total: usize,
57 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59 async fn get_signed_cardano_database_snapshot_by_id(
61 &self,
62 signed_entity_id: &str,
63 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65 async fn get_last_signed_mithril_stake_distributions(
68 &self,
69 total: usize,
70 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72 async fn get_signed_mithril_stake_distribution_by_id(
74 &self,
75 signed_entity_id: &str,
76 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78 async fn get_last_cardano_transaction_snapshot(
80 &self,
81 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83 async fn get_last_cardano_blocks_transactions_snapshot(
85 &self,
86 ) -> StdResult<Option<SignedEntity<CardanoBlocksTransactionsSnapshot>>>;
87
88 async fn get_last_signed_cardano_stake_distributions(
91 &self,
92 total: usize,
93 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
94}
95
96#[derive(Clone)]
98pub struct MithrilSignedEntityService {
99 signed_entity_storer: Arc<dyn SignedEntityStorer>,
100 mithril_stake_distribution_artifact_builder:
101 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
102 cardano_immutable_files_full_artifact_builder:
103 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
104 cardano_transactions_artifact_builder:
105 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
106 cardano_blocks_transactions_artifact_builder:
107 Arc<dyn ArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>>,
108 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
109 cardano_stake_distribution_artifact_builder:
110 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
111 cardano_database_artifact_builder:
112 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
113 metrics_service: Arc<MetricsService>,
114 logger: Logger,
115}
116
117pub struct SignedEntityServiceArtifactsDependencies {
119 mithril_stake_distribution_artifact_builder:
120 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
121 cardano_immutable_files_full_artifact_builder:
122 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
123 cardano_transactions_artifact_builder:
124 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
125 cardano_blocks_transactions_artifact_builder:
126 Arc<dyn ArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>>,
127 cardano_stake_distribution_artifact_builder:
128 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
129 cardano_database_artifact_builder:
130 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
131}
132
133impl SignedEntityServiceArtifactsDependencies {
134 pub fn new(
136 mithril_stake_distribution_artifact_builder: Arc<
137 dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
138 >,
139 cardano_immutable_files_full_artifact_builder: Arc<
140 dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
141 >,
142 cardano_transactions_artifact_builder: Arc<
143 dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
144 >,
145 cardano_blocks_transactions_artifact_builder: Arc<
146 dyn ArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>,
147 >,
148 cardano_stake_distribution_artifact_builder: Arc<
149 dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
150 >,
151 cardano_database_artifact_builder: Arc<
152 dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
153 >,
154 ) -> Self {
155 Self {
156 mithril_stake_distribution_artifact_builder,
157 cardano_immutable_files_full_artifact_builder,
158 cardano_transactions_artifact_builder,
159 cardano_blocks_transactions_artifact_builder,
160 cardano_stake_distribution_artifact_builder,
161 cardano_database_artifact_builder,
162 }
163 }
164}
165
166impl MithrilSignedEntityService {
167 pub fn new(
169 signed_entity_storer: Arc<dyn SignedEntityStorer>,
170 dependencies: SignedEntityServiceArtifactsDependencies,
171 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
172 metrics_service: Arc<MetricsService>,
173 logger: Logger,
174 ) -> Self {
175 Self {
176 signed_entity_storer,
177 mithril_stake_distribution_artifact_builder: dependencies
178 .mithril_stake_distribution_artifact_builder,
179 cardano_immutable_files_full_artifact_builder: dependencies
180 .cardano_immutable_files_full_artifact_builder,
181 cardano_transactions_artifact_builder: dependencies
182 .cardano_transactions_artifact_builder,
183 cardano_blocks_transactions_artifact_builder: dependencies
184 .cardano_blocks_transactions_artifact_builder,
185 cardano_stake_distribution_artifact_builder: dependencies
186 .cardano_stake_distribution_artifact_builder,
187 cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
188 signed_entity_type_lock,
189 metrics_service,
190 logger: logger.new_with_component_name::<Self>(),
191 }
192 }
193
194 async fn create_artifact_task(
195 &self,
196 signed_entity_type: SignedEntityType,
197 certificate: &Certificate,
198 ) -> StdResult<()> {
199 info!(
200 self.logger, ">> create_artifact_task";
201 "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
202 );
203
204 let mut remaining_retries = 2;
205 let artifact = loop {
206 remaining_retries -= 1;
207
208 match self.compute_artifact(signed_entity_type.clone(), certificate).await {
209 Err(error) if remaining_retries == 0 => break Err(error),
210 Err(_error) => (),
211 Ok(artifact) => break Ok(artifact),
212 };
213 }?;
214
215 let signed_entity = SignedEntityRecord {
216 signed_entity_id: artifact.get_id(),
217 signed_entity_type: signed_entity_type.clone(),
218 certificate_id: certificate.hash.clone(),
219 artifact: serde_json::to_string(&artifact)?,
220 created_at: Utc::now(),
221 };
222
223 self.signed_entity_storer
224 .store_signed_entity(&signed_entity)
225 .await
226 .with_context(|| {
227 format!(
228 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
229 )
230 })?;
231
232 self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
233
234 Ok(())
235 }
236
237 async fn compute_artifact(
239 &self,
240 signed_entity_type: SignedEntityType,
241 certificate: &Certificate,
242 ) -> StdResult<Arc<dyn Artifact>> {
243 match signed_entity_type.clone() {
244 SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
245 self.mithril_stake_distribution_artifact_builder
246 .compute_artifact(epoch, certificate)
247 .await
248 .with_context(|| {
249 format!(
250 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
251 )
252 })?,
253 )),
254 SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
255 self.cardano_immutable_files_full_artifact_builder
256 .compute_artifact(beacon.clone(), certificate)
257 .await
258 .with_context(|| {
259 format!(
260 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
261 )
262 })?,
263 )),
264 SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
265 self.cardano_stake_distribution_artifact_builder
266 .compute_artifact(epoch, certificate)
267 .await
268 .with_context(|| {
269 format!(
270 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
271 )
272 })?,
273 )),
274 SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
275 self.cardano_transactions_artifact_builder
276 .compute_artifact(block_number, certificate)
277 .await
278 .with_context(|| {
279 format!(
280 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
281 )
282 })?,
283 )),
284 SignedEntityType::CardanoBlocksTransactions(_epoch, block_number) => Ok(Arc::new(
285 self.cardano_blocks_transactions_artifact_builder
286 .compute_artifact(block_number, certificate)
287 .await
288 .with_context(|| {
289 format!(
290 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
291 )
292 })?,
293 )),
294 SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
295 self.cardano_database_artifact_builder
296 .compute_artifact(beacon, certificate)
297 .await
298 .with_context(|| {
299 format!(
300 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
301 )
302 })?,
303 )),
304 }
305 }
306
307 async fn get_last_signed_entities(
308 &self,
309 total: usize,
310 discriminants: &SignedEntityTypeDiscriminants,
311 ) -> StdResult<Vec<SignedEntityRecord>> {
312 self.signed_entity_storer
313 .get_last_signed_entities_by_type(discriminants, total)
314 .await
315 .with_context(|| {
316 format!(
317 "Signed Entity Service can not get last signed entities with type: '{discriminants:?}'"
318 )
319 })
320 }
321
322 fn increment_artifact_total_produced_metric_since_startup(
323 &self,
324 signed_entity_type: SignedEntityType,
325 ) {
326 let metrics = self.metrics_service.clone();
327 let metric_counter = match signed_entity_type {
328 SignedEntityType::MithrilStakeDistribution(..) => {
329 metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
330 }
331 SignedEntityType::CardanoImmutableFilesFull(..) => {
332 metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
333 }
334 SignedEntityType::CardanoStakeDistribution(..) => {
335 metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
336 }
337 SignedEntityType::CardanoTransactions(..) => {
338 metrics.get_artifact_cardano_transaction_total_produced_since_startup()
339 }
340 SignedEntityType::CardanoBlocksTransactions(..) => {
341 metrics.get_artifact_cardano_blocks_transactions_total_produced_since_startup()
342 }
343 SignedEntityType::CardanoDatabase(..) => {
344 metrics.get_artifact_cardano_database_total_produced_since_startup()
345 }
346 };
347
348 metric_counter.increment();
349 }
350}
351
352#[async_trait]
353impl SignedEntityService for MithrilSignedEntityService {
354 async fn create_artifact(
355 &self,
356 signed_entity_type: SignedEntityType,
357 certificate: &Certificate,
358 ) -> StdResult<JoinHandle<StdResult<()>>> {
359 if self.signed_entity_type_lock.is_locked(&signed_entity_type).await {
360 return Err(anyhow!(
361 "Signed entity type '{:?}' is already locked",
362 signed_entity_type
363 ));
364 }
365
366 let service = self.clone();
367 let certificate_cloned = certificate.clone();
368 service.signed_entity_type_lock.lock(&signed_entity_type).await;
369
370 Ok(tokio::task::spawn(async move {
371 let signed_entity_type_clone = signed_entity_type.clone();
372 let service_clone = service.clone();
373 let result = tokio::task::spawn(async move {
374 service_clone
375 .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
376 .await
377 })
378 .await;
379 service
380 .signed_entity_type_lock
381 .release(signed_entity_type.clone())
382 .await;
383
384 result.with_context(|| format!(
385 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
386 ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
387 }))
388 }
389
390 async fn get_last_signed_snapshots(
391 &self,
392 total: usize,
393 ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
394 let signed_entities = self
395 .get_last_signed_entities(
396 total,
397 &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
398 )
399 .await?
400 .into_iter()
401 .map(|record| record.try_into())
402 .collect::<Result<Vec<_>, _>>()?;
403
404 Ok(signed_entities)
405 }
406
407 async fn get_signed_snapshot_by_id(
408 &self,
409 signed_entity_id: &str,
410 ) -> StdResult<Option<SignedEntity<Snapshot>>> {
411 let entity: Option<SignedEntity<Snapshot>> = match self
412 .signed_entity_storer
413 .get_signed_entity(signed_entity_id)
414 .await
415 .with_context(|| {
416 format!(
417 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
418 )
419 })? {
420 Some(entity) => Some(entity.try_into()?),
421 None => None,
422 };
423
424 Ok(entity)
425 }
426
427 async fn get_last_signed_cardano_database_snapshots(
428 &self,
429 total: usize,
430 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
431 let signed_entities = self
432 .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
433 .await?
434 .into_iter()
435 .map(|record| record.try_into())
436 .collect::<Result<Vec<_>, _>>()?;
437
438 Ok(signed_entities)
439 }
440
441 async fn get_signed_cardano_database_snapshot_by_id(
442 &self,
443 signed_entity_id: &str,
444 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
445 let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
446 .signed_entity_storer
447 .get_signed_entity(signed_entity_id)
448 .await
449 .with_context(|| {
450 format!(
451 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
452 )
453 })? {
454 Some(entity) => Some(entity.try_into()?),
455 None => None,
456 };
457
458 Ok(entity)
459 }
460
461 async fn get_last_signed_mithril_stake_distributions(
462 &self,
463 total: usize,
464 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
465 let signed_entities = self
466 .get_last_signed_entities(
467 total,
468 &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
469 )
470 .await?
471 .into_iter()
472 .map(|record| record.try_into())
473 .collect::<Result<Vec<_>, _>>()?;
474
475 Ok(signed_entities)
476 }
477
478 async fn get_signed_mithril_stake_distribution_by_id(
479 &self,
480 signed_entity_id: &str,
481 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
482 let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
483 .signed_entity_storer
484 .get_signed_entity(signed_entity_id)
485 .await
486 .with_context(|| {
487 format!(
488 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
489 )
490 })? {
491 Some(entity) => Some(entity.try_into()?),
492 None => None,
493 };
494
495 Ok(entity)
496 }
497
498 async fn get_last_cardano_transaction_snapshot(
499 &self,
500 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
501 self.get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
502 .await?
503 .pop()
504 .map(|r| r.try_into())
505 .transpose()
506 }
507
508 async fn get_last_cardano_blocks_transactions_snapshot(
509 &self,
510 ) -> StdResult<Option<SignedEntity<CardanoBlocksTransactionsSnapshot>>> {
511 self.get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoBlocksTransactions)
512 .await?
513 .pop()
514 .map(|r| r.try_into())
515 .transpose()
516 }
517
518 async fn get_last_signed_cardano_stake_distributions(
519 &self,
520 total: usize,
521 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
522 let signed_entities = self
523 .get_last_signed_entities(
524 total,
525 &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
526 )
527 .await?
528 .into_iter()
529 .map(|record| record.try_into())
530 .collect::<Result<Vec<_>, _>>()?;
531
532 Ok(signed_entities)
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use std::{sync::atomic::Ordering, time::Duration};
539
540 use mithril_common::{
541 entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
542 signable_builder,
543 test::double::fake_data,
544 };
545 use mithril_metric::CounterValue;
546 use serde::{Serialize, de::DeserializeOwned};
547 use std::sync::atomic::AtomicBool;
548
549 use crate::artifact_builder::MockArtifactBuilder;
550 use crate::database::repository::MockSignedEntityStorer;
551 use crate::test::TestLogger;
552
553 use super::*;
554
555 fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
556 MithrilStakeDistribution::new(
557 epoch,
558 fake_data::signers_with_stakes(signers),
559 &fake_data::protocol_parameters(),
560 )
561 }
562
563 fn create_cardano_stake_distribution(
564 epoch: Epoch,
565 stake_distribution: StakeDistribution,
566 ) -> CardanoStakeDistribution {
567 CardanoStakeDistribution::new(epoch, stake_distribution)
568 }
569
570 fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
571 where
572 T: Serialize + DeserializeOwned,
573 {
574 let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
575 assert_eq!(
576 serde_json::to_string(&expected).unwrap(),
577 serde_json::to_string(¤t).unwrap()
578 );
579 }
580
581 struct MockDependencyInjector {
583 mock_signed_entity_storer: MockSignedEntityStorer,
584 mock_mithril_stake_distribution_artifact_builder:
585 MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
586 mock_cardano_immutable_files_full_artifact_builder:
587 MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
588 mock_cardano_transactions_artifact_builder:
589 MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
590 mock_cardano_blocks_transactions_artifact_builder:
591 MockArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>,
592 mock_cardano_stake_distribution_artifact_builder:
593 MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
594 mock_cardano_database_artifact_builder:
595 MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
596 }
597
598 impl MockDependencyInjector {
599 fn new() -> MockDependencyInjector {
600 MockDependencyInjector {
601 mock_signed_entity_storer: MockSignedEntityStorer::new(),
602 mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
603 Epoch,
604 MithrilStakeDistribution,
605 >::new(),
606 mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
607 CardanoDbBeacon,
608 Snapshot,
609 >::new(),
610 mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
611 BlockNumber,
612 CardanoTransactionsSnapshot,
613 >::new(),
614 mock_cardano_blocks_transactions_artifact_builder: MockArtifactBuilder::<
615 BlockNumber,
616 CardanoBlocksTransactionsSnapshot,
617 >::new(),
618 mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
619 Epoch,
620 CardanoStakeDistribution,
621 >::new(),
622 mock_cardano_database_artifact_builder: MockArtifactBuilder::<
623 CardanoDbBeacon,
624 CardanoDatabaseSnapshot,
625 >::new(),
626 }
627 }
628
629 fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
630 let dependencies = SignedEntityServiceArtifactsDependencies::new(
631 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
632 Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
633 Arc::new(self.mock_cardano_transactions_artifact_builder),
634 Arc::new(self.mock_cardano_blocks_transactions_artifact_builder),
635 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
636 Arc::new(self.mock_cardano_database_artifact_builder),
637 );
638 MithrilSignedEntityService::new(
639 Arc::new(self.mock_signed_entity_storer),
640 dependencies,
641 Arc::new(SignedEntityTypeLock::default()),
642 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
643 TestLogger::stdout(),
644 )
645 }
646
647 fn build_artifact_builder_service_with_time_consuming_process(
648 mut self,
649 atomic_stop: Arc<AtomicBool>,
650 ) -> MithrilSignedEntityService {
651 struct LongArtifactBuilder {
652 atomic_stop: Arc<AtomicBool>,
653 snapshot: Snapshot,
654 }
655
656 let snapshot = fake_data::snapshot(1);
657
658 #[async_trait]
659 impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
660 async fn compute_artifact(
661 &self,
662 _beacon: CardanoDbBeacon,
663 _certificate: &Certificate,
664 ) -> StdResult<Snapshot> {
665 let mut max_iteration = 100;
666 while !self.atomic_stop.load(Ordering::Relaxed) {
667 max_iteration -= 1;
668 if max_iteration <= 0 {
669 return Err(anyhow!("Test should handle the stop"));
670 }
671 tokio::time::sleep(Duration::from_millis(10)).await;
672 }
673 Ok(self.snapshot.clone())
674 }
675 }
676 let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
677 atomic_stop: atomic_stop.clone(),
678 snapshot: snapshot.clone(),
679 };
680
681 let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
682 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
683 self.mock_signed_entity_storer
684 .expect_store_signed_entity()
685 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
686 .return_once(|_| Ok(()));
687
688 let dependencies = SignedEntityServiceArtifactsDependencies::new(
689 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
690 Arc::new(cardano_immutable_files_full_long_artifact_builder),
691 Arc::new(self.mock_cardano_transactions_artifact_builder),
692 Arc::new(self.mock_cardano_blocks_transactions_artifact_builder),
693 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
694 Arc::new(self.mock_cardano_database_artifact_builder),
695 );
696 MithrilSignedEntityService::new(
697 Arc::new(self.mock_signed_entity_storer),
698 dependencies,
699 Arc::new(SignedEntityTypeLock::default()),
700 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
701 TestLogger::stdout(),
702 )
703 }
704
705 fn mock_artifact_processing<
706 T: Artifact + Clone + Serialize + 'static,
707 U: signable_builder::Beacon,
708 >(
709 &mut self,
710 artifact: T,
711 mock_that_provide_artifact: &dyn Fn(
712 &mut MockDependencyInjector,
713 ) -> &mut MockArtifactBuilder<U, T>,
714 ) {
715 {
716 let artifact_cloned = artifact.clone();
717 mock_that_provide_artifact(self)
718 .expect_compute_artifact()
719 .times(1)
720 .return_once(|_, _| Ok(artifact_cloned));
721 }
722 {
723 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
724 let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
725 self.mock_signed_entity_storer
726 .expect_store_signed_entity()
727 .withf(move |signed_entity| signed_entity.artifact == artifact_json)
728 .return_once(|_| Ok(()));
729 }
730 }
731
732 fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
733 self.mock_artifact_processing(artifact, &|mock_injector| {
734 &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
735 });
736 }
737 }
738
739 fn get_artifact_total_produced_metric_since_startup_counter_value(
740 metrics_service: Arc<MetricsService>,
741 signed_entity_type: &SignedEntityType,
742 ) -> CounterValue {
743 match signed_entity_type {
744 SignedEntityType::MithrilStakeDistribution(..) => metrics_service
745 .get_artifact_mithril_stake_distribution_total_produced_since_startup()
746 .get(),
747 SignedEntityType::CardanoImmutableFilesFull(..) => metrics_service
748 .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
749 .get(),
750 SignedEntityType::CardanoStakeDistribution(..) => metrics_service
751 .get_artifact_cardano_stake_distribution_total_produced_since_startup()
752 .get(),
753 SignedEntityType::CardanoTransactions(..) => metrics_service
754 .get_artifact_cardano_transaction_total_produced_since_startup()
755 .get(),
756 SignedEntityType::CardanoBlocksTransactions(..) => metrics_service
757 .get_artifact_cardano_blocks_transactions_total_produced_since_startup()
758 .get(),
759 SignedEntityType::CardanoDatabase(..) => metrics_service
760 .get_artifact_cardano_database_total_produced_since_startup()
761 .get(),
762 }
763 }
764
765 #[tokio::test]
766 async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type()
767 {
768 let mut mock_container = MockDependencyInjector::new();
769
770 let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
771
772 mock_container
773 .mock_mithril_stake_distribution_artifact_builder
774 .expect_compute_artifact()
775 .times(1)
776 .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
777
778 let artifact_builder_service = mock_container.build_artifact_builder_service();
779
780 let certificate = fake_data::certificate("hash".to_string());
781 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
782 let artifact = artifact_builder_service
783 .compute_artifact(signed_entity_type.clone(), &certificate)
784 .await
785 .unwrap();
786
787 assert_expected(&mithril_stake_distribution_expected, &artifact);
788 }
789
790 #[tokio::test]
791 async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
792 generic_test_that_the_artifact_is_stored(
793 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
794 create_stake_distribution(Epoch(1), 5),
795 &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
796 )
797 .await;
798 }
799
800 #[tokio::test]
801 async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type()
802 {
803 let mut mock_container = MockDependencyInjector::new();
804
805 let cardano_stake_distribution_expected = create_cardano_stake_distribution(
806 Epoch(1),
807 StakeDistribution::from([("pool-1".to_string(), 100)]),
808 );
809
810 mock_container
811 .mock_cardano_stake_distribution_artifact_builder
812 .expect_compute_artifact()
813 .times(1)
814 .returning(|_, _| {
815 Ok(create_cardano_stake_distribution(
816 Epoch(1),
817 StakeDistribution::from([("pool-1".to_string(), 100)]),
818 ))
819 });
820
821 let artifact_builder_service = mock_container.build_artifact_builder_service();
822
823 let certificate = fake_data::certificate("hash".to_string());
824 let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
825 let artifact = artifact_builder_service
826 .compute_artifact(signed_entity_type.clone(), &certificate)
827 .await
828 .unwrap();
829
830 assert_expected(&cardano_stake_distribution_expected, &artifact);
831 }
832
833 #[tokio::test]
834 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
835 generic_test_that_the_artifact_is_stored(
836 SignedEntityType::CardanoStakeDistribution(Epoch(1)),
837 create_cardano_stake_distribution(
838 Epoch(1),
839 StakeDistribution::from([("pool-1".to_string(), 100)]),
840 ),
841 &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
842 )
843 .await;
844 }
845
846 #[tokio::test]
847 async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
848 let mut mock_container = MockDependencyInjector::new();
849
850 let snapshot_expected = fake_data::snapshot(1);
851
852 mock_container
853 .mock_cardano_immutable_files_full_artifact_builder
854 .expect_compute_artifact()
855 .times(1)
856 .returning(|_, _| Ok(fake_data::snapshot(1)));
857
858 let artifact_builder_service = mock_container.build_artifact_builder_service();
859
860 let certificate = fake_data::certificate("hash".to_string());
861 let signed_entity_type =
862 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
863 let artifact = artifact_builder_service
864 .compute_artifact(signed_entity_type.clone(), &certificate)
865 .await
866 .unwrap();
867
868 assert_expected(&snapshot_expected, &artifact);
869 }
870
871 #[tokio::test]
872 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
873 generic_test_that_the_artifact_is_stored(
874 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
875 fake_data::snapshot(1),
876 &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
877 )
878 .await;
879 }
880
881 #[tokio::test]
882 async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
883 let mut mock_container = MockDependencyInjector::new();
884
885 let block_number = BlockNumber(151);
886 let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
887
888 mock_container
889 .mock_cardano_transactions_artifact_builder
890 .expect_compute_artifact()
891 .times(1)
892 .returning(move |_, _| {
893 Ok(CardanoTransactionsSnapshot::new(
894 "merkle_root".to_string(),
895 block_number,
896 ))
897 });
898
899 let artifact_builder_service = mock_container.build_artifact_builder_service();
900
901 let certificate = fake_data::certificate("hash".to_string());
902 let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
903 let artifact = artifact_builder_service
904 .compute_artifact(signed_entity_type.clone(), &certificate)
905 .await
906 .unwrap();
907
908 assert_expected(&expected, &artifact);
909 }
910
911 #[tokio::test]
912 async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
913 let block_number = BlockNumber(149);
914 generic_test_that_the_artifact_is_stored(
915 SignedEntityType::CardanoTransactions(Epoch(1), block_number),
916 CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
917 &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
918 )
919 .await;
920 }
921
922 #[tokio::test]
923 async fn build_cardano_blocks_transactions_snapshot_artifact_when_given_cardano_blocks_transactions_type()
924 {
925 let mut mock_container = MockDependencyInjector::new();
926
927 let block_number = BlockNumber(151);
928 let offset_security_parameter = BlockNumber(15);
929 let expected = CardanoBlocksTransactionsSnapshot::new(
930 "merkle_root".to_string(),
931 block_number,
932 offset_security_parameter,
933 );
934
935 mock_container
936 .mock_cardano_blocks_transactions_artifact_builder
937 .expect_compute_artifact()
938 .times(1)
939 .returning(move |_, _| {
940 Ok(CardanoBlocksTransactionsSnapshot::new(
941 "merkle_root".to_string(),
942 block_number,
943 offset_security_parameter,
944 ))
945 });
946
947 let artifact_builder_service = mock_container.build_artifact_builder_service();
948
949 let certificate = fake_data::certificate("hash".to_string());
950 let signed_entity_type =
951 SignedEntityType::CardanoBlocksTransactions(Epoch(1), block_number);
952 let artifact = artifact_builder_service
953 .compute_artifact(signed_entity_type.clone(), &certificate)
954 .await
955 .unwrap();
956
957 assert_expected(&expected, &artifact);
958 }
959
960 #[tokio::test]
961 async fn should_store_the_artifact_when_creating_artifact_for_cardano_blocks_transactions() {
962 let block_number = BlockNumber(149);
963 let offset_security_parameter = BlockNumber(15);
964 generic_test_that_the_artifact_is_stored(
965 SignedEntityType::CardanoBlocksTransactions(Epoch(1), block_number),
966 CardanoBlocksTransactionsSnapshot::new(
967 "merkle_root".to_string(),
968 block_number,
969 offset_security_parameter,
970 ),
971 &|mock_injector| &mut mock_injector.mock_cardano_blocks_transactions_artifact_builder,
972 )
973 .await;
974 }
975
976 #[tokio::test]
977 async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
978 let mut mock_container = MockDependencyInjector::new();
979
980 let cardano_database_expected = fake_data::cardano_database_snapshot(1);
981
982 mock_container
983 .mock_cardano_database_artifact_builder
984 .expect_compute_artifact()
985 .times(1)
986 .returning(|_, _| Ok(fake_data::cardano_database_snapshot(1)));
987
988 let artifact_builder_service = mock_container.build_artifact_builder_service();
989
990 let certificate = fake_data::certificate("hash".to_string());
991 let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
992 let artifact = artifact_builder_service
993 .compute_artifact(signed_entity_type.clone(), &certificate)
994 .await
995 .unwrap();
996
997 assert_expected(&cardano_database_expected, &artifact);
998 }
999
1000 #[tokio::test]
1001 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
1002 generic_test_that_the_artifact_is_stored(
1003 SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
1004 fake_data::cardano_database_snapshot(1),
1005 &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
1006 )
1007 .await;
1008 }
1009
1010 async fn generic_test_that_the_artifact_is_stored<
1011 T: Artifact + Clone + Serialize + 'static,
1012 U: signable_builder::Beacon,
1013 >(
1014 signed_entity_type: SignedEntityType,
1015 artifact: T,
1016 mock_that_provide_artifact: &dyn Fn(
1017 &mut MockDependencyInjector,
1018 ) -> &mut MockArtifactBuilder<U, T>,
1019 ) {
1020 let mut mock_container = MockDependencyInjector::new();
1021 {
1022 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
1023 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
1024 mock_container
1025 .mock_signed_entity_storer
1026 .expect_store_signed_entity()
1027 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
1028 .return_once(|_| Ok(()));
1029 }
1030 {
1031 let artifact_cloned = artifact.clone();
1032 mock_that_provide_artifact(&mut mock_container)
1033 .expect_compute_artifact()
1034 .times(1)
1035 .return_once(|_, _| Ok(artifact_cloned));
1036 }
1037 let artifact_builder_service = mock_container.build_artifact_builder_service();
1038
1039 let certificate = fake_data::certificate("hash".to_string());
1040 let error_message = format!(
1041 "Create artifact should not fail for {} signed entity",
1042 std::any::type_name::<T>()
1043 );
1044 let error_message_str = error_message.as_str();
1045
1046 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1047 artifact_builder_service.metrics_service.clone(),
1048 &signed_entity_type,
1049 );
1050
1051 artifact_builder_service
1052 .create_artifact_task(signed_entity_type.clone(), &certificate)
1053 .await
1054 .expect(error_message_str);
1055
1056 assert_eq!(
1057 initial_counter_value + 1,
1058 get_artifact_total_produced_metric_since_startup_counter_value(
1059 artifact_builder_service.metrics_service.clone(),
1060 &signed_entity_type,
1061 )
1062 )
1063 }
1064
1065 #[tokio::test]
1066 async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
1067 let atomic_stop = Arc::new(AtomicBool::new(false));
1068 let signed_entity_type_service = {
1069 let mut mock_container = MockDependencyInjector::new();
1070
1071 let msd = create_stake_distribution(Epoch(1), 5);
1072 mock_container.mock_stake_distribution_processing(msd);
1073
1074 mock_container
1075 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
1076 };
1077 let certificate = fake_data::certificate("hash".to_string());
1078
1079 let signed_entity_type_immutable =
1080 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1081 let first_task_that_never_finished = signed_entity_type_service
1082 .create_artifact(signed_entity_type_immutable, &certificate)
1083 .await
1084 .unwrap();
1085
1086 let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
1087 let second_task_that_finish_first = signed_entity_type_service
1088 .create_artifact(signed_entity_type_msd, &certificate)
1089 .await
1090 .unwrap();
1091
1092 second_task_that_finish_first.await.unwrap().unwrap();
1093 assert!(!first_task_that_never_finished.is_finished());
1094
1095 atomic_stop.swap(true, Ordering::Relaxed);
1096 }
1097
1098 #[tokio::test]
1099 async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1100 let atomic_stop = Arc::new(AtomicBool::new(false));
1101 let signed_entity_type_service = MockDependencyInjector::new()
1102 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1103 let certificate = fake_data::certificate("hash".to_string());
1104
1105 let signed_entity_type_immutable =
1106 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1107 assert!(
1108 !signed_entity_type_service
1109 .signed_entity_type_lock
1110 .is_locked(&signed_entity_type_immutable)
1111 .await
1112 );
1113 let join_handle = signed_entity_type_service
1114 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1115 .await
1116 .unwrap();
1117
1118 let is_locked = signed_entity_type_service
1121 .signed_entity_type_lock
1122 .is_locked(&signed_entity_type_immutable)
1123 .await;
1124 let is_finished = join_handle.is_finished();
1125
1126 atomic_stop.swap(true, Ordering::Relaxed);
1127 join_handle.await.unwrap().unwrap();
1128
1129 assert!(is_locked);
1130 assert!(!is_finished);
1131
1132 assert!(
1133 !signed_entity_type_service
1134 .signed_entity_type_lock
1135 .is_locked(&signed_entity_type_immutable)
1136 .await
1137 );
1138 }
1139
1140 #[tokio::test]
1141 async fn create_artifact_unlock_signed_entity_type_when_error() {
1142 let signed_entity_type_service = {
1143 let mut mock_container = MockDependencyInjector::new();
1144 mock_container
1145 .mock_cardano_immutable_files_full_artifact_builder
1146 .expect_compute_artifact()
1147 .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1148
1149 mock_container.build_artifact_builder_service()
1150 };
1151 let certificate = fake_data::certificate("hash".to_string());
1152
1153 let signed_entity_type_immutable =
1154 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1155
1156 let join_handle = signed_entity_type_service
1157 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1158 .await
1159 .unwrap();
1160
1161 let error = join_handle.await.unwrap().unwrap_err();
1162 assert!(
1163 error.to_string().contains("CardanoImmutableFilesFull"),
1164 "Error should contains CardanoImmutableFilesFull but was: {error}"
1165 );
1166
1167 assert!(
1168 !signed_entity_type_service
1169 .signed_entity_type_lock
1170 .is_locked(&signed_entity_type_immutable)
1171 .await
1172 );
1173 }
1174
1175 #[tokio::test]
1176 async fn create_artifact_unlock_signed_entity_type_when_panic() {
1177 let signed_entity_type_service =
1178 MockDependencyInjector::new().build_artifact_builder_service();
1179 let certificate = fake_data::certificate("hash".to_string());
1180
1181 let signed_entity_type_immutable =
1182 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1183
1184 let join_handle = signed_entity_type_service
1185 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1186 .await
1187 .unwrap();
1188
1189 let error = join_handle.await.unwrap().unwrap_err();
1190 assert!(
1191 error.to_string().contains("CardanoImmutableFilesFull"),
1192 "Error should contains CardanoImmutableFilesFull but was: {error}"
1193 );
1194
1195 assert!(
1196 !signed_entity_type_service
1197 .signed_entity_type_lock
1198 .is_locked(&signed_entity_type_immutable)
1199 .await
1200 );
1201 }
1202
1203 #[tokio::test]
1204 async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1205 let atomic_stop = Arc::new(AtomicBool::new(false));
1206 let signed_entity_service = MockDependencyInjector::new()
1207 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1208 let certificate = fake_data::certificate("hash".to_string());
1209 let signed_entity_type_immutable =
1210 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1211
1212 signed_entity_service
1213 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1214 .await
1215 .unwrap();
1216
1217 signed_entity_service
1218 .create_artifact(signed_entity_type_immutable, &certificate)
1219 .await
1220 .expect_err("Should return error when signed entity type is already locked");
1221
1222 atomic_stop.swap(true, Ordering::Relaxed);
1223 }
1224
1225 #[tokio::test]
1226 async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1227 let signed_entity_service = {
1228 let mut mock_container = MockDependencyInjector::new();
1229 mock_container
1230 .mock_cardano_immutable_files_full_artifact_builder
1231 .expect_compute_artifact()
1232 .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1233
1234 mock_container.build_artifact_builder_service()
1235 };
1236
1237 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1238
1239 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1240 signed_entity_service.metrics_service.clone(),
1241 &signed_entity_type,
1242 );
1243
1244 signed_entity_service
1245 .create_artifact(
1246 signed_entity_type.clone(),
1247 &fake_data::certificate("hash".to_string()),
1248 )
1249 .await
1250 .unwrap();
1251
1252 assert_eq!(
1253 initial_counter_value,
1254 get_artifact_total_produced_metric_since_startup_counter_value(
1255 signed_entity_service.metrics_service.clone(),
1256 &signed_entity_type,
1257 )
1258 );
1259 }
1260
1261 #[tokio::test]
1262 async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1263 let signed_entity_service = {
1264 let mut mock_container = MockDependencyInjector::new();
1265 mock_container
1266 .mock_signed_entity_storer
1267 .expect_store_signed_entity()
1268 .returning(|_| Err(anyhow!("Error while storing signed entity")));
1269
1270 mock_container.build_artifact_builder_service()
1271 };
1272
1273 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1274
1275 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1276 signed_entity_service.metrics_service.clone(),
1277 &signed_entity_type,
1278 );
1279
1280 signed_entity_service
1281 .create_artifact(
1282 signed_entity_type.clone(),
1283 &fake_data::certificate("hash".to_string()),
1284 )
1285 .await
1286 .unwrap();
1287
1288 assert_eq!(
1289 initial_counter_value,
1290 get_artifact_total_produced_metric_since_startup_counter_value(
1291 signed_entity_service.metrics_service.clone(),
1292 &signed_entity_type,
1293 )
1294 );
1295 }
1296}