mithril_aggregator/runtime/
runner.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{debug, warn, Logger};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
8use mithril_common::logging::LoggerExtensions;
9use mithril_common::StdResult;
10use mithril_persistence::store::StakeStorer;
11
12use crate::entities::OpenMessage;
13use crate::DependencyContainer;
14
15/// Configuration structure dedicated to the AggregatorRuntime.
16#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18    /// Interval between each snapshot, in ms
19    pub interval: Duration,
20
21    /// Whether the aggregator is a follower
22    pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26    /// Create a new instance of AggregatorConfig.
27    pub fn new(interval: Duration, is_follower: bool) -> Self {
28        Self {
29            interval,
30            is_follower,
31        }
32    }
33}
34
35/// This trait is intended to allow mocking the AggregatorRunner in tests.
36/// It exposes all the methods needed by the state machine.
37#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39    /// Return the current [TimePoint] from the chain
40    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42    /// Retrieves the current open message for a given signed entity type.
43    async fn get_current_open_message_for_signed_entity_type(
44        &self,
45        signed_entity_type: &SignedEntityType,
46    ) -> StdResult<Option<OpenMessage>>;
47
48    /// Retrieves the current non-certified open message.
49    async fn get_current_non_certified_open_message(
50        &self,
51        current_time_point: &TimePoint,
52    ) -> StdResult<Option<OpenMessage>>;
53
54    /// Check if a certificate chain is valid.
55    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57    /// Read the stake distribution from the blockchain and store it.
58    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60    /// Open the signer registration round of an epoch.
61    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63    /// Close the signer registration round of an epoch.
64    async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66    /// Check if the follower aggregator is running the same epoch as the leader.
67    async fn is_follower_aggregator_at_same_epoch_as_leader(
68        &self,
69        time_point: &TimePoint,
70    ) -> StdResult<bool>;
71
72    /// Synchronize the follower aggregator signer registration.
73    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75    /// Ask the EpochService to update the epoch settings.
76    async fn update_epoch_settings(&self) -> StdResult<()>;
77
78    /// Compute the protocol message
79    async fn compute_protocol_message(
80        &self,
81        signed_entity_type: &SignedEntityType,
82    ) -> StdResult<ProtocolMessage>;
83
84    /// Mark expired open message.
85    async fn mark_open_message_if_expired(
86        &self,
87        signed_entity_type: &SignedEntityType,
88    ) -> StdResult<Option<OpenMessage>>;
89
90    /// Tell the certifier to try to create a new certificate.
91    async fn create_certificate(
92        &self,
93        signed_entity_type: &SignedEntityType,
94    ) -> StdResult<Option<Certificate>>;
95
96    /// Create an artifact and persist it.
97    async fn create_artifact(
98        &self,
99        signed_entity_type: &SignedEntityType,
100        certificate: &Certificate,
101    ) -> StdResult<()>;
102
103    /// Update the EraChecker with EraReader information.
104    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
105
106    /// Ask services to update themselves for the new epoch
107    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
108
109    /// Perform the upkeep tasks.
110    async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
111
112    /// Precompute what doesn't change for the actual epoch
113    async fn precompute_epoch_data(&self) -> StdResult<()>;
114
115    /// Create new open message
116    async fn create_open_message(
117        &self,
118        signed_entity_type: &SignedEntityType,
119        protocol_message: &ProtocolMessage,
120    ) -> StdResult<OpenMessage>;
121
122    /// Checks if the open message is considered outdated.
123    async fn is_open_message_outdated(
124        &self,
125        open_message_signed_entity_type: SignedEntityType,
126        last_time_point: &TimePoint,
127    ) -> StdResult<bool>;
128
129    /// Increment the runtime cycle success metric.
130    fn increment_runtime_cycle_success_since_startup_counter(&self);
131
132    /// Increment the runtime cycle total metric.
133    fn increment_runtime_cycle_total_since_startup_counter(&self);
134}
135
136/// The runner responsibility is to expose a code API for the state machine. It
137/// holds services and configuration.
138pub struct AggregatorRunner {
139    dependencies: Arc<DependencyContainer>,
140    logger: Logger,
141}
142
143impl AggregatorRunner {
144    /// Create a new instance of the Aggregator Runner.
145    pub fn new(dependencies: Arc<DependencyContainer>) -> Self {
146        let logger = dependencies.root_logger.new_with_component_name::<Self>();
147        Self {
148            dependencies,
149            logger,
150        }
151    }
152
153    async fn list_available_signed_entity_types(
154        &self,
155        time_point: &TimePoint,
156    ) -> StdResult<Vec<SignedEntityType>> {
157        let signed_entity_types = self
158            .dependencies
159            .epoch_service
160            .read()
161            .await
162            .signed_entity_config()?
163            .list_allowed_signed_entity_types(time_point)?;
164        let unlocked_signed_entities = self
165            .dependencies
166            .signed_entity_type_lock
167            .filter_unlocked_entries(signed_entity_types)
168            .await;
169
170        Ok(unlocked_signed_entities)
171    }
172}
173
174#[cfg_attr(test, mockall::automock)]
175#[async_trait]
176impl AggregatorRunnerTrait for AggregatorRunner {
177    /// Return the current time point from the chain
178    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
179        debug!(self.logger, ">> get_time_point_from_chain");
180        let time_point = self
181            .dependencies
182            .ticker_service
183            .get_current_time_point()
184            .await?;
185
186        Ok(time_point)
187    }
188
189    async fn get_current_open_message_for_signed_entity_type(
190        &self,
191        signed_entity_type: &SignedEntityType,
192    ) -> StdResult<Option<OpenMessage>> {
193        debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
194        self.mark_open_message_if_expired(signed_entity_type)
195            .await?;
196
197        Ok(self
198            .dependencies
199            .certifier_service
200            .get_open_message(signed_entity_type)
201            .await
202            .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
203    }
204
205    async fn get_current_non_certified_open_message(
206        &self,
207        current_time_point: &TimePoint,
208    ) -> StdResult<Option<OpenMessage>> {
209        debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
210        let signed_entity_types = self
211            .list_available_signed_entity_types(current_time_point)
212            .await?;
213
214        for signed_entity_type in signed_entity_types {
215            let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
216                .await
217                .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
218            match current_open_message {
219                None => {
220                    let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
221                    let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
222                        .await
223                        .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
224
225                    return Ok(Some(open_message_new));
226                }
227                Some(open_message) => {
228                    if !open_message.is_certified && !open_message.is_expired {
229                        return Ok(Some(open_message));
230                    }
231                }
232            }
233        }
234
235        Ok(None)
236    }
237
238    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
239        debug!(self.logger, ">> is_certificate_chain_valid");
240        self.dependencies
241            .certifier_service
242            .verify_certificate_chain(time_point.epoch)
243            .await?;
244
245        Ok(())
246    }
247
248    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
249        debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
250        self.dependencies
251            .stake_distribution_service
252            .update_stake_distribution()
253            .await
254            .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
255    }
256
257    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
258        debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
259        let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
260
261        let stakes = self
262            .dependencies
263            .stake_store
264            .get_stakes(registration_epoch)
265            .await?
266            .unwrap_or_default();
267
268        self.dependencies
269            .signer_registration_round_opener
270            .open_registration_round(registration_epoch, stakes)
271            .await
272    }
273
274    async fn close_signer_registration_round(&self) -> StdResult<()> {
275        debug!(self.logger, ">> close_signer_registration_round");
276        self.dependencies
277            .signer_registration_round_opener
278            .close_registration_round()
279            .await
280    }
281
282    async fn is_follower_aggregator_at_same_epoch_as_leader(
283        &self,
284        time_point: &TimePoint,
285    ) -> StdResult<bool> {
286        self.dependencies
287            .signer_synchronizer
288            .can_synchronize_signers(time_point.epoch)
289            .await
290            .map_err(|e| e.into())
291    }
292
293    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
294        self.dependencies
295            .signer_synchronizer
296            .synchronize_all_signers()
297            .await
298            .map_err(|e| e.into())
299    }
300
301    async fn update_epoch_settings(&self) -> StdResult<()> {
302        debug!(self.logger, ">> update_epoch_settings");
303        self.dependencies
304            .epoch_service
305            .write()
306            .await
307            .update_epoch_settings()
308            .await
309    }
310
311    async fn compute_protocol_message(
312        &self,
313        signed_entity_type: &SignedEntityType,
314    ) -> StdResult<ProtocolMessage> {
315        debug!(self.logger, ">> compute_protocol_message");
316        let protocol_message = self
317            .dependencies
318            .signable_builder_service
319            .compute_protocol_message(signed_entity_type.to_owned())
320            .await
321            .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
322
323        Ok(protocol_message)
324    }
325
326    async fn mark_open_message_if_expired(
327        &self,
328        signed_entity_type: &SignedEntityType,
329    ) -> StdResult<Option<OpenMessage>> {
330        debug!(self.logger, ">> mark_open_message_if_expired");
331        let expired_open_message = self
332            .dependencies
333            .certifier_service
334            .mark_open_message_if_expired(signed_entity_type)
335            .await
336            .with_context(|| "CertifierService can not mark expired open message")?;
337
338        debug!(
339            self.logger, "Marked expired open messages";
340            "expired_open_message" => ?expired_open_message
341        );
342
343        Ok(expired_open_message)
344    }
345
346    async fn create_certificate(
347        &self,
348        signed_entity_type: &SignedEntityType,
349    ) -> StdResult<Option<Certificate>> {
350        debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
351
352        let certificate = self.dependencies
353            .certifier_service
354            .create_certificate(signed_entity_type)
355            .await
356            .with_context(|| {
357                format!(
358                    "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
359                )
360            })?;
361
362        if certificate.is_some() {
363            self.dependencies
364                .metrics_service
365                .get_certificate_total_produced_since_startup()
366                .increment();
367        }
368
369        Ok(certificate)
370    }
371
372    async fn create_artifact(
373        &self,
374        signed_entity_type: &SignedEntityType,
375        certificate: &Certificate,
376    ) -> StdResult<()> {
377        debug!(
378            self.logger, ">> create_artifact";
379            "signed_entity_type" => ?signed_entity_type,
380            "certificate_hash" => &certificate.hash
381        );
382
383        self.dependencies
384            .signed_entity_service
385            .create_artifact(signed_entity_type.to_owned(), certificate)
386            .await
387            .with_context(|| {
388                format!(
389                    "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
390                    certificate.hash
391                )
392            })?;
393
394        Ok(())
395    }
396
397    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
398        debug!(self.logger, ">> update_era_checker({epoch:?})");
399        let token = self
400            .dependencies
401            .era_reader
402            .read_era_epoch_token(epoch)
403            .await
404            .with_context(|| {
405                format!(
406                    "EraReader can not get era epoch token for current epoch: '{}'",
407                    epoch
408                )
409            })?;
410
411        let current_era = token
412            .get_current_supported_era()
413            .with_context(|| "EraEpochToken can not get current supported era")?;
414        self.dependencies
415            .era_checker
416            .change_era(current_era, token.get_current_epoch());
417        debug!(
418            self.logger,
419            "Current Era is {current_era} (Epoch {}).",
420            token.get_current_epoch()
421        );
422
423        if token.get_next_supported_era().is_err() {
424            let era_name = &token.get_next_era_marker().unwrap().name;
425            warn!(self.logger,"Upcoming Era '{era_name}' is not supported by this version of the software. Please update!");
426        }
427
428        Ok(())
429    }
430
431    async fn precompute_epoch_data(&self) -> StdResult<()> {
432        debug!(self.logger, ">> precompute_epoch_data");
433        self.dependencies
434            .epoch_service
435            .write()
436            .await
437            .precompute_epoch_data()
438            .await?;
439
440        Ok(())
441    }
442
443    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
444        debug!(self.logger, ">> inform_new_epoch({epoch:?})");
445        self.dependencies
446            .certifier_service
447            .inform_epoch(epoch)
448            .await?;
449
450        self.dependencies
451            .epoch_service
452            .write()
453            .await
454            .inform_epoch(epoch)
455            .await?;
456
457        Ok(())
458    }
459
460    async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
461        debug!(self.logger, ">> upkeep");
462        self.dependencies.upkeep_service.run(epoch).await
463    }
464
465    async fn create_open_message(
466        &self,
467        signed_entity_type: &SignedEntityType,
468        protocol_message: &ProtocolMessage,
469    ) -> StdResult<OpenMessage> {
470        debug!(self.logger, ">> create_open_message");
471        self.dependencies
472            .certifier_service
473            .create_open_message(signed_entity_type, protocol_message)
474            .await
475    }
476
477    async fn is_open_message_outdated(
478        &self,
479        open_message_signed_entity_type: SignedEntityType,
480        last_time_point: &TimePoint,
481    ) -> StdResult<bool> {
482        let current_open_message = self
483            .get_current_open_message_for_signed_entity_type(
484                &open_message_signed_entity_type,
485            )
486            .await
487            .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
488        let is_expired_open_message = current_open_message
489            .as_ref()
490            .map(|om| om.is_expired)
491            .unwrap_or(false);
492
493        let exists_newer_open_message = {
494            let new_signed_entity_type = self
495                .dependencies
496                .epoch_service
497                .read()
498                .await
499                .signed_entity_config()?
500                .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
501            new_signed_entity_type != open_message_signed_entity_type
502        };
503
504        Ok(exists_newer_open_message || is_expired_open_message)
505    }
506
507    fn increment_runtime_cycle_success_since_startup_counter(&self) {
508        self.dependencies
509            .metrics_service
510            .get_runtime_cycle_success_since_startup()
511            .increment();
512    }
513
514    fn increment_runtime_cycle_total_since_startup_counter(&self) {
515        self.dependencies
516            .metrics_service
517            .get_runtime_cycle_total_since_startup()
518            .increment();
519    }
520}
521
522#[cfg(test)]
523pub mod tests {
524    use crate::dependency_injection::DependenciesBuilder;
525    use crate::entities::AggregatorEpochSettings;
526    use crate::services::{FakeEpochService, FakeEpochServiceBuilder, MockUpkeepService};
527    use crate::{
528        entities::OpenMessage,
529        initialize_dependencies,
530        runtime::{AggregatorRunner, AggregatorRunnerTrait},
531        services::{MithrilStakeDistributionService, MockCertifierService},
532        Configuration, DependencyContainer, MithrilSignerRegistrationLeader,
533        SignerRegistrationRound,
534    };
535    use async_trait::async_trait;
536    use chrono::{DateTime, Utc};
537    use mithril_common::entities::{
538        CardanoTransactionsSigningConfig, ChainPoint, Epoch, SignedEntityConfig,
539        SignedEntityTypeDiscriminants,
540    };
541    use mithril_common::temp_dir;
542    use mithril_common::{
543        chain_observer::FakeObserver,
544        digesters::DumbImmutableFileObserver,
545        entities::{ProtocolMessage, SignedEntityType, StakeDistribution, TimePoint},
546        signable_builder::SignableBuilderService,
547        test_utils::{fake_data, MithrilFixtureBuilder},
548        MithrilTickerService, StdResult,
549    };
550    use mithril_persistence::store::StakeStorer;
551    use mithril_signed_entity_lock::SignedEntityTypeLock;
552    use mockall::predicate::eq;
553    use mockall::{mock, Sequence};
554    use std::path::PathBuf;
555    use std::sync::Arc;
556    use tokio::sync::RwLock;
557
558    mock! {
559        SignableBuilderServiceImpl { }
560
561        #[async_trait]
562        impl SignableBuilderService for SignableBuilderServiceImpl
563        {
564
565            async fn compute_protocol_message(
566                &self,
567                signed_entity_type: SignedEntityType,
568            ) -> StdResult<ProtocolMessage>;
569        }
570    }
571
572    async fn build_runner_with_fixture_data(deps: DependencyContainer) -> AggregatorRunner {
573        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
574        let current_epoch = deps
575            .chain_observer
576            .get_current_epoch()
577            .await
578            .unwrap()
579            .unwrap();
580        deps.init_state_from_fixture(
581            &fixture,
582            &CardanoTransactionsSigningConfig::dummy(),
583            &[
584                current_epoch.offset_to_signer_retrieval_epoch().unwrap(),
585                current_epoch,
586                current_epoch.next(),
587            ],
588        )
589        .await;
590
591        AggregatorRunner::new(Arc::new(deps))
592    }
593
594    async fn build_runner(
595        temp_dir: PathBuf,
596        mock_certifier_service: MockCertifierService,
597    ) -> AggregatorRunner {
598        let mut deps = initialize_dependencies(temp_dir).await;
599        deps.certifier_service = Arc::new(mock_certifier_service);
600
601        let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
602        mock_signable_builder_service
603            .expect_compute_protocol_message()
604            .return_once(|_| Ok(ProtocolMessage::default()));
605        deps.signable_builder_service = Arc::new(mock_signable_builder_service);
606
607        let runner = build_runner_with_fixture_data(deps).await;
608
609        let current_epoch = runner
610            .dependencies
611            .ticker_service
612            .get_current_epoch()
613            .await
614            .unwrap();
615        runner.inform_new_epoch(current_epoch).await.unwrap();
616        runner.precompute_epoch_data().await.unwrap();
617        runner
618    }
619
620    fn init_certifier_service_mock(
621        mock_certifier_service: &mut MockCertifierService,
622        messages: Vec<OpenMessage>,
623    ) {
624        for message in messages {
625            mock_certifier_service
626                .expect_get_open_message()
627                .return_once(|_| Ok(Some(message)))
628                .times(1);
629        }
630        // When all messages are retrieved, the function return None
631        mock_certifier_service
632            .expect_get_open_message()
633            .returning(|_| Ok(None));
634
635        mock_certifier_service
636            .expect_inform_epoch()
637            .return_once(|_| Ok(()));
638        mock_certifier_service
639            .expect_mark_open_message_if_expired()
640            .returning(|_| Ok(None));
641    }
642
643    fn create_open_message(is_certified: IsCertified, is_expired: IsExpired) -> OpenMessage {
644        OpenMessage {
645            signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(fake_data::beacon()),
646            is_certified: is_certified == IsCertified::Yes,
647            is_expired: is_expired == IsExpired::Yes,
648            ..OpenMessage::dummy()
649        }
650    }
651
652    #[derive(Eq, PartialEq)]
653    enum IsCertified {
654        Yes,
655        No,
656    }
657
658    #[derive(Eq, PartialEq)]
659    enum IsExpired {
660        Yes,
661        No,
662    }
663
664    #[tokio::test]
665    async fn test_get_time_point_from_chain() {
666        let expected = TimePoint::new(2, 17, ChainPoint::dummy());
667        let mut dependencies = initialize_dependencies!().await;
668        let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
669        immutable_file_observer
670            .shall_return(Some(expected.immutable_file_number))
671            .await;
672        let ticker_service = Arc::new(MithrilTickerService::new(
673            Arc::new(FakeObserver::new(Some(expected.clone()))),
674            immutable_file_observer,
675        ));
676        dependencies.ticker_service = ticker_service;
677        let runner = AggregatorRunner::new(Arc::new(dependencies));
678
679        // Retrieves the expected time point
680        let res = runner.get_time_point_from_chain().await;
681        assert_eq!(expected, res.unwrap());
682    }
683
684    #[tokio::test]
685    async fn test_update_stake_distribution() {
686        let mut deps = initialize_dependencies!().await;
687        let chain_observer = Arc::new(FakeObserver::default());
688        deps.chain_observer = chain_observer.clone();
689        deps.stake_distribution_service = Arc::new(MithrilStakeDistributionService::new(
690            deps.stake_store.clone(),
691            chain_observer.clone(),
692        ));
693        let deps = Arc::new(deps);
694        let runner = AggregatorRunner::new(deps.clone());
695        let time_point = runner.get_time_point_from_chain().await.unwrap();
696        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
697        let expected = fixture.stake_distribution();
698
699        chain_observer
700            .set_signers(fixture.signers_with_stake())
701            .await;
702        runner
703            .update_stake_distribution(&time_point)
704            .await
705            .expect("updating stake distribution should not return an error");
706
707        let saved_stake_distribution = deps
708            .stake_store
709            .get_stakes(time_point.epoch.offset_to_recording_epoch())
710            .await
711            .unwrap()
712            .unwrap_or_else(|| {
713                panic!(
714                    "I should have a stake distribution for the epoch {:?}",
715                    time_point.epoch
716                )
717            });
718
719        assert_eq!(expected, saved_stake_distribution);
720    }
721
722    #[tokio::test]
723    async fn test_open_signer_registration_round() {
724        let mut deps = initialize_dependencies!().await;
725        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
726            deps.verification_key_store.clone(),
727            deps.signer_recorder.clone(),
728            deps.signer_registration_verifier.clone(),
729            None,
730        ));
731        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
732        let stake_store = deps.stake_store.clone();
733        let deps = Arc::new(deps);
734        let runner = AggregatorRunner::new(deps.clone());
735
736        let time_point = TimePoint::dummy();
737        let recording_epoch = time_point.epoch.offset_to_recording_epoch();
738        let stake_distribution: StakeDistribution =
739            StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
740
741        stake_store
742            .save_stakes(recording_epoch, stake_distribution.clone())
743            .await
744            .expect("Save Stake distribution should not fail");
745
746        runner
747            .open_signer_registration_round(&time_point)
748            .await
749            .expect("opening signer registration should not return an error");
750
751        let saved_current_round = signer_registration_round_opener.get_current_round().await;
752
753        let expected_signer_registration_round =
754            SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
755
756        assert_eq!(
757            Some(expected_signer_registration_round),
758            saved_current_round,
759        );
760    }
761
762    #[tokio::test]
763    async fn test_close_signer_registration_round() {
764        let mut deps = initialize_dependencies!().await;
765        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
766            deps.verification_key_store.clone(),
767            deps.signer_recorder.clone(),
768            deps.signer_registration_verifier.clone(),
769            None,
770        ));
771        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
772        let deps = Arc::new(deps);
773        let runner = AggregatorRunner::new(deps.clone());
774
775        let time_point = TimePoint::dummy();
776        runner
777            .open_signer_registration_round(&time_point)
778            .await
779            .expect("opening signer registration should not return an error");
780
781        runner
782            .close_signer_registration_round()
783            .await
784            .expect("closing signer registration should not return an error");
785
786        let saved_current_round = signer_registration_round_opener.get_current_round().await;
787        assert!(saved_current_round.is_none());
788    }
789
790    #[tokio::test]
791    async fn test_expire_open_message() {
792        let open_message_expected = OpenMessage {
793            signed_entity_type: SignedEntityType::dummy(),
794            is_certified: false,
795            is_expired: false,
796            expires_at: Some(
797                DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
798                    .unwrap()
799                    .with_timezone(&Utc),
800            ),
801            ..OpenMessage::dummy()
802        };
803        let open_message_clone = open_message_expected.clone();
804
805        let mut mock_certifier_service = MockCertifierService::new();
806        mock_certifier_service
807            .expect_mark_open_message_if_expired()
808            .return_once(|_| Ok(Some(open_message_clone)));
809
810        let mut deps = initialize_dependencies!().await;
811        deps.certifier_service = Arc::new(mock_certifier_service);
812
813        let runner = build_runner_with_fixture_data(deps).await;
814        let open_message_expired = runner
815            .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
816            .await
817            .expect("mark_open_message_if_expired should not fail");
818
819        assert_eq!(Some(open_message_expected), open_message_expired);
820    }
821
822    #[tokio::test]
823    async fn test_update_era_checker() {
824        let deps = initialize_dependencies!().await;
825        let ticker_service = deps.ticker_service.clone();
826        let era_checker = deps.era_checker.clone();
827        let mut time_point = ticker_service.get_current_time_point().await.unwrap();
828
829        assert_eq!(time_point.epoch, era_checker.current_epoch());
830        let runner = AggregatorRunner::new(Arc::new(deps));
831        time_point.epoch += 1;
832
833        runner.update_era_checker(time_point.epoch).await.unwrap();
834        assert_eq!(time_point.epoch, era_checker.current_epoch());
835    }
836
837    #[tokio::test]
838    async fn test_inform_new_epoch() {
839        let mut mock_certifier_service = MockCertifierService::new();
840        mock_certifier_service
841            .expect_inform_epoch()
842            .returning(|_| Ok(()))
843            .times(1);
844        let mut deps = initialize_dependencies!().await;
845        let current_epoch = deps
846            .chain_observer
847            .get_current_epoch()
848            .await
849            .unwrap()
850            .unwrap();
851
852        deps.certifier_service = Arc::new(mock_certifier_service);
853        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
854            current_epoch,
855            &MithrilFixtureBuilder::default().build(),
856        )));
857
858        let runner = AggregatorRunner::new(Arc::new(deps));
859
860        runner.inform_new_epoch(current_epoch).await.unwrap();
861    }
862
863    #[tokio::test]
864    async fn test_upkeep_calls_run_on_upkeep_service() {
865        let mut upkeep_service = MockUpkeepService::new();
866        upkeep_service
867            .expect_run()
868            .with(eq(Epoch(5)))
869            .returning(|_| Ok(()))
870            .times(1);
871
872        let mut deps = initialize_dependencies!().await;
873        deps.upkeep_service = Arc::new(upkeep_service);
874
875        let runner = AggregatorRunner::new(Arc::new(deps));
876
877        runner.upkeep(Epoch(5)).await.unwrap();
878    }
879
880    #[tokio::test]
881    async fn test_update_epoch_settings() {
882        let mut mock_certifier_service = MockCertifierService::new();
883        mock_certifier_service
884            .expect_inform_epoch()
885            .returning(|_| Ok(()))
886            .times(1);
887
888        let config = Configuration::new_sample(temp_dir!());
889        let mut deps = DependenciesBuilder::new_with_stdout_logger(config.clone())
890            .build_dependency_container()
891            .await
892            .unwrap();
893        deps.certifier_service = Arc::new(mock_certifier_service);
894        let epoch_settings_storer = deps.epoch_settings_storer.clone();
895        let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
896        let insert_epoch = current_epoch.offset_to_epoch_settings_recording_epoch();
897
898        let runner = build_runner_with_fixture_data(deps).await;
899        runner.inform_new_epoch(current_epoch).await.unwrap();
900        runner
901            .update_epoch_settings()
902            .await
903            .expect("update_epoch_settings should not fail");
904
905        let saved_epoch_settings = epoch_settings_storer
906            .get_epoch_settings(insert_epoch)
907            .await
908            .unwrap()
909            .unwrap_or_else(|| panic!("should have epoch settings for epoch {insert_epoch}",));
910
911        assert_eq!(
912            AggregatorEpochSettings {
913                protocol_parameters: config.protocol_parameters.clone(),
914                cardano_transactions_signing_config: config
915                    .cardano_transactions_signing_config
916                    .clone(),
917            },
918            saved_epoch_settings
919        );
920    }
921
922    #[tokio::test]
923    async fn test_precompute_epoch_data() {
924        let mut deps = initialize_dependencies!().await;
925        let current_epoch = deps
926            .chain_observer
927            .get_current_epoch()
928            .await
929            .unwrap()
930            .unwrap();
931
932        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
933            current_epoch,
934            &MithrilFixtureBuilder::default().build(),
935        )));
936
937        let runner = AggregatorRunner::new(Arc::new(deps));
938
939        runner.precompute_epoch_data().await.unwrap();
940    }
941
942    #[tokio::test]
943    async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists(
944    ) {
945        let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
946        let open_message_expected = open_message_created.clone();
947
948        let runner = {
949            let mut mock_certifier_service = MockCertifierService::new();
950            init_certifier_service_mock(&mut mock_certifier_service, vec![]);
951
952            mock_certifier_service
953                .expect_create_open_message()
954                .return_once(|_, _| Ok(open_message_created))
955                .times(1);
956            build_runner(temp_dir!(), mock_certifier_service).await
957        };
958
959        let open_message_returned = runner
960            .get_current_non_certified_open_message(&TimePoint::dummy())
961            .await
962            .unwrap();
963        assert_eq!(Some(open_message_expected), open_message_returned);
964    }
965
966    #[tokio::test]
967    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired(
968    ) {
969        let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
970
971        let open_message_expected = not_certified_and_not_expired.clone();
972
973        let runner = {
974            let mut mock_certifier_service = MockCertifierService::new();
975            init_certifier_service_mock(
976                &mut mock_certifier_service,
977                vec![not_certified_and_not_expired],
978            );
979
980            mock_certifier_service.expect_create_open_message().never();
981            build_runner(temp_dir!(), mock_certifier_service).await
982        };
983
984        let open_message_returned = runner
985            .get_current_non_certified_open_message(&TimePoint::dummy())
986            .await
987            .unwrap();
988
989        assert_eq!(Some(open_message_expected), open_message_returned);
990    }
991
992    #[tokio::test]
993    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified(
994    ) {
995        let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
996        let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
997
998        let open_message_expected = not_certified_and_not_expired.clone();
999
1000        let runner = {
1001            let mut mock_certifier_service = MockCertifierService::new();
1002            init_certifier_service_mock(
1003                &mut mock_certifier_service,
1004                vec![certified_and_not_expired, not_certified_and_not_expired],
1005            );
1006
1007            mock_certifier_service.expect_create_open_message().never();
1008            build_runner(temp_dir!(), mock_certifier_service).await
1009        };
1010
1011        let open_message_returned = runner
1012            .get_current_non_certified_open_message(&TimePoint::dummy())
1013            .await
1014            .unwrap();
1015
1016        assert_eq!(Some(open_message_expected), open_message_returned);
1017    }
1018
1019    #[tokio::test]
1020    async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified(
1021    ) {
1022        let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
1023
1024        let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
1025        let open_message_expected = open_message_created.clone();
1026
1027        let runner = {
1028            let mut mock_certifier_service = MockCertifierService::new();
1029            init_certifier_service_mock(
1030                &mut mock_certifier_service,
1031                vec![certified_and_not_expired],
1032            );
1033
1034            mock_certifier_service
1035                .expect_create_open_message()
1036                .return_once(|_, _| Ok(open_message_created))
1037                .times(1);
1038            build_runner(temp_dir!(), mock_certifier_service).await
1039        };
1040
1041        let open_message_returned = runner
1042            .get_current_non_certified_open_message(&TimePoint::dummy())
1043            .await
1044            .unwrap();
1045
1046        assert_eq!(Some(open_message_expected), open_message_returned);
1047    }
1048
1049    #[tokio::test]
1050    async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified(
1051    ) {
1052        let certified_and_not_expired_1 = create_open_message(IsCertified::Yes, IsExpired::No);
1053        let certified_and_not_expired_2 = create_open_message(IsCertified::Yes, IsExpired::No);
1054
1055        let runner = {
1056            let mut mock_certifier_service = MockCertifierService::new();
1057            init_certifier_service_mock(
1058                &mut mock_certifier_service,
1059                vec![certified_and_not_expired_1, certified_and_not_expired_2],
1060            );
1061
1062            mock_certifier_service.expect_create_open_message().never();
1063            build_runner(temp_dir!(), mock_certifier_service).await
1064        };
1065
1066        let open_message_returned = runner
1067            .get_current_non_certified_open_message(&TimePoint::dummy())
1068            .await
1069            .unwrap();
1070
1071        assert!(open_message_returned.is_none());
1072    }
1073
1074    #[tokio::test]
1075    async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message(
1076    ) {
1077        let not_certified_and_expired = create_open_message(IsCertified::No, IsExpired::Yes);
1078        let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
1079
1080        let open_message_expected = not_certified_and_not_expired.clone();
1081
1082        let runner = {
1083            let mut mock_certifier_service = MockCertifierService::new();
1084            init_certifier_service_mock(
1085                &mut mock_certifier_service,
1086                vec![not_certified_and_expired, not_certified_and_not_expired],
1087            );
1088
1089            mock_certifier_service.expect_create_open_message().never();
1090            build_runner(temp_dir!(), mock_certifier_service).await
1091        };
1092
1093        let open_message_returned = runner
1094            .get_current_non_certified_open_message(&TimePoint::dummy())
1095            .await
1096            .unwrap();
1097
1098        assert_eq!(Some(open_message_expected), open_message_returned);
1099    }
1100
1101    #[tokio::test]
1102    async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution_and_then_for_immutable_file(
1103    ) {
1104        let mut mock_certifier_service = MockCertifierService::new();
1105
1106        let mut seq = Sequence::new();
1107        mock_certifier_service
1108            .expect_get_open_message()
1109            .with(eq(SignedEntityType::MithrilStakeDistribution(
1110                TimePoint::dummy().epoch,
1111            )))
1112            .times(1)
1113            .in_sequence(&mut seq)
1114            .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1115
1116        mock_certifier_service
1117            .expect_get_open_message()
1118            .with(eq(SignedEntityType::CardanoImmutableFilesFull(
1119                fake_data::beacon(),
1120            )))
1121            .times(1)
1122            .in_sequence(&mut seq)
1123            .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1124
1125        mock_certifier_service.expect_create_open_message().never();
1126
1127        mock_certifier_service
1128            .expect_inform_epoch()
1129            .return_once(|_| Ok(()));
1130        mock_certifier_service
1131            .expect_mark_open_message_if_expired()
1132            .returning(|_| Ok(None));
1133
1134        let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1135
1136        runner
1137            .get_current_non_certified_open_message(&TimePoint::dummy())
1138            .await
1139            .unwrap();
1140    }
1141
1142    #[tokio::test]
1143    async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1144        let runner = {
1145            let mut dependencies = initialize_dependencies!().await;
1146            let epoch_service = FakeEpochServiceBuilder {
1147                signed_entity_config: SignedEntityConfig {
1148                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1149                    ..SignedEntityConfig::dummy()
1150                },
1151                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1152            }
1153            .build();
1154            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1155            dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1156            AggregatorRunner::new(Arc::new(dependencies))
1157        };
1158
1159        let time_point = TimePoint::dummy();
1160        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1161            .list_available_signed_entity_types(&time_point)
1162            .await
1163            .unwrap()
1164            .into_iter()
1165            .map(Into::into)
1166            .collect();
1167
1168        assert_eq!(
1169            signed_entities,
1170            SignedEntityTypeDiscriminants::all()
1171                .into_iter()
1172                .collect::<Vec<_>>()
1173        );
1174    }
1175
1176    #[tokio::test]
1177    async fn list_available_signed_entity_types_exclude_locked_entities() {
1178        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1179        let runner = {
1180            let mut dependencies = initialize_dependencies!().await;
1181            dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1182            let epoch_service = FakeEpochServiceBuilder {
1183                signed_entity_config: SignedEntityConfig {
1184                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1185                    ..SignedEntityConfig::dummy()
1186                },
1187                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1188            }
1189            .build();
1190            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1191
1192            AggregatorRunner::new(Arc::new(dependencies))
1193        };
1194
1195        signed_entity_type_lock
1196            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1197            .await;
1198
1199        let time_point = TimePoint::dummy();
1200        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1201            .list_available_signed_entity_types(&time_point)
1202            .await
1203            .unwrap()
1204            .into_iter()
1205            .map(Into::into)
1206            .collect();
1207
1208        assert!(!signed_entities.is_empty());
1209        assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1210    }
1211
1212    #[tokio::test]
1213    async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message(
1214    ) {
1215        assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1216    }
1217
1218    #[tokio::test]
1219    async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message(
1220    ) {
1221        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1222    }
1223
1224    #[tokio::test]
1225    async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message(
1226    ) {
1227        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1228    }
1229
1230    #[tokio::test]
1231    async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message(
1232    ) {
1233        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1234    }
1235
1236    async fn is_outdated_returned_when(
1237        tmp_path: PathBuf,
1238        is_expired: IsExpired,
1239        newer_open_message: bool,
1240    ) -> bool {
1241        let current_time_point = TimePoint {
1242            epoch: Epoch(2),
1243            ..TimePoint::dummy()
1244        };
1245
1246        let message_epoch = if newer_open_message {
1247            current_time_point.epoch + 54
1248        } else {
1249            current_time_point.epoch
1250        };
1251        let open_message_to_verify = OpenMessage {
1252            signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1253            is_expired: is_expired == IsExpired::Yes,
1254            ..OpenMessage::dummy()
1255        };
1256
1257        let runner = {
1258            let mut deps = initialize_dependencies(tmp_path).await;
1259            let mut mock_certifier_service = MockCertifierService::new();
1260
1261            let open_message_current = open_message_to_verify.clone();
1262            mock_certifier_service
1263                .expect_get_open_message()
1264                .times(1)
1265                .return_once(|_| Ok(Some(open_message_current)));
1266            mock_certifier_service
1267                .expect_mark_open_message_if_expired()
1268                .returning(|_| Ok(None));
1269
1270            deps.certifier_service = Arc::new(mock_certifier_service);
1271
1272            let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1273            deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1274
1275            build_runner_with_fixture_data(deps).await
1276        };
1277
1278        runner
1279            .is_open_message_outdated(
1280                open_message_to_verify.signed_entity_type,
1281                &current_time_point,
1282            )
1283            .await
1284            .unwrap()
1285    }
1286}