1use anyhow::{Context, anyhow};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{Logger, info, warn};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13 StdResult,
14 entities::{
15 BlockNumber, CardanoDatabaseSnapshot, CardanoDbBeacon, CardanoStakeDistribution,
16 CardanoTransactionsSnapshot, Certificate, Epoch, MithrilStakeDistribution,
17 SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
18 },
19 logging::LoggerExtensions,
20 signable_builder::{Artifact, SignedEntity},
21};
22
23use crate::{
24 MetricsService,
25 artifact_builder::ArtifactBuilder,
26 database::{record::SignedEntityRecord, repository::SignedEntityStorer},
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34 async fn create_artifact(
36 &self,
37 signed_entity_type: SignedEntityType,
38 certificate: &Certificate,
39 ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41 async fn get_last_signed_snapshots(
43 &self,
44 total: usize,
45 ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47 async fn get_signed_snapshot_by_id(
49 &self,
50 signed_entity_id: &str,
51 ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53 async fn get_last_signed_cardano_database_snapshots(
55 &self,
56 total: usize,
57 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59 async fn get_signed_cardano_database_snapshot_by_id(
61 &self,
62 signed_entity_id: &str,
63 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65 async fn get_last_signed_mithril_stake_distributions(
68 &self,
69 total: usize,
70 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72 async fn get_signed_mithril_stake_distribution_by_id(
74 &self,
75 signed_entity_id: &str,
76 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78 async fn get_last_cardano_transaction_snapshot(
80 &self,
81 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83 async fn get_last_signed_cardano_stake_distributions(
86 &self,
87 total: usize,
88 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
89}
90
91#[derive(Clone)]
93pub struct MithrilSignedEntityService {
94 signed_entity_storer: Arc<dyn SignedEntityStorer>,
95 mithril_stake_distribution_artifact_builder:
96 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
97 cardano_immutable_files_full_artifact_builder:
98 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
99 cardano_transactions_artifact_builder:
100 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
101 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
102 cardano_stake_distribution_artifact_builder:
103 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
104 cardano_database_artifact_builder:
105 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
106 metrics_service: Arc<MetricsService>,
107 logger: Logger,
108}
109
110pub struct SignedEntityServiceArtifactsDependencies {
112 mithril_stake_distribution_artifact_builder:
113 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
114 cardano_immutable_files_full_artifact_builder:
115 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
116 cardano_transactions_artifact_builder:
117 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
118 cardano_stake_distribution_artifact_builder:
119 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
120 cardano_database_artifact_builder:
121 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
122}
123
124impl SignedEntityServiceArtifactsDependencies {
125 pub fn new(
127 mithril_stake_distribution_artifact_builder: Arc<
128 dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
129 >,
130 cardano_immutable_files_full_artifact_builder: Arc<
131 dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
132 >,
133 cardano_transactions_artifact_builder: Arc<
134 dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
135 >,
136 cardano_stake_distribution_artifact_builder: Arc<
137 dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
138 >,
139 cardano_database_artifact_builder: Arc<
140 dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
141 >,
142 ) -> Self {
143 Self {
144 mithril_stake_distribution_artifact_builder,
145 cardano_immutable_files_full_artifact_builder,
146 cardano_transactions_artifact_builder,
147 cardano_stake_distribution_artifact_builder,
148 cardano_database_artifact_builder,
149 }
150 }
151}
152
153impl MithrilSignedEntityService {
154 pub fn new(
156 signed_entity_storer: Arc<dyn SignedEntityStorer>,
157 dependencies: SignedEntityServiceArtifactsDependencies,
158 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
159 metrics_service: Arc<MetricsService>,
160 logger: Logger,
161 ) -> Self {
162 Self {
163 signed_entity_storer,
164 mithril_stake_distribution_artifact_builder: dependencies
165 .mithril_stake_distribution_artifact_builder,
166 cardano_immutable_files_full_artifact_builder: dependencies
167 .cardano_immutable_files_full_artifact_builder,
168 cardano_transactions_artifact_builder: dependencies
169 .cardano_transactions_artifact_builder,
170 cardano_stake_distribution_artifact_builder: dependencies
171 .cardano_stake_distribution_artifact_builder,
172 cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
173 signed_entity_type_lock,
174 metrics_service,
175 logger: logger.new_with_component_name::<Self>(),
176 }
177 }
178
179 async fn create_artifact_task(
180 &self,
181 signed_entity_type: SignedEntityType,
182 certificate: &Certificate,
183 ) -> StdResult<()> {
184 info!(
185 self.logger, ">> create_artifact_task";
186 "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
187 );
188
189 let mut remaining_retries = 2;
190 let artifact = loop {
191 remaining_retries -= 1;
192
193 match self.compute_artifact(signed_entity_type.clone(), certificate).await {
194 Err(error) if remaining_retries == 0 => break Err(error),
195 Err(_error) => (),
196 Ok(artifact) => break Ok(artifact),
197 };
198 }?;
199
200 let signed_entity = SignedEntityRecord {
201 signed_entity_id: artifact.get_id(),
202 signed_entity_type: signed_entity_type.clone(),
203 certificate_id: certificate.hash.clone(),
204 artifact: serde_json::to_string(&artifact)?,
205 created_at: Utc::now(),
206 };
207
208 self.signed_entity_storer
209 .store_signed_entity(&signed_entity)
210 .await
211 .with_context(|| {
212 format!(
213 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
214 )
215 })?;
216
217 self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
218
219 Ok(())
220 }
221
222 async fn compute_artifact(
224 &self,
225 signed_entity_type: SignedEntityType,
226 certificate: &Certificate,
227 ) -> StdResult<Arc<dyn Artifact>> {
228 match signed_entity_type.clone() {
229 SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
230 self.mithril_stake_distribution_artifact_builder
231 .compute_artifact(epoch, certificate)
232 .await
233 .with_context(|| {
234 format!(
235 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
236 )
237 })?,
238 )),
239 SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
240 self.cardano_immutable_files_full_artifact_builder
241 .compute_artifact(beacon.clone(), certificate)
242 .await
243 .with_context(|| {
244 format!(
245 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
246 )
247 })?,
248 )),
249 SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
250 self.cardano_stake_distribution_artifact_builder
251 .compute_artifact(epoch, certificate)
252 .await
253 .with_context(|| {
254 format!(
255 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
256 )
257 })?)),
258 SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
259 self.cardano_transactions_artifact_builder
260 .compute_artifact(block_number, certificate)
261 .await
262 .with_context(|| {
263 format!(
264 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
265 )
266 })?,
267 )),
268 SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
269 self.cardano_database_artifact_builder
270 .compute_artifact(beacon, certificate)
271 .await
272 .with_context(|| {
273 format!(
274 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
275 )
276 })?
277 )),
278 }
279 }
280
281 async fn get_last_signed_entities(
282 &self,
283 total: usize,
284 discriminants: &SignedEntityTypeDiscriminants,
285 ) -> StdResult<Vec<SignedEntityRecord>> {
286 self.signed_entity_storer
287 .get_last_signed_entities_by_type(discriminants, total)
288 .await
289 .with_context(|| {
290 format!(
291 "Signed Entity Service can not get last signed entities with type: '{discriminants:?}'"
292 )
293 })
294 }
295
296 fn increment_artifact_total_produced_metric_since_startup(
297 &self,
298 signed_entity_type: SignedEntityType,
299 ) {
300 let metrics = self.metrics_service.clone();
301 let metric_counter = match signed_entity_type {
302 SignedEntityType::MithrilStakeDistribution(_) => {
303 metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
304 }
305 SignedEntityType::CardanoImmutableFilesFull(_) => {
306 metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
307 }
308 SignedEntityType::CardanoStakeDistribution(_) => {
309 metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
310 }
311 SignedEntityType::CardanoTransactions(_, _) => {
312 metrics.get_artifact_cardano_transaction_total_produced_since_startup()
313 }
314 SignedEntityType::CardanoDatabase(_) => {
315 metrics.get_artifact_cardano_database_total_produced_since_startup()
316 }
317 };
318
319 metric_counter.increment();
320 }
321}
322
323#[async_trait]
324impl SignedEntityService for MithrilSignedEntityService {
325 async fn create_artifact(
326 &self,
327 signed_entity_type: SignedEntityType,
328 certificate: &Certificate,
329 ) -> StdResult<JoinHandle<StdResult<()>>> {
330 if self.signed_entity_type_lock.is_locked(&signed_entity_type).await {
331 return Err(anyhow!(
332 "Signed entity type '{:?}' is already locked",
333 signed_entity_type
334 ));
335 }
336
337 let service = self.clone();
338 let certificate_cloned = certificate.clone();
339 service.signed_entity_type_lock.lock(&signed_entity_type).await;
340
341 Ok(tokio::task::spawn(async move {
342 let signed_entity_type_clone = signed_entity_type.clone();
343 let service_clone = service.clone();
344 let result = tokio::task::spawn(async move {
345 service_clone
346 .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
347 .await
348 })
349 .await;
350 service
351 .signed_entity_type_lock
352 .release(signed_entity_type.clone())
353 .await;
354
355 result.with_context(|| format!(
356 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
357 ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
358 }))
359 }
360
361 async fn get_last_signed_snapshots(
362 &self,
363 total: usize,
364 ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
365 let signed_entities = self
366 .get_last_signed_entities(
367 total,
368 &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
369 )
370 .await?
371 .into_iter()
372 .map(|record| record.try_into())
373 .collect::<Result<Vec<_>, _>>()?;
374
375 Ok(signed_entities)
376 }
377
378 async fn get_signed_snapshot_by_id(
379 &self,
380 signed_entity_id: &str,
381 ) -> StdResult<Option<SignedEntity<Snapshot>>> {
382 let entity: Option<SignedEntity<Snapshot>> = match self
383 .signed_entity_storer
384 .get_signed_entity(signed_entity_id)
385 .await
386 .with_context(|| {
387 format!(
388 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
389 )
390 })? {
391 Some(entity) => Some(entity.try_into()?),
392 None => None,
393 };
394
395 Ok(entity)
396 }
397
398 async fn get_last_signed_cardano_database_snapshots(
399 &self,
400 total: usize,
401 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
402 let signed_entities = self
403 .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
404 .await?
405 .into_iter()
406 .map(|record| record.try_into())
407 .collect::<Result<Vec<_>, _>>()?;
408
409 Ok(signed_entities)
410 }
411
412 async fn get_signed_cardano_database_snapshot_by_id(
413 &self,
414 signed_entity_id: &str,
415 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
416 let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
417 .signed_entity_storer
418 .get_signed_entity(signed_entity_id)
419 .await
420 .with_context(|| {
421 format!(
422 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
423 )
424 })? {
425 Some(entity) => Some(entity.try_into()?),
426 None => None,
427 };
428
429 Ok(entity)
430 }
431
432 async fn get_last_signed_mithril_stake_distributions(
433 &self,
434 total: usize,
435 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
436 let signed_entities = self
437 .get_last_signed_entities(
438 total,
439 &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
440 )
441 .await?
442 .into_iter()
443 .map(|record| record.try_into())
444 .collect::<Result<Vec<_>, _>>()?;
445
446 Ok(signed_entities)
447 }
448
449 async fn get_signed_mithril_stake_distribution_by_id(
450 &self,
451 signed_entity_id: &str,
452 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
453 let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
454 .signed_entity_storer
455 .get_signed_entity(signed_entity_id)
456 .await
457 .with_context(|| {
458 format!(
459 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
460 )
461 })? {
462 Some(entity) => Some(entity.try_into()?),
463 None => None,
464 };
465
466 Ok(entity)
467 }
468
469 async fn get_last_cardano_transaction_snapshot(
470 &self,
471 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
472 let mut signed_entities_records = self
473 .get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
474 .await?;
475
476 match signed_entities_records.pop() {
477 Some(record) => Ok(Some(record.try_into()?)),
478 None => Ok(None),
479 }
480 }
481
482 async fn get_last_signed_cardano_stake_distributions(
483 &self,
484 total: usize,
485 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
486 let signed_entities = self
487 .get_last_signed_entities(
488 total,
489 &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
490 )
491 .await?
492 .into_iter()
493 .map(|record| record.try_into())
494 .collect::<Result<Vec<_>, _>>()?;
495
496 Ok(signed_entities)
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use std::{sync::atomic::Ordering, time::Duration};
503
504 use mithril_common::{
505 entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
506 signable_builder,
507 test_utils::fake_data,
508 };
509 use mithril_metric::CounterValue;
510 use serde::{Serialize, de::DeserializeOwned};
511 use std::sync::atomic::AtomicBool;
512
513 use crate::artifact_builder::MockArtifactBuilder;
514 use crate::database::repository::MockSignedEntityStorer;
515 use crate::test_tools::TestLogger;
516
517 use super::*;
518
519 fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
520 MithrilStakeDistribution::new(
521 epoch,
522 fake_data::signers_with_stakes(signers),
523 &fake_data::protocol_parameters(),
524 )
525 }
526
527 fn create_cardano_stake_distribution(
528 epoch: Epoch,
529 stake_distribution: StakeDistribution,
530 ) -> CardanoStakeDistribution {
531 CardanoStakeDistribution::new(epoch, stake_distribution)
532 }
533
534 fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
535 where
536 T: Serialize + DeserializeOwned,
537 {
538 let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
539 assert_eq!(
540 serde_json::to_string(&expected).unwrap(),
541 serde_json::to_string(¤t).unwrap()
542 );
543 }
544
545 struct MockDependencyInjector {
547 mock_signed_entity_storer: MockSignedEntityStorer,
548 mock_mithril_stake_distribution_artifact_builder:
549 MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
550 mock_cardano_immutable_files_full_artifact_builder:
551 MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
552 mock_cardano_transactions_artifact_builder:
553 MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
554 mock_cardano_stake_distribution_artifact_builder:
555 MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
556 mock_cardano_database_artifact_builder:
557 MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
558 }
559
560 impl MockDependencyInjector {
561 fn new() -> MockDependencyInjector {
562 MockDependencyInjector {
563 mock_signed_entity_storer: MockSignedEntityStorer::new(),
564 mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
565 Epoch,
566 MithrilStakeDistribution,
567 >::new(),
568 mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
569 CardanoDbBeacon,
570 Snapshot,
571 >::new(),
572 mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
573 BlockNumber,
574 CardanoTransactionsSnapshot,
575 >::new(),
576 mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
577 Epoch,
578 CardanoStakeDistribution,
579 >::new(),
580 mock_cardano_database_artifact_builder: MockArtifactBuilder::<
581 CardanoDbBeacon,
582 CardanoDatabaseSnapshot,
583 >::new(),
584 }
585 }
586
587 fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
588 let dependencies = SignedEntityServiceArtifactsDependencies::new(
589 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
590 Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
591 Arc::new(self.mock_cardano_transactions_artifact_builder),
592 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
593 Arc::new(self.mock_cardano_database_artifact_builder),
594 );
595 MithrilSignedEntityService::new(
596 Arc::new(self.mock_signed_entity_storer),
597 dependencies,
598 Arc::new(SignedEntityTypeLock::default()),
599 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
600 TestLogger::stdout(),
601 )
602 }
603
604 fn build_artifact_builder_service_with_time_consuming_process(
605 mut self,
606 atomic_stop: Arc<AtomicBool>,
607 ) -> MithrilSignedEntityService {
608 struct LongArtifactBuilder {
609 atomic_stop: Arc<AtomicBool>,
610 snapshot: Snapshot,
611 }
612
613 let snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
614
615 #[async_trait]
616 impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
617 async fn compute_artifact(
618 &self,
619 _beacon: CardanoDbBeacon,
620 _certificate: &Certificate,
621 ) -> StdResult<Snapshot> {
622 let mut max_iteration = 100;
623 while !self.atomic_stop.load(Ordering::Relaxed) {
624 max_iteration -= 1;
625 if max_iteration <= 0 {
626 return Err(anyhow!("Test should handle the stop"));
627 }
628 tokio::time::sleep(Duration::from_millis(10)).await;
629 }
630 Ok(self.snapshot.clone())
631 }
632 }
633 let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
634 atomic_stop: atomic_stop.clone(),
635 snapshot: snapshot.clone(),
636 };
637
638 let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
639 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
640 self.mock_signed_entity_storer
641 .expect_store_signed_entity()
642 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
643 .return_once(|_| Ok(()));
644
645 let dependencies = SignedEntityServiceArtifactsDependencies::new(
646 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
647 Arc::new(cardano_immutable_files_full_long_artifact_builder),
648 Arc::new(self.mock_cardano_transactions_artifact_builder),
649 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
650 Arc::new(self.mock_cardano_database_artifact_builder),
651 );
652 MithrilSignedEntityService::new(
653 Arc::new(self.mock_signed_entity_storer),
654 dependencies,
655 Arc::new(SignedEntityTypeLock::default()),
656 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
657 TestLogger::stdout(),
658 )
659 }
660
661 fn mock_artifact_processing<
662 T: Artifact + Clone + Serialize + 'static,
663 U: signable_builder::Beacon,
664 >(
665 &mut self,
666 artifact: T,
667 mock_that_provide_artifact: &dyn Fn(
668 &mut MockDependencyInjector,
669 ) -> &mut MockArtifactBuilder<U, T>,
670 ) {
671 {
672 let artifact_cloned = artifact.clone();
673 mock_that_provide_artifact(self)
674 .expect_compute_artifact()
675 .times(1)
676 .return_once(|_, _| Ok(artifact_cloned));
677 }
678 {
679 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
680 let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
681 self.mock_signed_entity_storer
682 .expect_store_signed_entity()
683 .withf(move |signed_entity| signed_entity.artifact == artifact_json)
684 .return_once(|_| Ok(()));
685 }
686 }
687
688 fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
689 self.mock_artifact_processing(artifact, &|mock_injector| {
690 &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
691 });
692 }
693 }
694
695 fn get_artifact_total_produced_metric_since_startup_counter_value(
696 metrics_service: Arc<MetricsService>,
697 signed_entity_type: &SignedEntityType,
698 ) -> CounterValue {
699 match signed_entity_type {
700 SignedEntityType::MithrilStakeDistribution(_) => metrics_service
701 .get_artifact_mithril_stake_distribution_total_produced_since_startup()
702 .get(),
703 SignedEntityType::CardanoImmutableFilesFull(_) => metrics_service
704 .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
705 .get(),
706 SignedEntityType::CardanoStakeDistribution(_) => metrics_service
707 .get_artifact_cardano_stake_distribution_total_produced_since_startup()
708 .get(),
709 SignedEntityType::CardanoTransactions(_, _) => metrics_service
710 .get_artifact_cardano_transaction_total_produced_since_startup()
711 .get(),
712 SignedEntityType::CardanoDatabase(_) => metrics_service
713 .get_artifact_cardano_database_total_produced_since_startup()
714 .get(),
715 }
716 }
717
718 #[tokio::test]
719 async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type()
720 {
721 let mut mock_container = MockDependencyInjector::new();
722
723 let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
724
725 mock_container
726 .mock_mithril_stake_distribution_artifact_builder
727 .expect_compute_artifact()
728 .times(1)
729 .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
730
731 let artifact_builder_service = mock_container.build_artifact_builder_service();
732
733 let certificate = fake_data::certificate("hash".to_string());
734 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
735 let artifact = artifact_builder_service
736 .compute_artifact(signed_entity_type.clone(), &certificate)
737 .await
738 .unwrap();
739
740 assert_expected(&mithril_stake_distribution_expected, &artifact);
741 }
742
743 #[tokio::test]
744 async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
745 generic_test_that_the_artifact_is_stored(
746 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
747 create_stake_distribution(Epoch(1), 5),
748 &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
749 )
750 .await;
751 }
752
753 #[tokio::test]
754 async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type()
755 {
756 let mut mock_container = MockDependencyInjector::new();
757
758 let cardano_stake_distribution_expected = create_cardano_stake_distribution(
759 Epoch(1),
760 StakeDistribution::from([("pool-1".to_string(), 100)]),
761 );
762
763 mock_container
764 .mock_cardano_stake_distribution_artifact_builder
765 .expect_compute_artifact()
766 .times(1)
767 .returning(|_, _| {
768 Ok(create_cardano_stake_distribution(
769 Epoch(1),
770 StakeDistribution::from([("pool-1".to_string(), 100)]),
771 ))
772 });
773
774 let artifact_builder_service = mock_container.build_artifact_builder_service();
775
776 let certificate = fake_data::certificate("hash".to_string());
777 let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
778 let artifact = artifact_builder_service
779 .compute_artifact(signed_entity_type.clone(), &certificate)
780 .await
781 .unwrap();
782
783 assert_expected(&cardano_stake_distribution_expected, &artifact);
784 }
785
786 #[tokio::test]
787 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
788 generic_test_that_the_artifact_is_stored(
789 SignedEntityType::CardanoStakeDistribution(Epoch(1)),
790 create_cardano_stake_distribution(
791 Epoch(1),
792 StakeDistribution::from([("pool-1".to_string(), 100)]),
793 ),
794 &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
795 )
796 .await;
797 }
798
799 #[tokio::test]
800 async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
801 let mut mock_container = MockDependencyInjector::new();
802
803 let snapshot_expected = fake_data::snapshots(1).first().unwrap().to_owned();
804
805 mock_container
806 .mock_cardano_immutable_files_full_artifact_builder
807 .expect_compute_artifact()
808 .times(1)
809 .returning(|_, _| Ok(fake_data::snapshots(1).first().unwrap().to_owned()));
810
811 let artifact_builder_service = mock_container.build_artifact_builder_service();
812
813 let certificate = fake_data::certificate("hash".to_string());
814 let signed_entity_type =
815 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
816 let artifact = artifact_builder_service
817 .compute_artifact(signed_entity_type.clone(), &certificate)
818 .await
819 .unwrap();
820
821 assert_expected(&snapshot_expected, &artifact);
822 }
823
824 #[tokio::test]
825 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
826 generic_test_that_the_artifact_is_stored(
827 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
828 fake_data::snapshots(1).first().unwrap().to_owned(),
829 &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
830 )
831 .await;
832 }
833
834 #[tokio::test]
835 async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
836 let mut mock_container = MockDependencyInjector::new();
837
838 let block_number = BlockNumber(151);
839 let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
840
841 mock_container
842 .mock_cardano_transactions_artifact_builder
843 .expect_compute_artifact()
844 .times(1)
845 .returning(move |_, _| {
846 Ok(CardanoTransactionsSnapshot::new(
847 "merkle_root".to_string(),
848 block_number,
849 ))
850 });
851
852 let artifact_builder_service = mock_container.build_artifact_builder_service();
853
854 let certificate = fake_data::certificate("hash".to_string());
855 let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
856 let artifact = artifact_builder_service
857 .compute_artifact(signed_entity_type.clone(), &certificate)
858 .await
859 .unwrap();
860
861 assert_expected(&expected, &artifact);
862 }
863
864 #[tokio::test]
865 async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
866 let block_number = BlockNumber(149);
867 generic_test_that_the_artifact_is_stored(
868 SignedEntityType::CardanoTransactions(Epoch(1), block_number),
869 CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
870 &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
871 )
872 .await;
873 }
874
875 #[tokio::test]
876 async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
877 let mut mock_container = MockDependencyInjector::new();
878
879 let cardano_database_expected =
880 fake_data::cardano_database_snapshots(1).first().unwrap().to_owned();
881
882 mock_container
883 .mock_cardano_database_artifact_builder
884 .expect_compute_artifact()
885 .times(1)
886 .returning(|_, _| {
887 Ok(fake_data::cardano_database_snapshots(1).first().unwrap().to_owned())
888 });
889
890 let artifact_builder_service = mock_container.build_artifact_builder_service();
891
892 let certificate = fake_data::certificate("hash".to_string());
893 let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
894 let artifact = artifact_builder_service
895 .compute_artifact(signed_entity_type.clone(), &certificate)
896 .await
897 .unwrap();
898
899 assert_expected(&cardano_database_expected, &artifact);
900 }
901
902 #[tokio::test]
903 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
904 generic_test_that_the_artifact_is_stored(
905 SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
906 fake_data::cardano_database_snapshots(1).first().unwrap().to_owned(),
907 &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
908 )
909 .await;
910 }
911
912 async fn generic_test_that_the_artifact_is_stored<
913 T: Artifact + Clone + Serialize + 'static,
914 U: signable_builder::Beacon,
915 >(
916 signed_entity_type: SignedEntityType,
917 artifact: T,
918 mock_that_provide_artifact: &dyn Fn(
919 &mut MockDependencyInjector,
920 ) -> &mut MockArtifactBuilder<U, T>,
921 ) {
922 let mut mock_container = MockDependencyInjector::new();
923 {
924 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
925 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
926 mock_container
927 .mock_signed_entity_storer
928 .expect_store_signed_entity()
929 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
930 .return_once(|_| Ok(()));
931 }
932 {
933 let artifact_cloned = artifact.clone();
934 mock_that_provide_artifact(&mut mock_container)
935 .expect_compute_artifact()
936 .times(1)
937 .return_once(|_, _| Ok(artifact_cloned));
938 }
939 let artifact_builder_service = mock_container.build_artifact_builder_service();
940
941 let certificate = fake_data::certificate("hash".to_string());
942 let error_message = format!(
943 "Create artifact should not fail for {} signed entity",
944 std::any::type_name::<T>()
945 );
946 let error_message_str = error_message.as_str();
947
948 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
949 artifact_builder_service.metrics_service.clone(),
950 &signed_entity_type,
951 );
952
953 artifact_builder_service
954 .create_artifact_task(signed_entity_type.clone(), &certificate)
955 .await
956 .expect(error_message_str);
957
958 assert_eq!(
959 initial_counter_value + 1,
960 get_artifact_total_produced_metric_since_startup_counter_value(
961 artifact_builder_service.metrics_service.clone(),
962 &signed_entity_type,
963 )
964 )
965 }
966
967 #[tokio::test]
968 async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
969 let atomic_stop = Arc::new(AtomicBool::new(false));
970 let signed_entity_type_service = {
971 let mut mock_container = MockDependencyInjector::new();
972
973 let msd = create_stake_distribution(Epoch(1), 5);
974 mock_container.mock_stake_distribution_processing(msd);
975
976 mock_container
977 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
978 };
979 let certificate = fake_data::certificate("hash".to_string());
980
981 let signed_entity_type_immutable =
982 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
983 let first_task_that_never_finished = signed_entity_type_service
984 .create_artifact(signed_entity_type_immutable, &certificate)
985 .await
986 .unwrap();
987
988 let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
989 let second_task_that_finish_first = signed_entity_type_service
990 .create_artifact(signed_entity_type_msd, &certificate)
991 .await
992 .unwrap();
993
994 second_task_that_finish_first.await.unwrap().unwrap();
995 assert!(!first_task_that_never_finished.is_finished());
996
997 atomic_stop.swap(true, Ordering::Relaxed);
998 }
999
1000 #[tokio::test]
1001 async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1002 let atomic_stop = Arc::new(AtomicBool::new(false));
1003 let signed_entity_type_service = MockDependencyInjector::new()
1004 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1005 let certificate = fake_data::certificate("hash".to_string());
1006
1007 let signed_entity_type_immutable =
1008 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1009 assert!(
1010 !signed_entity_type_service
1011 .signed_entity_type_lock
1012 .is_locked(&signed_entity_type_immutable)
1013 .await
1014 );
1015 let join_handle = signed_entity_type_service
1016 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1017 .await
1018 .unwrap();
1019
1020 let is_locked = signed_entity_type_service
1023 .signed_entity_type_lock
1024 .is_locked(&signed_entity_type_immutable)
1025 .await;
1026 let is_finished = join_handle.is_finished();
1027
1028 atomic_stop.swap(true, Ordering::Relaxed);
1029 join_handle.await.unwrap().unwrap();
1030
1031 assert!(is_locked);
1032 assert!(!is_finished);
1033
1034 assert!(
1035 !signed_entity_type_service
1036 .signed_entity_type_lock
1037 .is_locked(&signed_entity_type_immutable)
1038 .await
1039 );
1040 }
1041
1042 #[tokio::test]
1043 async fn create_artifact_unlock_signed_entity_type_when_error() {
1044 let signed_entity_type_service = {
1045 let mut mock_container = MockDependencyInjector::new();
1046 mock_container
1047 .mock_cardano_immutable_files_full_artifact_builder
1048 .expect_compute_artifact()
1049 .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1050
1051 mock_container.build_artifact_builder_service()
1052 };
1053 let certificate = fake_data::certificate("hash".to_string());
1054
1055 let signed_entity_type_immutable =
1056 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1057
1058 let join_handle = signed_entity_type_service
1059 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1060 .await
1061 .unwrap();
1062
1063 let error = join_handle.await.unwrap().unwrap_err();
1064 assert!(
1065 error.to_string().contains("CardanoImmutableFilesFull"),
1066 "Error should contains CardanoImmutableFilesFull but was: {error}"
1067 );
1068
1069 assert!(
1070 !signed_entity_type_service
1071 .signed_entity_type_lock
1072 .is_locked(&signed_entity_type_immutable)
1073 .await
1074 );
1075 }
1076
1077 #[tokio::test]
1078 async fn create_artifact_unlock_signed_entity_type_when_panic() {
1079 let signed_entity_type_service =
1080 MockDependencyInjector::new().build_artifact_builder_service();
1081 let certificate = fake_data::certificate("hash".to_string());
1082
1083 let signed_entity_type_immutable =
1084 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1085
1086 let join_handle = signed_entity_type_service
1087 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1088 .await
1089 .unwrap();
1090
1091 let error = join_handle.await.unwrap().unwrap_err();
1092 assert!(
1093 error.to_string().contains("CardanoImmutableFilesFull"),
1094 "Error should contains CardanoImmutableFilesFull but was: {error}"
1095 );
1096
1097 assert!(
1098 !signed_entity_type_service
1099 .signed_entity_type_lock
1100 .is_locked(&signed_entity_type_immutable)
1101 .await
1102 );
1103 }
1104
1105 #[tokio::test]
1106 async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1107 let atomic_stop = Arc::new(AtomicBool::new(false));
1108 let signed_entity_service = MockDependencyInjector::new()
1109 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1110 let certificate = fake_data::certificate("hash".to_string());
1111 let signed_entity_type_immutable =
1112 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1113
1114 signed_entity_service
1115 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1116 .await
1117 .unwrap();
1118
1119 signed_entity_service
1120 .create_artifact(signed_entity_type_immutable, &certificate)
1121 .await
1122 .expect_err("Should return error when signed entity type is already locked");
1123
1124 atomic_stop.swap(true, Ordering::Relaxed);
1125 }
1126
1127 #[tokio::test]
1128 async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1129 let signed_entity_service = {
1130 let mut mock_container = MockDependencyInjector::new();
1131 mock_container
1132 .mock_cardano_immutable_files_full_artifact_builder
1133 .expect_compute_artifact()
1134 .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1135
1136 mock_container.build_artifact_builder_service()
1137 };
1138
1139 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1140
1141 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1142 signed_entity_service.metrics_service.clone(),
1143 &signed_entity_type,
1144 );
1145
1146 signed_entity_service
1147 .create_artifact(
1148 signed_entity_type.clone(),
1149 &fake_data::certificate("hash".to_string()),
1150 )
1151 .await
1152 .unwrap();
1153
1154 assert_eq!(
1155 initial_counter_value,
1156 get_artifact_total_produced_metric_since_startup_counter_value(
1157 signed_entity_service.metrics_service.clone(),
1158 &signed_entity_type,
1159 )
1160 );
1161 }
1162
1163 #[tokio::test]
1164 async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1165 let signed_entity_service = {
1166 let mut mock_container = MockDependencyInjector::new();
1167 mock_container
1168 .mock_signed_entity_storer
1169 .expect_store_signed_entity()
1170 .returning(|_| Err(anyhow!("Error while storing signed entity")));
1171
1172 mock_container.build_artifact_builder_service()
1173 };
1174
1175 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1176
1177 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1178 signed_entity_service.metrics_service.clone(),
1179 &signed_entity_type,
1180 );
1181
1182 signed_entity_service
1183 .create_artifact(
1184 signed_entity_type.clone(),
1185 &fake_data::certificate("hash".to_string()),
1186 )
1187 .await
1188 .unwrap();
1189
1190 assert_eq!(
1191 initial_counter_value,
1192 get_artifact_total_produced_metric_since_startup_counter_value(
1193 signed_entity_service.metrics_service.clone(),
1194 &signed_entity_type,
1195 )
1196 );
1197 }
1198}