mithril_aggregator/runtime/
runner.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{debug, warn, Logger};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
8use mithril_common::logging::LoggerExtensions;
9use mithril_common::StdResult;
10use mithril_persistence::store::StakeStorer;
11
12use crate::entities::OpenMessage;
13use crate::ServeCommandDependenciesContainer;
14
15/// Configuration structure dedicated to the AggregatorRuntime.
16#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18    /// Interval between each snapshot, in ms
19    pub interval: Duration,
20
21    /// Whether the aggregator is a follower
22    pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26    /// Create a new instance of AggregatorConfig.
27    pub fn new(interval: Duration, is_follower: bool) -> Self {
28        Self {
29            interval,
30            is_follower,
31        }
32    }
33}
34
35/// This trait is intended to allow mocking the AggregatorRunner in tests.
36/// It exposes all the methods needed by the state machine.
37#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39    /// Return the current [TimePoint] from the chain
40    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42    /// Retrieves the current open message for a given signed entity type.
43    async fn get_current_open_message_for_signed_entity_type(
44        &self,
45        signed_entity_type: &SignedEntityType,
46    ) -> StdResult<Option<OpenMessage>>;
47
48    /// Retrieves the current non-certified open message.
49    async fn get_current_non_certified_open_message(
50        &self,
51        current_time_point: &TimePoint,
52    ) -> StdResult<Option<OpenMessage>>;
53
54    /// Check if a certificate chain is valid.
55    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57    /// Read the stake distribution from the blockchain and store it.
58    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60    /// Open the signer registration round of an epoch.
61    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63    /// Close the signer registration round of an epoch.
64    async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66    /// Check if the follower aggregator is running the same epoch as the leader.
67    async fn is_follower_aggregator_at_same_epoch_as_leader(
68        &self,
69        time_point: &TimePoint,
70    ) -> StdResult<bool>;
71
72    /// Synchronize the follower aggregator signer registration.
73    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75    /// Ask the EpochService to update the epoch settings.
76    async fn update_epoch_settings(&self) -> StdResult<()>;
77
78    /// Compute the protocol message
79    async fn compute_protocol_message(
80        &self,
81        signed_entity_type: &SignedEntityType,
82    ) -> StdResult<ProtocolMessage>;
83
84    /// Mark expired open message.
85    async fn mark_open_message_if_expired(
86        &self,
87        signed_entity_type: &SignedEntityType,
88    ) -> StdResult<Option<OpenMessage>>;
89
90    /// Tell the certifier to try to create a new certificate.
91    async fn create_certificate(
92        &self,
93        signed_entity_type: &SignedEntityType,
94    ) -> StdResult<Option<Certificate>>;
95
96    /// Create an artifact and persist it.
97    async fn create_artifact(
98        &self,
99        signed_entity_type: &SignedEntityType,
100        certificate: &Certificate,
101    ) -> StdResult<()>;
102
103    /// Update the EraChecker with EraReader information.
104    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
105
106    /// Ask services to update themselves for the new epoch
107    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
108
109    /// Perform the upkeep tasks.
110    async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
111
112    /// Precompute what doesn't change for the actual epoch
113    async fn precompute_epoch_data(&self) -> StdResult<()>;
114
115    /// Create new open message
116    async fn create_open_message(
117        &self,
118        signed_entity_type: &SignedEntityType,
119        protocol_message: &ProtocolMessage,
120    ) -> StdResult<OpenMessage>;
121
122    /// Checks if the open message is considered outdated.
123    async fn is_open_message_outdated(
124        &self,
125        open_message_signed_entity_type: SignedEntityType,
126        last_time_point: &TimePoint,
127    ) -> StdResult<bool>;
128
129    /// Increment the runtime cycle success metric.
130    fn increment_runtime_cycle_success_since_startup_counter(&self);
131
132    /// Increment the runtime cycle total metric.
133    fn increment_runtime_cycle_total_since_startup_counter(&self);
134}
135
136/// The runner responsibility is to expose a code API for the state machine. It
137/// holds services and configuration.
138pub struct AggregatorRunner {
139    dependencies: Arc<ServeCommandDependenciesContainer>,
140    logger: Logger,
141}
142
143impl AggregatorRunner {
144    /// Create a new instance of the Aggregator Runner.
145    pub fn new(dependencies: Arc<ServeCommandDependenciesContainer>) -> Self {
146        let logger = dependencies.root_logger.new_with_component_name::<Self>();
147        Self {
148            dependencies,
149            logger,
150        }
151    }
152
153    async fn list_available_signed_entity_types(
154        &self,
155        time_point: &TimePoint,
156    ) -> StdResult<Vec<SignedEntityType>> {
157        let signed_entity_types = self
158            .dependencies
159            .epoch_service
160            .read()
161            .await
162            .signed_entity_config()?
163            .list_allowed_signed_entity_types(time_point)?;
164        let unlocked_signed_entities = self
165            .dependencies
166            .signed_entity_type_lock
167            .filter_unlocked_entries(signed_entity_types)
168            .await;
169
170        Ok(unlocked_signed_entities)
171    }
172}
173
174#[cfg_attr(test, mockall::automock)]
175#[async_trait]
176impl AggregatorRunnerTrait for AggregatorRunner {
177    /// Return the current time point from the chain
178    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
179        debug!(self.logger, ">> get_time_point_from_chain");
180        let time_point = self
181            .dependencies
182            .ticker_service
183            .get_current_time_point()
184            .await?;
185
186        Ok(time_point)
187    }
188
189    async fn get_current_open_message_for_signed_entity_type(
190        &self,
191        signed_entity_type: &SignedEntityType,
192    ) -> StdResult<Option<OpenMessage>> {
193        debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
194        self.mark_open_message_if_expired(signed_entity_type)
195            .await?;
196
197        Ok(self
198            .dependencies
199            .certifier_service
200            .get_open_message(signed_entity_type)
201            .await
202            .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
203    }
204
205    async fn get_current_non_certified_open_message(
206        &self,
207        current_time_point: &TimePoint,
208    ) -> StdResult<Option<OpenMessage>> {
209        debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
210        let signed_entity_types = self
211            .list_available_signed_entity_types(current_time_point)
212            .await?;
213
214        for signed_entity_type in signed_entity_types {
215            let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
216                .await
217                .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
218            match current_open_message {
219                None => {
220                    let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
221                    let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
222                        .await
223                        .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
224
225                    return Ok(Some(open_message_new));
226                }
227                Some(open_message) => {
228                    if !open_message.is_certified && !open_message.is_expired {
229                        return Ok(Some(open_message));
230                    }
231                }
232            }
233        }
234
235        Ok(None)
236    }
237
238    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
239        debug!(self.logger, ">> is_certificate_chain_valid");
240        self.dependencies
241            .certifier_service
242            .verify_certificate_chain(time_point.epoch)
243            .await?;
244
245        Ok(())
246    }
247
248    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
249        debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
250        self.dependencies
251            .stake_distribution_service
252            .update_stake_distribution()
253            .await
254            .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
255    }
256
257    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
258        debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
259        let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
260
261        let stakes = self
262            .dependencies
263            .stake_store
264            .get_stakes(registration_epoch)
265            .await?
266            .unwrap_or_default();
267
268        self.dependencies
269            .signer_registration_round_opener
270            .open_registration_round(registration_epoch, stakes)
271            .await
272    }
273
274    async fn close_signer_registration_round(&self) -> StdResult<()> {
275        debug!(self.logger, ">> close_signer_registration_round");
276        self.dependencies
277            .signer_registration_round_opener
278            .close_registration_round()
279            .await
280    }
281
282    async fn is_follower_aggregator_at_same_epoch_as_leader(
283        &self,
284        time_point: &TimePoint,
285    ) -> StdResult<bool> {
286        self.dependencies
287            .signer_synchronizer
288            .can_synchronize_signers(time_point.epoch)
289            .await
290            .map_err(|e| e.into())
291    }
292
293    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
294        self.dependencies
295            .signer_synchronizer
296            .synchronize_all_signers()
297            .await
298            .map_err(|e| e.into())
299    }
300
301    async fn update_epoch_settings(&self) -> StdResult<()> {
302        debug!(self.logger, ">> update_epoch_settings");
303        self.dependencies
304            .epoch_service
305            .write()
306            .await
307            .update_epoch_settings()
308            .await
309    }
310
311    async fn compute_protocol_message(
312        &self,
313        signed_entity_type: &SignedEntityType,
314    ) -> StdResult<ProtocolMessage> {
315        debug!(self.logger, ">> compute_protocol_message");
316        let protocol_message = self
317            .dependencies
318            .signable_builder_service
319            .compute_protocol_message(signed_entity_type.to_owned())
320            .await
321            .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
322
323        Ok(protocol_message)
324    }
325
326    async fn mark_open_message_if_expired(
327        &self,
328        signed_entity_type: &SignedEntityType,
329    ) -> StdResult<Option<OpenMessage>> {
330        debug!(self.logger, ">> mark_open_message_if_expired");
331        let expired_open_message = self
332            .dependencies
333            .certifier_service
334            .mark_open_message_if_expired(signed_entity_type)
335            .await
336            .with_context(|| "CertifierService can not mark expired open message")?;
337
338        debug!(
339            self.logger, "Marked expired open messages";
340            "expired_open_message" => ?expired_open_message
341        );
342
343        Ok(expired_open_message)
344    }
345
346    async fn create_certificate(
347        &self,
348        signed_entity_type: &SignedEntityType,
349    ) -> StdResult<Option<Certificate>> {
350        debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
351
352        let certificate = self.dependencies
353            .certifier_service
354            .create_certificate(signed_entity_type)
355            .await
356            .with_context(|| {
357                format!(
358                    "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
359                )
360            })?;
361
362        if certificate.is_some() {
363            self.dependencies
364                .metrics_service
365                .get_certificate_total_produced_since_startup()
366                .increment();
367        }
368
369        Ok(certificate)
370    }
371
372    async fn create_artifact(
373        &self,
374        signed_entity_type: &SignedEntityType,
375        certificate: &Certificate,
376    ) -> StdResult<()> {
377        debug!(
378            self.logger, ">> create_artifact";
379            "signed_entity_type" => ?signed_entity_type,
380            "certificate_hash" => &certificate.hash
381        );
382
383        self.dependencies
384            .signed_entity_service
385            .create_artifact(signed_entity_type.to_owned(), certificate)
386            .await
387            .with_context(|| {
388                format!(
389                    "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
390                    certificate.hash
391                )
392            })?;
393
394        Ok(())
395    }
396
397    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
398        debug!(self.logger, ">> update_era_checker({epoch:?})");
399        let token = self
400            .dependencies
401            .era_reader
402            .read_era_epoch_token(epoch)
403            .await
404            .with_context(|| {
405                format!("EraReader can not get era epoch token for current epoch: '{epoch}'")
406            })?;
407
408        let current_era = token
409            .get_current_supported_era()
410            .with_context(|| "EraEpochToken can not get current supported era")?;
411        self.dependencies
412            .era_checker
413            .change_era(current_era, token.get_current_epoch());
414        debug!(
415            self.logger,
416            "Current Era is {current_era} (Epoch {}).",
417            token.get_current_epoch()
418        );
419
420        if token.get_next_supported_era().is_err() {
421            let era_name = &token.get_next_era_marker().unwrap().name;
422            warn!(self.logger,"Upcoming Era '{era_name}' is not supported by this version of the software. Please update!");
423        }
424
425        Ok(())
426    }
427
428    async fn precompute_epoch_data(&self) -> StdResult<()> {
429        debug!(self.logger, ">> precompute_epoch_data");
430        self.dependencies
431            .epoch_service
432            .write()
433            .await
434            .precompute_epoch_data()
435            .await?;
436
437        Ok(())
438    }
439
440    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
441        debug!(self.logger, ">> inform_new_epoch({epoch:?})");
442        self.dependencies
443            .certifier_service
444            .inform_epoch(epoch)
445            .await?;
446
447        self.dependencies
448            .epoch_service
449            .write()
450            .await
451            .inform_epoch(epoch)
452            .await?;
453
454        Ok(())
455    }
456
457    async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
458        debug!(self.logger, ">> upkeep");
459        self.dependencies.upkeep_service.run(epoch).await
460    }
461
462    async fn create_open_message(
463        &self,
464        signed_entity_type: &SignedEntityType,
465        protocol_message: &ProtocolMessage,
466    ) -> StdResult<OpenMessage> {
467        debug!(self.logger, ">> create_open_message");
468        self.dependencies
469            .certifier_service
470            .create_open_message(signed_entity_type, protocol_message)
471            .await
472    }
473
474    async fn is_open_message_outdated(
475        &self,
476        open_message_signed_entity_type: SignedEntityType,
477        last_time_point: &TimePoint,
478    ) -> StdResult<bool> {
479        let current_open_message = self
480            .get_current_open_message_for_signed_entity_type(
481                &open_message_signed_entity_type,
482            )
483            .await
484            .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
485        let is_expired_open_message = current_open_message
486            .as_ref()
487            .map(|om| om.is_expired)
488            .unwrap_or(false);
489
490        let exists_newer_open_message = {
491            let new_signed_entity_type = self
492                .dependencies
493                .epoch_service
494                .read()
495                .await
496                .signed_entity_config()?
497                .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
498            new_signed_entity_type != open_message_signed_entity_type
499        };
500
501        Ok(exists_newer_open_message || is_expired_open_message)
502    }
503
504    fn increment_runtime_cycle_success_since_startup_counter(&self) {
505        self.dependencies
506            .metrics_service
507            .get_runtime_cycle_success_since_startup()
508            .increment();
509    }
510
511    fn increment_runtime_cycle_total_since_startup_counter(&self) {
512        self.dependencies
513            .metrics_service
514            .get_runtime_cycle_total_since_startup()
515            .increment();
516    }
517}
518
519#[cfg(test)]
520pub mod tests {
521    use async_trait::async_trait;
522    use chrono::{DateTime, Utc};
523    use mockall::predicate::eq;
524    use mockall::{mock, Sequence};
525    use std::path::PathBuf;
526    use std::sync::Arc;
527    use tokio::sync::RwLock;
528
529    use mithril_cardano_node_chain::test::double::FakeChainObserver;
530    use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
531    use mithril_common::{
532        entities::{
533            CardanoTransactionsSigningConfig, ChainPoint, Epoch, ProtocolMessage,
534            SignedEntityConfig, SignedEntityType, SignedEntityTypeDiscriminants, StakeDistribution,
535            TimePoint,
536        },
537        signable_builder::SignableBuilderService,
538        temp_dir,
539        test_utils::{fake_data, MithrilFixtureBuilder},
540        StdResult,
541    };
542    use mithril_persistence::store::StakeStorer;
543    use mithril_signed_entity_lock::SignedEntityTypeLock;
544    use mithril_ticker::MithrilTickerService;
545
546    use crate::{
547        dependency_injection::DependenciesBuilder,
548        entities::{AggregatorEpochSettings, OpenMessage},
549        initialize_dependencies,
550        runtime::{AggregatorRunner, AggregatorRunnerTrait},
551        services::{
552            FakeEpochService, FakeEpochServiceBuilder, MithrilStakeDistributionService,
553            MockCertifierService, MockUpkeepService,
554        },
555        MithrilSignerRegistrationLeader, ServeCommandConfiguration,
556        ServeCommandDependenciesContainer, SignerRegistrationRound,
557    };
558
559    mock! {
560        SignableBuilderServiceImpl { }
561
562        #[async_trait]
563        impl SignableBuilderService for SignableBuilderServiceImpl
564        {
565
566            async fn compute_protocol_message(
567                &self,
568                signed_entity_type: SignedEntityType,
569            ) -> StdResult<ProtocolMessage>;
570        }
571    }
572
573    async fn build_runner_with_fixture_data(
574        deps: ServeCommandDependenciesContainer,
575    ) -> AggregatorRunner {
576        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
577        let current_epoch = deps
578            .chain_observer
579            .get_current_epoch()
580            .await
581            .unwrap()
582            .unwrap();
583        deps.init_state_from_fixture(
584            &fixture,
585            &CardanoTransactionsSigningConfig::dummy(),
586            &[
587                current_epoch.offset_to_signer_retrieval_epoch().unwrap(),
588                current_epoch,
589                current_epoch.next(),
590            ],
591        )
592        .await;
593
594        AggregatorRunner::new(Arc::new(deps))
595    }
596
597    async fn build_runner(
598        temp_dir: PathBuf,
599        mock_certifier_service: MockCertifierService,
600    ) -> AggregatorRunner {
601        let mut deps = initialize_dependencies(temp_dir).await;
602        deps.certifier_service = Arc::new(mock_certifier_service);
603
604        let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
605        mock_signable_builder_service
606            .expect_compute_protocol_message()
607            .return_once(|_| Ok(ProtocolMessage::default()));
608        deps.signable_builder_service = Arc::new(mock_signable_builder_service);
609
610        let runner = build_runner_with_fixture_data(deps).await;
611
612        let current_epoch = runner
613            .dependencies
614            .ticker_service
615            .get_current_epoch()
616            .await
617            .unwrap();
618        runner.inform_new_epoch(current_epoch).await.unwrap();
619        runner.precompute_epoch_data().await.unwrap();
620        runner
621    }
622
623    fn init_certifier_service_mock(
624        mock_certifier_service: &mut MockCertifierService,
625        messages: Vec<OpenMessage>,
626    ) {
627        for message in messages {
628            mock_certifier_service
629                .expect_get_open_message()
630                .return_once(|_| Ok(Some(message)))
631                .times(1);
632        }
633        // When all messages are retrieved, the function return None
634        mock_certifier_service
635            .expect_get_open_message()
636            .returning(|_| Ok(None));
637
638        mock_certifier_service
639            .expect_inform_epoch()
640            .return_once(|_| Ok(()));
641        mock_certifier_service
642            .expect_mark_open_message_if_expired()
643            .returning(|_| Ok(None));
644    }
645
646    fn create_open_message(is_certified: IsCertified, is_expired: IsExpired) -> OpenMessage {
647        OpenMessage {
648            signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(fake_data::beacon()),
649            is_certified: is_certified == IsCertified::Yes,
650            is_expired: is_expired == IsExpired::Yes,
651            ..OpenMessage::dummy()
652        }
653    }
654
655    #[derive(Eq, PartialEq)]
656    enum IsCertified {
657        Yes,
658        No,
659    }
660
661    #[derive(Eq, PartialEq)]
662    enum IsExpired {
663        Yes,
664        No,
665    }
666
667    #[tokio::test]
668    async fn test_get_time_point_from_chain() {
669        let expected = TimePoint::new(2, 17, ChainPoint::dummy());
670        let mut dependencies = initialize_dependencies!().await;
671        let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
672        immutable_file_observer
673            .shall_return(Some(expected.immutable_file_number))
674            .await;
675        let ticker_service = Arc::new(MithrilTickerService::new(
676            Arc::new(FakeChainObserver::new(Some(expected.clone()))),
677            immutable_file_observer,
678        ));
679        dependencies.ticker_service = ticker_service;
680        let runner = AggregatorRunner::new(Arc::new(dependencies));
681
682        // Retrieves the expected time point
683        let res = runner.get_time_point_from_chain().await;
684        assert_eq!(expected, res.unwrap());
685    }
686
687    #[tokio::test]
688    async fn test_update_stake_distribution() {
689        let chain_observer = Arc::new(FakeChainObserver::default());
690        let deps = {
691            let mut deps = initialize_dependencies!().await;
692            deps.chain_observer = chain_observer.clone();
693            deps.stake_distribution_service = Arc::new(MithrilStakeDistributionService::new(
694                deps.stake_store.clone(),
695                chain_observer.clone(),
696            ));
697            Arc::new(deps)
698        };
699        let runner = AggregatorRunner::new(deps.clone());
700        let time_point = runner.get_time_point_from_chain().await.unwrap();
701        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
702        let expected = fixture.stake_distribution();
703
704        chain_observer
705            .set_signers(fixture.signers_with_stake())
706            .await;
707        runner
708            .update_stake_distribution(&time_point)
709            .await
710            .expect("updating stake distribution should not return an error");
711
712        let saved_stake_distribution = deps
713            .stake_store
714            .get_stakes(time_point.epoch.offset_to_recording_epoch())
715            .await
716            .unwrap()
717            .unwrap_or_else(|| {
718                panic!(
719                    "I should have a stake distribution for the epoch {:?}",
720                    time_point.epoch
721                )
722            });
723
724        assert_eq!(expected, saved_stake_distribution);
725    }
726
727    #[tokio::test]
728    async fn test_open_signer_registration_round() {
729        let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
730        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
731
732        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
733            builder.get_verification_key_store().await.unwrap(),
734            builder.get_signer_store().await.unwrap(),
735            builder.get_signer_registration_verifier().await.unwrap(),
736        ));
737        let mut deps = builder.build_serve_dependencies_container().await.unwrap();
738        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
739        let stake_store = deps.stake_store.clone();
740        let deps = Arc::new(deps);
741        let runner = AggregatorRunner::new(deps.clone());
742
743        let time_point = TimePoint::dummy();
744        let recording_epoch = time_point.epoch.offset_to_recording_epoch();
745        let stake_distribution: StakeDistribution =
746            StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
747
748        stake_store
749            .save_stakes(recording_epoch, stake_distribution.clone())
750            .await
751            .expect("Save Stake distribution should not fail");
752
753        runner
754            .open_signer_registration_round(&time_point)
755            .await
756            .expect("opening signer registration should not return an error");
757
758        let saved_current_round = signer_registration_round_opener.get_current_round().await;
759
760        let expected_signer_registration_round =
761            SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
762
763        assert_eq!(
764            Some(expected_signer_registration_round),
765            saved_current_round,
766        );
767    }
768
769    #[tokio::test]
770    async fn test_close_signer_registration_round() {
771        let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
772        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
773
774        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
775            builder.get_verification_key_store().await.unwrap(),
776            builder.get_signer_store().await.unwrap(),
777            builder.get_signer_registration_verifier().await.unwrap(),
778        ));
779        let mut deps = builder.build_serve_dependencies_container().await.unwrap();
780        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
781        let deps = Arc::new(deps);
782        let runner = AggregatorRunner::new(deps.clone());
783
784        let time_point = TimePoint::dummy();
785        runner
786            .open_signer_registration_round(&time_point)
787            .await
788            .expect("opening signer registration should not return an error");
789
790        runner
791            .close_signer_registration_round()
792            .await
793            .expect("closing signer registration should not return an error");
794
795        let saved_current_round = signer_registration_round_opener.get_current_round().await;
796        assert!(saved_current_round.is_none());
797    }
798
799    #[tokio::test]
800    async fn test_expire_open_message() {
801        let open_message_expected = OpenMessage {
802            signed_entity_type: SignedEntityType::dummy(),
803            is_certified: false,
804            is_expired: false,
805            expires_at: Some(
806                DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
807                    .unwrap()
808                    .with_timezone(&Utc),
809            ),
810            ..OpenMessage::dummy()
811        };
812        let open_message_clone = open_message_expected.clone();
813
814        let mut mock_certifier_service = MockCertifierService::new();
815        mock_certifier_service
816            .expect_mark_open_message_if_expired()
817            .return_once(|_| Ok(Some(open_message_clone)));
818
819        let mut deps = initialize_dependencies!().await;
820        deps.certifier_service = Arc::new(mock_certifier_service);
821
822        let runner = build_runner_with_fixture_data(deps).await;
823        let open_message_expired = runner
824            .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
825            .await
826            .expect("mark_open_message_if_expired should not fail");
827
828        assert_eq!(Some(open_message_expected), open_message_expired);
829    }
830
831    #[tokio::test]
832    async fn test_update_era_checker() {
833        let deps = initialize_dependencies!().await;
834        let ticker_service = deps.ticker_service.clone();
835        let era_checker = deps.era_checker.clone();
836        let mut time_point = ticker_service.get_current_time_point().await.unwrap();
837
838        assert_eq!(time_point.epoch, era_checker.current_epoch());
839        let runner = AggregatorRunner::new(Arc::new(deps));
840        time_point.epoch += 1;
841
842        runner.update_era_checker(time_point.epoch).await.unwrap();
843        assert_eq!(time_point.epoch, era_checker.current_epoch());
844    }
845
846    #[tokio::test]
847    async fn test_inform_new_epoch() {
848        let mut mock_certifier_service = MockCertifierService::new();
849        mock_certifier_service
850            .expect_inform_epoch()
851            .returning(|_| Ok(()))
852            .times(1);
853        let mut deps = initialize_dependencies!().await;
854        let current_epoch = deps
855            .chain_observer
856            .get_current_epoch()
857            .await
858            .unwrap()
859            .unwrap();
860
861        deps.certifier_service = Arc::new(mock_certifier_service);
862        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
863            current_epoch,
864            &MithrilFixtureBuilder::default().build(),
865        )));
866
867        let runner = AggregatorRunner::new(Arc::new(deps));
868
869        runner.inform_new_epoch(current_epoch).await.unwrap();
870    }
871
872    #[tokio::test]
873    async fn test_upkeep_calls_run_on_upkeep_service() {
874        let mut upkeep_service = MockUpkeepService::new();
875        upkeep_service
876            .expect_run()
877            .with(eq(Epoch(5)))
878            .returning(|_| Ok(()))
879            .times(1);
880
881        let mut deps = initialize_dependencies!().await;
882        deps.upkeep_service = Arc::new(upkeep_service);
883
884        let runner = AggregatorRunner::new(Arc::new(deps));
885
886        runner.upkeep(Epoch(5)).await.unwrap();
887    }
888
889    #[tokio::test]
890    async fn test_update_epoch_settings() {
891        let mut mock_certifier_service = MockCertifierService::new();
892        mock_certifier_service
893            .expect_inform_epoch()
894            .returning(|_| Ok(()))
895            .times(1);
896
897        let config = ServeCommandConfiguration::new_sample(temp_dir!());
898        let mut deps = DependenciesBuilder::new_with_stdout_logger(Arc::new(config.clone()))
899            .build_serve_dependencies_container()
900            .await
901            .unwrap();
902        deps.certifier_service = Arc::new(mock_certifier_service);
903        let epoch_settings_storer = deps.epoch_settings_storer.clone();
904        let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
905        let insert_epoch = current_epoch.offset_to_epoch_settings_recording_epoch();
906
907        let runner = build_runner_with_fixture_data(deps).await;
908        runner.inform_new_epoch(current_epoch).await.unwrap();
909        runner
910            .update_epoch_settings()
911            .await
912            .expect("update_epoch_settings should not fail");
913
914        let saved_epoch_settings = epoch_settings_storer
915            .get_epoch_settings(insert_epoch)
916            .await
917            .unwrap()
918            .unwrap_or_else(|| panic!("should have epoch settings for epoch {insert_epoch}",));
919
920        assert_eq!(
921            AggregatorEpochSettings {
922                protocol_parameters: config.protocol_parameters.clone(),
923                cardano_transactions_signing_config: config
924                    .cardano_transactions_signing_config
925                    .clone(),
926            },
927            saved_epoch_settings
928        );
929    }
930
931    #[tokio::test]
932    async fn test_precompute_epoch_data() {
933        let mut deps = initialize_dependencies!().await;
934        let current_epoch = deps
935            .chain_observer
936            .get_current_epoch()
937            .await
938            .unwrap()
939            .unwrap();
940
941        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
942            current_epoch,
943            &MithrilFixtureBuilder::default().build(),
944        )));
945
946        let runner = AggregatorRunner::new(Arc::new(deps));
947
948        runner.precompute_epoch_data().await.unwrap();
949    }
950
951    #[tokio::test]
952    async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists(
953    ) {
954        let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
955        let open_message_expected = open_message_created.clone();
956
957        let runner = {
958            let mut mock_certifier_service = MockCertifierService::new();
959            init_certifier_service_mock(&mut mock_certifier_service, vec![]);
960
961            mock_certifier_service
962                .expect_create_open_message()
963                .return_once(|_, _| Ok(open_message_created))
964                .times(1);
965            build_runner(temp_dir!(), mock_certifier_service).await
966        };
967
968        let open_message_returned = runner
969            .get_current_non_certified_open_message(&TimePoint::dummy())
970            .await
971            .unwrap();
972        assert_eq!(Some(open_message_expected), open_message_returned);
973    }
974
975    #[tokio::test]
976    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired(
977    ) {
978        let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
979
980        let open_message_expected = not_certified_and_not_expired.clone();
981
982        let runner = {
983            let mut mock_certifier_service = MockCertifierService::new();
984            init_certifier_service_mock(
985                &mut mock_certifier_service,
986                vec![not_certified_and_not_expired],
987            );
988
989            mock_certifier_service.expect_create_open_message().never();
990            build_runner(temp_dir!(), mock_certifier_service).await
991        };
992
993        let open_message_returned = runner
994            .get_current_non_certified_open_message(&TimePoint::dummy())
995            .await
996            .unwrap();
997
998        assert_eq!(Some(open_message_expected), open_message_returned);
999    }
1000
1001    #[tokio::test]
1002    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified(
1003    ) {
1004        let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
1005        let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
1006
1007        let open_message_expected = not_certified_and_not_expired.clone();
1008
1009        let runner = {
1010            let mut mock_certifier_service = MockCertifierService::new();
1011            init_certifier_service_mock(
1012                &mut mock_certifier_service,
1013                vec![certified_and_not_expired, not_certified_and_not_expired],
1014            );
1015
1016            mock_certifier_service.expect_create_open_message().never();
1017            build_runner(temp_dir!(), mock_certifier_service).await
1018        };
1019
1020        let open_message_returned = runner
1021            .get_current_non_certified_open_message(&TimePoint::dummy())
1022            .await
1023            .unwrap();
1024
1025        assert_eq!(Some(open_message_expected), open_message_returned);
1026    }
1027
1028    #[tokio::test]
1029    async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified(
1030    ) {
1031        let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
1032
1033        let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
1034        let open_message_expected = open_message_created.clone();
1035
1036        let runner = {
1037            let mut mock_certifier_service = MockCertifierService::new();
1038            init_certifier_service_mock(
1039                &mut mock_certifier_service,
1040                vec![certified_and_not_expired],
1041            );
1042
1043            mock_certifier_service
1044                .expect_create_open_message()
1045                .return_once(|_, _| Ok(open_message_created))
1046                .times(1);
1047            build_runner(temp_dir!(), mock_certifier_service).await
1048        };
1049
1050        let open_message_returned = runner
1051            .get_current_non_certified_open_message(&TimePoint::dummy())
1052            .await
1053            .unwrap();
1054
1055        assert_eq!(Some(open_message_expected), open_message_returned);
1056    }
1057
1058    #[tokio::test]
1059    async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified(
1060    ) {
1061        let certified_and_not_expired_1 = create_open_message(IsCertified::Yes, IsExpired::No);
1062        let certified_and_not_expired_2 = create_open_message(IsCertified::Yes, IsExpired::No);
1063
1064        let runner = {
1065            let mut mock_certifier_service = MockCertifierService::new();
1066            init_certifier_service_mock(
1067                &mut mock_certifier_service,
1068                vec![certified_and_not_expired_1, certified_and_not_expired_2],
1069            );
1070
1071            mock_certifier_service.expect_create_open_message().never();
1072            build_runner(temp_dir!(), mock_certifier_service).await
1073        };
1074
1075        let open_message_returned = runner
1076            .get_current_non_certified_open_message(&TimePoint::dummy())
1077            .await
1078            .unwrap();
1079
1080        assert!(open_message_returned.is_none());
1081    }
1082
1083    #[tokio::test]
1084    async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message(
1085    ) {
1086        let not_certified_and_expired = create_open_message(IsCertified::No, IsExpired::Yes);
1087        let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
1088
1089        let open_message_expected = not_certified_and_not_expired.clone();
1090
1091        let runner = {
1092            let mut mock_certifier_service = MockCertifierService::new();
1093            init_certifier_service_mock(
1094                &mut mock_certifier_service,
1095                vec![not_certified_and_expired, not_certified_and_not_expired],
1096            );
1097
1098            mock_certifier_service.expect_create_open_message().never();
1099            build_runner(temp_dir!(), mock_certifier_service).await
1100        };
1101
1102        let open_message_returned = runner
1103            .get_current_non_certified_open_message(&TimePoint::dummy())
1104            .await
1105            .unwrap();
1106
1107        assert_eq!(Some(open_message_expected), open_message_returned);
1108    }
1109
1110    #[tokio::test]
1111    async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution_and_then_for_immutable_file(
1112    ) {
1113        let mut mock_certifier_service = MockCertifierService::new();
1114
1115        let mut seq = Sequence::new();
1116        mock_certifier_service
1117            .expect_get_open_message()
1118            .with(eq(SignedEntityType::MithrilStakeDistribution(
1119                TimePoint::dummy().epoch,
1120            )))
1121            .times(1)
1122            .in_sequence(&mut seq)
1123            .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1124
1125        mock_certifier_service
1126            .expect_get_open_message()
1127            .with(eq(SignedEntityType::CardanoImmutableFilesFull(
1128                fake_data::beacon(),
1129            )))
1130            .times(1)
1131            .in_sequence(&mut seq)
1132            .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1133
1134        mock_certifier_service.expect_create_open_message().never();
1135
1136        mock_certifier_service
1137            .expect_inform_epoch()
1138            .return_once(|_| Ok(()));
1139        mock_certifier_service
1140            .expect_mark_open_message_if_expired()
1141            .returning(|_| Ok(None));
1142
1143        let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1144
1145        runner
1146            .get_current_non_certified_open_message(&TimePoint::dummy())
1147            .await
1148            .unwrap();
1149    }
1150
1151    #[tokio::test]
1152    async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1153        let runner = {
1154            let mut dependencies = initialize_dependencies!().await;
1155            let epoch_service = FakeEpochServiceBuilder {
1156                signed_entity_config: SignedEntityConfig {
1157                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1158                    ..SignedEntityConfig::dummy()
1159                },
1160                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1161            }
1162            .build();
1163            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1164            dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1165            AggregatorRunner::new(Arc::new(dependencies))
1166        };
1167
1168        let time_point = TimePoint::dummy();
1169        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1170            .list_available_signed_entity_types(&time_point)
1171            .await
1172            .unwrap()
1173            .into_iter()
1174            .map(Into::into)
1175            .collect();
1176
1177        assert_eq!(
1178            signed_entities,
1179            SignedEntityTypeDiscriminants::all()
1180                .into_iter()
1181                .collect::<Vec<_>>()
1182        );
1183    }
1184
1185    #[tokio::test]
1186    async fn list_available_signed_entity_types_exclude_locked_entities() {
1187        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1188        let runner = {
1189            let mut dependencies = initialize_dependencies!().await;
1190            dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1191            let epoch_service = FakeEpochServiceBuilder {
1192                signed_entity_config: SignedEntityConfig {
1193                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1194                    ..SignedEntityConfig::dummy()
1195                },
1196                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1197            }
1198            .build();
1199            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1200
1201            AggregatorRunner::new(Arc::new(dependencies))
1202        };
1203
1204        signed_entity_type_lock
1205            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1206            .await;
1207
1208        let time_point = TimePoint::dummy();
1209        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1210            .list_available_signed_entity_types(&time_point)
1211            .await
1212            .unwrap()
1213            .into_iter()
1214            .map(Into::into)
1215            .collect();
1216
1217        assert!(!signed_entities.is_empty());
1218        assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1219    }
1220
1221    #[tokio::test]
1222    async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message(
1223    ) {
1224        assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1225    }
1226
1227    #[tokio::test]
1228    async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message(
1229    ) {
1230        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1231    }
1232
1233    #[tokio::test]
1234    async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message(
1235    ) {
1236        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1237    }
1238
1239    #[tokio::test]
1240    async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message(
1241    ) {
1242        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1243    }
1244
1245    async fn is_outdated_returned_when(
1246        tmp_path: PathBuf,
1247        is_expired: IsExpired,
1248        newer_open_message: bool,
1249    ) -> bool {
1250        let current_time_point = TimePoint {
1251            epoch: Epoch(2),
1252            ..TimePoint::dummy()
1253        };
1254
1255        let message_epoch = if newer_open_message {
1256            current_time_point.epoch + 54
1257        } else {
1258            current_time_point.epoch
1259        };
1260        let open_message_to_verify = OpenMessage {
1261            signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1262            is_expired: is_expired == IsExpired::Yes,
1263            ..OpenMessage::dummy()
1264        };
1265
1266        let runner = {
1267            let mut deps = initialize_dependencies(tmp_path).await;
1268            let mut mock_certifier_service = MockCertifierService::new();
1269
1270            let open_message_current = open_message_to_verify.clone();
1271            mock_certifier_service
1272                .expect_get_open_message()
1273                .times(1)
1274                .return_once(|_| Ok(Some(open_message_current)));
1275            mock_certifier_service
1276                .expect_mark_open_message_if_expired()
1277                .returning(|_| Ok(None));
1278
1279            deps.certifier_service = Arc::new(mock_certifier_service);
1280
1281            let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1282            deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1283
1284            build_runner_with_fixture_data(deps).await
1285        };
1286
1287        runner
1288            .is_open_message_outdated(
1289                open_message_to_verify.signed_entity_type,
1290                &current_time_point,
1291            )
1292            .await
1293            .unwrap()
1294    }
1295}