1use anyhow::{anyhow, Context};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{info, warn, Logger};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13 entities::{
14 BlockNumber, CardanoDatabaseSnapshot, CardanoDbBeacon, CardanoStakeDistribution,
15 CardanoTransactionsSnapshot, Certificate, Epoch, MithrilStakeDistribution,
16 SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
17 },
18 logging::LoggerExtensions,
19 signable_builder::{Artifact, SignedEntity},
20 StdResult,
21};
22
23use crate::{
24 artifact_builder::ArtifactBuilder,
25 database::{record::SignedEntityRecord, repository::SignedEntityStorer},
26 MetricsService,
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34 async fn create_artifact(
36 &self,
37 signed_entity_type: SignedEntityType,
38 certificate: &Certificate,
39 ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41 async fn get_last_signed_snapshots(
43 &self,
44 total: usize,
45 ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47 async fn get_signed_snapshot_by_id(
49 &self,
50 signed_entity_id: &str,
51 ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53 async fn get_last_signed_cardano_database_snapshots(
55 &self,
56 total: usize,
57 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59 async fn get_signed_cardano_database_snapshot_by_id(
61 &self,
62 signed_entity_id: &str,
63 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65 async fn get_last_signed_mithril_stake_distributions(
68 &self,
69 total: usize,
70 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72 async fn get_signed_mithril_stake_distribution_by_id(
74 &self,
75 signed_entity_id: &str,
76 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78 async fn get_last_cardano_transaction_snapshot(
80 &self,
81 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83 async fn get_last_signed_cardano_stake_distributions(
86 &self,
87 total: usize,
88 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
89}
90
91#[derive(Clone)]
93pub struct MithrilSignedEntityService {
94 signed_entity_storer: Arc<dyn SignedEntityStorer>,
95 mithril_stake_distribution_artifact_builder:
96 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
97 cardano_immutable_files_full_artifact_builder:
98 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
99 cardano_transactions_artifact_builder:
100 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
101 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
102 cardano_stake_distribution_artifact_builder:
103 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
104 cardano_database_artifact_builder:
105 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
106 metrics_service: Arc<MetricsService>,
107 logger: Logger,
108}
109
110pub struct SignedEntityServiceArtifactsDependencies {
112 mithril_stake_distribution_artifact_builder:
113 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
114 cardano_immutable_files_full_artifact_builder:
115 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
116 cardano_transactions_artifact_builder:
117 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
118 cardano_stake_distribution_artifact_builder:
119 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
120 cardano_database_artifact_builder:
121 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
122}
123
124impl SignedEntityServiceArtifactsDependencies {
125 pub fn new(
127 mithril_stake_distribution_artifact_builder: Arc<
128 dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
129 >,
130 cardano_immutable_files_full_artifact_builder: Arc<
131 dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
132 >,
133 cardano_transactions_artifact_builder: Arc<
134 dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
135 >,
136 cardano_stake_distribution_artifact_builder: Arc<
137 dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
138 >,
139 cardano_database_artifact_builder: Arc<
140 dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
141 >,
142 ) -> Self {
143 Self {
144 mithril_stake_distribution_artifact_builder,
145 cardano_immutable_files_full_artifact_builder,
146 cardano_transactions_artifact_builder,
147 cardano_stake_distribution_artifact_builder,
148 cardano_database_artifact_builder,
149 }
150 }
151}
152
153impl MithrilSignedEntityService {
154 pub fn new(
156 signed_entity_storer: Arc<dyn SignedEntityStorer>,
157 dependencies: SignedEntityServiceArtifactsDependencies,
158 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
159 metrics_service: Arc<MetricsService>,
160 logger: Logger,
161 ) -> Self {
162 Self {
163 signed_entity_storer,
164 mithril_stake_distribution_artifact_builder: dependencies
165 .mithril_stake_distribution_artifact_builder,
166 cardano_immutable_files_full_artifact_builder: dependencies
167 .cardano_immutable_files_full_artifact_builder,
168 cardano_transactions_artifact_builder: dependencies
169 .cardano_transactions_artifact_builder,
170 cardano_stake_distribution_artifact_builder: dependencies
171 .cardano_stake_distribution_artifact_builder,
172 cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
173 signed_entity_type_lock,
174 metrics_service,
175 logger: logger.new_with_component_name::<Self>(),
176 }
177 }
178
179 async fn create_artifact_task(
180 &self,
181 signed_entity_type: SignedEntityType,
182 certificate: &Certificate,
183 ) -> StdResult<()> {
184 info!(
185 self.logger, ">> create_artifact_task";
186 "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
187 );
188
189 let mut remaining_retries = 2;
190 let artifact = loop {
191 remaining_retries -= 1;
192
193 match self
194 .compute_artifact(signed_entity_type.clone(), certificate)
195 .await
196 {
197 Err(error) if remaining_retries == 0 => break Err(error),
198 Err(_error) => (),
199 Ok(artifact) => break Ok(artifact),
200 };
201 }?;
202
203 let signed_entity = SignedEntityRecord {
204 signed_entity_id: artifact.get_id(),
205 signed_entity_type: signed_entity_type.clone(),
206 certificate_id: certificate.hash.clone(),
207 artifact: serde_json::to_string(&artifact)?,
208 created_at: Utc::now(),
209 };
210
211 self.signed_entity_storer
212 .store_signed_entity(&signed_entity)
213 .await
214 .with_context(|| {
215 format!(
216 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
217 )
218 })?;
219
220 self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
221
222 Ok(())
223 }
224
225 async fn compute_artifact(
227 &self,
228 signed_entity_type: SignedEntityType,
229 certificate: &Certificate,
230 ) -> StdResult<Arc<dyn Artifact>> {
231 match signed_entity_type.clone() {
232 SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
233 self.mithril_stake_distribution_artifact_builder
234 .compute_artifact(epoch, certificate)
235 .await
236 .with_context(|| {
237 format!(
238 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
239 )
240 })?,
241 )),
242 SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
243 self.cardano_immutable_files_full_artifact_builder
244 .compute_artifact(beacon.clone(), certificate)
245 .await
246 .with_context(|| {
247 format!(
248 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
249 )
250 })?,
251 )),
252 SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
253 self.cardano_stake_distribution_artifact_builder
254 .compute_artifact(epoch, certificate)
255 .await
256 .with_context(|| {
257 format!(
258 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
259 )
260 })?)),
261 SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
262 self.cardano_transactions_artifact_builder
263 .compute_artifact(block_number, certificate)
264 .await
265 .with_context(|| {
266 format!(
267 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
268 )
269 })?,
270 )),
271 SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
272 self.cardano_database_artifact_builder
273 .compute_artifact(beacon, certificate)
274 .await
275 .with_context(|| {
276 format!(
277 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
278 )
279 })?
280 )),
281 }
282 }
283
284 async fn get_last_signed_entities(
285 &self,
286 total: usize,
287 discriminants: &SignedEntityTypeDiscriminants,
288 ) -> StdResult<Vec<SignedEntityRecord>> {
289 self.signed_entity_storer
290 .get_last_signed_entities_by_type(discriminants, total)
291 .await
292 .with_context(|| {
293 format!(
294 "Signed Entity Service can not get last signed entities with type: '{:?}'",
295 discriminants
296 )
297 })
298 }
299
300 fn increment_artifact_total_produced_metric_since_startup(
301 &self,
302 signed_entity_type: SignedEntityType,
303 ) {
304 let metrics = self.metrics_service.clone();
305 let metric_counter = match signed_entity_type {
306 SignedEntityType::MithrilStakeDistribution(_) => {
307 metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
308 }
309 SignedEntityType::CardanoImmutableFilesFull(_) => {
310 metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
311 }
312 SignedEntityType::CardanoStakeDistribution(_) => {
313 metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
314 }
315 SignedEntityType::CardanoTransactions(_, _) => {
316 metrics.get_artifact_cardano_transaction_total_produced_since_startup()
317 }
318 SignedEntityType::CardanoDatabase(_) => {
319 metrics.get_artifact_cardano_database_total_produced_since_startup()
320 }
321 };
322
323 metric_counter.increment();
324 }
325}
326
327#[async_trait]
328impl SignedEntityService for MithrilSignedEntityService {
329 async fn create_artifact(
330 &self,
331 signed_entity_type: SignedEntityType,
332 certificate: &Certificate,
333 ) -> StdResult<JoinHandle<StdResult<()>>> {
334 if self
335 .signed_entity_type_lock
336 .is_locked(&signed_entity_type)
337 .await
338 {
339 return Err(anyhow!(
340 "Signed entity type '{:?}' is already locked",
341 signed_entity_type
342 ));
343 }
344
345 let service = self.clone();
346 let certificate_cloned = certificate.clone();
347 service
348 .signed_entity_type_lock
349 .lock(&signed_entity_type)
350 .await;
351
352 Ok(tokio::task::spawn(async move {
353 let signed_entity_type_clone = signed_entity_type.clone();
354 let service_clone = service.clone();
355 let result = tokio::task::spawn(async move {
356 service_clone
357 .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
358 .await
359 })
360 .await;
361 service
362 .signed_entity_type_lock
363 .release(signed_entity_type.clone())
364 .await;
365
366 result.with_context(|| format!(
367 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
368 ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
369 }))
370 }
371
372 async fn get_last_signed_snapshots(
373 &self,
374 total: usize,
375 ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
376 let signed_entities = self
377 .get_last_signed_entities(
378 total,
379 &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
380 )
381 .await?
382 .into_iter()
383 .map(|record| record.try_into())
384 .collect::<Result<Vec<_>, _>>()?;
385
386 Ok(signed_entities)
387 }
388
389 async fn get_signed_snapshot_by_id(
390 &self,
391 signed_entity_id: &str,
392 ) -> StdResult<Option<SignedEntity<Snapshot>>> {
393 let entity: Option<SignedEntity<Snapshot>> = match self
394 .signed_entity_storer
395 .get_signed_entity(signed_entity_id)
396 .await
397 .with_context(|| {
398 format!(
399 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
400 )
401 })? {
402 Some(entity) => Some(entity.try_into()?),
403 None => None,
404 };
405
406 Ok(entity)
407 }
408
409 async fn get_last_signed_cardano_database_snapshots(
410 &self,
411 total: usize,
412 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
413 let signed_entities = self
414 .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
415 .await?
416 .into_iter()
417 .map(|record| record.try_into())
418 .collect::<Result<Vec<_>, _>>()?;
419
420 Ok(signed_entities)
421 }
422
423 async fn get_signed_cardano_database_snapshot_by_id(
424 &self,
425 signed_entity_id: &str,
426 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
427 let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
428 .signed_entity_storer
429 .get_signed_entity(signed_entity_id)
430 .await
431 .with_context(|| {
432 format!(
433 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
434 )
435 })? {
436 Some(entity) => Some(entity.try_into()?),
437 None => None,
438 };
439
440 Ok(entity)
441 }
442
443 async fn get_last_signed_mithril_stake_distributions(
444 &self,
445 total: usize,
446 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
447 let signed_entities = self
448 .get_last_signed_entities(
449 total,
450 &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
451 )
452 .await?
453 .into_iter()
454 .map(|record| record.try_into())
455 .collect::<Result<Vec<_>, _>>()?;
456
457 Ok(signed_entities)
458 }
459
460 async fn get_signed_mithril_stake_distribution_by_id(
461 &self,
462 signed_entity_id: &str,
463 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
464 let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
465 .signed_entity_storer
466 .get_signed_entity(signed_entity_id)
467 .await
468 .with_context(|| {
469 format!(
470 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
471 )
472 })? {
473 Some(entity) => Some(entity.try_into()?),
474 None => None,
475 };
476
477 Ok(entity)
478 }
479
480 async fn get_last_cardano_transaction_snapshot(
481 &self,
482 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
483 let mut signed_entities_records = self
484 .get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
485 .await?;
486
487 match signed_entities_records.pop() {
488 Some(record) => Ok(Some(record.try_into()?)),
489 None => Ok(None),
490 }
491 }
492
493 async fn get_last_signed_cardano_stake_distributions(
494 &self,
495 total: usize,
496 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
497 let signed_entities = self
498 .get_last_signed_entities(
499 total,
500 &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
501 )
502 .await?
503 .into_iter()
504 .map(|record| record.try_into())
505 .collect::<Result<Vec<_>, _>>()?;
506
507 Ok(signed_entities)
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use std::{sync::atomic::Ordering, time::Duration};
514
515 use mithril_common::{
516 entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
517 signable_builder,
518 test_utils::fake_data,
519 };
520 use mithril_metric::CounterValue;
521 use serde::{de::DeserializeOwned, Serialize};
522 use std::sync::atomic::AtomicBool;
523
524 use crate::artifact_builder::MockArtifactBuilder;
525 use crate::database::repository::MockSignedEntityStorer;
526 use crate::test_tools::TestLogger;
527
528 use super::*;
529
530 fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
531 MithrilStakeDistribution::new(
532 epoch,
533 fake_data::signers_with_stakes(signers),
534 &fake_data::protocol_parameters(),
535 )
536 }
537
538 fn create_cardano_stake_distribution(
539 epoch: Epoch,
540 stake_distribution: StakeDistribution,
541 ) -> CardanoStakeDistribution {
542 CardanoStakeDistribution::new(epoch, stake_distribution)
543 }
544
545 fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
546 where
547 T: Serialize + DeserializeOwned,
548 {
549 let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
550 assert_eq!(
551 serde_json::to_string(&expected).unwrap(),
552 serde_json::to_string(¤t).unwrap()
553 );
554 }
555
556 struct MockDependencyInjector {
558 mock_signed_entity_storer: MockSignedEntityStorer,
559 mock_mithril_stake_distribution_artifact_builder:
560 MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
561 mock_cardano_immutable_files_full_artifact_builder:
562 MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
563 mock_cardano_transactions_artifact_builder:
564 MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
565 mock_cardano_stake_distribution_artifact_builder:
566 MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
567 mock_cardano_database_artifact_builder:
568 MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
569 }
570
571 impl MockDependencyInjector {
572 fn new() -> MockDependencyInjector {
573 MockDependencyInjector {
574 mock_signed_entity_storer: MockSignedEntityStorer::new(),
575 mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
576 Epoch,
577 MithrilStakeDistribution,
578 >::new(),
579 mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
580 CardanoDbBeacon,
581 Snapshot,
582 >::new(),
583 mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
584 BlockNumber,
585 CardanoTransactionsSnapshot,
586 >::new(),
587 mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
588 Epoch,
589 CardanoStakeDistribution,
590 >::new(),
591 mock_cardano_database_artifact_builder: MockArtifactBuilder::<
592 CardanoDbBeacon,
593 CardanoDatabaseSnapshot,
594 >::new(),
595 }
596 }
597
598 fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
599 let dependencies = SignedEntityServiceArtifactsDependencies::new(
600 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
601 Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
602 Arc::new(self.mock_cardano_transactions_artifact_builder),
603 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
604 Arc::new(self.mock_cardano_database_artifact_builder),
605 );
606 MithrilSignedEntityService::new(
607 Arc::new(self.mock_signed_entity_storer),
608 dependencies,
609 Arc::new(SignedEntityTypeLock::default()),
610 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
611 TestLogger::stdout(),
612 )
613 }
614
615 fn build_artifact_builder_service_with_time_consuming_process(
616 mut self,
617 atomic_stop: Arc<AtomicBool>,
618 ) -> MithrilSignedEntityService {
619 struct LongArtifactBuilder {
620 atomic_stop: Arc<AtomicBool>,
621 snapshot: Snapshot,
622 }
623
624 let snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
625
626 #[async_trait]
627 impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
628 async fn compute_artifact(
629 &self,
630 _beacon: CardanoDbBeacon,
631 _certificate: &Certificate,
632 ) -> StdResult<Snapshot> {
633 let mut max_iteration = 100;
634 while !self.atomic_stop.load(Ordering::Relaxed) {
635 max_iteration -= 1;
636 if max_iteration <= 0 {
637 return Err(anyhow!("Test should handle the stop"));
638 }
639 tokio::time::sleep(Duration::from_millis(10)).await;
640 }
641 Ok(self.snapshot.clone())
642 }
643 }
644 let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
645 atomic_stop: atomic_stop.clone(),
646 snapshot: snapshot.clone(),
647 };
648
649 let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
650 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
651 self.mock_signed_entity_storer
652 .expect_store_signed_entity()
653 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
654 .return_once(|_| Ok(()));
655
656 let dependencies = SignedEntityServiceArtifactsDependencies::new(
657 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
658 Arc::new(cardano_immutable_files_full_long_artifact_builder),
659 Arc::new(self.mock_cardano_transactions_artifact_builder),
660 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
661 Arc::new(self.mock_cardano_database_artifact_builder),
662 );
663 MithrilSignedEntityService::new(
664 Arc::new(self.mock_signed_entity_storer),
665 dependencies,
666 Arc::new(SignedEntityTypeLock::default()),
667 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
668 TestLogger::stdout(),
669 )
670 }
671
672 fn mock_artifact_processing<
673 T: Artifact + Clone + Serialize + 'static,
674 U: signable_builder::Beacon,
675 >(
676 &mut self,
677 artifact: T,
678 mock_that_provide_artifact: &dyn Fn(
679 &mut MockDependencyInjector,
680 ) -> &mut MockArtifactBuilder<U, T>,
681 ) {
682 {
683 let artifact_cloned = artifact.clone();
684 mock_that_provide_artifact(self)
685 .expect_compute_artifact()
686 .times(1)
687 .return_once(|_, _| Ok(artifact_cloned));
688 }
689 {
690 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
691 let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
692 self.mock_signed_entity_storer
693 .expect_store_signed_entity()
694 .withf(move |signed_entity| signed_entity.artifact == artifact_json)
695 .return_once(|_| Ok(()));
696 }
697 }
698
699 fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
700 self.mock_artifact_processing(artifact, &|mock_injector| {
701 &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
702 });
703 }
704 }
705
706 fn get_artifact_total_produced_metric_since_startup_counter_value(
707 metrics_service: Arc<MetricsService>,
708 signed_entity_type: &SignedEntityType,
709 ) -> CounterValue {
710 match signed_entity_type {
711 SignedEntityType::MithrilStakeDistribution(_) => metrics_service
712 .get_artifact_mithril_stake_distribution_total_produced_since_startup()
713 .get(),
714 SignedEntityType::CardanoImmutableFilesFull(_) => metrics_service
715 .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
716 .get(),
717 SignedEntityType::CardanoStakeDistribution(_) => metrics_service
718 .get_artifact_cardano_stake_distribution_total_produced_since_startup()
719 .get(),
720 SignedEntityType::CardanoTransactions(_, _) => metrics_service
721 .get_artifact_cardano_transaction_total_produced_since_startup()
722 .get(),
723 SignedEntityType::CardanoDatabase(_) => metrics_service
724 .get_artifact_cardano_database_total_produced_since_startup()
725 .get(),
726 }
727 }
728
729 #[tokio::test]
730 async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type(
731 ) {
732 let mut mock_container = MockDependencyInjector::new();
733
734 let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
735
736 mock_container
737 .mock_mithril_stake_distribution_artifact_builder
738 .expect_compute_artifact()
739 .times(1)
740 .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
741
742 let artifact_builder_service = mock_container.build_artifact_builder_service();
743
744 let certificate = fake_data::certificate("hash".to_string());
745 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
746 let artifact = artifact_builder_service
747 .compute_artifact(signed_entity_type.clone(), &certificate)
748 .await
749 .unwrap();
750
751 assert_expected(&mithril_stake_distribution_expected, &artifact);
752 }
753
754 #[tokio::test]
755 async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
756 generic_test_that_the_artifact_is_stored(
757 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
758 create_stake_distribution(Epoch(1), 5),
759 &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
760 )
761 .await;
762 }
763
764 #[tokio::test]
765 async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type(
766 ) {
767 let mut mock_container = MockDependencyInjector::new();
768
769 let cardano_stake_distribution_expected = create_cardano_stake_distribution(
770 Epoch(1),
771 StakeDistribution::from([("pool-1".to_string(), 100)]),
772 );
773
774 mock_container
775 .mock_cardano_stake_distribution_artifact_builder
776 .expect_compute_artifact()
777 .times(1)
778 .returning(|_, _| {
779 Ok(create_cardano_stake_distribution(
780 Epoch(1),
781 StakeDistribution::from([("pool-1".to_string(), 100)]),
782 ))
783 });
784
785 let artifact_builder_service = mock_container.build_artifact_builder_service();
786
787 let certificate = fake_data::certificate("hash".to_string());
788 let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
789 let artifact = artifact_builder_service
790 .compute_artifact(signed_entity_type.clone(), &certificate)
791 .await
792 .unwrap();
793
794 assert_expected(&cardano_stake_distribution_expected, &artifact);
795 }
796
797 #[tokio::test]
798 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
799 generic_test_that_the_artifact_is_stored(
800 SignedEntityType::CardanoStakeDistribution(Epoch(1)),
801 create_cardano_stake_distribution(
802 Epoch(1),
803 StakeDistribution::from([("pool-1".to_string(), 100)]),
804 ),
805 &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
806 )
807 .await;
808 }
809
810 #[tokio::test]
811 async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
812 let mut mock_container = MockDependencyInjector::new();
813
814 let snapshot_expected = fake_data::snapshots(1).first().unwrap().to_owned();
815
816 mock_container
817 .mock_cardano_immutable_files_full_artifact_builder
818 .expect_compute_artifact()
819 .times(1)
820 .returning(|_, _| Ok(fake_data::snapshots(1).first().unwrap().to_owned()));
821
822 let artifact_builder_service = mock_container.build_artifact_builder_service();
823
824 let certificate = fake_data::certificate("hash".to_string());
825 let signed_entity_type =
826 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
827 let artifact = artifact_builder_service
828 .compute_artifact(signed_entity_type.clone(), &certificate)
829 .await
830 .unwrap();
831
832 assert_expected(&snapshot_expected, &artifact);
833 }
834
835 #[tokio::test]
836 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
837 generic_test_that_the_artifact_is_stored(
838 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
839 fake_data::snapshots(1).first().unwrap().to_owned(),
840 &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
841 )
842 .await;
843 }
844
845 #[tokio::test]
846 async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
847 let mut mock_container = MockDependencyInjector::new();
848
849 let block_number = BlockNumber(151);
850 let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
851
852 mock_container
853 .mock_cardano_transactions_artifact_builder
854 .expect_compute_artifact()
855 .times(1)
856 .returning(move |_, _| {
857 Ok(CardanoTransactionsSnapshot::new(
858 "merkle_root".to_string(),
859 block_number,
860 ))
861 });
862
863 let artifact_builder_service = mock_container.build_artifact_builder_service();
864
865 let certificate = fake_data::certificate("hash".to_string());
866 let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
867 let artifact = artifact_builder_service
868 .compute_artifact(signed_entity_type.clone(), &certificate)
869 .await
870 .unwrap();
871
872 assert_expected(&expected, &artifact);
873 }
874
875 #[tokio::test]
876 async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
877 let block_number = BlockNumber(149);
878 generic_test_that_the_artifact_is_stored(
879 SignedEntityType::CardanoTransactions(Epoch(1), block_number),
880 CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
881 &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
882 )
883 .await;
884 }
885
886 #[tokio::test]
887 async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
888 let mut mock_container = MockDependencyInjector::new();
889
890 let cardano_database_expected = fake_data::cardano_database_snapshots(1)
891 .first()
892 .unwrap()
893 .to_owned();
894
895 mock_container
896 .mock_cardano_database_artifact_builder
897 .expect_compute_artifact()
898 .times(1)
899 .returning(|_, _| {
900 Ok(fake_data::cardano_database_snapshots(1)
901 .first()
902 .unwrap()
903 .to_owned())
904 });
905
906 let artifact_builder_service = mock_container.build_artifact_builder_service();
907
908 let certificate = fake_data::certificate("hash".to_string());
909 let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
910 let artifact = artifact_builder_service
911 .compute_artifact(signed_entity_type.clone(), &certificate)
912 .await
913 .unwrap();
914
915 assert_expected(&cardano_database_expected, &artifact);
916 }
917
918 #[tokio::test]
919 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
920 generic_test_that_the_artifact_is_stored(
921 SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
922 fake_data::cardano_database_snapshots(1)
923 .first()
924 .unwrap()
925 .to_owned(),
926 &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
927 )
928 .await;
929 }
930
931 async fn generic_test_that_the_artifact_is_stored<
932 T: Artifact + Clone + Serialize + 'static,
933 U: signable_builder::Beacon,
934 >(
935 signed_entity_type: SignedEntityType,
936 artifact: T,
937 mock_that_provide_artifact: &dyn Fn(
938 &mut MockDependencyInjector,
939 ) -> &mut MockArtifactBuilder<U, T>,
940 ) {
941 let mut mock_container = MockDependencyInjector::new();
942 {
943 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
944 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
945 mock_container
946 .mock_signed_entity_storer
947 .expect_store_signed_entity()
948 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
949 .return_once(|_| Ok(()));
950 }
951 {
952 let artifact_cloned = artifact.clone();
953 mock_that_provide_artifact(&mut mock_container)
954 .expect_compute_artifact()
955 .times(1)
956 .return_once(|_, _| Ok(artifact_cloned));
957 }
958 let artifact_builder_service = mock_container.build_artifact_builder_service();
959
960 let certificate = fake_data::certificate("hash".to_string());
961 let error_message = format!(
962 "Create artifact should not fail for {} signed entity",
963 std::any::type_name::<T>()
964 );
965 let error_message_str = error_message.as_str();
966
967 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
968 artifact_builder_service.metrics_service.clone(),
969 &signed_entity_type,
970 );
971
972 artifact_builder_service
973 .create_artifact_task(signed_entity_type.clone(), &certificate)
974 .await
975 .expect(error_message_str);
976
977 assert_eq!(
978 initial_counter_value + 1,
979 get_artifact_total_produced_metric_since_startup_counter_value(
980 artifact_builder_service.metrics_service.clone(),
981 &signed_entity_type,
982 )
983 )
984 }
985
986 #[tokio::test]
987 async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
988 let atomic_stop = Arc::new(AtomicBool::new(false));
989 let signed_entity_type_service = {
990 let mut mock_container = MockDependencyInjector::new();
991
992 let msd = create_stake_distribution(Epoch(1), 5);
993 mock_container.mock_stake_distribution_processing(msd);
994
995 mock_container
996 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
997 };
998 let certificate = fake_data::certificate("hash".to_string());
999
1000 let signed_entity_type_immutable =
1001 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1002 let first_task_that_never_finished = signed_entity_type_service
1003 .create_artifact(signed_entity_type_immutable, &certificate)
1004 .await
1005 .unwrap();
1006
1007 let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
1008 let second_task_that_finish_first = signed_entity_type_service
1009 .create_artifact(signed_entity_type_msd, &certificate)
1010 .await
1011 .unwrap();
1012
1013 second_task_that_finish_first.await.unwrap().unwrap();
1014 assert!(!first_task_that_never_finished.is_finished());
1015
1016 atomic_stop.swap(true, Ordering::Relaxed);
1017 }
1018
1019 #[tokio::test]
1020 async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1021 let atomic_stop = Arc::new(AtomicBool::new(false));
1022 let signed_entity_type_service = MockDependencyInjector::new()
1023 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1024 let certificate = fake_data::certificate("hash".to_string());
1025
1026 let signed_entity_type_immutable =
1027 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1028 assert!(
1029 !signed_entity_type_service
1030 .signed_entity_type_lock
1031 .is_locked(&signed_entity_type_immutable)
1032 .await
1033 );
1034 let join_handle = signed_entity_type_service
1035 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1036 .await
1037 .unwrap();
1038
1039 let is_locked = signed_entity_type_service
1042 .signed_entity_type_lock
1043 .is_locked(&signed_entity_type_immutable)
1044 .await;
1045 let is_finished = join_handle.is_finished();
1046
1047 atomic_stop.swap(true, Ordering::Relaxed);
1048 join_handle.await.unwrap().unwrap();
1049
1050 assert!(is_locked);
1051 assert!(!is_finished);
1052
1053 assert!(
1054 !signed_entity_type_service
1055 .signed_entity_type_lock
1056 .is_locked(&signed_entity_type_immutable)
1057 .await
1058 );
1059 }
1060
1061 #[tokio::test]
1062 async fn create_artifact_unlock_signed_entity_type_when_error() {
1063 let signed_entity_type_service = {
1064 let mut mock_container = MockDependencyInjector::new();
1065 mock_container
1066 .mock_cardano_immutable_files_full_artifact_builder
1067 .expect_compute_artifact()
1068 .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1069
1070 mock_container.build_artifact_builder_service()
1071 };
1072 let certificate = fake_data::certificate("hash".to_string());
1073
1074 let signed_entity_type_immutable =
1075 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1076
1077 let join_handle = signed_entity_type_service
1078 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1079 .await
1080 .unwrap();
1081
1082 let error = join_handle.await.unwrap().unwrap_err();
1083 assert!(
1084 error.to_string().contains("CardanoImmutableFilesFull"),
1085 "Error should contains CardanoImmutableFilesFull but was: {}",
1086 error
1087 );
1088
1089 assert!(
1090 !signed_entity_type_service
1091 .signed_entity_type_lock
1092 .is_locked(&signed_entity_type_immutable)
1093 .await
1094 );
1095 }
1096
1097 #[tokio::test]
1098 async fn create_artifact_unlock_signed_entity_type_when_panic() {
1099 let signed_entity_type_service =
1100 MockDependencyInjector::new().build_artifact_builder_service();
1101 let certificate = fake_data::certificate("hash".to_string());
1102
1103 let signed_entity_type_immutable =
1104 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1105
1106 let join_handle = signed_entity_type_service
1107 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1108 .await
1109 .unwrap();
1110
1111 let error = join_handle.await.unwrap().unwrap_err();
1112 assert!(
1113 error.to_string().contains("CardanoImmutableFilesFull"),
1114 "Error should contains CardanoImmutableFilesFull but was: {}",
1115 error
1116 );
1117
1118 assert!(
1119 !signed_entity_type_service
1120 .signed_entity_type_lock
1121 .is_locked(&signed_entity_type_immutable)
1122 .await
1123 );
1124 }
1125
1126 #[tokio::test]
1127 async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1128 let atomic_stop = Arc::new(AtomicBool::new(false));
1129 let signed_entity_service = MockDependencyInjector::new()
1130 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1131 let certificate = fake_data::certificate("hash".to_string());
1132 let signed_entity_type_immutable =
1133 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1134
1135 signed_entity_service
1136 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1137 .await
1138 .unwrap();
1139
1140 signed_entity_service
1141 .create_artifact(signed_entity_type_immutable, &certificate)
1142 .await
1143 .expect_err("Should return error when signed entity type is already locked");
1144
1145 atomic_stop.swap(true, Ordering::Relaxed);
1146 }
1147
1148 #[tokio::test]
1149 async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1150 let signed_entity_service = {
1151 let mut mock_container = MockDependencyInjector::new();
1152 mock_container
1153 .mock_cardano_immutable_files_full_artifact_builder
1154 .expect_compute_artifact()
1155 .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1156
1157 mock_container.build_artifact_builder_service()
1158 };
1159
1160 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1161
1162 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1163 signed_entity_service.metrics_service.clone(),
1164 &signed_entity_type,
1165 );
1166
1167 signed_entity_service
1168 .create_artifact(
1169 signed_entity_type.clone(),
1170 &fake_data::certificate("hash".to_string()),
1171 )
1172 .await
1173 .unwrap();
1174
1175 assert_eq!(
1176 initial_counter_value,
1177 get_artifact_total_produced_metric_since_startup_counter_value(
1178 signed_entity_service.metrics_service.clone(),
1179 &signed_entity_type,
1180 )
1181 );
1182 }
1183
1184 #[tokio::test]
1185 async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1186 let signed_entity_service = {
1187 let mut mock_container = MockDependencyInjector::new();
1188 mock_container
1189 .mock_signed_entity_storer
1190 .expect_store_signed_entity()
1191 .returning(|_| Err(anyhow!("Error while storing signed entity")));
1192
1193 mock_container.build_artifact_builder_service()
1194 };
1195
1196 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1197
1198 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1199 signed_entity_service.metrics_service.clone(),
1200 &signed_entity_type,
1201 );
1202
1203 signed_entity_service
1204 .create_artifact(
1205 signed_entity_type.clone(),
1206 &fake_data::certificate("hash".to_string()),
1207 )
1208 .await
1209 .unwrap();
1210
1211 assert_eq!(
1212 initial_counter_value,
1213 get_artifact_total_produced_metric_since_startup_counter_value(
1214 signed_entity_service.metrics_service.clone(),
1215 &signed_entity_type,
1216 )
1217 );
1218 }
1219}