1use anyhow::{Context, anyhow};
6use async_trait::async_trait;
7use chrono::Utc;
8use slog::{Logger, info, warn};
9use std::sync::Arc;
10use tokio::task::JoinHandle;
11
12use mithril_common::{
13 StdResult,
14 entities::{
15 BlockNumber, CardanoBlocksTransactionsSnapshot, CardanoDatabaseSnapshot, CardanoDbBeacon,
16 CardanoStakeDistribution, CardanoTransactionsSnapshot, Certificate, Epoch,
17 MithrilStakeDistribution, SignedEntityType, SignedEntityTypeDiscriminants, Snapshot,
18 },
19 logging::LoggerExtensions,
20 signable_builder::{Artifact, SignedEntity},
21};
22
23use crate::{
24 MetricsService,
25 artifact_builder::ArtifactBuilder,
26 database::{record::SignedEntityRecord, repository::SignedEntityStorer},
27};
28use mithril_signed_entity_lock::SignedEntityTypeLock;
29
30#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait SignedEntityService: Send + Sync {
34 async fn create_artifact(
36 &self,
37 signed_entity_type: SignedEntityType,
38 certificate: &Certificate,
39 ) -> StdResult<JoinHandle<StdResult<()>>>;
40
41 async fn get_last_signed_snapshots(
43 &self,
44 total: usize,
45 ) -> StdResult<Vec<SignedEntity<Snapshot>>>;
46
47 async fn get_signed_snapshot_by_id(
49 &self,
50 signed_entity_id: &str,
51 ) -> StdResult<Option<SignedEntity<Snapshot>>>;
52
53 async fn get_last_signed_cardano_database_snapshots(
55 &self,
56 total: usize,
57 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>>;
58
59 async fn get_signed_cardano_database_snapshot_by_id(
61 &self,
62 signed_entity_id: &str,
63 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>>;
64
65 async fn get_last_signed_mithril_stake_distributions(
68 &self,
69 total: usize,
70 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>>;
71
72 async fn get_signed_mithril_stake_distribution_by_id(
74 &self,
75 signed_entity_id: &str,
76 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>>;
77
78 async fn get_last_cardano_transaction_snapshot(
80 &self,
81 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>>;
82
83 async fn get_last_signed_cardano_stake_distributions(
86 &self,
87 total: usize,
88 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>>;
89}
90
91#[derive(Clone)]
93pub struct MithrilSignedEntityService {
94 signed_entity_storer: Arc<dyn SignedEntityStorer>,
95 mithril_stake_distribution_artifact_builder:
96 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
97 cardano_immutable_files_full_artifact_builder:
98 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
99 cardano_transactions_artifact_builder:
100 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
101 cardano_blocks_transactions_artifact_builder:
102 Arc<dyn ArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>>,
103 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
104 cardano_stake_distribution_artifact_builder:
105 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
106 cardano_database_artifact_builder:
107 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
108 metrics_service: Arc<MetricsService>,
109 logger: Logger,
110}
111
112pub struct SignedEntityServiceArtifactsDependencies {
114 mithril_stake_distribution_artifact_builder:
115 Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
116 cardano_immutable_files_full_artifact_builder:
117 Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
118 cardano_transactions_artifact_builder:
119 Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
120 cardano_blocks_transactions_artifact_builder:
121 Arc<dyn ArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>>,
122 cardano_stake_distribution_artifact_builder:
123 Arc<dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>>,
124 cardano_database_artifact_builder:
125 Arc<dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>>,
126}
127
128impl SignedEntityServiceArtifactsDependencies {
129 pub fn new(
131 mithril_stake_distribution_artifact_builder: Arc<
132 dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
133 >,
134 cardano_immutable_files_full_artifact_builder: Arc<
135 dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>,
136 >,
137 cardano_transactions_artifact_builder: Arc<
138 dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
139 >,
140 cardano_blocks_transactions_artifact_builder: Arc<
141 dyn ArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>,
142 >,
143 cardano_stake_distribution_artifact_builder: Arc<
144 dyn ArtifactBuilder<Epoch, CardanoStakeDistribution>,
145 >,
146 cardano_database_artifact_builder: Arc<
147 dyn ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
148 >,
149 ) -> Self {
150 Self {
151 mithril_stake_distribution_artifact_builder,
152 cardano_immutable_files_full_artifact_builder,
153 cardano_transactions_artifact_builder,
154 cardano_blocks_transactions_artifact_builder,
155 cardano_stake_distribution_artifact_builder,
156 cardano_database_artifact_builder,
157 }
158 }
159}
160
161impl MithrilSignedEntityService {
162 pub fn new(
164 signed_entity_storer: Arc<dyn SignedEntityStorer>,
165 dependencies: SignedEntityServiceArtifactsDependencies,
166 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
167 metrics_service: Arc<MetricsService>,
168 logger: Logger,
169 ) -> Self {
170 Self {
171 signed_entity_storer,
172 mithril_stake_distribution_artifact_builder: dependencies
173 .mithril_stake_distribution_artifact_builder,
174 cardano_immutable_files_full_artifact_builder: dependencies
175 .cardano_immutable_files_full_artifact_builder,
176 cardano_transactions_artifact_builder: dependencies
177 .cardano_transactions_artifact_builder,
178 cardano_blocks_transactions_artifact_builder: dependencies
179 .cardano_blocks_transactions_artifact_builder,
180 cardano_stake_distribution_artifact_builder: dependencies
181 .cardano_stake_distribution_artifact_builder,
182 cardano_database_artifact_builder: dependencies.cardano_database_artifact_builder,
183 signed_entity_type_lock,
184 metrics_service,
185 logger: logger.new_with_component_name::<Self>(),
186 }
187 }
188
189 async fn create_artifact_task(
190 &self,
191 signed_entity_type: SignedEntityType,
192 certificate: &Certificate,
193 ) -> StdResult<()> {
194 info!(
195 self.logger, ">> create_artifact_task";
196 "signed_entity_type" => ?signed_entity_type, "certificate_hash" => &certificate.hash
197 );
198
199 let mut remaining_retries = 2;
200 let artifact = loop {
201 remaining_retries -= 1;
202
203 match self.compute_artifact(signed_entity_type.clone(), certificate).await {
204 Err(error) if remaining_retries == 0 => break Err(error),
205 Err(_error) => (),
206 Ok(artifact) => break Ok(artifact),
207 };
208 }?;
209
210 let signed_entity = SignedEntityRecord {
211 signed_entity_id: artifact.get_id(),
212 signed_entity_type: signed_entity_type.clone(),
213 certificate_id: certificate.hash.clone(),
214 artifact: serde_json::to_string(&artifact)?,
215 created_at: Utc::now(),
216 };
217
218 self.signed_entity_storer
219 .store_signed_entity(&signed_entity)
220 .await
221 .with_context(|| {
222 format!(
223 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
224 )
225 })?;
226
227 self.increment_artifact_total_produced_metric_since_startup(signed_entity_type);
228
229 Ok(())
230 }
231
232 async fn compute_artifact(
234 &self,
235 signed_entity_type: SignedEntityType,
236 certificate: &Certificate,
237 ) -> StdResult<Arc<dyn Artifact>> {
238 match signed_entity_type.clone() {
239 SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
240 self.mithril_stake_distribution_artifact_builder
241 .compute_artifact(epoch, certificate)
242 .await
243 .with_context(|| {
244 format!(
245 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
246 )
247 })?,
248 )),
249 SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
250 self.cardano_immutable_files_full_artifact_builder
251 .compute_artifact(beacon.clone(), certificate)
252 .await
253 .with_context(|| {
254 format!(
255 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
256 )
257 })?,
258 )),
259 SignedEntityType::CardanoStakeDistribution(epoch) => Ok(Arc::new(
260 self.cardano_stake_distribution_artifact_builder
261 .compute_artifact(epoch, certificate)
262 .await
263 .with_context(|| {
264 format!(
265 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
266 )
267 })?)),
268 SignedEntityType::CardanoTransactions(_epoch, block_number) => Ok(Arc::new(
269 self.cardano_transactions_artifact_builder
270 .compute_artifact(block_number, certificate)
271 .await
272 .with_context(|| {
273 format!(
274 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
275 )
276 })?,
277 )),
278 SignedEntityType::CardanoBlocksTransactions(_epoch, block_number) => Ok(Arc::new(
279 self.cardano_blocks_transactions_artifact_builder
280 .compute_artifact(block_number, certificate)
281 .await
282 .with_context(|| {
283 format!(
284 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
285 )
286 })?,
287 )),
288 SignedEntityType::CardanoDatabase(beacon) => Ok(Arc::new(
289 self.cardano_database_artifact_builder
290 .compute_artifact(beacon, certificate)
291 .await
292 .with_context(|| {
293 format!(
294 "Signed Entity Service can not compute artifact for entity type: '{signed_entity_type}'"
295 )
296 })?
297 )),
298 }
299 }
300
301 async fn get_last_signed_entities(
302 &self,
303 total: usize,
304 discriminants: &SignedEntityTypeDiscriminants,
305 ) -> StdResult<Vec<SignedEntityRecord>> {
306 self.signed_entity_storer
307 .get_last_signed_entities_by_type(discriminants, total)
308 .await
309 .with_context(|| {
310 format!(
311 "Signed Entity Service can not get last signed entities with type: '{discriminants:?}'"
312 )
313 })
314 }
315
316 fn increment_artifact_total_produced_metric_since_startup(
317 &self,
318 signed_entity_type: SignedEntityType,
319 ) {
320 let metrics = self.metrics_service.clone();
321 let metric_counter = match signed_entity_type {
322 SignedEntityType::MithrilStakeDistribution(..) => {
323 metrics.get_artifact_mithril_stake_distribution_total_produced_since_startup()
324 }
325 SignedEntityType::CardanoImmutableFilesFull(..) => {
326 metrics.get_artifact_cardano_immutable_files_full_total_produced_since_startup()
327 }
328 SignedEntityType::CardanoStakeDistribution(..) => {
329 metrics.get_artifact_cardano_stake_distribution_total_produced_since_startup()
330 }
331 SignedEntityType::CardanoTransactions(..) => {
332 metrics.get_artifact_cardano_transaction_total_produced_since_startup()
333 }
334 SignedEntityType::CardanoBlocksTransactions(..) => {
335 metrics.get_artifact_cardano_blocks_transactions_total_produced_since_startup()
336 }
337 SignedEntityType::CardanoDatabase(..) => {
338 metrics.get_artifact_cardano_database_total_produced_since_startup()
339 }
340 };
341
342 metric_counter.increment();
343 }
344}
345
346#[async_trait]
347impl SignedEntityService for MithrilSignedEntityService {
348 async fn create_artifact(
349 &self,
350 signed_entity_type: SignedEntityType,
351 certificate: &Certificate,
352 ) -> StdResult<JoinHandle<StdResult<()>>> {
353 if self.signed_entity_type_lock.is_locked(&signed_entity_type).await {
354 return Err(anyhow!(
355 "Signed entity type '{:?}' is already locked",
356 signed_entity_type
357 ));
358 }
359
360 let service = self.clone();
361 let certificate_cloned = certificate.clone();
362 service.signed_entity_type_lock.lock(&signed_entity_type).await;
363
364 Ok(tokio::task::spawn(async move {
365 let signed_entity_type_clone = signed_entity_type.clone();
366 let service_clone = service.clone();
367 let result = tokio::task::spawn(async move {
368 service_clone
369 .create_artifact_task(signed_entity_type_clone, &certificate_cloned)
370 .await
371 })
372 .await;
373 service
374 .signed_entity_type_lock
375 .release(signed_entity_type.clone())
376 .await;
377
378 result.with_context(|| format!(
379 "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
380 ))?.inspect_err(|e| warn!(service.logger, "Error while creating artifact"; "error" => ?e))
381 }))
382 }
383
384 async fn get_last_signed_snapshots(
385 &self,
386 total: usize,
387 ) -> StdResult<Vec<SignedEntity<Snapshot>>> {
388 let signed_entities = self
389 .get_last_signed_entities(
390 total,
391 &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
392 )
393 .await?
394 .into_iter()
395 .map(|record| record.try_into())
396 .collect::<Result<Vec<_>, _>>()?;
397
398 Ok(signed_entities)
399 }
400
401 async fn get_signed_snapshot_by_id(
402 &self,
403 signed_entity_id: &str,
404 ) -> StdResult<Option<SignedEntity<Snapshot>>> {
405 let entity: Option<SignedEntity<Snapshot>> = match self
406 .signed_entity_storer
407 .get_signed_entity(signed_entity_id)
408 .await
409 .with_context(|| {
410 format!(
411 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
412 )
413 })? {
414 Some(entity) => Some(entity.try_into()?),
415 None => None,
416 };
417
418 Ok(entity)
419 }
420
421 async fn get_last_signed_cardano_database_snapshots(
422 &self,
423 total: usize,
424 ) -> StdResult<Vec<SignedEntity<CardanoDatabaseSnapshot>>> {
425 let signed_entities = self
426 .get_last_signed_entities(total, &SignedEntityTypeDiscriminants::CardanoDatabase)
427 .await?
428 .into_iter()
429 .map(|record| record.try_into())
430 .collect::<Result<Vec<_>, _>>()?;
431
432 Ok(signed_entities)
433 }
434
435 async fn get_signed_cardano_database_snapshot_by_id(
436 &self,
437 signed_entity_id: &str,
438 ) -> StdResult<Option<SignedEntity<CardanoDatabaseSnapshot>>> {
439 let entity: Option<SignedEntity<CardanoDatabaseSnapshot>> = match self
440 .signed_entity_storer
441 .get_signed_entity(signed_entity_id)
442 .await
443 .with_context(|| {
444 format!(
445 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
446 )
447 })? {
448 Some(entity) => Some(entity.try_into()?),
449 None => None,
450 };
451
452 Ok(entity)
453 }
454
455 async fn get_last_signed_mithril_stake_distributions(
456 &self,
457 total: usize,
458 ) -> StdResult<Vec<SignedEntity<MithrilStakeDistribution>>> {
459 let signed_entities = self
460 .get_last_signed_entities(
461 total,
462 &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
463 )
464 .await?
465 .into_iter()
466 .map(|record| record.try_into())
467 .collect::<Result<Vec<_>, _>>()?;
468
469 Ok(signed_entities)
470 }
471
472 async fn get_signed_mithril_stake_distribution_by_id(
473 &self,
474 signed_entity_id: &str,
475 ) -> StdResult<Option<SignedEntity<MithrilStakeDistribution>>> {
476 let entity: Option<SignedEntity<MithrilStakeDistribution>> = match self
477 .signed_entity_storer
478 .get_signed_entity(signed_entity_id)
479 .await
480 .with_context(|| {
481 format!(
482 "Signed Entity Service can not get signed entity with id: '{signed_entity_id}'"
483 )
484 })? {
485 Some(entity) => Some(entity.try_into()?),
486 None => None,
487 };
488
489 Ok(entity)
490 }
491
492 async fn get_last_cardano_transaction_snapshot(
493 &self,
494 ) -> StdResult<Option<SignedEntity<CardanoTransactionsSnapshot>>> {
495 let mut signed_entities_records = self
496 .get_last_signed_entities(1, &SignedEntityTypeDiscriminants::CardanoTransactions)
497 .await?;
498
499 match signed_entities_records.pop() {
500 Some(record) => Ok(Some(record.try_into()?)),
501 None => Ok(None),
502 }
503 }
504
505 async fn get_last_signed_cardano_stake_distributions(
506 &self,
507 total: usize,
508 ) -> StdResult<Vec<SignedEntity<CardanoStakeDistribution>>> {
509 let signed_entities = self
510 .get_last_signed_entities(
511 total,
512 &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
513 )
514 .await?
515 .into_iter()
516 .map(|record| record.try_into())
517 .collect::<Result<Vec<_>, _>>()?;
518
519 Ok(signed_entities)
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use std::{sync::atomic::Ordering, time::Duration};
526
527 use mithril_common::{
528 entities::{CardanoTransactionsSnapshot, Epoch, StakeDistribution},
529 signable_builder,
530 test::double::fake_data,
531 };
532 use mithril_metric::CounterValue;
533 use serde::{Serialize, de::DeserializeOwned};
534 use std::sync::atomic::AtomicBool;
535
536 use crate::artifact_builder::MockArtifactBuilder;
537 use crate::database::repository::MockSignedEntityStorer;
538 use crate::test::TestLogger;
539
540 use super::*;
541
542 fn create_stake_distribution(epoch: Epoch, signers: usize) -> MithrilStakeDistribution {
543 MithrilStakeDistribution::new(
544 epoch,
545 fake_data::signers_with_stakes(signers),
546 &fake_data::protocol_parameters(),
547 )
548 }
549
550 fn create_cardano_stake_distribution(
551 epoch: Epoch,
552 stake_distribution: StakeDistribution,
553 ) -> CardanoStakeDistribution {
554 CardanoStakeDistribution::new(epoch, stake_distribution)
555 }
556
557 fn assert_expected<T>(expected: &T, artifact: &Arc<dyn Artifact>)
558 where
559 T: Serialize + DeserializeOwned,
560 {
561 let current: T = serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
562 assert_eq!(
563 serde_json::to_string(&expected).unwrap(),
564 serde_json::to_string(¤t).unwrap()
565 );
566 }
567
568 struct MockDependencyInjector {
570 mock_signed_entity_storer: MockSignedEntityStorer,
571 mock_mithril_stake_distribution_artifact_builder:
572 MockArtifactBuilder<Epoch, MithrilStakeDistribution>,
573 mock_cardano_immutable_files_full_artifact_builder:
574 MockArtifactBuilder<CardanoDbBeacon, Snapshot>,
575 mock_cardano_transactions_artifact_builder:
576 MockArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
577 mock_cardano_blocks_transactions_artifact_builder:
578 MockArtifactBuilder<BlockNumber, CardanoBlocksTransactionsSnapshot>,
579 mock_cardano_stake_distribution_artifact_builder:
580 MockArtifactBuilder<Epoch, CardanoStakeDistribution>,
581 mock_cardano_database_artifact_builder:
582 MockArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot>,
583 }
584
585 impl MockDependencyInjector {
586 fn new() -> MockDependencyInjector {
587 MockDependencyInjector {
588 mock_signed_entity_storer: MockSignedEntityStorer::new(),
589 mock_mithril_stake_distribution_artifact_builder: MockArtifactBuilder::<
590 Epoch,
591 MithrilStakeDistribution,
592 >::new(),
593 mock_cardano_immutable_files_full_artifact_builder: MockArtifactBuilder::<
594 CardanoDbBeacon,
595 Snapshot,
596 >::new(),
597 mock_cardano_transactions_artifact_builder: MockArtifactBuilder::<
598 BlockNumber,
599 CardanoTransactionsSnapshot,
600 >::new(),
601 mock_cardano_blocks_transactions_artifact_builder: MockArtifactBuilder::<
602 BlockNumber,
603 CardanoBlocksTransactionsSnapshot,
604 >::new(),
605 mock_cardano_stake_distribution_artifact_builder: MockArtifactBuilder::<
606 Epoch,
607 CardanoStakeDistribution,
608 >::new(),
609 mock_cardano_database_artifact_builder: MockArtifactBuilder::<
610 CardanoDbBeacon,
611 CardanoDatabaseSnapshot,
612 >::new(),
613 }
614 }
615
616 fn build_artifact_builder_service(self) -> MithrilSignedEntityService {
617 let dependencies = SignedEntityServiceArtifactsDependencies::new(
618 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
619 Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
620 Arc::new(self.mock_cardano_transactions_artifact_builder),
621 Arc::new(self.mock_cardano_blocks_transactions_artifact_builder),
622 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
623 Arc::new(self.mock_cardano_database_artifact_builder),
624 );
625 MithrilSignedEntityService::new(
626 Arc::new(self.mock_signed_entity_storer),
627 dependencies,
628 Arc::new(SignedEntityTypeLock::default()),
629 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
630 TestLogger::stdout(),
631 )
632 }
633
634 fn build_artifact_builder_service_with_time_consuming_process(
635 mut self,
636 atomic_stop: Arc<AtomicBool>,
637 ) -> MithrilSignedEntityService {
638 struct LongArtifactBuilder {
639 atomic_stop: Arc<AtomicBool>,
640 snapshot: Snapshot,
641 }
642
643 let snapshot = fake_data::snapshot(1);
644
645 #[async_trait]
646 impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
647 async fn compute_artifact(
648 &self,
649 _beacon: CardanoDbBeacon,
650 _certificate: &Certificate,
651 ) -> StdResult<Snapshot> {
652 let mut max_iteration = 100;
653 while !self.atomic_stop.load(Ordering::Relaxed) {
654 max_iteration -= 1;
655 if max_iteration <= 0 {
656 return Err(anyhow!("Test should handle the stop"));
657 }
658 tokio::time::sleep(Duration::from_millis(10)).await;
659 }
660 Ok(self.snapshot.clone())
661 }
662 }
663 let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
664 atomic_stop: atomic_stop.clone(),
665 snapshot: snapshot.clone(),
666 };
667
668 let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
669 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
670 self.mock_signed_entity_storer
671 .expect_store_signed_entity()
672 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
673 .return_once(|_| Ok(()));
674
675 let dependencies = SignedEntityServiceArtifactsDependencies::new(
676 Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
677 Arc::new(cardano_immutable_files_full_long_artifact_builder),
678 Arc::new(self.mock_cardano_transactions_artifact_builder),
679 Arc::new(self.mock_cardano_blocks_transactions_artifact_builder),
680 Arc::new(self.mock_cardano_stake_distribution_artifact_builder),
681 Arc::new(self.mock_cardano_database_artifact_builder),
682 );
683 MithrilSignedEntityService::new(
684 Arc::new(self.mock_signed_entity_storer),
685 dependencies,
686 Arc::new(SignedEntityTypeLock::default()),
687 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
688 TestLogger::stdout(),
689 )
690 }
691
692 fn mock_artifact_processing<
693 T: Artifact + Clone + Serialize + 'static,
694 U: signable_builder::Beacon,
695 >(
696 &mut self,
697 artifact: T,
698 mock_that_provide_artifact: &dyn Fn(
699 &mut MockDependencyInjector,
700 ) -> &mut MockArtifactBuilder<U, T>,
701 ) {
702 {
703 let artifact_cloned = artifact.clone();
704 mock_that_provide_artifact(self)
705 .expect_compute_artifact()
706 .times(1)
707 .return_once(|_, _| Ok(artifact_cloned));
708 }
709 {
710 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
711 let artifact_json = serde_json::to_string(&artifact_clone).unwrap();
712 self.mock_signed_entity_storer
713 .expect_store_signed_entity()
714 .withf(move |signed_entity| signed_entity.artifact == artifact_json)
715 .return_once(|_| Ok(()));
716 }
717 }
718
719 fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) {
720 self.mock_artifact_processing(artifact, &|mock_injector| {
721 &mut mock_injector.mock_mithril_stake_distribution_artifact_builder
722 });
723 }
724 }
725
726 fn get_artifact_total_produced_metric_since_startup_counter_value(
727 metrics_service: Arc<MetricsService>,
728 signed_entity_type: &SignedEntityType,
729 ) -> CounterValue {
730 match signed_entity_type {
731 SignedEntityType::MithrilStakeDistribution(..) => metrics_service
732 .get_artifact_mithril_stake_distribution_total_produced_since_startup()
733 .get(),
734 SignedEntityType::CardanoImmutableFilesFull(..) => metrics_service
735 .get_artifact_cardano_immutable_files_full_total_produced_since_startup()
736 .get(),
737 SignedEntityType::CardanoStakeDistribution(..) => metrics_service
738 .get_artifact_cardano_stake_distribution_total_produced_since_startup()
739 .get(),
740 SignedEntityType::CardanoTransactions(..) => metrics_service
741 .get_artifact_cardano_transaction_total_produced_since_startup()
742 .get(),
743 SignedEntityType::CardanoBlocksTransactions(..) => metrics_service
744 .get_artifact_cardano_blocks_transactions_total_produced_since_startup()
745 .get(),
746 SignedEntityType::CardanoDatabase(..) => metrics_service
747 .get_artifact_cardano_database_total_produced_since_startup()
748 .get(),
749 }
750 }
751
752 #[tokio::test]
753 async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type()
754 {
755 let mut mock_container = MockDependencyInjector::new();
756
757 let mithril_stake_distribution_expected = create_stake_distribution(Epoch(1), 5);
758
759 mock_container
760 .mock_mithril_stake_distribution_artifact_builder
761 .expect_compute_artifact()
762 .times(1)
763 .returning(|_, _| Ok(create_stake_distribution(Epoch(1), 5)));
764
765 let artifact_builder_service = mock_container.build_artifact_builder_service();
766
767 let certificate = fake_data::certificate("hash".to_string());
768 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
769 let artifact = artifact_builder_service
770 .compute_artifact(signed_entity_type.clone(), &certificate)
771 .await
772 .unwrap();
773
774 assert_expected(&mithril_stake_distribution_expected, &artifact);
775 }
776
777 #[tokio::test]
778 async fn should_store_the_artifact_when_creating_artifact_for_a_mithril_stake_distribution() {
779 generic_test_that_the_artifact_is_stored(
780 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
781 create_stake_distribution(Epoch(1), 5),
782 &|mock_injector| &mut mock_injector.mock_mithril_stake_distribution_artifact_builder,
783 )
784 .await;
785 }
786
787 #[tokio::test]
788 async fn build_cardano_stake_distribution_artifact_when_given_cardano_stake_distribution_entity_type()
789 {
790 let mut mock_container = MockDependencyInjector::new();
791
792 let cardano_stake_distribution_expected = create_cardano_stake_distribution(
793 Epoch(1),
794 StakeDistribution::from([("pool-1".to_string(), 100)]),
795 );
796
797 mock_container
798 .mock_cardano_stake_distribution_artifact_builder
799 .expect_compute_artifact()
800 .times(1)
801 .returning(|_, _| {
802 Ok(create_cardano_stake_distribution(
803 Epoch(1),
804 StakeDistribution::from([("pool-1".to_string(), 100)]),
805 ))
806 });
807
808 let artifact_builder_service = mock_container.build_artifact_builder_service();
809
810 let certificate = fake_data::certificate("hash".to_string());
811 let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(1));
812 let artifact = artifact_builder_service
813 .compute_artifact(signed_entity_type.clone(), &certificate)
814 .await
815 .unwrap();
816
817 assert_expected(&cardano_stake_distribution_expected, &artifact);
818 }
819
820 #[tokio::test]
821 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_stake_distribution() {
822 generic_test_that_the_artifact_is_stored(
823 SignedEntityType::CardanoStakeDistribution(Epoch(1)),
824 create_cardano_stake_distribution(
825 Epoch(1),
826 StakeDistribution::from([("pool-1".to_string(), 100)]),
827 ),
828 &|mock_injector| &mut mock_injector.mock_cardano_stake_distribution_artifact_builder,
829 )
830 .await;
831 }
832
833 #[tokio::test]
834 async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
835 let mut mock_container = MockDependencyInjector::new();
836
837 let snapshot_expected = fake_data::snapshot(1);
838
839 mock_container
840 .mock_cardano_immutable_files_full_artifact_builder
841 .expect_compute_artifact()
842 .times(1)
843 .returning(|_, _| Ok(fake_data::snapshot(1)));
844
845 let artifact_builder_service = mock_container.build_artifact_builder_service();
846
847 let certificate = fake_data::certificate("hash".to_string());
848 let signed_entity_type =
849 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
850 let artifact = artifact_builder_service
851 .compute_artifact(signed_entity_type.clone(), &certificate)
852 .await
853 .unwrap();
854
855 assert_expected(&snapshot_expected, &artifact);
856 }
857
858 #[tokio::test]
859 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_immutable_files() {
860 generic_test_that_the_artifact_is_stored(
861 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
862 fake_data::snapshot(1),
863 &|mock_injector| &mut mock_injector.mock_cardano_immutable_files_full_artifact_builder,
864 )
865 .await;
866 }
867
868 #[tokio::test]
869 async fn build_cardano_transactions_snapshot_artifact_when_given_cardano_transactions_type() {
870 let mut mock_container = MockDependencyInjector::new();
871
872 let block_number = BlockNumber(151);
873 let expected = CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number);
874
875 mock_container
876 .mock_cardano_transactions_artifact_builder
877 .expect_compute_artifact()
878 .times(1)
879 .returning(move |_, _| {
880 Ok(CardanoTransactionsSnapshot::new(
881 "merkle_root".to_string(),
882 block_number,
883 ))
884 });
885
886 let artifact_builder_service = mock_container.build_artifact_builder_service();
887
888 let certificate = fake_data::certificate("hash".to_string());
889 let signed_entity_type = SignedEntityType::CardanoTransactions(Epoch(1), block_number);
890 let artifact = artifact_builder_service
891 .compute_artifact(signed_entity_type.clone(), &certificate)
892 .await
893 .unwrap();
894
895 assert_expected(&expected, &artifact);
896 }
897
898 #[tokio::test]
899 async fn should_store_the_artifact_when_creating_artifact_for_cardano_transactions() {
900 let block_number = BlockNumber(149);
901 generic_test_that_the_artifact_is_stored(
902 SignedEntityType::CardanoTransactions(Epoch(1), block_number),
903 CardanoTransactionsSnapshot::new("merkle_root".to_string(), block_number),
904 &|mock_injector| &mut mock_injector.mock_cardano_transactions_artifact_builder,
905 )
906 .await;
907 }
908
909 #[tokio::test]
910 async fn build_cardano_blocks_transactions_snapshot_artifact_when_given_cardano_blocks_transactions_type()
911 {
912 let mut mock_container = MockDependencyInjector::new();
913
914 let block_number = BlockNumber(151);
915 let offset_security_parameter = BlockNumber(15);
916 let expected = CardanoBlocksTransactionsSnapshot::new(
917 "merkle_root".to_string(),
918 block_number,
919 offset_security_parameter,
920 );
921
922 mock_container
923 .mock_cardano_blocks_transactions_artifact_builder
924 .expect_compute_artifact()
925 .times(1)
926 .returning(move |_, _| {
927 Ok(CardanoBlocksTransactionsSnapshot::new(
928 "merkle_root".to_string(),
929 block_number,
930 offset_security_parameter,
931 ))
932 });
933
934 let artifact_builder_service = mock_container.build_artifact_builder_service();
935
936 let certificate = fake_data::certificate("hash".to_string());
937 let signed_entity_type =
938 SignedEntityType::CardanoBlocksTransactions(Epoch(1), block_number);
939 let artifact = artifact_builder_service
940 .compute_artifact(signed_entity_type.clone(), &certificate)
941 .await
942 .unwrap();
943
944 assert_expected(&expected, &artifact);
945 }
946
947 #[tokio::test]
948 async fn should_store_the_artifact_when_creating_artifact_for_cardano_blocks_transactions() {
949 let block_number = BlockNumber(149);
950 let offset_security_parameter = BlockNumber(15);
951 generic_test_that_the_artifact_is_stored(
952 SignedEntityType::CardanoBlocksTransactions(Epoch(1), block_number),
953 CardanoBlocksTransactionsSnapshot::new(
954 "merkle_root".to_string(),
955 block_number,
956 offset_security_parameter,
957 ),
958 &|mock_injector| &mut mock_injector.mock_cardano_blocks_transactions_artifact_builder,
959 )
960 .await;
961 }
962
963 #[tokio::test]
964 async fn build_cardano_database_artifact_when_given_cardano_database_entity_type() {
965 let mut mock_container = MockDependencyInjector::new();
966
967 let cardano_database_expected = fake_data::cardano_database_snapshot(1);
968
969 mock_container
970 .mock_cardano_database_artifact_builder
971 .expect_compute_artifact()
972 .times(1)
973 .returning(|_, _| Ok(fake_data::cardano_database_snapshot(1)));
974
975 let artifact_builder_service = mock_container.build_artifact_builder_service();
976
977 let certificate = fake_data::certificate("hash".to_string());
978 let signed_entity_type = SignedEntityType::CardanoDatabase(CardanoDbBeacon::default());
979 let artifact = artifact_builder_service
980 .compute_artifact(signed_entity_type.clone(), &certificate)
981 .await
982 .unwrap();
983
984 assert_expected(&cardano_database_expected, &artifact);
985 }
986
987 #[tokio::test]
988 async fn should_store_the_artifact_when_creating_artifact_for_a_cardano_database() {
989 generic_test_that_the_artifact_is_stored(
990 SignedEntityType::CardanoDatabase(CardanoDbBeacon::default()),
991 fake_data::cardano_database_snapshot(1),
992 &|mock_injector| &mut mock_injector.mock_cardano_database_artifact_builder,
993 )
994 .await;
995 }
996
997 async fn generic_test_that_the_artifact_is_stored<
998 T: Artifact + Clone + Serialize + 'static,
999 U: signable_builder::Beacon,
1000 >(
1001 signed_entity_type: SignedEntityType,
1002 artifact: T,
1003 mock_that_provide_artifact: &dyn Fn(
1004 &mut MockDependencyInjector,
1005 ) -> &mut MockArtifactBuilder<U, T>,
1006 ) {
1007 let mut mock_container = MockDependencyInjector::new();
1008 {
1009 let artifact_clone: Arc<dyn Artifact> = Arc::new(artifact.clone());
1010 let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
1011 mock_container
1012 .mock_signed_entity_storer
1013 .expect_store_signed_entity()
1014 .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
1015 .return_once(|_| Ok(()));
1016 }
1017 {
1018 let artifact_cloned = artifact.clone();
1019 mock_that_provide_artifact(&mut mock_container)
1020 .expect_compute_artifact()
1021 .times(1)
1022 .return_once(|_, _| Ok(artifact_cloned));
1023 }
1024 let artifact_builder_service = mock_container.build_artifact_builder_service();
1025
1026 let certificate = fake_data::certificate("hash".to_string());
1027 let error_message = format!(
1028 "Create artifact should not fail for {} signed entity",
1029 std::any::type_name::<T>()
1030 );
1031 let error_message_str = error_message.as_str();
1032
1033 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1034 artifact_builder_service.metrics_service.clone(),
1035 &signed_entity_type,
1036 );
1037
1038 artifact_builder_service
1039 .create_artifact_task(signed_entity_type.clone(), &certificate)
1040 .await
1041 .expect(error_message_str);
1042
1043 assert_eq!(
1044 initial_counter_value + 1,
1045 get_artifact_total_produced_metric_since_startup_counter_value(
1046 artifact_builder_service.metrics_service.clone(),
1047 &signed_entity_type,
1048 )
1049 )
1050 }
1051
1052 #[tokio::test]
1053 async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
1054 let atomic_stop = Arc::new(AtomicBool::new(false));
1055 let signed_entity_type_service = {
1056 let mut mock_container = MockDependencyInjector::new();
1057
1058 let msd = create_stake_distribution(Epoch(1), 5);
1059 mock_container.mock_stake_distribution_processing(msd);
1060
1061 mock_container
1062 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
1063 };
1064 let certificate = fake_data::certificate("hash".to_string());
1065
1066 let signed_entity_type_immutable =
1067 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1068 let first_task_that_never_finished = signed_entity_type_service
1069 .create_artifact(signed_entity_type_immutable, &certificate)
1070 .await
1071 .unwrap();
1072
1073 let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
1074 let second_task_that_finish_first = signed_entity_type_service
1075 .create_artifact(signed_entity_type_msd, &certificate)
1076 .await
1077 .unwrap();
1078
1079 second_task_that_finish_first.await.unwrap().unwrap();
1080 assert!(!first_task_that_never_finished.is_finished());
1081
1082 atomic_stop.swap(true, Ordering::Relaxed);
1083 }
1084
1085 #[tokio::test]
1086 async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
1087 let atomic_stop = Arc::new(AtomicBool::new(false));
1088 let signed_entity_type_service = MockDependencyInjector::new()
1089 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1090 let certificate = fake_data::certificate("hash".to_string());
1091
1092 let signed_entity_type_immutable =
1093 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1094 assert!(
1095 !signed_entity_type_service
1096 .signed_entity_type_lock
1097 .is_locked(&signed_entity_type_immutable)
1098 .await
1099 );
1100 let join_handle = signed_entity_type_service
1101 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1102 .await
1103 .unwrap();
1104
1105 let is_locked = signed_entity_type_service
1108 .signed_entity_type_lock
1109 .is_locked(&signed_entity_type_immutable)
1110 .await;
1111 let is_finished = join_handle.is_finished();
1112
1113 atomic_stop.swap(true, Ordering::Relaxed);
1114 join_handle.await.unwrap().unwrap();
1115
1116 assert!(is_locked);
1117 assert!(!is_finished);
1118
1119 assert!(
1120 !signed_entity_type_service
1121 .signed_entity_type_lock
1122 .is_locked(&signed_entity_type_immutable)
1123 .await
1124 );
1125 }
1126
1127 #[tokio::test]
1128 async fn create_artifact_unlock_signed_entity_type_when_error() {
1129 let signed_entity_type_service = {
1130 let mut mock_container = MockDependencyInjector::new();
1131 mock_container
1132 .mock_cardano_immutable_files_full_artifact_builder
1133 .expect_compute_artifact()
1134 .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
1135
1136 mock_container.build_artifact_builder_service()
1137 };
1138 let certificate = fake_data::certificate("hash".to_string());
1139
1140 let signed_entity_type_immutable =
1141 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1142
1143 let join_handle = signed_entity_type_service
1144 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1145 .await
1146 .unwrap();
1147
1148 let error = join_handle.await.unwrap().unwrap_err();
1149 assert!(
1150 error.to_string().contains("CardanoImmutableFilesFull"),
1151 "Error should contains CardanoImmutableFilesFull but was: {error}"
1152 );
1153
1154 assert!(
1155 !signed_entity_type_service
1156 .signed_entity_type_lock
1157 .is_locked(&signed_entity_type_immutable)
1158 .await
1159 );
1160 }
1161
1162 #[tokio::test]
1163 async fn create_artifact_unlock_signed_entity_type_when_panic() {
1164 let signed_entity_type_service =
1165 MockDependencyInjector::new().build_artifact_builder_service();
1166 let certificate = fake_data::certificate("hash".to_string());
1167
1168 let signed_entity_type_immutable =
1169 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1170
1171 let join_handle = signed_entity_type_service
1172 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1173 .await
1174 .unwrap();
1175
1176 let error = join_handle.await.unwrap().unwrap_err();
1177 assert!(
1178 error.to_string().contains("CardanoImmutableFilesFull"),
1179 "Error should contains CardanoImmutableFilesFull but was: {error}"
1180 );
1181
1182 assert!(
1183 !signed_entity_type_service
1184 .signed_entity_type_lock
1185 .is_locked(&signed_entity_type_immutable)
1186 .await
1187 );
1188 }
1189
1190 #[tokio::test]
1191 async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
1192 let atomic_stop = Arc::new(AtomicBool::new(false));
1193 let signed_entity_service = MockDependencyInjector::new()
1194 .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
1195 let certificate = fake_data::certificate("hash".to_string());
1196 let signed_entity_type_immutable =
1197 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
1198
1199 signed_entity_service
1200 .create_artifact(signed_entity_type_immutable.clone(), &certificate)
1201 .await
1202 .unwrap();
1203
1204 signed_entity_service
1205 .create_artifact(signed_entity_type_immutable, &certificate)
1206 .await
1207 .expect_err("Should return error when signed entity type is already locked");
1208
1209 atomic_stop.swap(true, Ordering::Relaxed);
1210 }
1211
1212 #[tokio::test]
1213 async fn metrics_counter_value_is_not_incremented_when_compute_artifact_error() {
1214 let signed_entity_service = {
1215 let mut mock_container = MockDependencyInjector::new();
1216 mock_container
1217 .mock_cardano_immutable_files_full_artifact_builder
1218 .expect_compute_artifact()
1219 .returning(|_, _| Err(anyhow!("Error while computing artifact")));
1220
1221 mock_container.build_artifact_builder_service()
1222 };
1223
1224 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1225
1226 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1227 signed_entity_service.metrics_service.clone(),
1228 &signed_entity_type,
1229 );
1230
1231 signed_entity_service
1232 .create_artifact(
1233 signed_entity_type.clone(),
1234 &fake_data::certificate("hash".to_string()),
1235 )
1236 .await
1237 .unwrap();
1238
1239 assert_eq!(
1240 initial_counter_value,
1241 get_artifact_total_produced_metric_since_startup_counter_value(
1242 signed_entity_service.metrics_service.clone(),
1243 &signed_entity_type,
1244 )
1245 );
1246 }
1247
1248 #[tokio::test]
1249 async fn metrics_counter_value_is_not_incremented_when_store_signed_entity_error() {
1250 let signed_entity_service = {
1251 let mut mock_container = MockDependencyInjector::new();
1252 mock_container
1253 .mock_signed_entity_storer
1254 .expect_store_signed_entity()
1255 .returning(|_| Err(anyhow!("Error while storing signed entity")));
1256
1257 mock_container.build_artifact_builder_service()
1258 };
1259
1260 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(7));
1261
1262 let initial_counter_value = get_artifact_total_produced_metric_since_startup_counter_value(
1263 signed_entity_service.metrics_service.clone(),
1264 &signed_entity_type,
1265 );
1266
1267 signed_entity_service
1268 .create_artifact(
1269 signed_entity_type.clone(),
1270 &fake_data::certificate("hash".to_string()),
1271 )
1272 .await
1273 .unwrap();
1274
1275 assert_eq!(
1276 initial_counter_value,
1277 get_artifact_total_produced_metric_since_startup_counter_value(
1278 signed_entity_service.metrics_service.clone(),
1279 &signed_entity_type,
1280 )
1281 );
1282 }
1283}