1use anyhow::{Context, anyhow};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{Logger, info, warn};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13 StdResult,
14 entities::{
15 BlockNumber, CardanoDatabaseSnapshot, CardanoDbBeacon, CardanoStakeDistribution,
16 CardanoTransactionsSnapshot, Certificate, Epoch, MithrilStakeDistribution,
17 SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
18 },
19 logging::LoggerExtensions,
20 signable_builder::{Artifact, SignedEntity},
21};
22
23use crate::{
24 MetricsService,
25 artifact_builder::ArtifactBuilder,
26 database::{record::SignedEntityRecord, repository::SignedEntityStorer},
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34 async fn create_artifact(
36 &self,
37 signed_entity_type: SignedEntityType,
38 certificate: &Certificate,
39 ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41 async fn get_last_signed_snapshots(
43 &self,
44 total: usize,
45 ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47 async fn get_signed_snapshot_by_id(
49 &self,
50 signed_entity_id: &str,
51 ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53 async fn get_last_signed_cardano_database_snapshots(
55 &self,
56 total: usize,
57 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59 async fn get_signed_cardano_database_snapshot_by_id(
61 &self,
62 signed_entity_id: &str,
63 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65 async fn get_last_signed_mithril_stake_distributions(
68 &self,
69 total: usize,
70 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72 async fn get_signed_mithril_stake_distribution_by_id(
74 &self,
75 signed_entity_id: &str,
76 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78 async fn get_last_cardano_transaction_snapshot(
80 &self,
81 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83 async fn get_last_signed_cardano_stake_distributions(
86 &self,
87 total: usize,
88 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
89}
90
91#[derive(Clone)]
93pub struct MithrilSignedEntityService {
94 signed_entity_storer: Arc<dyn SignedEntityStorer>,
95 mithril_stake_distribution_artifact_builder:
96 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
97 cardano_immutable_files_full_artifact_builder:
98 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
99 cardano_transactions_artifact_builder:
100 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
101 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
102 cardano_stake_distribution_artifact_builder:
103 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
104 cardano_database_artifact_builder:
105 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
106 metrics_service: Arc<MetricsService>,
107 logger: Logger,
108}
109
110pub struct SignedEntityServiceArtifactsDependencies {
112 mithril_stake_distribution_artifact_builder:
113 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
114 cardano_immutable_files_full_artifact_builder:
115 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
116 cardano_transactions_artifact_builder:
117 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
118 cardano_stake_distribution_artifact_builder:
119 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
120 cardano_database_artifact_builder:
121 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
122}
123
124impl SignedEntityServiceArtifactsDependencies {
125 pub fn new(
127 mithril_stake_distribution_artifact_builder: Arc<
128 dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
129 >,
130 cardano_immutable_files_full_artifact_builder: Arc<
131 dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
132 >,
133 cardano_transactions_artifact_builder: Arc<
134 dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
135 >,
136 cardano_stake_distribution_artifact_builder: Arc<
137 dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
138 >,
139 cardano_database_artifact_builder: Arc<
140 dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
141 >,
142 ) -> Self {
143 Self {
144 mithril_stake_distribution_artifact_builder,
145 cardano_immutable_files_full_artifact_builder,
146 cardano_transactions_artifact_builder,
147 cardano_stake_distribution_artifact_builder,
148 cardano_database_artifact_builder,
149 }
150 }
151}
152
153impl MithrilSignedEntityService {
154 pub fn new(
156 signed_entity_storer: Arc<dyn SignedEntityStorer>,
157 dependencies: SignedEntityServiceArtifactsDependencies,
158 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
159 metrics_service: Arc<MetricsService>,
160 logger: Logger,
161 ) -> Self {
162 Self {
163 signed_entity_storer,
164 mithril_stake_distribution_artifact_builder: dependencies
165 .mithril_stake_distribution_artifact_builder,
166 cardano_immutable_files_full_artifact_builder: dependencies
167 .cardano_immutable_files_full_artifact_builder,
168 cardano_transactions_artifact_builder: dependencies
169 .cardano_transactions_artifact_builder,
170 cardano_stake_distribution_artifact_builder: dependencies
171 .cardano_stake_distribution_artifact_builder,
172 cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
173 signed_entity_type_lock,
174 metrics_service,
175 logger: logger.new_with_component_name::<Self>(),
176 }
177 }
178
179 async fn create_artifact_task(
180 &self,
181 signed_entity_type: SignedEntityType,
182 certificate: &Certificate,
183 ) -> StdResult<()> {
184 info!(
185 self.logger, ">> create_artifact_task";
186 "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
187 );
188
189 let mut remaining_retries = 2;
190 let artifact = loop {
191 remaining_retries -= 1;
192
193 match self.compute_artifact(signed_entity_type.clone(), certificate).await {
194 Err(error) if remaining_retries == 0 => break Err(error),
195 Err(_error) => (),
196 Ok(artifact) => break Ok(artifact),
197 };
198 }?;
199
200 let signed_entity = SignedEntityRecord {
201 signed_entity_id: artifact.get_id(),
202 signed_entity_type: signed_entity_type.clone(),
203 certificate_id: certificate.hash.clone(),
204 artifact: serde_json::to_string(&artifact)?,
205 created_at: Utc::now(),
206 };
207
208 self.signed_entity_storer
209 .store_signed_entity(&signed_entity)
210 .await
211 .with_context(|| {
212 format!(
213 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
214 )
215 })?;
216
217 self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
218
219 Ok(())
220 }
221
222 async fn compute_artifact(
224 &self,
225 signed_entity_type: SignedEntityType,
226 certificate: &Certificate,
227 ) -> StdResult<Arc<dyn Artifact>> {
228 match signed_entity_type.clone() {
229 SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
230 self.mithril_stake_distribution_artifact_builder
231 .compute_artifact(epoch, certificate)
232 .await
233 .with_context(|| {
234 format!(
235 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
236 )
237 })?,
238 )),
239 SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
240 self.cardano_immutable_files_full_artifact_builder
241 .compute_artifact(beacon.clone(), certificate)
242 .await
243 .with_context(|| {
244 format!(
245 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
246 )
247 })?,
248 )),
249 SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
250 self.cardano_stake_distribution_artifact_builder
251 .compute_artifact(epoch, certificate)
252 .await
253 .with_context(|| {
254 format!(
255 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
256 )
257 })?)),
258 SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
259 self.cardano_transactions_artifact_builder
260 .compute_artifact(block_number, certificate)
261 .await
262 .with_context(|| {
263 format!(
264 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
265 )
266 })?,
267 )),
268 SignedEntityType::CardanoBlocksTransactions(_epoch, _block_number) =>
269 Err(anyhow!("Cardano blocks transactions is not supported yet")),
270 SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
271 self.cardano_database_artifact_builder
272 .compute_artifact(beacon, certificate)
273 .await
274 .with_context(|| {
275 format!(
276 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
277 )
278 })?
279 )),
280 }
281 }
282
283 async fn get_last_signed_entities(
284 &self,
285 total: usize,
286 discriminants: &SignedEntityTypeDiscriminants,
287 ) -> StdResult<Vec<SignedEntityRecord>> {
288 self.signed_entity_storer
289 .get_last_signed_entities_by_type(discriminants, total)
290 .await
291 .with_context(|| {
292 format!(
293 "Signed Entity Service can not get last signed entities with type: '{discriminants:?}'"
294 )
295 })
296 }
297
298 fn increment_artifact_total_produced_metric_since_startup(
299 &self,
300 signed_entity_type: SignedEntityType,
301 ) {
302 let metrics = self.metrics_service.clone();
303 let metric_counter = match signed_entity_type {
304 SignedEntityType::MithrilStakeDistribution(..) => {
305 metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
306 }
307 SignedEntityType::CardanoImmutableFilesFull(..) => {
308 metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
309 }
310 SignedEntityType::CardanoStakeDistribution(..) => {
311 metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
312 }
313 SignedEntityType::CardanoTransactions(..) => {
314 metrics.get_artifact_cardano_transaction_total_produced_since_startup()
315 }
316 SignedEntityType::CardanoBlocksTransactions(..) => {
317 metrics.get_artifact_cardano_blocks_transactions_total_produced_since_startup()
318 }
319 SignedEntityType::CardanoDatabase(..) => {
320 metrics.get_artifact_cardano_database_total_produced_since_startup()
321 }
322 };
323
324 metric_counter.increment();
325 }
326}
327
328#[async_trait]
329impl SignedEntityService for MithrilSignedEntityService {
330 async fn create_artifact(
331 &self,
332 signed_entity_type: SignedEntityType,
333 certificate: &Certificate,
334 ) -> StdResult<JoinHandle<StdResult<()>>> {
335 if self.signed_entity_type_lock.is_locked(&signed_entity_type).await {
336 return Err(anyhow!(
337 "Signed entity type '{:?}' is already locked",
338 signed_entity_type
339 ));
340 }
341
342 let service = self.clone();
343 let certificate_cloned = certificate.clone();
344 service.signed_entity_type_lock.lock(&signed_entity_type).await;
345
346 Ok(tokio::task::spawn(async move {
347 let signed_entity_type_clone = signed_entity_type.clone();
348 let service_clone = service.clone();
349 let result = tokio::task::spawn(async move {
350 service_clone
351 .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
352 .await
353 })
354 .await;
355 service
356 .signed_entity_type_lock
357 .release(signed_entity_type.clone())
358 .await;
359
360 result.with_context(|| format!(
361 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
362 ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
363 }))
364 }
365
366 async fn get_last_signed_snapshots(
367 &self,
368 total: usize,
369 ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
370 let signed_entities = self
371 .get_last_signed_entities(
372 total,
373 &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
374 )
375 .await?
376 .into_iter()
377 .map(|record| record.try_into())
378 .collect::<Result<Vec<_>, _>>()?;
379
380 Ok(signed_entities)
381 }
382
383 async fn get_signed_snapshot_by_id(
384 &self,
385 signed_entity_id: &str,
386 ) -> StdResult<Option<SignedEntity<Snapshot>>> {
387 let entity: Option<SignedEntity<Snapshot>> = match self
388 .signed_entity_storer
389 .get_signed_entity(signed_entity_id)
390 .await
391 .with_context(|| {
392 format!(
393 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
394 )
395 })? {
396 Some(entity) => Some(entity.try_into()?),
397 None => None,
398 };
399
400 Ok(entity)
401 }
402
403 async fn get_last_signed_cardano_database_snapshots(
404 &self,
405 total: usize,
406 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
407 let signed_entities = self
408 .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
409 .await?
410 .into_iter()
411 .map(|record| record.try_into())
412 .collect::<Result<Vec<_>, _>>()?;
413
414 Ok(signed_entities)
415 }
416
417 async fn get_signed_cardano_database_snapshot_by_id(
418 &self,
419 signed_entity_id: &str,
420 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
421 let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
422 .signed_entity_storer
423 .get_signed_entity(signed_entity_id)
424 .await
425 .with_context(|| {
426 format!(
427 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
428 )
429 })? {
430 Some(entity) => Some(entity.try_into()?),
431 None => None,
432 };
433
434 Ok(entity)
435 }
436
437 async fn get_last_signed_mithril_stake_distributions(
438 &self,
439 total: usize,
440 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
441 let signed_entities = self
442 .get_last_signed_entities(
443 total,
444 &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
445 )
446 .await?
447 .into_iter()
448 .map(|record| record.try_into())
449 .collect::<Result<Vec<_>, _>>()?;
450
451 Ok(signed_entities)
452 }
453
454 async fn get_signed_mithril_stake_distribution_by_id(
455 &self,
456 signed_entity_id: &str,
457 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
458 let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
459 .signed_entity_storer
460 .get_signed_entity(signed_entity_id)
461 .await
462 .with_context(|| {
463 format!(
464 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
465 )
466 })? {
467 Some(entity) => Some(entity.try_into()?),
468 None => None,
469 };
470
471 Ok(entity)
472 }
473
474 async fn get_last_cardano_transaction_snapshot(
475 &self,
476 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
477 let mut signed_entities_records = self
478 .get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
479 .await?;
480
481 match signed_entities_records.pop() {
482 Some(record) => Ok(Some(record.try_into()?)),
483 None => Ok(None),
484 }
485 }
486
487 async fn get_last_signed_cardano_stake_distributions(
488 &self,
489 total: usize,
490 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
491 let signed_entities = self
492 .get_last_signed_entities(
493 total,
494 &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
495 )
496 .await?
497 .into_iter()
498 .map(|record| record.try_into())
499 .collect::<Result<Vec<_>, _>>()?;
500
501 Ok(signed_entities)
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use std::{sync::atomic::Ordering, time::Duration};
508
509 use mithril_common::{
510 entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
511 signable_builder,
512 test::double::fake_data,
513 };
514 use mithril_metric::CounterValue;
515 use serde::{Serialize, de::DeserializeOwned};
516 use std::sync::atomic::AtomicBool;
517
518 use crate::artifact_builder::MockArtifactBuilder;
519 use crate::database::repository::MockSignedEntityStorer;
520 use crate::test::TestLogger;
521
522 use super::*;
523
524 fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
525 MithrilStakeDistribution::new(
526 epoch,
527 fake_data::signers_with_stakes(signers),
528 &fake_data::protocol_parameters(),
529 )
530 }
531
532 fn create_cardano_stake_distribution(
533 epoch: Epoch,
534 stake_distribution: StakeDistribution,
535 ) -> CardanoStakeDistribution {
536 CardanoStakeDistribution::new(epoch, stake_distribution)
537 }
538
539 fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
540 where
541 T: Serialize + DeserializeOwned,
542 {
543 let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
544 assert_eq!(
545 serde_json::to_string(&expected).unwrap(),
546 serde_json::to_string(¤t).unwrap()
547 );
548 }
549
550 struct MockDependencyInjector {
552 mock_signed_entity_storer: MockSignedEntityStorer,
553 mock_mithril_stake_distribution_artifact_builder:
554 MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
555 mock_cardano_immutable_files_full_artifact_builder:
556 MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
557 mock_cardano_transactions_artifact_builder:
558 MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
559 mock_cardano_stake_distribution_artifact_builder:
560 MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
561 mock_cardano_database_artifact_builder:
562 MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
563 }
564
565 impl MockDependencyInjector {
566 fn new() -> MockDependencyInjector {
567 MockDependencyInjector {
568 mock_signed_entity_storer: MockSignedEntityStorer::new(),
569 mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
570 Epoch,
571 MithrilStakeDistribution,
572 >::new(),
573 mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
574 CardanoDbBeacon,
575 Snapshot,
576 >::new(),
577 mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
578 BlockNumber,
579 CardanoTransactionsSnapshot,
580 >::new(),
581 mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
582 Epoch,
583 CardanoStakeDistribution,
584 >::new(),
585 mock_cardano_database_artifact_builder: MockArtifactBuilder::<
586 CardanoDbBeacon,
587 CardanoDatabaseSnapshot,
588 >::new(),
589 }
590 }
591
592 fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
593 let dependencies = SignedEntityServiceArtifactsDependencies::new(
594 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
595 Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
596 Arc::new(self.mock_cardano_transactions_artifact_builder),
597 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
598 Arc::new(self.mock_cardano_database_artifact_builder),
599 );
600 MithrilSignedEntityService::new(
601 Arc::new(self.mock_signed_entity_storer),
602 dependencies,
603 Arc::new(SignedEntityTypeLock::default()),
604 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
605 TestLogger::stdout(),
606 )
607 }
608
609 fn build_artifact_builder_service_with_time_consuming_process(
610 mut self,
611 atomic_stop: Arc<AtomicBool>,
612 ) -> MithrilSignedEntityService {
613 struct LongArtifactBuilder {
614 atomic_stop: Arc<AtomicBool>,
615 snapshot: Snapshot,
616 }
617
618 let snapshot = fake_data::snapshot(1);
619
620 #[async_trait]
621 impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
622 async fn compute_artifact(
623 &self,
624 _beacon: CardanoDbBeacon,
625 _certificate: &Certificate,
626 ) -> StdResult<Snapshot> {
627 let mut max_iteration = 100;
628 while !self.atomic_stop.load(Ordering::Relaxed) {
629 max_iteration -= 1;
630 if max_iteration <= 0 {
631 return Err(anyhow!("Test should handle the stop"));
632 }
633 tokio::time::sleep(Duration::from_millis(10)).await;
634 }
635 Ok(self.snapshot.clone())
636 }
637 }
638 let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
639 atomic_stop: atomic_stop.clone(),
640 snapshot: snapshot.clone(),
641 };
642
643 let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
644 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
645 self.mock_signed_entity_storer
646 .expect_store_signed_entity()
647 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
648 .return_once(|_| Ok(()));
649
650 let dependencies = SignedEntityServiceArtifactsDependencies::new(
651 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
652 Arc::new(cardano_immutable_files_full_long_artifact_builder),
653 Arc::new(self.mock_cardano_transactions_artifact_builder),
654 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
655 Arc::new(self.mock_cardano_database_artifact_builder),
656 );
657 MithrilSignedEntityService::new(
658 Arc::new(self.mock_signed_entity_storer),
659 dependencies,
660 Arc::new(SignedEntityTypeLock::default()),
661 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
662 TestLogger::stdout(),
663 )
664 }
665
666 fn mock_artifact_processing<
667 T: Artifact + Clone + Serialize + 'static,
668 U: signable_builder::Beacon,
669 >(
670 &mut self,
671 artifact: T,
672 mock_that_provide_artifact: &dyn Fn(
673 &mut MockDependencyInjector,
674 ) -> &mut MockArtifactBuilder<U, T>,
675 ) {
676 {
677 let artifact_cloned = artifact.clone();
678 mock_that_provide_artifact(self)
679 .expect_compute_artifact()
680 .times(1)
681 .return_once(|_, _| Ok(artifact_cloned));
682 }
683 {
684 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
685 let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
686 self.mock_signed_entity_storer
687 .expect_store_signed_entity()
688 .withf(move |signed_entity| signed_entity.artifact == artifact_json)
689 .return_once(|_| Ok(()));
690 }
691 }
692
693 fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
694 self.mock_artifact_processing(artifact, &|mock_injector| {
695 &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
696 });
697 }
698 }
699
700 fn get_artifact_total_produced_metric_since_startup_counter_value(
701 metrics_service: Arc<MetricsService>,
702 signed_entity_type: &SignedEntityType,
703 ) -> CounterValue {
704 match signed_entity_type {
705 SignedEntityType::MithrilStakeDistribution(..) => metrics_service
706 .get_artifact_mithril_stake_distribution_total_produced_since_startup()
707 .get(),
708 SignedEntityType::CardanoImmutableFilesFull(..) => metrics_service
709 .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
710 .get(),
711 SignedEntityType::CardanoStakeDistribution(..) => metrics_service
712 .get_artifact_cardano_stake_distribution_total_produced_since_startup()
713 .get(),
714 SignedEntityType::CardanoTransactions(..) => metrics_service
715 .get_artifact_cardano_transaction_total_produced_since_startup()
716 .get(),
717 SignedEntityType::CardanoBlocksTransactions(..) => metrics_service
718 .get_artifact_cardano_blocks_transactions_total_produced_since_startup()
719 .get(),
720 SignedEntityType::CardanoDatabase(..) => metrics_service
721 .get_artifact_cardano_database_total_produced_since_startup()
722 .get(),
723 }
724 }
725
726 #[tokio::test]
727 async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type()
728 {
729 let mut mock_container = MockDependencyInjector::new();
730
731 let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
732
733 mock_container
734 .mock_mithril_stake_distribution_artifact_builder
735 .expect_compute_artifact()
736 .times(1)
737 .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
738
739 let artifact_builder_service = mock_container.build_artifact_builder_service();
740
741 let certificate = fake_data::certificate("hash".to_string());
742 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
743 let artifact = artifact_builder_service
744 .compute_artifact(signed_entity_type.clone(), &certificate)
745 .await
746 .unwrap();
747
748 assert_expected(&mithril_stake_distribution_expected, &artifact);
749 }
750
751 #[tokio::test]
752 async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
753 generic_test_that_the_artifact_is_stored(
754 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
755 create_stake_distribution(Epoch(1), 5),
756 &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
757 )
758 .await;
759 }
760
761 #[tokio::test]
762 async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type()
763 {
764 let mut mock_container = MockDependencyInjector::new();
765
766 let cardano_stake_distribution_expected = create_cardano_stake_distribution(
767 Epoch(1),
768 StakeDistribution::from([("pool-1".to_string(), 100)]),
769 );
770
771 mock_container
772 .mock_cardano_stake_distribution_artifact_builder
773 .expect_compute_artifact()
774 .times(1)
775 .returning(|_, _| {
776 Ok(create_cardano_stake_distribution(
777 Epoch(1),
778 StakeDistribution::from([("pool-1".to_string(), 100)]),
779 ))
780 });
781
782 let artifact_builder_service = mock_container.build_artifact_builder_service();
783
784 let certificate = fake_data::certificate("hash".to_string());
785 let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
786 let artifact = artifact_builder_service
787 .compute_artifact(signed_entity_type.clone(), &certificate)
788 .await
789 .unwrap();
790
791 assert_expected(&cardano_stake_distribution_expected, &artifact);
792 }
793
794 #[tokio::test]
795 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
796 generic_test_that_the_artifact_is_stored(
797 SignedEntityType::CardanoStakeDistribution(Epoch(1)),
798 create_cardano_stake_distribution(
799 Epoch(1),
800 StakeDistribution::from([("pool-1".to_string(), 100)]),
801 ),
802 &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
803 )
804 .await;
805 }
806
807 #[tokio::test]
808 async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
809 let mut mock_container = MockDependencyInjector::new();
810
811 let snapshot_expected = fake_data::snapshot(1);
812
813 mock_container
814 .mock_cardano_immutable_files_full_artifact_builder
815 .expect_compute_artifact()
816 .times(1)
817 .returning(|_, _| Ok(fake_data::snapshot(1)));
818
819 let artifact_builder_service = mock_container.build_artifact_builder_service();
820
821 let certificate = fake_data::certificate("hash".to_string());
822 let signed_entity_type =
823 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
824 let artifact = artifact_builder_service
825 .compute_artifact(signed_entity_type.clone(), &certificate)
826 .await
827 .unwrap();
828
829 assert_expected(&snapshot_expected, &artifact);
830 }
831
832 #[tokio::test]
833 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
834 generic_test_that_the_artifact_is_stored(
835 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
836 fake_data::snapshot(1),
837 &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
838 )
839 .await;
840 }
841
842 #[tokio::test]
843 async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
844 let mut mock_container = MockDependencyInjector::new();
845
846 let block_number = BlockNumber(151);
847 let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
848
849 mock_container
850 .mock_cardano_transactions_artifact_builder
851 .expect_compute_artifact()
852 .times(1)
853 .returning(move |_, _| {
854 Ok(CardanoTransactionsSnapshot::new(
855 "merkle_root".to_string(),
856 block_number,
857 ))
858 });
859
860 let artifact_builder_service = mock_container.build_artifact_builder_service();
861
862 let certificate = fake_data::certificate("hash".to_string());
863 let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
864 let artifact = artifact_builder_service
865 .compute_artifact(signed_entity_type.clone(), &certificate)
866 .await
867 .unwrap();
868
869 assert_expected(&expected, &artifact);
870 }
871
872 #[tokio::test]
873 async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
874 let block_number = BlockNumber(149);
875 generic_test_that_the_artifact_is_stored(
876 SignedEntityType::CardanoTransactions(Epoch(1), block_number),
877 CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
878 &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
879 )
880 .await;
881 }
882
883 #[tokio::test]
884 async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
885 let mut mock_container = MockDependencyInjector::new();
886
887 let cardano_database_expected = fake_data::cardano_database_snapshot(1);
888
889 mock_container
890 .mock_cardano_database_artifact_builder
891 .expect_compute_artifact()
892 .times(1)
893 .returning(|_, _| Ok(fake_data::cardano_database_snapshot(1)));
894
895 let artifact_builder_service = mock_container.build_artifact_builder_service();
896
897 let certificate = fake_data::certificate("hash".to_string());
898 let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
899 let artifact = artifact_builder_service
900 .compute_artifact(signed_entity_type.clone(), &certificate)
901 .await
902 .unwrap();
903
904 assert_expected(&cardano_database_expected, &artifact);
905 }
906
907 #[tokio::test]
908 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
909 generic_test_that_the_artifact_is_stored(
910 SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
911 fake_data::cardano_database_snapshot(1),
912 &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
913 )
914 .await;
915 }
916
917 async fn generic_test_that_the_artifact_is_stored<
918 T: Artifact + Clone + Serialize + 'static,
919 U: signable_builder::Beacon,
920 >(
921 signed_entity_type: SignedEntityType,
922 artifact: T,
923 mock_that_provide_artifact: &dyn Fn(
924 &mut MockDependencyInjector,
925 ) -> &mut MockArtifactBuilder<U, T>,
926 ) {
927 let mut mock_container = MockDependencyInjector::new();
928 {
929 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
930 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
931 mock_container
932 .mock_signed_entity_storer
933 .expect_store_signed_entity()
934 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
935 .return_once(|_| Ok(()));
936 }
937 {
938 let artifact_cloned = artifact.clone();
939 mock_that_provide_artifact(&mut mock_container)
940 .expect_compute_artifact()
941 .times(1)
942 .return_once(|_, _| Ok(artifact_cloned));
943 }
944 let artifact_builder_service = mock_container.build_artifact_builder_service();
945
946 let certificate = fake_data::certificate("hash".to_string());
947 let error_message = format!(
948 "Create artifact should not fail for {} signed entity",
949 std::any::type_name::<T>()
950 );
951 let error_message_str = error_message.as_str();
952
953 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
954 artifact_builder_service.metrics_service.clone(),
955 &signed_entity_type,
956 );
957
958 artifact_builder_service
959 .create_artifact_task(signed_entity_type.clone(), &certificate)
960 .await
961 .expect(error_message_str);
962
963 assert_eq!(
964 initial_counter_value + 1,
965 get_artifact_total_produced_metric_since_startup_counter_value(
966 artifact_builder_service.metrics_service.clone(),
967 &signed_entity_type,
968 )
969 )
970 }
971
972 #[tokio::test]
973 async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
974 let atomic_stop = Arc::new(AtomicBool::new(false));
975 let signed_entity_type_service = {
976 let mut mock_container = MockDependencyInjector::new();
977
978 let msd = create_stake_distribution(Epoch(1), 5);
979 mock_container.mock_stake_distribution_processing(msd);
980
981 mock_container
982 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
983 };
984 let certificate = fake_data::certificate("hash".to_string());
985
986 let signed_entity_type_immutable =
987 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
988 let first_task_that_never_finished = signed_entity_type_service
989 .create_artifact(signed_entity_type_immutable, &certificate)
990 .await
991 .unwrap();
992
993 let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
994 let second_task_that_finish_first = signed_entity_type_service
995 .create_artifact(signed_entity_type_msd, &certificate)
996 .await
997 .unwrap();
998
999 second_task_that_finish_first.await.unwrap().unwrap();
1000 assert!(!first_task_that_never_finished.is_finished());
1001
1002 atomic_stop.swap(true, Ordering::Relaxed);
1003 }
1004
1005 #[tokio::test]
1006 async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1007 let atomic_stop = Arc::new(AtomicBool::new(false));
1008 let signed_entity_type_service = MockDependencyInjector::new()
1009 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1010 let certificate = fake_data::certificate("hash".to_string());
1011
1012 let signed_entity_type_immutable =
1013 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1014 assert!(
1015 !signed_entity_type_service
1016 .signed_entity_type_lock
1017 .is_locked(&signed_entity_type_immutable)
1018 .await
1019 );
1020 let join_handle = signed_entity_type_service
1021 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1022 .await
1023 .unwrap();
1024
1025 let is_locked = signed_entity_type_service
1028 .signed_entity_type_lock
1029 .is_locked(&signed_entity_type_immutable)
1030 .await;
1031 let is_finished = join_handle.is_finished();
1032
1033 atomic_stop.swap(true, Ordering::Relaxed);
1034 join_handle.await.unwrap().unwrap();
1035
1036 assert!(is_locked);
1037 assert!(!is_finished);
1038
1039 assert!(
1040 !signed_entity_type_service
1041 .signed_entity_type_lock
1042 .is_locked(&signed_entity_type_immutable)
1043 .await
1044 );
1045 }
1046
1047 #[tokio::test]
1048 async fn create_artifact_unlock_signed_entity_type_when_error() {
1049 let signed_entity_type_service = {
1050 let mut mock_container = MockDependencyInjector::new();
1051 mock_container
1052 .mock_cardano_immutable_files_full_artifact_builder
1053 .expect_compute_artifact()
1054 .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1055
1056 mock_container.build_artifact_builder_service()
1057 };
1058 let certificate = fake_data::certificate("hash".to_string());
1059
1060 let signed_entity_type_immutable =
1061 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1062
1063 let join_handle = signed_entity_type_service
1064 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1065 .await
1066 .unwrap();
1067
1068 let error = join_handle.await.unwrap().unwrap_err();
1069 assert!(
1070 error.to_string().contains("CardanoImmutableFilesFull"),
1071 "Error should contains CardanoImmutableFilesFull but was: {error}"
1072 );
1073
1074 assert!(
1075 !signed_entity_type_service
1076 .signed_entity_type_lock
1077 .is_locked(&signed_entity_type_immutable)
1078 .await
1079 );
1080 }
1081
1082 #[tokio::test]
1083 async fn create_artifact_unlock_signed_entity_type_when_panic() {
1084 let signed_entity_type_service =
1085 MockDependencyInjector::new().build_artifact_builder_service();
1086 let certificate = fake_data::certificate("hash".to_string());
1087
1088 let signed_entity_type_immutable =
1089 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1090
1091 let join_handle = signed_entity_type_service
1092 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1093 .await
1094 .unwrap();
1095
1096 let error = join_handle.await.unwrap().unwrap_err();
1097 assert!(
1098 error.to_string().contains("CardanoImmutableFilesFull"),
1099 "Error should contains CardanoImmutableFilesFull but was: {error}"
1100 );
1101
1102 assert!(
1103 !signed_entity_type_service
1104 .signed_entity_type_lock
1105 .is_locked(&signed_entity_type_immutable)
1106 .await
1107 );
1108 }
1109
1110 #[tokio::test]
1111 async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1112 let atomic_stop = Arc::new(AtomicBool::new(false));
1113 let signed_entity_service = MockDependencyInjector::new()
1114 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1115 let certificate = fake_data::certificate("hash".to_string());
1116 let signed_entity_type_immutable =
1117 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1118
1119 signed_entity_service
1120 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1121 .await
1122 .unwrap();
1123
1124 signed_entity_service
1125 .create_artifact(signed_entity_type_immutable, &certificate)
1126 .await
1127 .expect_err("Should return error when signed entity type is already locked");
1128
1129 atomic_stop.swap(true, Ordering::Relaxed);
1130 }
1131
1132 #[tokio::test]
1133 async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1134 let signed_entity_service = {
1135 let mut mock_container = MockDependencyInjector::new();
1136 mock_container
1137 .mock_cardano_immutable_files_full_artifact_builder
1138 .expect_compute_artifact()
1139 .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1140
1141 mock_container.build_artifact_builder_service()
1142 };
1143
1144 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1145
1146 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1147 signed_entity_service.metrics_service.clone(),
1148 &signed_entity_type,
1149 );
1150
1151 signed_entity_service
1152 .create_artifact(
1153 signed_entity_type.clone(),
1154 &fake_data::certificate("hash".to_string()),
1155 )
1156 .await
1157 .unwrap();
1158
1159 assert_eq!(
1160 initial_counter_value,
1161 get_artifact_total_produced_metric_since_startup_counter_value(
1162 signed_entity_service.metrics_service.clone(),
1163 &signed_entity_type,
1164 )
1165 );
1166 }
1167
1168 #[tokio::test]
1169 async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1170 let signed_entity_service = {
1171 let mut mock_container = MockDependencyInjector::new();
1172 mock_container
1173 .mock_signed_entity_storer
1174 .expect_store_signed_entity()
1175 .returning(|_| Err(anyhow!("Error while storing signed entity")));
1176
1177 mock_container.build_artifact_builder_service()
1178 };
1179
1180 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1181
1182 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1183 signed_entity_service.metrics_service.clone(),
1184 &signed_entity_type,
1185 );
1186
1187 signed_entity_service
1188 .create_artifact(
1189 signed_entity_type.clone(),
1190 &fake_data::certificate("hash".to_string()),
1191 )
1192 .await
1193 .unwrap();
1194
1195 assert_eq!(
1196 initial_counter_value,
1197 get_artifact_total_produced_metric_since_startup_counter_value(
1198 signed_entity_service.metrics_service.clone(),
1199 &signed_entity_type,
1200 )
1201 );
1202 }
1203}