1use anyhow::{Context, anyhow};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{Logger, info, warn};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13 StdResult,
14 entities::{
15 BlockNumber, CardanoBlocksTransactionsSnapshot, CardanoDatabaseSnapshot, CardanoDbBeacon,
16 CardanoStakeDistribution, CardanoTransactionsSnapshot, Certificate, Epoch,
17 MithrilStakeDistribution, SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
18 },
19 logging::LoggerExtensions,
20 signable_builder::{Artifact, SignedEntity},
21};
22
23use crate::{
24 MetricsService,
25 artifact_builder::ArtifactBuilder,
26 database::{record::SignedEntityRecord, repository::SignedEntityStorer},
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34 async fn create_artifact(
36 &self,
37 signed_entity_type: SignedEntityType,
38 certificate: &Certificate,
39 ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41 async fn get_last_signed_snapshots(
43 &self,
44 total: usize,
45 ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47 async fn get_signed_snapshot_by_id(
49 &self,
50 signed_entity_id: &str,
51 ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53 async fn get_last_signed_cardano_database_snapshots(
55 &self,
56 total: usize,
57 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59 async fn get_signed_cardano_database_snapshot_by_id(
61 &self,
62 signed_entity_id: &str,
63 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65 async fn get_last_signed_mithril_stake_distributions(
68 &self,
69 total: usize,
70 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72 async fn get_signed_mithril_stake_distribution_by_id(
74 &self,
75 signed_entity_id: &str,
76 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78 async fn get_last_cardano_transaction_snapshot(
80 &self,
81 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83 async fn get_last_cardano_blocks_transactions_snapshot(
85 &self,
86 ) -> StdResult<Option<SignedEntity<CardanoBlocksTransactionsSnapshot>>>;
87
88 async fn get_last_signed_cardano_stake_distributions(
91 &self,
92 total: usize,
93 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
94}
95
96#[derive(Clone)]
98pub struct MithrilSignedEntityService {
99 signed_entity_storer: Arc<dyn SignedEntityStorer>,
100 mithril_stake_distribution_artifact_builder:
101 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
102 cardano_immutable_files_full_artifact_builder:
103 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
104 cardano_transactions_artifact_builder:
105 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
106 cardano_blocks_transactions_artifact_builder:
107 Arc<dyn ArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>>,
108 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
109 cardano_stake_distribution_artifact_builder:
110 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
111 cardano_database_artifact_builder:
112 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
113 metrics_service: Arc<MetricsService>,
114 logger: Logger,
115}
116
117pub struct SignedEntityServiceArtifactsDependencies {
119 mithril_stake_distribution_artifact_builder:
120 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
121 cardano_immutable_files_full_artifact_builder:
122 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
123 cardano_transactions_artifact_builder:
124 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
125 cardano_blocks_transactions_artifact_builder:
126 Arc<dyn ArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>>,
127 cardano_stake_distribution_artifact_builder:
128 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
129 cardano_database_artifact_builder:
130 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
131}
132
133impl SignedEntityServiceArtifactsDependencies {
134 pub fn new(
136 mithril_stake_distribution_artifact_builder: Arc<
137 dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
138 >,
139 cardano_immutable_files_full_artifact_builder: Arc<
140 dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
141 >,
142 cardano_transactions_artifact_builder: Arc<
143 dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
144 >,
145 cardano_blocks_transactions_artifact_builder: Arc<
146 dyn ArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>,
147 >,
148 cardano_stake_distribution_artifact_builder: Arc<
149 dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
150 >,
151 cardano_database_artifact_builder: Arc<
152 dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
153 >,
154 ) -> Self {
155 Self {
156 mithril_stake_distribution_artifact_builder,
157 cardano_immutable_files_full_artifact_builder,
158 cardano_transactions_artifact_builder,
159 cardano_blocks_transactions_artifact_builder,
160 cardano_stake_distribution_artifact_builder,
161 cardano_database_artifact_builder,
162 }
163 }
164}
165
166impl MithrilSignedEntityService {
167 pub fn new(
169 signed_entity_storer: Arc<dyn SignedEntityStorer>,
170 dependencies: SignedEntityServiceArtifactsDependencies,
171 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
172 metrics_service: Arc<MetricsService>,
173 logger: Logger,
174 ) -> Self {
175 Self {
176 signed_entity_storer,
177 mithril_stake_distribution_artifact_builder: dependencies
178 .mithril_stake_distribution_artifact_builder,
179 cardano_immutable_files_full_artifact_builder: dependencies
180 .cardano_immutable_files_full_artifact_builder,
181 cardano_transactions_artifact_builder: dependencies
182 .cardano_transactions_artifact_builder,
183 cardano_blocks_transactions_artifact_builder: dependencies
184 .cardano_blocks_transactions_artifact_builder,
185 cardano_stake_distribution_artifact_builder: dependencies
186 .cardano_stake_distribution_artifact_builder,
187 cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
188 signed_entity_type_lock,
189 metrics_service,
190 logger: logger.new_with_component_name::<Self>(),
191 }
192 }
193
194 async fn create_artifact_task(
195 &self,
196 signed_entity_type: SignedEntityType,
197 certificate: &Certificate,
198 ) -> StdResult<()> {
199 info!(
200 self.logger, ">> create_artifact_task";
201 "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
202 );
203
204 let mut remaining_retries = 2;
205 let artifact = loop {
206 remaining_retries -= 1;
207
208 match self.compute_artifact(signed_entity_type.clone(), certificate).await {
209 Err(error) if remaining_retries == 0 => break Err(error),
210 Err(_error) => (),
211 Ok(artifact) => break Ok(artifact),
212 };
213 }?;
214
215 let signed_entity = SignedEntityRecord {
216 signed_entity_id: artifact.get_id(),
217 signed_entity_type: signed_entity_type.clone(),
218 certificate_id: certificate.hash.clone(),
219 artifact: serde_json::to_string(&artifact)?,
220 created_at: Utc::now(),
221 };
222
223 self.signed_entity_storer
224 .store_signed_entity(&signed_entity)
225 .await
226 .with_context(|| {
227 format!(
228 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
229 )
230 })?;
231
232 self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
233
234 Ok(())
235 }
236
237 async fn compute_artifact(
239 &self,
240 signed_entity_type: SignedEntityType,
241 certificate: &Certificate,
242 ) -> StdResult<Arc<dyn Artifact>> {
243 match signed_entity_type.clone() {
244 SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
245 self.mithril_stake_distribution_artifact_builder
246 .compute_artifact(epoch, certificate)
247 .await
248 .with_context(|| {
249 format!(
250 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
251 )
252 })?,
253 )),
254 SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
255 self.cardano_immutable_files_full_artifact_builder
256 .compute_artifact(beacon.clone(), certificate)
257 .await
258 .with_context(|| {
259 format!(
260 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
261 )
262 })?,
263 )),
264 SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
265 self.cardano_stake_distribution_artifact_builder
266 .compute_artifact(epoch, certificate)
267 .await
268 .with_context(|| {
269 format!(
270 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
271 )
272 })?)),
273 SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
274 self.cardano_transactions_artifact_builder
275 .compute_artifact(block_number, certificate)
276 .await
277 .with_context(|| {
278 format!(
279 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
280 )
281 })?,
282 )),
283 SignedEntityType::CardanoBlocksTransactions(_epoch, block_number) => Ok(Arc::new(
284 self.cardano_blocks_transactions_artifact_builder
285 .compute_artifact(block_number, certificate)
286 .await
287 .with_context(|| {
288 format!(
289 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
290 )
291 })?,
292 )),
293 SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
294 self.cardano_database_artifact_builder
295 .compute_artifact(beacon, certificate)
296 .await
297 .with_context(|| {
298 format!(
299 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
300 )
301 })?
302 )),
303 }
304 }
305
306 async fn get_last_signed_entities(
307 &self,
308 total: usize,
309 discriminants: &SignedEntityTypeDiscriminants,
310 ) -> StdResult<Vec<SignedEntityRecord>> {
311 self.signed_entity_storer
312 .get_last_signed_entities_by_type(discriminants, total)
313 .await
314 .with_context(|| {
315 format!(
316 "Signed Entity Service can not get last signed entities with type: '{discriminants:?}'"
317 )
318 })
319 }
320
321 fn increment_artifact_total_produced_metric_since_startup(
322 &self,
323 signed_entity_type: SignedEntityType,
324 ) {
325 let metrics = self.metrics_service.clone();
326 let metric_counter = match signed_entity_type {
327 SignedEntityType::MithrilStakeDistribution(..) => {
328 metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
329 }
330 SignedEntityType::CardanoImmutableFilesFull(..) => {
331 metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
332 }
333 SignedEntityType::CardanoStakeDistribution(..) => {
334 metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
335 }
336 SignedEntityType::CardanoTransactions(..) => {
337 metrics.get_artifact_cardano_transaction_total_produced_since_startup()
338 }
339 SignedEntityType::CardanoBlocksTransactions(..) => {
340 metrics.get_artifact_cardano_blocks_transactions_total_produced_since_startup()
341 }
342 SignedEntityType::CardanoDatabase(..) => {
343 metrics.get_artifact_cardano_database_total_produced_since_startup()
344 }
345 };
346
347 metric_counter.increment();
348 }
349}
350
351#[async_trait]
352impl SignedEntityService for MithrilSignedEntityService {
353 async fn create_artifact(
354 &self,
355 signed_entity_type: SignedEntityType,
356 certificate: &Certificate,
357 ) -> StdResult<JoinHandle<StdResult<()>>> {
358 if self.signed_entity_type_lock.is_locked(&signed_entity_type).await {
359 return Err(anyhow!(
360 "Signed entity type '{:?}' is already locked",
361 signed_entity_type
362 ));
363 }
364
365 let service = self.clone();
366 let certificate_cloned = certificate.clone();
367 service.signed_entity_type_lock.lock(&signed_entity_type).await;
368
369 Ok(tokio::task::spawn(async move {
370 let signed_entity_type_clone = signed_entity_type.clone();
371 let service_clone = service.clone();
372 let result = tokio::task::spawn(async move {
373 service_clone
374 .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
375 .await
376 })
377 .await;
378 service
379 .signed_entity_type_lock
380 .release(signed_entity_type.clone())
381 .await;
382
383 result.with_context(|| format!(
384 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
385 ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
386 }))
387 }
388
389 async fn get_last_signed_snapshots(
390 &self,
391 total: usize,
392 ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
393 let signed_entities = self
394 .get_last_signed_entities(
395 total,
396 &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
397 )
398 .await?
399 .into_iter()
400 .map(|record| record.try_into())
401 .collect::<Result<Vec<_>, _>>()?;
402
403 Ok(signed_entities)
404 }
405
406 async fn get_signed_snapshot_by_id(
407 &self,
408 signed_entity_id: &str,
409 ) -> StdResult<Option<SignedEntity<Snapshot>>> {
410 let entity: Option<SignedEntity<Snapshot>> = match self
411 .signed_entity_storer
412 .get_signed_entity(signed_entity_id)
413 .await
414 .with_context(|| {
415 format!(
416 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
417 )
418 })? {
419 Some(entity) => Some(entity.try_into()?),
420 None => None,
421 };
422
423 Ok(entity)
424 }
425
426 async fn get_last_signed_cardano_database_snapshots(
427 &self,
428 total: usize,
429 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
430 let signed_entities = self
431 .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
432 .await?
433 .into_iter()
434 .map(|record| record.try_into())
435 .collect::<Result<Vec<_>, _>>()?;
436
437 Ok(signed_entities)
438 }
439
440 async fn get_signed_cardano_database_snapshot_by_id(
441 &self,
442 signed_entity_id: &str,
443 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
444 let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
445 .signed_entity_storer
446 .get_signed_entity(signed_entity_id)
447 .await
448 .with_context(|| {
449 format!(
450 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
451 )
452 })? {
453 Some(entity) => Some(entity.try_into()?),
454 None => None,
455 };
456
457 Ok(entity)
458 }
459
460 async fn get_last_signed_mithril_stake_distributions(
461 &self,
462 total: usize,
463 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
464 let signed_entities = self
465 .get_last_signed_entities(
466 total,
467 &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
468 )
469 .await?
470 .into_iter()
471 .map(|record| record.try_into())
472 .collect::<Result<Vec<_>, _>>()?;
473
474 Ok(signed_entities)
475 }
476
477 async fn get_signed_mithril_stake_distribution_by_id(
478 &self,
479 signed_entity_id: &str,
480 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
481 let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
482 .signed_entity_storer
483 .get_signed_entity(signed_entity_id)
484 .await
485 .with_context(|| {
486 format!(
487 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
488 )
489 })? {
490 Some(entity) => Some(entity.try_into()?),
491 None => None,
492 };
493
494 Ok(entity)
495 }
496
497 async fn get_last_cardano_transaction_snapshot(
498 &self,
499 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
500 self.get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
501 .await?
502 .pop()
503 .map(|r| r.try_into())
504 .transpose()
505 }
506
507 async fn get_last_cardano_blocks_transactions_snapshot(
508 &self,
509 ) -> StdResult<Option<SignedEntity<CardanoBlocksTransactionsSnapshot>>> {
510 self.get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoBlocksTransactions)
511 .await?
512 .pop()
513 .map(|r| r.try_into())
514 .transpose()
515 }
516
517 async fn get_last_signed_cardano_stake_distributions(
518 &self,
519 total: usize,
520 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
521 let signed_entities = self
522 .get_last_signed_entities(
523 total,
524 &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
525 )
526 .await?
527 .into_iter()
528 .map(|record| record.try_into())
529 .collect::<Result<Vec<_>, _>>()?;
530
531 Ok(signed_entities)
532 }
533}
534
535#[cfg(test)]
536mod tests {
537 use std::{sync::atomic::Ordering, time::Duration};
538
539 use mithril_common::{
540 entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
541 signable_builder,
542 test::double::fake_data,
543 };
544 use mithril_metric::CounterValue;
545 use serde::{Serialize, de::DeserializeOwned};
546 use std::sync::atomic::AtomicBool;
547
548 use crate::artifact_builder::MockArtifactBuilder;
549 use crate::database::repository::MockSignedEntityStorer;
550 use crate::test::TestLogger;
551
552 use super::*;
553
554 fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
555 MithrilStakeDistribution::new(
556 epoch,
557 fake_data::signers_with_stakes(signers),
558 &fake_data::protocol_parameters(),
559 )
560 }
561
562 fn create_cardano_stake_distribution(
563 epoch: Epoch,
564 stake_distribution: StakeDistribution,
565 ) -> CardanoStakeDistribution {
566 CardanoStakeDistribution::new(epoch, stake_distribution)
567 }
568
569 fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
570 where
571 T: Serialize + DeserializeOwned,
572 {
573 let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
574 assert_eq!(
575 serde_json::to_string(&expected).unwrap(),
576 serde_json::to_string(¤t).unwrap()
577 );
578 }
579
580 struct MockDependencyInjector {
582 mock_signed_entity_storer: MockSignedEntityStorer,
583 mock_mithril_stake_distribution_artifact_builder:
584 MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
585 mock_cardano_immutable_files_full_artifact_builder:
586 MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
587 mock_cardano_transactions_artifact_builder:
588 MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
589 mock_cardano_blocks_transactions_artifact_builder:
590 MockArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>,
591 mock_cardano_stake_distribution_artifact_builder:
592 MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
593 mock_cardano_database_artifact_builder:
594 MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
595 }
596
597 impl MockDependencyInjector {
598 fn new() -> MockDependencyInjector {
599 MockDependencyInjector {
600 mock_signed_entity_storer: MockSignedEntityStorer::new(),
601 mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
602 Epoch,
603 MithrilStakeDistribution,
604 >::new(),
605 mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
606 CardanoDbBeacon,
607 Snapshot,
608 >::new(),
609 mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
610 BlockNumber,
611 CardanoTransactionsSnapshot,
612 >::new(),
613 mock_cardano_blocks_transactions_artifact_builder: MockArtifactBuilder::<
614 BlockNumber,
615 CardanoBlocksTransactionsSnapshot,
616 >::new(),
617 mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
618 Epoch,
619 CardanoStakeDistribution,
620 >::new(),
621 mock_cardano_database_artifact_builder: MockArtifactBuilder::<
622 CardanoDbBeacon,
623 CardanoDatabaseSnapshot,
624 >::new(),
625 }
626 }
627
628 fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
629 let dependencies = SignedEntityServiceArtifactsDependencies::new(
630 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
631 Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
632 Arc::new(self.mock_cardano_transactions_artifact_builder),
633 Arc::new(self.mock_cardano_blocks_transactions_artifact_builder),
634 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
635 Arc::new(self.mock_cardano_database_artifact_builder),
636 );
637 MithrilSignedEntityService::new(
638 Arc::new(self.mock_signed_entity_storer),
639 dependencies,
640 Arc::new(SignedEntityTypeLock::default()),
641 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
642 TestLogger::stdout(),
643 )
644 }
645
646 fn build_artifact_builder_service_with_time_consuming_process(
647 mut self,
648 atomic_stop: Arc<AtomicBool>,
649 ) -> MithrilSignedEntityService {
650 struct LongArtifactBuilder {
651 atomic_stop: Arc<AtomicBool>,
652 snapshot: Snapshot,
653 }
654
655 let snapshot = fake_data::snapshot(1);
656
657 #[async_trait]
658 impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
659 async fn compute_artifact(
660 &self,
661 _beacon: CardanoDbBeacon,
662 _certificate: &Certificate,
663 ) -> StdResult<Snapshot> {
664 let mut max_iteration = 100;
665 while !self.atomic_stop.load(Ordering::Relaxed) {
666 max_iteration -= 1;
667 if max_iteration <= 0 {
668 return Err(anyhow!("Test should handle the stop"));
669 }
670 tokio::time::sleep(Duration::from_millis(10)).await;
671 }
672 Ok(self.snapshot.clone())
673 }
674 }
675 let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
676 atomic_stop: atomic_stop.clone(),
677 snapshot: snapshot.clone(),
678 };
679
680 let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
681 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
682 self.mock_signed_entity_storer
683 .expect_store_signed_entity()
684 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
685 .return_once(|_| Ok(()));
686
687 let dependencies = SignedEntityServiceArtifactsDependencies::new(
688 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
689 Arc::new(cardano_immutable_files_full_long_artifact_builder),
690 Arc::new(self.mock_cardano_transactions_artifact_builder),
691 Arc::new(self.mock_cardano_blocks_transactions_artifact_builder),
692 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
693 Arc::new(self.mock_cardano_database_artifact_builder),
694 );
695 MithrilSignedEntityService::new(
696 Arc::new(self.mock_signed_entity_storer),
697 dependencies,
698 Arc::new(SignedEntityTypeLock::default()),
699 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
700 TestLogger::stdout(),
701 )
702 }
703
704 fn mock_artifact_processing<
705 T: Artifact + Clone + Serialize + 'static,
706 U: signable_builder::Beacon,
707 >(
708 &mut self,
709 artifact: T,
710 mock_that_provide_artifact: &dyn Fn(
711 &mut MockDependencyInjector,
712 ) -> &mut MockArtifactBuilder<U, T>,
713 ) {
714 {
715 let artifact_cloned = artifact.clone();
716 mock_that_provide_artifact(self)
717 .expect_compute_artifact()
718 .times(1)
719 .return_once(|_, _| Ok(artifact_cloned));
720 }
721 {
722 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
723 let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
724 self.mock_signed_entity_storer
725 .expect_store_signed_entity()
726 .withf(move |signed_entity| signed_entity.artifact == artifact_json)
727 .return_once(|_| Ok(()));
728 }
729 }
730
731 fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
732 self.mock_artifact_processing(artifact, &|mock_injector| {
733 &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
734 });
735 }
736 }
737
738 fn get_artifact_total_produced_metric_since_startup_counter_value(
739 metrics_service: Arc<MetricsService>,
740 signed_entity_type: &SignedEntityType,
741 ) -> CounterValue {
742 match signed_entity_type {
743 SignedEntityType::MithrilStakeDistribution(..) => metrics_service
744 .get_artifact_mithril_stake_distribution_total_produced_since_startup()
745 .get(),
746 SignedEntityType::CardanoImmutableFilesFull(..) => metrics_service
747 .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
748 .get(),
749 SignedEntityType::CardanoStakeDistribution(..) => metrics_service
750 .get_artifact_cardano_stake_distribution_total_produced_since_startup()
751 .get(),
752 SignedEntityType::CardanoTransactions(..) => metrics_service
753 .get_artifact_cardano_transaction_total_produced_since_startup()
754 .get(),
755 SignedEntityType::CardanoBlocksTransactions(..) => metrics_service
756 .get_artifact_cardano_blocks_transactions_total_produced_since_startup()
757 .get(),
758 SignedEntityType::CardanoDatabase(..) => metrics_service
759 .get_artifact_cardano_database_total_produced_since_startup()
760 .get(),
761 }
762 }
763
764 #[tokio::test]
765 async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type()
766 {
767 let mut mock_container = MockDependencyInjector::new();
768
769 let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
770
771 mock_container
772 .mock_mithril_stake_distribution_artifact_builder
773 .expect_compute_artifact()
774 .times(1)
775 .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
776
777 let artifact_builder_service = mock_container.build_artifact_builder_service();
778
779 let certificate = fake_data::certificate("hash".to_string());
780 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
781 let artifact = artifact_builder_service
782 .compute_artifact(signed_entity_type.clone(), &certificate)
783 .await
784 .unwrap();
785
786 assert_expected(&mithril_stake_distribution_expected, &artifact);
787 }
788
789 #[tokio::test]
790 async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
791 generic_test_that_the_artifact_is_stored(
792 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
793 create_stake_distribution(Epoch(1), 5),
794 &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
795 )
796 .await;
797 }
798
799 #[tokio::test]
800 async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type()
801 {
802 let mut mock_container = MockDependencyInjector::new();
803
804 let cardano_stake_distribution_expected = create_cardano_stake_distribution(
805 Epoch(1),
806 StakeDistribution::from([("pool-1".to_string(), 100)]),
807 );
808
809 mock_container
810 .mock_cardano_stake_distribution_artifact_builder
811 .expect_compute_artifact()
812 .times(1)
813 .returning(|_, _| {
814 Ok(create_cardano_stake_distribution(
815 Epoch(1),
816 StakeDistribution::from([("pool-1".to_string(), 100)]),
817 ))
818 });
819
820 let artifact_builder_service = mock_container.build_artifact_builder_service();
821
822 let certificate = fake_data::certificate("hash".to_string());
823 let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
824 let artifact = artifact_builder_service
825 .compute_artifact(signed_entity_type.clone(), &certificate)
826 .await
827 .unwrap();
828
829 assert_expected(&cardano_stake_distribution_expected, &artifact);
830 }
831
832 #[tokio::test]
833 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
834 generic_test_that_the_artifact_is_stored(
835 SignedEntityType::CardanoStakeDistribution(Epoch(1)),
836 create_cardano_stake_distribution(
837 Epoch(1),
838 StakeDistribution::from([("pool-1".to_string(), 100)]),
839 ),
840 &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
841 )
842 .await;
843 }
844
845 #[tokio::test]
846 async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
847 let mut mock_container = MockDependencyInjector::new();
848
849 let snapshot_expected = fake_data::snapshot(1);
850
851 mock_container
852 .mock_cardano_immutable_files_full_artifact_builder
853 .expect_compute_artifact()
854 .times(1)
855 .returning(|_, _| Ok(fake_data::snapshot(1)));
856
857 let artifact_builder_service = mock_container.build_artifact_builder_service();
858
859 let certificate = fake_data::certificate("hash".to_string());
860 let signed_entity_type =
861 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
862 let artifact = artifact_builder_service
863 .compute_artifact(signed_entity_type.clone(), &certificate)
864 .await
865 .unwrap();
866
867 assert_expected(&snapshot_expected, &artifact);
868 }
869
870 #[tokio::test]
871 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
872 generic_test_that_the_artifact_is_stored(
873 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
874 fake_data::snapshot(1),
875 &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
876 )
877 .await;
878 }
879
880 #[tokio::test]
881 async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
882 let mut mock_container = MockDependencyInjector::new();
883
884 let block_number = BlockNumber(151);
885 let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
886
887 mock_container
888 .mock_cardano_transactions_artifact_builder
889 .expect_compute_artifact()
890 .times(1)
891 .returning(move |_, _| {
892 Ok(CardanoTransactionsSnapshot::new(
893 "merkle_root".to_string(),
894 block_number,
895 ))
896 });
897
898 let artifact_builder_service = mock_container.build_artifact_builder_service();
899
900 let certificate = fake_data::certificate("hash".to_string());
901 let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
902 let artifact = artifact_builder_service
903 .compute_artifact(signed_entity_type.clone(), &certificate)
904 .await
905 .unwrap();
906
907 assert_expected(&expected, &artifact);
908 }
909
910 #[tokio::test]
911 async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
912 let block_number = BlockNumber(149);
913 generic_test_that_the_artifact_is_stored(
914 SignedEntityType::CardanoTransactions(Epoch(1), block_number),
915 CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
916 &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
917 )
918 .await;
919 }
920
921 #[tokio::test]
922 async fn build_cardano_blocks_transactions_snapshot_artifact_when_given_cardano_blocks_transactions_type()
923 {
924 let mut mock_container = MockDependencyInjector::new();
925
926 let block_number = BlockNumber(151);
927 let offset_security_parameter = BlockNumber(15);
928 let expected = CardanoBlocksTransactionsSnapshot::new(
929 "merkle_root".to_string(),
930 block_number,
931 offset_security_parameter,
932 );
933
934 mock_container
935 .mock_cardano_blocks_transactions_artifact_builder
936 .expect_compute_artifact()
937 .times(1)
938 .returning(move |_, _| {
939 Ok(CardanoBlocksTransactionsSnapshot::new(
940 "merkle_root".to_string(),
941 block_number,
942 offset_security_parameter,
943 ))
944 });
945
946 let artifact_builder_service = mock_container.build_artifact_builder_service();
947
948 let certificate = fake_data::certificate("hash".to_string());
949 let signed_entity_type =
950 SignedEntityType::CardanoBlocksTransactions(Epoch(1), block_number);
951 let artifact = artifact_builder_service
952 .compute_artifact(signed_entity_type.clone(), &certificate)
953 .await
954 .unwrap();
955
956 assert_expected(&expected, &artifact);
957 }
958
959 #[tokio::test]
960 async fn should_store_the_artifact_when_creating_artifact_for_cardano_blocks_transactions() {
961 let block_number = BlockNumber(149);
962 let offset_security_parameter = BlockNumber(15);
963 generic_test_that_the_artifact_is_stored(
964 SignedEntityType::CardanoBlocksTransactions(Epoch(1), block_number),
965 CardanoBlocksTransactionsSnapshot::new(
966 "merkle_root".to_string(),
967 block_number,
968 offset_security_parameter,
969 ),
970 &|mock_injector| &mut mock_injector.mock_cardano_blocks_transactions_artifact_builder,
971 )
972 .await;
973 }
974
975 #[tokio::test]
976 async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
977 let mut mock_container = MockDependencyInjector::new();
978
979 let cardano_database_expected = fake_data::cardano_database_snapshot(1);
980
981 mock_container
982 .mock_cardano_database_artifact_builder
983 .expect_compute_artifact()
984 .times(1)
985 .returning(|_, _| Ok(fake_data::cardano_database_snapshot(1)));
986
987 let artifact_builder_service = mock_container.build_artifact_builder_service();
988
989 let certificate = fake_data::certificate("hash".to_string());
990 let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
991 let artifact = artifact_builder_service
992 .compute_artifact(signed_entity_type.clone(), &certificate)
993 .await
994 .unwrap();
995
996 assert_expected(&cardano_database_expected, &artifact);
997 }
998
999 #[tokio::test]
1000 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
1001 generic_test_that_the_artifact_is_stored(
1002 SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
1003 fake_data::cardano_database_snapshot(1),
1004 &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
1005 )
1006 .await;
1007 }
1008
1009 async fn generic_test_that_the_artifact_is_stored<
1010 T: Artifact + Clone + Serialize + 'static,
1011 U: signable_builder::Beacon,
1012 >(
1013 signed_entity_type: SignedEntityType,
1014 artifact: T,
1015 mock_that_provide_artifact: &dyn Fn(
1016 &mut MockDependencyInjector,
1017 ) -> &mut MockArtifactBuilder<U, T>,
1018 ) {
1019 let mut mock_container = MockDependencyInjector::new();
1020 {
1021 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
1022 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
1023 mock_container
1024 .mock_signed_entity_storer
1025 .expect_store_signed_entity()
1026 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
1027 .return_once(|_| Ok(()));
1028 }
1029 {
1030 let artifact_cloned = artifact.clone();
1031 mock_that_provide_artifact(&mut mock_container)
1032 .expect_compute_artifact()
1033 .times(1)
1034 .return_once(|_, _| Ok(artifact_cloned));
1035 }
1036 let artifact_builder_service = mock_container.build_artifact_builder_service();
1037
1038 let certificate = fake_data::certificate("hash".to_string());
1039 let error_message = format!(
1040 "Create artifact should not fail for {} signed entity",
1041 std::any::type_name::<T>()
1042 );
1043 let error_message_str = error_message.as_str();
1044
1045 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1046 artifact_builder_service.metrics_service.clone(),
1047 &signed_entity_type,
1048 );
1049
1050 artifact_builder_service
1051 .create_artifact_task(signed_entity_type.clone(), &certificate)
1052 .await
1053 .expect(error_message_str);
1054
1055 assert_eq!(
1056 initial_counter_value + 1,
1057 get_artifact_total_produced_metric_since_startup_counter_value(
1058 artifact_builder_service.metrics_service.clone(),
1059 &signed_entity_type,
1060 )
1061 )
1062 }
1063
1064 #[tokio::test]
1065 async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
1066 let atomic_stop = Arc::new(AtomicBool::new(false));
1067 let signed_entity_type_service = {
1068 let mut mock_container = MockDependencyInjector::new();
1069
1070 let msd = create_stake_distribution(Epoch(1), 5);
1071 mock_container.mock_stake_distribution_processing(msd);
1072
1073 mock_container
1074 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
1075 };
1076 let certificate = fake_data::certificate("hash".to_string());
1077
1078 let signed_entity_type_immutable =
1079 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1080 let first_task_that_never_finished = signed_entity_type_service
1081 .create_artifact(signed_entity_type_immutable, &certificate)
1082 .await
1083 .unwrap();
1084
1085 let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
1086 let second_task_that_finish_first = signed_entity_type_service
1087 .create_artifact(signed_entity_type_msd, &certificate)
1088 .await
1089 .unwrap();
1090
1091 second_task_that_finish_first.await.unwrap().unwrap();
1092 assert!(!first_task_that_never_finished.is_finished());
1093
1094 atomic_stop.swap(true, Ordering::Relaxed);
1095 }
1096
1097 #[tokio::test]
1098 async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1099 let atomic_stop = Arc::new(AtomicBool::new(false));
1100 let signed_entity_type_service = MockDependencyInjector::new()
1101 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1102 let certificate = fake_data::certificate("hash".to_string());
1103
1104 let signed_entity_type_immutable =
1105 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1106 assert!(
1107 !signed_entity_type_service
1108 .signed_entity_type_lock
1109 .is_locked(&signed_entity_type_immutable)
1110 .await
1111 );
1112 let join_handle = signed_entity_type_service
1113 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1114 .await
1115 .unwrap();
1116
1117 let is_locked = signed_entity_type_service
1120 .signed_entity_type_lock
1121 .is_locked(&signed_entity_type_immutable)
1122 .await;
1123 let is_finished = join_handle.is_finished();
1124
1125 atomic_stop.swap(true, Ordering::Relaxed);
1126 join_handle.await.unwrap().unwrap();
1127
1128 assert!(is_locked);
1129 assert!(!is_finished);
1130
1131 assert!(
1132 !signed_entity_type_service
1133 .signed_entity_type_lock
1134 .is_locked(&signed_entity_type_immutable)
1135 .await
1136 );
1137 }
1138
1139 #[tokio::test]
1140 async fn create_artifact_unlock_signed_entity_type_when_error() {
1141 let signed_entity_type_service = {
1142 let mut mock_container = MockDependencyInjector::new();
1143 mock_container
1144 .mock_cardano_immutable_files_full_artifact_builder
1145 .expect_compute_artifact()
1146 .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1147
1148 mock_container.build_artifact_builder_service()
1149 };
1150 let certificate = fake_data::certificate("hash".to_string());
1151
1152 let signed_entity_type_immutable =
1153 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1154
1155 let join_handle = signed_entity_type_service
1156 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1157 .await
1158 .unwrap();
1159
1160 let error = join_handle.await.unwrap().unwrap_err();
1161 assert!(
1162 error.to_string().contains("CardanoImmutableFilesFull"),
1163 "Error should contains CardanoImmutableFilesFull but was: {error}"
1164 );
1165
1166 assert!(
1167 !signed_entity_type_service
1168 .signed_entity_type_lock
1169 .is_locked(&signed_entity_type_immutable)
1170 .await
1171 );
1172 }
1173
1174 #[tokio::test]
1175 async fn create_artifact_unlock_signed_entity_type_when_panic() {
1176 let signed_entity_type_service =
1177 MockDependencyInjector::new().build_artifact_builder_service();
1178 let certificate = fake_data::certificate("hash".to_string());
1179
1180 let signed_entity_type_immutable =
1181 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1182
1183 let join_handle = signed_entity_type_service
1184 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1185 .await
1186 .unwrap();
1187
1188 let error = join_handle.await.unwrap().unwrap_err();
1189 assert!(
1190 error.to_string().contains("CardanoImmutableFilesFull"),
1191 "Error should contains CardanoImmutableFilesFull but was: {error}"
1192 );
1193
1194 assert!(
1195 !signed_entity_type_service
1196 .signed_entity_type_lock
1197 .is_locked(&signed_entity_type_immutable)
1198 .await
1199 );
1200 }
1201
1202 #[tokio::test]
1203 async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1204 let atomic_stop = Arc::new(AtomicBool::new(false));
1205 let signed_entity_service = MockDependencyInjector::new()
1206 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1207 let certificate = fake_data::certificate("hash".to_string());
1208 let signed_entity_type_immutable =
1209 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1210
1211 signed_entity_service
1212 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1213 .await
1214 .unwrap();
1215
1216 signed_entity_service
1217 .create_artifact(signed_entity_type_immutable, &certificate)
1218 .await
1219 .expect_err("Should return error when signed entity type is already locked");
1220
1221 atomic_stop.swap(true, Ordering::Relaxed);
1222 }
1223
1224 #[tokio::test]
1225 async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1226 let signed_entity_service = {
1227 let mut mock_container = MockDependencyInjector::new();
1228 mock_container
1229 .mock_cardano_immutable_files_full_artifact_builder
1230 .expect_compute_artifact()
1231 .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1232
1233 mock_container.build_artifact_builder_service()
1234 };
1235
1236 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1237
1238 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1239 signed_entity_service.metrics_service.clone(),
1240 &signed_entity_type,
1241 );
1242
1243 signed_entity_service
1244 .create_artifact(
1245 signed_entity_type.clone(),
1246 &fake_data::certificate("hash".to_string()),
1247 )
1248 .await
1249 .unwrap();
1250
1251 assert_eq!(
1252 initial_counter_value,
1253 get_artifact_total_produced_metric_since_startup_counter_value(
1254 signed_entity_service.metrics_service.clone(),
1255 &signed_entity_type,
1256 )
1257 );
1258 }
1259
1260 #[tokio::test]
1261 async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1262 let signed_entity_service = {
1263 let mut mock_container = MockDependencyInjector::new();
1264 mock_container
1265 .mock_signed_entity_storer
1266 .expect_store_signed_entity()
1267 .returning(|_| Err(anyhow!("Error while storing signed entity")));
1268
1269 mock_container.build_artifact_builder_service()
1270 };
1271
1272 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1273
1274 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1275 signed_entity_service.metrics_service.clone(),
1276 &signed_entity_type,
1277 );
1278
1279 signed_entity_service
1280 .create_artifact(
1281 signed_entity_type.clone(),
1282 &fake_data::certificate("hash".to_string()),
1283 )
1284 .await
1285 .unwrap();
1286
1287 assert_eq!(
1288 initial_counter_value,
1289 get_artifact_total_produced_metric_since_startup_counter_value(
1290 signed_entity_service.metrics_service.clone(),
1291 &signed_entity_type,
1292 )
1293 );
1294 }
1295}