1use anyhow::{anyhow, Context};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{info, warn, Logger};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13 entities::{
14 BlockNumber, CardanoDatabaseSnapshot, CardanoDbBeacon, CardanoStakeDistribution,
15 CardanoTransactionsSnapshot, Certificate, Epoch, MithrilStakeDistribution,
16 SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
17 },
18 logging::LoggerExtensions,
19 signable_builder::{Artifact, SignedEntity},
20 StdResult,
21};
22
23use crate::{
24 artifact_builder::ArtifactBuilder,
25 database::{record::SignedEntityRecord, repository::SignedEntityStorer},
26 MetricsService,
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34 async fn create_artifact(
36 &self,
37 signed_entity_type: SignedEntityType,
38 certificate: &Certificate,
39 ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41 async fn get_last_signed_snapshots(
43 &self,
44 total: usize,
45 ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47 async fn get_signed_snapshot_by_id(
49 &self,
50 signed_entity_id: &str,
51 ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53 async fn get_last_signed_cardano_database_snapshots(
55 &self,
56 total: usize,
57 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59 async fn get_signed_cardano_database_snapshot_by_id(
61 &self,
62 signed_entity_id: &str,
63 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65 async fn get_last_signed_mithril_stake_distributions(
68 &self,
69 total: usize,
70 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72 async fn get_signed_mithril_stake_distribution_by_id(
74 &self,
75 signed_entity_id: &str,
76 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78 async fn get_last_cardano_transaction_snapshot(
80 &self,
81 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83 async fn get_last_signed_cardano_stake_distributions(
86 &self,
87 total: usize,
88 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
89}
90
91#[derive(Clone)]
93pub struct MithrilSignedEntityService {
94 signed_entity_storer: Arc<dyn SignedEntityStorer>,
95 mithril_stake_distribution_artifact_builder:
96 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
97 cardano_immutable_files_full_artifact_builder:
98 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
99 cardano_transactions_artifact_builder:
100 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
101 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
102 cardano_stake_distribution_artifact_builder:
103 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
104 cardano_database_artifact_builder:
105 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
106 metrics_service: Arc<MetricsService>,
107 logger: Logger,
108}
109
110pub struct SignedEntityServiceArtifactsDependencies {
112 mithril_stake_distribution_artifact_builder:
113 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
114 cardano_immutable_files_full_artifact_builder:
115 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
116 cardano_transactions_artifact_builder:
117 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
118 cardano_stake_distribution_artifact_builder:
119 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
120 cardano_database_artifact_builder:
121 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
122}
123
124impl SignedEntityServiceArtifactsDependencies {
125 pub fn new(
127 mithril_stake_distribution_artifact_builder: Arc<
128 dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
129 >,
130 cardano_immutable_files_full_artifact_builder: Arc<
131 dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
132 >,
133 cardano_transactions_artifact_builder: Arc<
134 dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
135 >,
136 cardano_stake_distribution_artifact_builder: Arc<
137 dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
138 >,
139 cardano_database_artifact_builder: Arc<
140 dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
141 >,
142 ) -> Self {
143 Self {
144 mithril_stake_distribution_artifact_builder,
145 cardano_immutable_files_full_artifact_builder,
146 cardano_transactions_artifact_builder,
147 cardano_stake_distribution_artifact_builder,
148 cardano_database_artifact_builder,
149 }
150 }
151}
152
153impl MithrilSignedEntityService {
154 pub fn new(
156 signed_entity_storer: Arc<dyn SignedEntityStorer>,
157 dependencies: SignedEntityServiceArtifactsDependencies,
158 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
159 metrics_service: Arc<MetricsService>,
160 logger: Logger,
161 ) -> Self {
162 Self {
163 signed_entity_storer,
164 mithril_stake_distribution_artifact_builder: dependencies
165 .mithril_stake_distribution_artifact_builder,
166 cardano_immutable_files_full_artifact_builder: dependencies
167 .cardano_immutable_files_full_artifact_builder,
168 cardano_transactions_artifact_builder: dependencies
169 .cardano_transactions_artifact_builder,
170 cardano_stake_distribution_artifact_builder: dependencies
171 .cardano_stake_distribution_artifact_builder,
172 cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
173 signed_entity_type_lock,
174 metrics_service,
175 logger: logger.new_with_component_name::<Self>(),
176 }
177 }
178
179 async fn create_artifact_task(
180 &self,
181 signed_entity_type: SignedEntityType,
182 certificate: &Certificate,
183 ) -> StdResult<()> {
184 info!(
185 self.logger, ">> create_artifact_task";
186 "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
187 );
188
189 let mut remaining_retries = 2;
190 let artifact = loop {
191 remaining_retries -= 1;
192
193 match self
194 .compute_artifact(signed_entity_type.clone(), certificate)
195 .await
196 {
197 Err(error) if remaining_retries == 0 => break Err(error),
198 Err(_error) => (),
199 Ok(artifact) => break Ok(artifact),
200 };
201 }?;
202
203 let signed_entity = SignedEntityRecord {
204 signed_entity_id: artifact.get_id(),
205 signed_entity_type: signed_entity_type.clone(),
206 certificate_id: certificate.hash.clone(),
207 artifact: serde_json::to_string(&artifact)?,
208 created_at: Utc::now(),
209 };
210
211 self.signed_entity_storer
212 .store_signed_entity(&signed_entity)
213 .await
214 .with_context(|| {
215 format!(
216 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
217 )
218 })?;
219
220 self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
221
222 Ok(())
223 }
224
225 async fn compute_artifact(
227 &self,
228 signed_entity_type: SignedEntityType,
229 certificate: &Certificate,
230 ) -> StdResult<Arc<dyn Artifact>> {
231 match signed_entity_type.clone() {
232 SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
233 self.mithril_stake_distribution_artifact_builder
234 .compute_artifact(epoch, certificate)
235 .await
236 .with_context(|| {
237 format!(
238 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
239 )
240 })?,
241 )),
242 SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
243 self.cardano_immutable_files_full_artifact_builder
244 .compute_artifact(beacon.clone(), certificate)
245 .await
246 .with_context(|| {
247 format!(
248 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
249 )
250 })?,
251 )),
252 SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
253 self.cardano_stake_distribution_artifact_builder
254 .compute_artifact(epoch, certificate)
255 .await
256 .with_context(|| {
257 format!(
258 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
259 )
260 })?)),
261 SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
262 self.cardano_transactions_artifact_builder
263 .compute_artifact(block_number, certificate)
264 .await
265 .with_context(|| {
266 format!(
267 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
268 )
269 })?,
270 )),
271 SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
272 self.cardano_database_artifact_builder
273 .compute_artifact(beacon, certificate)
274 .await
275 .with_context(|| {
276 format!(
277 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
278 )
279 })?
280 )),
281 }
282 }
283
284 async fn get_last_signed_entities(
285 &self,
286 total: usize,
287 discriminants: &SignedEntityTypeDiscriminants,
288 ) -> StdResult<Vec<SignedEntityRecord>> {
289 self.signed_entity_storer
290 .get_last_signed_entities_by_type(discriminants, total)
291 .await
292 .with_context(|| {
293 format!(
294 "Signed Entity Service can not get last signed entities with type: '{discriminants:?}'"
295 )
296 })
297 }
298
299 fn increment_artifact_total_produced_metric_since_startup(
300 &self,
301 signed_entity_type: SignedEntityType,
302 ) {
303 let metrics = self.metrics_service.clone();
304 let metric_counter = match signed_entity_type {
305 SignedEntityType::MithrilStakeDistribution(_) => {
306 metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
307 }
308 SignedEntityType::CardanoImmutableFilesFull(_) => {
309 metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
310 }
311 SignedEntityType::CardanoStakeDistribution(_) => {
312 metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
313 }
314 SignedEntityType::CardanoTransactions(_, _) => {
315 metrics.get_artifact_cardano_transaction_total_produced_since_startup()
316 }
317 SignedEntityType::CardanoDatabase(_) => {
318 metrics.get_artifact_cardano_database_total_produced_since_startup()
319 }
320 };
321
322 metric_counter.increment();
323 }
324}
325
326#[async_trait]
327impl SignedEntityService for MithrilSignedEntityService {
328 async fn create_artifact(
329 &self,
330 signed_entity_type: SignedEntityType,
331 certificate: &Certificate,
332 ) -> StdResult<JoinHandle<StdResult<()>>> {
333 if self
334 .signed_entity_type_lock
335 .is_locked(&signed_entity_type)
336 .await
337 {
338 return Err(anyhow!(
339 "Signed entity type '{:?}' is already locked",
340 signed_entity_type
341 ));
342 }
343
344 let service = self.clone();
345 let certificate_cloned = certificate.clone();
346 service
347 .signed_entity_type_lock
348 .lock(&signed_entity_type)
349 .await;
350
351 Ok(tokio::task::spawn(async move {
352 let signed_entity_type_clone = signed_entity_type.clone();
353 let service_clone = service.clone();
354 let result = tokio::task::spawn(async move {
355 service_clone
356 .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
357 .await
358 })
359 .await;
360 service
361 .signed_entity_type_lock
362 .release(signed_entity_type.clone())
363 .await;
364
365 result.with_context(|| format!(
366 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
367 ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
368 }))
369 }
370
371 async fn get_last_signed_snapshots(
372 &self,
373 total: usize,
374 ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
375 let signed_entities = self
376 .get_last_signed_entities(
377 total,
378 &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
379 )
380 .await?
381 .into_iter()
382 .map(|record| record.try_into())
383 .collect::<Result<Vec<_>, _>>()?;
384
385 Ok(signed_entities)
386 }
387
388 async fn get_signed_snapshot_by_id(
389 &self,
390 signed_entity_id: &str,
391 ) -> StdResult<Option<SignedEntity<Snapshot>>> {
392 let entity: Option<SignedEntity<Snapshot>> = match self
393 .signed_entity_storer
394 .get_signed_entity(signed_entity_id)
395 .await
396 .with_context(|| {
397 format!(
398 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
399 )
400 })? {
401 Some(entity) => Some(entity.try_into()?),
402 None => None,
403 };
404
405 Ok(entity)
406 }
407
408 async fn get_last_signed_cardano_database_snapshots(
409 &self,
410 total: usize,
411 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
412 let signed_entities = self
413 .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
414 .await?
415 .into_iter()
416 .map(|record| record.try_into())
417 .collect::<Result<Vec<_>, _>>()?;
418
419 Ok(signed_entities)
420 }
421
422 async fn get_signed_cardano_database_snapshot_by_id(
423 &self,
424 signed_entity_id: &str,
425 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
426 let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
427 .signed_entity_storer
428 .get_signed_entity(signed_entity_id)
429 .await
430 .with_context(|| {
431 format!(
432 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
433 )
434 })? {
435 Some(entity) => Some(entity.try_into()?),
436 None => None,
437 };
438
439 Ok(entity)
440 }
441
442 async fn get_last_signed_mithril_stake_distributions(
443 &self,
444 total: usize,
445 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
446 let signed_entities = self
447 .get_last_signed_entities(
448 total,
449 &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
450 )
451 .await?
452 .into_iter()
453 .map(|record| record.try_into())
454 .collect::<Result<Vec<_>, _>>()?;
455
456 Ok(signed_entities)
457 }
458
459 async fn get_signed_mithril_stake_distribution_by_id(
460 &self,
461 signed_entity_id: &str,
462 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
463 let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
464 .signed_entity_storer
465 .get_signed_entity(signed_entity_id)
466 .await
467 .with_context(|| {
468 format!(
469 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
470 )
471 })? {
472 Some(entity) => Some(entity.try_into()?),
473 None => None,
474 };
475
476 Ok(entity)
477 }
478
479 async fn get_last_cardano_transaction_snapshot(
480 &self,
481 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
482 let mut signed_entities_records = self
483 .get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
484 .await?;
485
486 match signed_entities_records.pop() {
487 Some(record) => Ok(Some(record.try_into()?)),
488 None => Ok(None),
489 }
490 }
491
492 async fn get_last_signed_cardano_stake_distributions(
493 &self,
494 total: usize,
495 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
496 let signed_entities = self
497 .get_last_signed_entities(
498 total,
499 &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
500 )
501 .await?
502 .into_iter()
503 .map(|record| record.try_into())
504 .collect::<Result<Vec<_>, _>>()?;
505
506 Ok(signed_entities)
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use std::{sync::atomic::Ordering, time::Duration};
513
514 use mithril_common::{
515 entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
516 signable_builder,
517 test_utils::fake_data,
518 };
519 use mithril_metric::CounterValue;
520 use serde::{de::DeserializeOwned, Serialize};
521 use std::sync::atomic::AtomicBool;
522
523 use crate::artifact_builder::MockArtifactBuilder;
524 use crate::database::repository::MockSignedEntityStorer;
525 use crate::test_tools::TestLogger;
526
527 use super::*;
528
529 fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
530 MithrilStakeDistribution::new(
531 epoch,
532 fake_data::signers_with_stakes(signers),
533 &fake_data::protocol_parameters(),
534 )
535 }
536
537 fn create_cardano_stake_distribution(
538 epoch: Epoch,
539 stake_distribution: StakeDistribution,
540 ) -> CardanoStakeDistribution {
541 CardanoStakeDistribution::new(epoch, stake_distribution)
542 }
543
544 fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
545 where
546 T: Serialize + DeserializeOwned,
547 {
548 let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
549 assert_eq!(
550 serde_json::to_string(&expected).unwrap(),
551 serde_json::to_string(¤t).unwrap()
552 );
553 }
554
555 struct MockDependencyInjector {
557 mock_signed_entity_storer: MockSignedEntityStorer,
558 mock_mithril_stake_distribution_artifact_builder:
559 MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
560 mock_cardano_immutable_files_full_artifact_builder:
561 MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
562 mock_cardano_transactions_artifact_builder:
563 MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
564 mock_cardano_stake_distribution_artifact_builder:
565 MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
566 mock_cardano_database_artifact_builder:
567 MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
568 }
569
570 impl MockDependencyInjector {
571 fn new() -> MockDependencyInjector {
572 MockDependencyInjector {
573 mock_signed_entity_storer: MockSignedEntityStorer::new(),
574 mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
575 Epoch,
576 MithrilStakeDistribution,
577 >::new(),
578 mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
579 CardanoDbBeacon,
580 Snapshot,
581 >::new(),
582 mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
583 BlockNumber,
584 CardanoTransactionsSnapshot,
585 >::new(),
586 mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
587 Epoch,
588 CardanoStakeDistribution,
589 >::new(),
590 mock_cardano_database_artifact_builder: MockArtifactBuilder::<
591 CardanoDbBeacon,
592 CardanoDatabaseSnapshot,
593 >::new(),
594 }
595 }
596
597 fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
598 let dependencies = SignedEntityServiceArtifactsDependencies::new(
599 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
600 Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
601 Arc::new(self.mock_cardano_transactions_artifact_builder),
602 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
603 Arc::new(self.mock_cardano_database_artifact_builder),
604 );
605 MithrilSignedEntityService::new(
606 Arc::new(self.mock_signed_entity_storer),
607 dependencies,
608 Arc::new(SignedEntityTypeLock::default()),
609 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
610 TestLogger::stdout(),
611 )
612 }
613
614 fn build_artifact_builder_service_with_time_consuming_process(
615 mut self,
616 atomic_stop: Arc<AtomicBool>,
617 ) -> MithrilSignedEntityService {
618 struct LongArtifactBuilder {
619 atomic_stop: Arc<AtomicBool>,
620 snapshot: Snapshot,
621 }
622
623 let snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
624
625 #[async_trait]
626 impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
627 async fn compute_artifact(
628 &self,
629 _beacon: CardanoDbBeacon,
630 _certificate: &Certificate,
631 ) -> StdResult<Snapshot> {
632 let mut max_iteration = 100;
633 while !self.atomic_stop.load(Ordering::Relaxed) {
634 max_iteration -= 1;
635 if max_iteration <= 0 {
636 return Err(anyhow!("Test should handle the stop"));
637 }
638 tokio::time::sleep(Duration::from_millis(10)).await;
639 }
640 Ok(self.snapshot.clone())
641 }
642 }
643 let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
644 atomic_stop: atomic_stop.clone(),
645 snapshot: snapshot.clone(),
646 };
647
648 let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
649 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
650 self.mock_signed_entity_storer
651 .expect_store_signed_entity()
652 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
653 .return_once(|_| Ok(()));
654
655 let dependencies = SignedEntityServiceArtifactsDependencies::new(
656 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
657 Arc::new(cardano_immutable_files_full_long_artifact_builder),
658 Arc::new(self.mock_cardano_transactions_artifact_builder),
659 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
660 Arc::new(self.mock_cardano_database_artifact_builder),
661 );
662 MithrilSignedEntityService::new(
663 Arc::new(self.mock_signed_entity_storer),
664 dependencies,
665 Arc::new(SignedEntityTypeLock::default()),
666 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
667 TestLogger::stdout(),
668 )
669 }
670
671 fn mock_artifact_processing<
672 T: Artifact + Clone + Serialize + 'static,
673 U: signable_builder::Beacon,
674 >(
675 &mut self,
676 artifact: T,
677 mock_that_provide_artifact: &dyn Fn(
678 &mut MockDependencyInjector,
679 ) -> &mut MockArtifactBuilder<U, T>,
680 ) {
681 {
682 let artifact_cloned = artifact.clone();
683 mock_that_provide_artifact(self)
684 .expect_compute_artifact()
685 .times(1)
686 .return_once(|_, _| Ok(artifact_cloned));
687 }
688 {
689 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
690 let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
691 self.mock_signed_entity_storer
692 .expect_store_signed_entity()
693 .withf(move |signed_entity| signed_entity.artifact == artifact_json)
694 .return_once(|_| Ok(()));
695 }
696 }
697
698 fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
699 self.mock_artifact_processing(artifact, &|mock_injector| {
700 &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
701 });
702 }
703 }
704
705 fn get_artifact_total_produced_metric_since_startup_counter_value(
706 metrics_service: Arc<MetricsService>,
707 signed_entity_type: &SignedEntityType,
708 ) -> CounterValue {
709 match signed_entity_type {
710 SignedEntityType::MithrilStakeDistribution(_) => metrics_service
711 .get_artifact_mithril_stake_distribution_total_produced_since_startup()
712 .get(),
713 SignedEntityType::CardanoImmutableFilesFull(_) => metrics_service
714 .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
715 .get(),
716 SignedEntityType::CardanoStakeDistribution(_) => metrics_service
717 .get_artifact_cardano_stake_distribution_total_produced_since_startup()
718 .get(),
719 SignedEntityType::CardanoTransactions(_, _) => metrics_service
720 .get_artifact_cardano_transaction_total_produced_since_startup()
721 .get(),
722 SignedEntityType::CardanoDatabase(_) => metrics_service
723 .get_artifact_cardano_database_total_produced_since_startup()
724 .get(),
725 }
726 }
727
728 #[tokio::test]
729 async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type(
730 ) {
731 let mut mock_container = MockDependencyInjector::new();
732
733 let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
734
735 mock_container
736 .mock_mithril_stake_distribution_artifact_builder
737 .expect_compute_artifact()
738 .times(1)
739 .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
740
741 let artifact_builder_service = mock_container.build_artifact_builder_service();
742
743 let certificate = fake_data::certificate("hash".to_string());
744 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
745 let artifact = artifact_builder_service
746 .compute_artifact(signed_entity_type.clone(), &certificate)
747 .await
748 .unwrap();
749
750 assert_expected(&mithril_stake_distribution_expected, &artifact);
751 }
752
753 #[tokio::test]
754 async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
755 generic_test_that_the_artifact_is_stored(
756 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
757 create_stake_distribution(Epoch(1), 5),
758 &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
759 )
760 .await;
761 }
762
763 #[tokio::test]
764 async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type(
765 ) {
766 let mut mock_container = MockDependencyInjector::new();
767
768 let cardano_stake_distribution_expected = create_cardano_stake_distribution(
769 Epoch(1),
770 StakeDistribution::from([("pool-1".to_string(), 100)]),
771 );
772
773 mock_container
774 .mock_cardano_stake_distribution_artifact_builder
775 .expect_compute_artifact()
776 .times(1)
777 .returning(|_, _| {
778 Ok(create_cardano_stake_distribution(
779 Epoch(1),
780 StakeDistribution::from([("pool-1".to_string(), 100)]),
781 ))
782 });
783
784 let artifact_builder_service = mock_container.build_artifact_builder_service();
785
786 let certificate = fake_data::certificate("hash".to_string());
787 let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
788 let artifact = artifact_builder_service
789 .compute_artifact(signed_entity_type.clone(), &certificate)
790 .await
791 .unwrap();
792
793 assert_expected(&cardano_stake_distribution_expected, &artifact);
794 }
795
796 #[tokio::test]
797 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
798 generic_test_that_the_artifact_is_stored(
799 SignedEntityType::CardanoStakeDistribution(Epoch(1)),
800 create_cardano_stake_distribution(
801 Epoch(1),
802 StakeDistribution::from([("pool-1".to_string(), 100)]),
803 ),
804 &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
805 )
806 .await;
807 }
808
809 #[tokio::test]
810 async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
811 let mut mock_container = MockDependencyInjector::new();
812
813 let snapshot_expected = fake_data::snapshots(1).first().unwrap().to_owned();
814
815 mock_container
816 .mock_cardano_immutable_files_full_artifact_builder
817 .expect_compute_artifact()
818 .times(1)
819 .returning(|_, _| Ok(fake_data::snapshots(1).first().unwrap().to_owned()));
820
821 let artifact_builder_service = mock_container.build_artifact_builder_service();
822
823 let certificate = fake_data::certificate("hash".to_string());
824 let signed_entity_type =
825 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
826 let artifact = artifact_builder_service
827 .compute_artifact(signed_entity_type.clone(), &certificate)
828 .await
829 .unwrap();
830
831 assert_expected(&snapshot_expected, &artifact);
832 }
833
834 #[tokio::test]
835 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
836 generic_test_that_the_artifact_is_stored(
837 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
838 fake_data::snapshots(1).first().unwrap().to_owned(),
839 &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
840 )
841 .await;
842 }
843
844 #[tokio::test]
845 async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
846 let mut mock_container = MockDependencyInjector::new();
847
848 let block_number = BlockNumber(151);
849 let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
850
851 mock_container
852 .mock_cardano_transactions_artifact_builder
853 .expect_compute_artifact()
854 .times(1)
855 .returning(move |_, _| {
856 Ok(CardanoTransactionsSnapshot::new(
857 "merkle_root".to_string(),
858 block_number,
859 ))
860 });
861
862 let artifact_builder_service = mock_container.build_artifact_builder_service();
863
864 let certificate = fake_data::certificate("hash".to_string());
865 let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
866 let artifact = artifact_builder_service
867 .compute_artifact(signed_entity_type.clone(), &certificate)
868 .await
869 .unwrap();
870
871 assert_expected(&expected, &artifact);
872 }
873
874 #[tokio::test]
875 async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
876 let block_number = BlockNumber(149);
877 generic_test_that_the_artifact_is_stored(
878 SignedEntityType::CardanoTransactions(Epoch(1), block_number),
879 CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
880 &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
881 )
882 .await;
883 }
884
885 #[tokio::test]
886 async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
887 let mut mock_container = MockDependencyInjector::new();
888
889 let cardano_database_expected = fake_data::cardano_database_snapshots(1)
890 .first()
891 .unwrap()
892 .to_owned();
893
894 mock_container
895 .mock_cardano_database_artifact_builder
896 .expect_compute_artifact()
897 .times(1)
898 .returning(|_, _| {
899 Ok(fake_data::cardano_database_snapshots(1)
900 .first()
901 .unwrap()
902 .to_owned())
903 });
904
905 let artifact_builder_service = mock_container.build_artifact_builder_service();
906
907 let certificate = fake_data::certificate("hash".to_string());
908 let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
909 let artifact = artifact_builder_service
910 .compute_artifact(signed_entity_type.clone(), &certificate)
911 .await
912 .unwrap();
913
914 assert_expected(&cardano_database_expected, &artifact);
915 }
916
917 #[tokio::test]
918 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
919 generic_test_that_the_artifact_is_stored(
920 SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
921 fake_data::cardano_database_snapshots(1)
922 .first()
923 .unwrap()
924 .to_owned(),
925 &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
926 )
927 .await;
928 }
929
930 async fn generic_test_that_the_artifact_is_stored<
931 T: Artifact + Clone + Serialize + 'static,
932 U: signable_builder::Beacon,
933 >(
934 signed_entity_type: SignedEntityType,
935 artifact: T,
936 mock_that_provide_artifact: &dyn Fn(
937 &mut MockDependencyInjector,
938 ) -> &mut MockArtifactBuilder<U, T>,
939 ) {
940 let mut mock_container = MockDependencyInjector::new();
941 {
942 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
943 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
944 mock_container
945 .mock_signed_entity_storer
946 .expect_store_signed_entity()
947 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
948 .return_once(|_| Ok(()));
949 }
950 {
951 let artifact_cloned = artifact.clone();
952 mock_that_provide_artifact(&mut mock_container)
953 .expect_compute_artifact()
954 .times(1)
955 .return_once(|_, _| Ok(artifact_cloned));
956 }
957 let artifact_builder_service = mock_container.build_artifact_builder_service();
958
959 let certificate = fake_data::certificate("hash".to_string());
960 let error_message = format!(
961 "Create artifact should not fail for {} signed entity",
962 std::any::type_name::<T>()
963 );
964 let error_message_str = error_message.as_str();
965
966 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
967 artifact_builder_service.metrics_service.clone(),
968 &signed_entity_type,
969 );
970
971 artifact_builder_service
972 .create_artifact_task(signed_entity_type.clone(), &certificate)
973 .await
974 .expect(error_message_str);
975
976 assert_eq!(
977 initial_counter_value + 1,
978 get_artifact_total_produced_metric_since_startup_counter_value(
979 artifact_builder_service.metrics_service.clone(),
980 &signed_entity_type,
981 )
982 )
983 }
984
985 #[tokio::test]
986 async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
987 let atomic_stop = Arc::new(AtomicBool::new(false));
988 let signed_entity_type_service = {
989 let mut mock_container = MockDependencyInjector::new();
990
991 let msd = create_stake_distribution(Epoch(1), 5);
992 mock_container.mock_stake_distribution_processing(msd);
993
994 mock_container
995 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
996 };
997 let certificate = fake_data::certificate("hash".to_string());
998
999 let signed_entity_type_immutable =
1000 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1001 let first_task_that_never_finished = signed_entity_type_service
1002 .create_artifact(signed_entity_type_immutable, &certificate)
1003 .await
1004 .unwrap();
1005
1006 let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
1007 let second_task_that_finish_first = signed_entity_type_service
1008 .create_artifact(signed_entity_type_msd, &certificate)
1009 .await
1010 .unwrap();
1011
1012 second_task_that_finish_first.await.unwrap().unwrap();
1013 assert!(!first_task_that_never_finished.is_finished());
1014
1015 atomic_stop.swap(true, Ordering::Relaxed);
1016 }
1017
1018 #[tokio::test]
1019 async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1020 let atomic_stop = Arc::new(AtomicBool::new(false));
1021 let signed_entity_type_service = MockDependencyInjector::new()
1022 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1023 let certificate = fake_data::certificate("hash".to_string());
1024
1025 let signed_entity_type_immutable =
1026 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1027 assert!(
1028 !signed_entity_type_service
1029 .signed_entity_type_lock
1030 .is_locked(&signed_entity_type_immutable)
1031 .await
1032 );
1033 let join_handle = signed_entity_type_service
1034 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1035 .await
1036 .unwrap();
1037
1038 let is_locked = signed_entity_type_service
1041 .signed_entity_type_lock
1042 .is_locked(&signed_entity_type_immutable)
1043 .await;
1044 let is_finished = join_handle.is_finished();
1045
1046 atomic_stop.swap(true, Ordering::Relaxed);
1047 join_handle.await.unwrap().unwrap();
1048
1049 assert!(is_locked);
1050 assert!(!is_finished);
1051
1052 assert!(
1053 !signed_entity_type_service
1054 .signed_entity_type_lock
1055 .is_locked(&signed_entity_type_immutable)
1056 .await
1057 );
1058 }
1059
1060 #[tokio::test]
1061 async fn create_artifact_unlock_signed_entity_type_when_error() {
1062 let signed_entity_type_service = {
1063 let mut mock_container = MockDependencyInjector::new();
1064 mock_container
1065 .mock_cardano_immutable_files_full_artifact_builder
1066 .expect_compute_artifact()
1067 .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1068
1069 mock_container.build_artifact_builder_service()
1070 };
1071 let certificate = fake_data::certificate("hash".to_string());
1072
1073 let signed_entity_type_immutable =
1074 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1075
1076 let join_handle = signed_entity_type_service
1077 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1078 .await
1079 .unwrap();
1080
1081 let error = join_handle.await.unwrap().unwrap_err();
1082 assert!(
1083 error.to_string().contains("CardanoImmutableFilesFull"),
1084 "Error should contains CardanoImmutableFilesFull but was: {error}"
1085 );
1086
1087 assert!(
1088 !signed_entity_type_service
1089 .signed_entity_type_lock
1090 .is_locked(&signed_entity_type_immutable)
1091 .await
1092 );
1093 }
1094
1095 #[tokio::test]
1096 async fn create_artifact_unlock_signed_entity_type_when_panic() {
1097 let signed_entity_type_service =
1098 MockDependencyInjector::new().build_artifact_builder_service();
1099 let certificate = fake_data::certificate("hash".to_string());
1100
1101 let signed_entity_type_immutable =
1102 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1103
1104 let join_handle = signed_entity_type_service
1105 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1106 .await
1107 .unwrap();
1108
1109 let error = join_handle.await.unwrap().unwrap_err();
1110 assert!(
1111 error.to_string().contains("CardanoImmutableFilesFull"),
1112 "Error should contains CardanoImmutableFilesFull but was: {error}"
1113 );
1114
1115 assert!(
1116 !signed_entity_type_service
1117 .signed_entity_type_lock
1118 .is_locked(&signed_entity_type_immutable)
1119 .await
1120 );
1121 }
1122
1123 #[tokio::test]
1124 async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1125 let atomic_stop = Arc::new(AtomicBool::new(false));
1126 let signed_entity_service = MockDependencyInjector::new()
1127 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1128 let certificate = fake_data::certificate("hash".to_string());
1129 let signed_entity_type_immutable =
1130 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1131
1132 signed_entity_service
1133 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1134 .await
1135 .unwrap();
1136
1137 signed_entity_service
1138 .create_artifact(signed_entity_type_immutable, &certificate)
1139 .await
1140 .expect_err("Should return error when signed entity type is already locked");
1141
1142 atomic_stop.swap(true, Ordering::Relaxed);
1143 }
1144
1145 #[tokio::test]
1146 async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1147 let signed_entity_service = {
1148 let mut mock_container = MockDependencyInjector::new();
1149 mock_container
1150 .mock_cardano_immutable_files_full_artifact_builder
1151 .expect_compute_artifact()
1152 .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1153
1154 mock_container.build_artifact_builder_service()
1155 };
1156
1157 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1158
1159 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1160 signed_entity_service.metrics_service.clone(),
1161 &signed_entity_type,
1162 );
1163
1164 signed_entity_service
1165 .create_artifact(
1166 signed_entity_type.clone(),
1167 &fake_data::certificate("hash".to_string()),
1168 )
1169 .await
1170 .unwrap();
1171
1172 assert_eq!(
1173 initial_counter_value,
1174 get_artifact_total_produced_metric_since_startup_counter_value(
1175 signed_entity_service.metrics_service.clone(),
1176 &signed_entity_type,
1177 )
1178 );
1179 }
1180
1181 #[tokio::test]
1182 async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1183 let signed_entity_service = {
1184 let mut mock_container = MockDependencyInjector::new();
1185 mock_container
1186 .mock_signed_entity_storer
1187 .expect_store_signed_entity()
1188 .returning(|_| Err(anyhow!("Error while storing signed entity")));
1189
1190 mock_container.build_artifact_builder_service()
1191 };
1192
1193 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1194
1195 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1196 signed_entity_service.metrics_service.clone(),
1197 &signed_entity_type,
1198 );
1199
1200 signed_entity_service
1201 .create_artifact(
1202 signed_entity_type.clone(),
1203 &fake_data::certificate("hash".to_string()),
1204 )
1205 .await
1206 .unwrap();
1207
1208 assert_eq!(
1209 initial_counter_value,
1210 get_artifact_total_produced_metric_since_startup_counter_value(
1211 signed_entity_service.metrics_service.clone(),
1212 &signed_entity_type,
1213 )
1214 );
1215 }
1216}