mithril_aggregator/runtime/
runner.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{Logger, debug, warn};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::StdResult;
8use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
9use mithril_common::logging::LoggerExtensions;
10use mithril_persistence::store::StakeStorer;
11
12use crate::ServeCommandDependenciesContainer;
13use crate::entities::OpenMessage;
14
15/// Configuration structure dedicated to the AggregatorRuntime.
16#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18    /// Interval between each snapshot, in ms
19    pub interval: Duration,
20
21    /// Whether the aggregator is a follower
22    pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26    /// Create a new instance of AggregatorConfig.
27    pub fn new(interval: Duration, is_follower: bool) -> Self {
28        Self {
29            interval,
30            is_follower,
31        }
32    }
33}
34
35/// This trait is intended to allow mocking the AggregatorRunner in tests.
36/// It exposes all the methods needed by the state machine.
37#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39    /// Return the current [TimePoint] from the chain
40    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42    /// Retrieves the current open message for a given signed entity type.
43    async fn get_current_open_message_for_signed_entity_type(
44        &self,
45        signed_entity_type: &SignedEntityType,
46    ) -> StdResult<Option<OpenMessage>>;
47
48    /// Retrieves the current non-certified open message.
49    async fn get_current_non_certified_open_message(
50        &self,
51        current_time_point: &TimePoint,
52    ) -> StdResult<Option<OpenMessage>>;
53
54    /// Check if a certificate chain is valid.
55    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57    /// Read the stake distribution from the blockchain and store it.
58    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60    /// Open the signer registration round of an epoch.
61    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63    /// Close the signer registration round of an epoch.
64    async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66    /// Check if the follower aggregator is running the same epoch as the leader.
67    async fn is_follower_aggregator_at_same_epoch_as_leader(
68        &self,
69        time_point: &TimePoint,
70    ) -> StdResult<bool>;
71
72    /// Synchronize the follower aggregator signer registration.
73    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75    /// Synchronize the follower aggregator certificate chain
76    async fn synchronize_follower_aggregator_certificate_chain(
77        &self,
78        force_sync: bool,
79    ) -> StdResult<()>;
80
81    /// Ask the EpochService to update the epoch settings.
82    async fn update_epoch_settings(&self) -> StdResult<()>;
83
84    /// Compute the protocol message
85    async fn compute_protocol_message(
86        &self,
87        signed_entity_type: &SignedEntityType,
88    ) -> StdResult<ProtocolMessage>;
89
90    /// Mark expired open message.
91    async fn mark_open_message_if_expired(
92        &self,
93        signed_entity_type: &SignedEntityType,
94    ) -> StdResult<Option<OpenMessage>>;
95
96    /// Tell the certifier to try to create a new certificate.
97    async fn create_certificate(
98        &self,
99        signed_entity_type: &SignedEntityType,
100    ) -> StdResult<Option<Certificate>>;
101
102    /// Create an artifact and persist it.
103    async fn create_artifact(
104        &self,
105        signed_entity_type: &SignedEntityType,
106        certificate: &Certificate,
107    ) -> StdResult<()>;
108
109    /// Update the EraChecker with EraReader information.
110    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
111
112    /// Ask services to update themselves for the new epoch
113    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
114
115    /// Perform the upkeep tasks.
116    async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
117
118    /// Precompute what doesn't change for the actual epoch
119    async fn precompute_epoch_data(&self) -> StdResult<()>;
120
121    /// Create new open message
122    async fn create_open_message(
123        &self,
124        signed_entity_type: &SignedEntityType,
125        protocol_message: &ProtocolMessage,
126    ) -> StdResult<OpenMessage>;
127
128    /// Checks if the open message is considered outdated.
129    async fn is_open_message_outdated(
130        &self,
131        open_message_signed_entity_type: SignedEntityType,
132        last_time_point: &TimePoint,
133    ) -> StdResult<bool>;
134
135    /// Increment the runtime cycle success metric.
136    fn increment_runtime_cycle_success_since_startup_counter(&self);
137
138    /// Increment the runtime cycle total metric.
139    fn increment_runtime_cycle_total_since_startup_counter(&self);
140}
141
142/// The runner responsibility is to expose a code API for the state machine. It
143/// holds services and configuration.
144pub struct AggregatorRunner {
145    dependencies: Arc<ServeCommandDependenciesContainer>,
146    logger: Logger,
147}
148
149impl AggregatorRunner {
150    /// Create a new instance of the Aggregator Runner.
151    pub fn new(dependencies: Arc<ServeCommandDependenciesContainer>) -> Self {
152        let logger = dependencies.root_logger.new_with_component_name::<Self>();
153        Self {
154            dependencies,
155            logger,
156        }
157    }
158
159    async fn list_available_signed_entity_types(
160        &self,
161        time_point: &TimePoint,
162    ) -> StdResult<Vec<SignedEntityType>> {
163        let signed_entity_types = self
164            .dependencies
165            .epoch_service
166            .read()
167            .await
168            .signed_entity_config()?
169            .list_allowed_signed_entity_types(time_point)?;
170        let unlocked_signed_entities = self
171            .dependencies
172            .signed_entity_type_lock
173            .filter_unlocked_entries(signed_entity_types)
174            .await;
175
176        Ok(unlocked_signed_entities)
177    }
178}
179
180#[cfg_attr(test, mockall::automock)]
181#[async_trait]
182impl AggregatorRunnerTrait for AggregatorRunner {
183    /// Return the current time point from the chain
184    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
185        debug!(self.logger, ">> get_time_point_from_chain");
186        let time_point = self.dependencies.ticker_service.get_current_time_point().await?;
187
188        Ok(time_point)
189    }
190
191    async fn get_current_open_message_for_signed_entity_type(
192        &self,
193        signed_entity_type: &SignedEntityType,
194    ) -> StdResult<Option<OpenMessage>> {
195        debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
196        self.mark_open_message_if_expired(signed_entity_type).await?;
197
198        Ok(self
199            .dependencies
200            .certifier_service
201            .get_open_message(signed_entity_type)
202            .await
203            .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
204    }
205
206    async fn get_current_non_certified_open_message(
207        &self,
208        current_time_point: &TimePoint,
209    ) -> StdResult<Option<OpenMessage>> {
210        debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
211        let signed_entity_types =
212            self.list_available_signed_entity_types(current_time_point).await?;
213
214        for signed_entity_type in signed_entity_types {
215            let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
216                .await
217                .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
218            match current_open_message {
219                None => {
220                    let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
221                    let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
222                        .await
223                        .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
224
225                    return Ok(Some(open_message_new));
226                }
227                Some(open_message) => {
228                    if !open_message.is_certified && !open_message.is_expired {
229                        return Ok(Some(open_message));
230                    }
231                }
232            }
233        }
234
235        Ok(None)
236    }
237
238    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
239        debug!(self.logger, ">> is_certificate_chain_valid");
240        self.dependencies
241            .certifier_service
242            .verify_certificate_chain(time_point.epoch)
243            .await?;
244
245        Ok(())
246    }
247
248    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
249        debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
250        self.dependencies
251            .stake_distribution_service
252            .update_stake_distribution()
253            .await
254            .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
255    }
256
257    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
258        debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
259        let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
260
261        let stakes = self
262            .dependencies
263            .stake_store
264            .get_stakes(registration_epoch)
265            .await?
266            .unwrap_or_default();
267
268        self.dependencies
269            .signer_registration_round_opener
270            .open_registration_round(registration_epoch, stakes)
271            .await
272    }
273
274    async fn close_signer_registration_round(&self) -> StdResult<()> {
275        debug!(self.logger, ">> close_signer_registration_round");
276        self.dependencies
277            .signer_registration_round_opener
278            .close_registration_round()
279            .await
280    }
281
282    async fn is_follower_aggregator_at_same_epoch_as_leader(
283        &self,
284        time_point: &TimePoint,
285    ) -> StdResult<bool> {
286        self.dependencies
287            .signer_synchronizer
288            .can_synchronize_signers(time_point.epoch)
289            .await
290            .map_err(|e| e.into())
291    }
292
293    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
294        self.dependencies
295            .signer_synchronizer
296            .synchronize_all_signers()
297            .await
298            .map_err(|e| e.into())
299    }
300
301    async fn update_epoch_settings(&self) -> StdResult<()> {
302        debug!(self.logger, ">> update_epoch_settings");
303        self.dependencies
304            .epoch_service
305            .write()
306            .await
307            .update_epoch_settings()
308            .await
309    }
310
311    async fn compute_protocol_message(
312        &self,
313        signed_entity_type: &SignedEntityType,
314    ) -> StdResult<ProtocolMessage> {
315        debug!(self.logger, ">> compute_protocol_message");
316        let protocol_message = self
317            .dependencies
318            .signable_builder_service
319            .compute_protocol_message(signed_entity_type.to_owned())
320            .await
321            .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
322
323        Ok(protocol_message)
324    }
325
326    async fn mark_open_message_if_expired(
327        &self,
328        signed_entity_type: &SignedEntityType,
329    ) -> StdResult<Option<OpenMessage>> {
330        debug!(self.logger, ">> mark_open_message_if_expired");
331        let expired_open_message = self
332            .dependencies
333            .certifier_service
334            .mark_open_message_if_expired(signed_entity_type)
335            .await
336            .with_context(|| "CertifierService can not mark expired open message")?;
337
338        debug!(
339            self.logger, "Marked expired open messages";
340            "expired_open_message" => ?expired_open_message
341        );
342
343        Ok(expired_open_message)
344    }
345
346    async fn create_certificate(
347        &self,
348        signed_entity_type: &SignedEntityType,
349    ) -> StdResult<Option<Certificate>> {
350        debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
351
352        let certificate = self.dependencies
353            .certifier_service
354            .create_certificate(signed_entity_type)
355            .await
356            .with_context(|| {
357                format!(
358                    "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
359                )
360            })?;
361
362        if certificate.is_some() {
363            self.dependencies
364                .metrics_service
365                .get_certificate_total_produced_since_startup()
366                .increment();
367        }
368
369        Ok(certificate)
370    }
371
372    async fn create_artifact(
373        &self,
374        signed_entity_type: &SignedEntityType,
375        certificate: &Certificate,
376    ) -> StdResult<()> {
377        debug!(
378            self.logger, ">> create_artifact";
379            "signed_entity_type" => ?signed_entity_type,
380            "certificate_hash" => &certificate.hash
381        );
382
383        self.dependencies
384            .signed_entity_service
385            .create_artifact(signed_entity_type.to_owned(), certificate)
386            .await
387            .with_context(|| {
388                format!(
389                    "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
390                    certificate.hash
391                )
392            })?;
393
394        Ok(())
395    }
396
397    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
398        debug!(self.logger, ">> update_era_checker({epoch:?})");
399        let token = self
400            .dependencies
401            .era_reader
402            .read_era_epoch_token(epoch)
403            .await
404            .with_context(|| {
405                format!("EraReader can not get era epoch token for current epoch: '{epoch}'")
406            })?;
407
408        let current_era = token
409            .get_current_supported_era()
410            .with_context(|| "EraEpochToken can not get current supported era")?;
411        self.dependencies
412            .era_checker
413            .change_era(current_era, token.get_current_epoch());
414        debug!(
415            self.logger,
416            "Current Era is {current_era} (Epoch {}).",
417            token.get_current_epoch()
418        );
419
420        if token.get_next_supported_era().is_err() {
421            let era_name = &token.get_next_era_marker().unwrap().name;
422            warn!(
423                self.logger,
424                "Upcoming Era '{era_name}' is not supported by this version of the software. Please update!"
425            );
426        }
427
428        Ok(())
429    }
430
431    async fn precompute_epoch_data(&self) -> StdResult<()> {
432        debug!(self.logger, ">> precompute_epoch_data");
433        self.dependencies
434            .epoch_service
435            .write()
436            .await
437            .precompute_epoch_data()
438            .await?;
439
440        Ok(())
441    }
442
443    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
444        debug!(self.logger, ">> inform_new_epoch({epoch:?})");
445        self.dependencies.certifier_service.inform_epoch(epoch).await?;
446
447        self.dependencies
448            .epoch_service
449            .write()
450            .await
451            .inform_epoch(epoch)
452            .await?;
453
454        Ok(())
455    }
456
457    async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
458        debug!(self.logger, ">> upkeep");
459        self.dependencies.upkeep_service.run(epoch).await
460    }
461
462    async fn create_open_message(
463        &self,
464        signed_entity_type: &SignedEntityType,
465        protocol_message: &ProtocolMessage,
466    ) -> StdResult<OpenMessage> {
467        debug!(self.logger, ">> create_open_message");
468        self.dependencies
469            .certifier_service
470            .create_open_message(signed_entity_type, protocol_message)
471            .await
472    }
473
474    async fn is_open_message_outdated(
475        &self,
476        open_message_signed_entity_type: SignedEntityType,
477        last_time_point: &TimePoint,
478    ) -> StdResult<bool> {
479        let current_open_message = self
480            .get_current_open_message_for_signed_entity_type(
481                &open_message_signed_entity_type,
482            )
483            .await
484            .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
485        let is_expired_open_message =
486            current_open_message.as_ref().map(|om| om.is_expired).unwrap_or(false);
487
488        let exists_newer_open_message = {
489            let new_signed_entity_type = self
490                .dependencies
491                .epoch_service
492                .read()
493                .await
494                .signed_entity_config()?
495                .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
496            new_signed_entity_type != open_message_signed_entity_type
497        };
498
499        Ok(exists_newer_open_message || is_expired_open_message)
500    }
501
502    fn increment_runtime_cycle_success_since_startup_counter(&self) {
503        self.dependencies
504            .metrics_service
505            .get_runtime_cycle_success_since_startup()
506            .increment();
507    }
508
509    fn increment_runtime_cycle_total_since_startup_counter(&self) {
510        self.dependencies
511            .metrics_service
512            .get_runtime_cycle_total_since_startup()
513            .increment();
514    }
515
516    async fn synchronize_follower_aggregator_certificate_chain(
517        &self,
518        force_sync: bool,
519    ) -> StdResult<()> {
520        debug!(
521            self.logger,
522            ">> synchronize_follower_aggregator_certificate_chain(force_sync:{force_sync})"
523        );
524        self.dependencies
525            .certificate_chain_synchronizer
526            .synchronize_certificate_chain(force_sync)
527            .await
528    }
529}
530
531#[cfg(test)]
532pub mod tests {
533    use async_trait::async_trait;
534    use chrono::{DateTime, Utc};
535    use mockall::mock;
536    use mockall::predicate::eq;
537    use std::path::PathBuf;
538    use std::sync::Arc;
539    use tokio::sync::RwLock;
540
541    use mithril_cardano_node_chain::test::double::FakeChainObserver;
542    use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
543    use mithril_common::{
544        StdResult,
545        entities::{
546            CardanoTransactionsSigningConfig, ChainPoint, Epoch, ProtocolMessage,
547            SignedEntityConfig, SignedEntityType, SignedEntityTypeDiscriminants, StakeDistribution,
548            TimePoint,
549        },
550        signable_builder::SignableBuilderService,
551        temp_dir,
552        test::{
553            builder::MithrilFixtureBuilder,
554            double::{Dummy, fake_data},
555        },
556    };
557    use mithril_persistence::store::StakeStorer;
558    use mithril_signed_entity_lock::SignedEntityTypeLock;
559    use mithril_ticker::MithrilTickerService;
560
561    use crate::{
562        MithrilSignerRegistrationLeader, ServeCommandConfiguration,
563        ServeCommandDependenciesContainer, SignerRegistrationRound,
564        dependency_injection::DependenciesBuilder,
565        entities::{AggregatorEpochSettings, OpenMessage},
566        initialize_dependencies,
567        runtime::{AggregatorRunner, AggregatorRunnerTrait},
568        services::{
569            FakeEpochService, FakeEpochServiceBuilder, MithrilStakeDistributionService,
570            MockCertifierService, MockUpkeepService,
571        },
572    };
573
574    mock! {
575        SignableBuilderServiceImpl { }
576
577        #[async_trait]
578        impl SignableBuilderService for SignableBuilderServiceImpl
579        {
580
581            async fn compute_protocol_message(
582                &self,
583                signed_entity_type: SignedEntityType,
584            ) -> StdResult<ProtocolMessage>;
585        }
586    }
587
588    async fn build_runner_with_fixture_data(
589        deps: ServeCommandDependenciesContainer,
590    ) -> AggregatorRunner {
591        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
592        let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
593        deps.init_state_from_fixture(
594            &fixture,
595            &CardanoTransactionsSigningConfig::dummy(),
596            &[
597                current_epoch.offset_to_signer_retrieval_epoch().unwrap(),
598                current_epoch,
599                current_epoch.next(),
600            ],
601        )
602        .await;
603
604        AggregatorRunner::new(Arc::new(deps))
605    }
606
607    async fn build_runner(
608        temp_dir: PathBuf,
609        mock_certifier_service: MockCertifierService,
610    ) -> AggregatorRunner {
611        build_runner_with_discriminants(temp_dir, mock_certifier_service, vec![]).await
612    }
613
614    async fn build_runner_with_discriminants(
615        temp_dir: PathBuf,
616        mock_certifier_service: MockCertifierService,
617        allowed_discriminants: Vec<SignedEntityTypeDiscriminants>,
618    ) -> AggregatorRunner {
619        let mut deps = initialize_dependencies(temp_dir).await;
620        deps.certifier_service = Arc::new(mock_certifier_service);
621
622        let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
623        mock_signable_builder_service
624            .expect_compute_protocol_message()
625            .return_once(|_| Ok(ProtocolMessage::default()));
626        deps.signable_builder_service = Arc::new(mock_signable_builder_service);
627
628        // Configure EpochService with allowed_discriminants
629        if !allowed_discriminants.is_empty() {
630            let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
631            let epoch_service = FakeEpochServiceBuilder {
632                signed_entity_config: SignedEntityConfig {
633                    allowed_discriminants: allowed_discriminants.into_iter().collect(),
634                    ..SignedEntityConfig::dummy()
635                },
636                ..FakeEpochServiceBuilder::dummy(current_epoch)
637            }
638            .build();
639            deps.epoch_service = Arc::new(RwLock::new(epoch_service));
640        }
641
642        let runner = build_runner_with_fixture_data(deps).await;
643
644        let current_epoch = runner.dependencies.ticker_service.get_current_epoch().await.unwrap();
645        runner.inform_new_epoch(current_epoch).await.unwrap();
646        runner.precompute_epoch_data().await.unwrap();
647        runner
648    }
649
650    fn init_certifier_service_mock(
651        mock_certifier_service: &mut MockCertifierService,
652        messages: Vec<OpenMessage>,
653    ) {
654        for message in messages {
655            mock_certifier_service
656                .expect_get_open_message()
657                .return_once(|_| Ok(Some(message)))
658                .times(1);
659        }
660        // When all messages are retrieved, the function return None
661        mock_certifier_service
662            .expect_get_open_message()
663            .returning(|_| Ok(None));
664
665        mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
666        mock_certifier_service
667            .expect_mark_open_message_if_expired()
668            .returning(|_| Ok(None));
669    }
670
671    fn create_open_message(
672        is_certified: IsCertified,
673        is_expired: IsExpired,
674        signed_entity_type: SignedEntityType,
675    ) -> OpenMessage {
676        OpenMessage {
677            signed_entity_type,
678            is_certified: is_certified == IsCertified::Yes,
679            is_expired: is_expired == IsExpired::Yes,
680            ..OpenMessage::dummy()
681        }
682    }
683
684    #[derive(Eq, PartialEq)]
685    enum IsCertified {
686        Yes,
687        No,
688    }
689
690    #[derive(Eq, PartialEq)]
691    enum IsExpired {
692        Yes,
693        No,
694    }
695
696    #[tokio::test]
697    async fn test_get_time_point_from_chain() {
698        let expected = TimePoint::new(2, 17, ChainPoint::dummy());
699        let mut dependencies = initialize_dependencies!().await;
700        let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
701        immutable_file_observer
702            .shall_return(Some(expected.immutable_file_number))
703            .await;
704        let ticker_service = Arc::new(MithrilTickerService::new(
705            Arc::new(FakeChainObserver::new(Some(expected.clone()))),
706            immutable_file_observer,
707        ));
708        dependencies.ticker_service = ticker_service;
709        let runner = AggregatorRunner::new(Arc::new(dependencies));
710
711        // Retrieves the expected time point
712        let res = runner.get_time_point_from_chain().await;
713        assert_eq!(expected, res.unwrap());
714    }
715
716    #[tokio::test]
717    async fn test_update_stake_distribution() {
718        let chain_observer = Arc::new(FakeChainObserver::default());
719        let deps = {
720            let mut deps = initialize_dependencies!().await;
721            deps.chain_observer = chain_observer.clone();
722            deps.stake_distribution_service = Arc::new(MithrilStakeDistributionService::new(
723                deps.stake_store.clone(),
724                chain_observer.clone(),
725            ));
726            Arc::new(deps)
727        };
728        let runner = AggregatorRunner::new(deps.clone());
729        let time_point = runner.get_time_point_from_chain().await.unwrap();
730        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
731        let expected = fixture.stake_distribution();
732
733        chain_observer.set_signers(fixture.signers_with_stake()).await;
734        runner
735            .update_stake_distribution(&time_point)
736            .await
737            .expect("updating stake distribution should not return an error");
738
739        let saved_stake_distribution = deps
740            .stake_store
741            .get_stakes(time_point.epoch.offset_to_recording_epoch())
742            .await
743            .unwrap()
744            .unwrap_or_else(|| {
745                panic!(
746                    "I should have a stake distribution for the epoch {:?}",
747                    time_point.epoch
748                )
749            });
750
751        assert_eq!(expected, saved_stake_distribution);
752    }
753
754    #[tokio::test]
755    async fn test_open_signer_registration_round() {
756        let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
757        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
758
759        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
760            builder.get_verification_key_store().await.unwrap(),
761            builder.get_signer_store().await.unwrap(),
762            builder.get_signer_registration_verifier().await.unwrap(),
763        ));
764        let mut deps = builder.build_serve_dependencies_container().await.unwrap();
765        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
766        let stake_store = deps.stake_store.clone();
767        let deps = Arc::new(deps);
768        let runner = AggregatorRunner::new(deps.clone());
769
770        let time_point = TimePoint::dummy();
771        let recording_epoch = time_point.epoch.offset_to_recording_epoch();
772        let stake_distribution: StakeDistribution =
773            StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
774
775        stake_store
776            .save_stakes(recording_epoch, stake_distribution.clone())
777            .await
778            .expect("Save Stake distribution should not fail");
779
780        runner
781            .open_signer_registration_round(&time_point)
782            .await
783            .expect("opening signer registration should not return an error");
784
785        let saved_current_round = signer_registration_round_opener.get_current_round().await;
786
787        let expected_signer_registration_round =
788            SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
789
790        assert_eq!(
791            Some(expected_signer_registration_round),
792            saved_current_round,
793        );
794    }
795
796    #[tokio::test]
797    async fn test_close_signer_registration_round() {
798        let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
799        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
800
801        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
802            builder.get_verification_key_store().await.unwrap(),
803            builder.get_signer_store().await.unwrap(),
804            builder.get_signer_registration_verifier().await.unwrap(),
805        ));
806        let mut deps = builder.build_serve_dependencies_container().await.unwrap();
807        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
808        let deps = Arc::new(deps);
809        let runner = AggregatorRunner::new(deps.clone());
810
811        let time_point = TimePoint::dummy();
812        runner
813            .open_signer_registration_round(&time_point)
814            .await
815            .expect("opening signer registration should not return an error");
816
817        runner
818            .close_signer_registration_round()
819            .await
820            .expect("closing signer registration should not return an error");
821
822        let saved_current_round = signer_registration_round_opener.get_current_round().await;
823        assert!(saved_current_round.is_none());
824    }
825
826    #[tokio::test]
827    async fn test_expire_open_message() {
828        let open_message_expected = OpenMessage {
829            signed_entity_type: SignedEntityType::dummy(),
830            is_certified: false,
831            is_expired: false,
832            expires_at: Some(
833                DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
834                    .unwrap()
835                    .with_timezone(&Utc),
836            ),
837            ..OpenMessage::dummy()
838        };
839        let open_message_clone = open_message_expected.clone();
840
841        let mut mock_certifier_service = MockCertifierService::new();
842        mock_certifier_service
843            .expect_mark_open_message_if_expired()
844            .return_once(|_| Ok(Some(open_message_clone)));
845
846        let mut deps = initialize_dependencies!().await;
847        deps.certifier_service = Arc::new(mock_certifier_service);
848
849        let runner = build_runner_with_fixture_data(deps).await;
850        let open_message_expired = runner
851            .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
852            .await
853            .expect("mark_open_message_if_expired should not fail");
854
855        assert_eq!(Some(open_message_expected), open_message_expired);
856    }
857
858    #[tokio::test]
859    async fn test_update_era_checker() {
860        let deps = initialize_dependencies!().await;
861        let ticker_service = deps.ticker_service.clone();
862        let era_checker = deps.era_checker.clone();
863        let mut time_point = ticker_service.get_current_time_point().await.unwrap();
864
865        assert_eq!(time_point.epoch, era_checker.current_epoch());
866        let runner = AggregatorRunner::new(Arc::new(deps));
867        time_point.epoch += 1;
868
869        runner.update_era_checker(time_point.epoch).await.unwrap();
870        assert_eq!(time_point.epoch, era_checker.current_epoch());
871    }
872
873    #[tokio::test]
874    async fn test_inform_new_epoch() {
875        let mut mock_certifier_service = MockCertifierService::new();
876        mock_certifier_service
877            .expect_inform_epoch()
878            .returning(|_| Ok(()))
879            .times(1);
880        let mut deps = initialize_dependencies!().await;
881        let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
882
883        deps.certifier_service = Arc::new(mock_certifier_service);
884        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
885            current_epoch,
886            &MithrilFixtureBuilder::default().build(),
887        )));
888
889        let runner = AggregatorRunner::new(Arc::new(deps));
890
891        runner.inform_new_epoch(current_epoch).await.unwrap();
892    }
893
894    #[tokio::test]
895    async fn test_upkeep_calls_run_on_upkeep_service() {
896        let mut upkeep_service = MockUpkeepService::new();
897        upkeep_service
898            .expect_run()
899            .with(eq(Epoch(5)))
900            .returning(|_| Ok(()))
901            .times(1);
902
903        let mut deps = initialize_dependencies!().await;
904        deps.upkeep_service = Arc::new(upkeep_service);
905
906        let runner = AggregatorRunner::new(Arc::new(deps));
907
908        runner.upkeep(Epoch(5)).await.unwrap();
909    }
910
911    #[tokio::test]
912    async fn test_update_epoch_settings() {
913        let mut mock_certifier_service = MockCertifierService::new();
914        mock_certifier_service
915            .expect_inform_epoch()
916            .returning(|_| Ok(()))
917            .times(1);
918
919        let config = ServeCommandConfiguration::new_sample(temp_dir!());
920        let mut deps = DependenciesBuilder::new_with_stdout_logger(Arc::new(config.clone()))
921            .build_serve_dependencies_container()
922            .await
923            .unwrap();
924        deps.certifier_service = Arc::new(mock_certifier_service);
925        let epoch_settings_storer = deps.epoch_settings_storer.clone();
926        let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
927        let insert_epoch = current_epoch.offset_to_epoch_settings_recording_epoch();
928
929        let runner = build_runner_with_fixture_data(deps).await;
930        runner.inform_new_epoch(current_epoch).await.unwrap();
931        runner
932            .update_epoch_settings()
933            .await
934            .expect("update_epoch_settings should not fail");
935
936        let saved_epoch_settings = epoch_settings_storer
937            .get_epoch_settings(insert_epoch)
938            .await
939            .unwrap()
940            .unwrap_or_else(|| panic!("should have epoch settings for epoch {insert_epoch}",));
941
942        assert_eq!(
943            AggregatorEpochSettings {
944                protocol_parameters: config.protocol_parameters.clone(),
945                cardano_transactions_signing_config: config
946                    .cardano_transactions_signing_config
947                    .clone(),
948            },
949            saved_epoch_settings
950        );
951    }
952
953    #[tokio::test]
954    async fn test_precompute_epoch_data() {
955        let mut deps = initialize_dependencies!().await;
956        let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
957
958        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
959            current_epoch,
960            &MithrilFixtureBuilder::default().build(),
961        )));
962
963        let runner = AggregatorRunner::new(Arc::new(deps));
964
965        runner.precompute_epoch_data().await.unwrap();
966    }
967
968    #[tokio::test]
969    async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists()
970     {
971        let open_message_created = create_open_message(
972            IsCertified::No,
973            IsExpired::No,
974            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
975        );
976        let open_message_expected = open_message_created.clone();
977
978        let runner = {
979            let mut mock_certifier_service = MockCertifierService::new();
980            init_certifier_service_mock(&mut mock_certifier_service, vec![]);
981
982            mock_certifier_service
983                .expect_create_open_message()
984                .return_once(|_, _| Ok(open_message_created))
985                .times(1);
986            build_runner(temp_dir!(), mock_certifier_service).await
987        };
988
989        let open_message_returned = runner
990            .get_current_non_certified_open_message(&TimePoint::dummy())
991            .await
992            .unwrap();
993        assert_eq!(Some(open_message_expected), open_message_returned);
994    }
995
996    #[tokio::test]
997    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired()
998     {
999        let not_certified_and_not_expired = create_open_message(
1000            IsCertified::No,
1001            IsExpired::No,
1002            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1003        );
1004
1005        let open_message_expected = not_certified_and_not_expired.clone();
1006
1007        let runner = {
1008            let mut mock_certifier_service = MockCertifierService::new();
1009            init_certifier_service_mock(
1010                &mut mock_certifier_service,
1011                vec![not_certified_and_not_expired],
1012            );
1013
1014            mock_certifier_service.expect_create_open_message().never();
1015            build_runner(temp_dir!(), mock_certifier_service).await
1016        };
1017
1018        let open_message_returned = runner
1019            .get_current_non_certified_open_message(&TimePoint::dummy())
1020            .await
1021            .unwrap();
1022
1023        assert_eq!(Some(open_message_expected), open_message_returned);
1024    }
1025
1026    #[tokio::test]
1027    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified()
1028     {
1029        let certified_and_not_expired = create_open_message(
1030            IsCertified::Yes,
1031            IsExpired::No,
1032            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1033        );
1034        let not_certified_and_not_expired = create_open_message(
1035            IsCertified::No,
1036            IsExpired::No,
1037            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1038        );
1039
1040        let open_message_expected = not_certified_and_not_expired.clone();
1041
1042        let runner = {
1043            let mut mock_certifier_service = MockCertifierService::new();
1044            init_certifier_service_mock(
1045                &mut mock_certifier_service,
1046                vec![certified_and_not_expired, not_certified_and_not_expired],
1047            );
1048
1049            mock_certifier_service.expect_create_open_message().never();
1050            build_runner_with_discriminants(
1051                temp_dir!(),
1052                mock_certifier_service,
1053                vec![
1054                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1055                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1056                ],
1057            )
1058            .await
1059        };
1060
1061        let open_message_returned = runner
1062            .get_current_non_certified_open_message(&TimePoint::dummy())
1063            .await
1064            .unwrap();
1065
1066        assert_eq!(Some(open_message_expected), open_message_returned);
1067    }
1068
1069    #[tokio::test]
1070    async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified()
1071     {
1072        let certified_and_not_expired = create_open_message(
1073            IsCertified::Yes,
1074            IsExpired::No,
1075            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1076        );
1077        let open_message_created = create_open_message(
1078            IsCertified::No,
1079            IsExpired::No,
1080            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1081        );
1082        let open_message_expected = open_message_created.clone();
1083
1084        let runner = {
1085            let mut mock_certifier_service = MockCertifierService::new();
1086            init_certifier_service_mock(
1087                &mut mock_certifier_service,
1088                vec![certified_and_not_expired],
1089            );
1090
1091            mock_certifier_service
1092                .expect_create_open_message()
1093                .return_once(|_, _| Ok(open_message_created))
1094                .times(1);
1095            build_runner_with_discriminants(
1096                temp_dir!(),
1097                mock_certifier_service,
1098                vec![
1099                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1100                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1101                ],
1102            )
1103            .await
1104        };
1105
1106        let open_message_returned = runner
1107            .get_current_non_certified_open_message(&TimePoint::dummy())
1108            .await
1109            .unwrap();
1110
1111        assert_eq!(Some(open_message_expected), open_message_returned);
1112    }
1113
1114    #[tokio::test]
1115    async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified()
1116     {
1117        let certified_and_not_expired_1 = create_open_message(
1118            IsCertified::Yes,
1119            IsExpired::No,
1120            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1121        );
1122        let certified_and_not_expired_2 = create_open_message(
1123            IsCertified::Yes,
1124            IsExpired::No,
1125            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1126        );
1127
1128        let runner = {
1129            let mut mock_certifier_service = MockCertifierService::new();
1130            init_certifier_service_mock(
1131                &mut mock_certifier_service,
1132                vec![certified_and_not_expired_1, certified_and_not_expired_2],
1133            );
1134
1135            mock_certifier_service.expect_create_open_message().never();
1136            build_runner_with_discriminants(
1137                temp_dir!(),
1138                mock_certifier_service,
1139                vec![
1140                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1141                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1142                ],
1143            )
1144            .await
1145        };
1146
1147        let open_message_returned = runner
1148            .get_current_non_certified_open_message(&TimePoint::dummy())
1149            .await
1150            .unwrap();
1151
1152        assert!(open_message_returned.is_none());
1153    }
1154
1155    #[tokio::test]
1156    async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message()
1157     {
1158        let not_certified_and_expired = create_open_message(
1159            IsCertified::No,
1160            IsExpired::Yes,
1161            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1162        );
1163        let not_certified_and_not_expired = create_open_message(
1164            IsCertified::No,
1165            IsExpired::No,
1166            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1167        );
1168
1169        let open_message_expected = not_certified_and_not_expired.clone();
1170
1171        let runner = {
1172            let mut mock_certifier_service = MockCertifierService::new();
1173            init_certifier_service_mock(
1174                &mut mock_certifier_service,
1175                vec![not_certified_and_expired, not_certified_and_not_expired],
1176            );
1177
1178            mock_certifier_service.expect_create_open_message().never();
1179            build_runner_with_discriminants(
1180                temp_dir!(),
1181                mock_certifier_service,
1182                vec![
1183                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1184                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1185                ],
1186            )
1187            .await
1188        };
1189
1190        let open_message_returned = runner
1191            .get_current_non_certified_open_message(&TimePoint::dummy())
1192            .await
1193            .unwrap();
1194
1195        assert_eq!(Some(open_message_expected), open_message_returned);
1196    }
1197
1198    #[tokio::test]
1199    async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution() {
1200        let mut mock_certifier_service = MockCertifierService::new();
1201
1202        mock_certifier_service
1203            .expect_get_open_message()
1204            .with(eq(SignedEntityType::MithrilStakeDistribution(
1205                TimePoint::dummy().epoch,
1206            )))
1207            .times(1)
1208            .return_once(|_| {
1209                Ok(Some(create_open_message(
1210                    IsCertified::Yes,
1211                    IsExpired::No,
1212                    SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1213                )))
1214            });
1215
1216        mock_certifier_service.expect_create_open_message().never();
1217
1218        mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
1219        mock_certifier_service
1220            .expect_mark_open_message_if_expired()
1221            .returning(|_| Ok(None));
1222
1223        let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1224
1225        runner
1226            .get_current_non_certified_open_message(&TimePoint::dummy())
1227            .await
1228            .unwrap();
1229    }
1230
1231    #[tokio::test]
1232    async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1233        let runner = {
1234            let mut dependencies = initialize_dependencies!().await;
1235            let epoch_service = FakeEpochServiceBuilder {
1236                signed_entity_config: SignedEntityConfig {
1237                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1238                    ..SignedEntityConfig::dummy()
1239                },
1240                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1241            }
1242            .build();
1243            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1244            dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1245            AggregatorRunner::new(Arc::new(dependencies))
1246        };
1247
1248        let time_point = TimePoint::dummy();
1249        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1250            .list_available_signed_entity_types(&time_point)
1251            .await
1252            .unwrap()
1253            .into_iter()
1254            .map(Into::into)
1255            .collect();
1256
1257        assert_eq!(
1258            signed_entities,
1259            SignedEntityTypeDiscriminants::all().into_iter().collect::<Vec<_>>()
1260        );
1261    }
1262
1263    #[tokio::test]
1264    async fn list_available_signed_entity_types_exclude_locked_entities() {
1265        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1266        let runner = {
1267            let mut dependencies = initialize_dependencies!().await;
1268            dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1269            let epoch_service = FakeEpochServiceBuilder {
1270                signed_entity_config: SignedEntityConfig {
1271                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1272                    ..SignedEntityConfig::dummy()
1273                },
1274                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1275            }
1276            .build();
1277            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1278
1279            AggregatorRunner::new(Arc::new(dependencies))
1280        };
1281
1282        signed_entity_type_lock
1283            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1284            .await;
1285
1286        let time_point = TimePoint::dummy();
1287        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1288            .list_available_signed_entity_types(&time_point)
1289            .await
1290            .unwrap()
1291            .into_iter()
1292            .map(Into::into)
1293            .collect();
1294
1295        assert!(!signed_entities.is_empty());
1296        assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1297    }
1298
1299    #[tokio::test]
1300    async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message()
1301     {
1302        assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1303    }
1304
1305    #[tokio::test]
1306    async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message()
1307     {
1308        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1309    }
1310
1311    #[tokio::test]
1312    async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message()
1313     {
1314        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1315    }
1316
1317    #[tokio::test]
1318    async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message()
1319     {
1320        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1321    }
1322
1323    async fn is_outdated_returned_when(
1324        tmp_path: PathBuf,
1325        is_expired: IsExpired,
1326        newer_open_message: bool,
1327    ) -> bool {
1328        let current_time_point = TimePoint {
1329            epoch: Epoch(2),
1330            ..TimePoint::dummy()
1331        };
1332
1333        let message_epoch = if newer_open_message {
1334            current_time_point.epoch + 54
1335        } else {
1336            current_time_point.epoch
1337        };
1338        let open_message_to_verify = OpenMessage {
1339            signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1340            is_expired: is_expired == IsExpired::Yes,
1341            ..OpenMessage::dummy()
1342        };
1343
1344        let runner = {
1345            let mut deps = initialize_dependencies(tmp_path).await;
1346            let mut mock_certifier_service = MockCertifierService::new();
1347
1348            let open_message_current = open_message_to_verify.clone();
1349            mock_certifier_service
1350                .expect_get_open_message()
1351                .times(1)
1352                .return_once(|_| Ok(Some(open_message_current)));
1353            mock_certifier_service
1354                .expect_mark_open_message_if_expired()
1355                .returning(|_| Ok(None));
1356
1357            deps.certifier_service = Arc::new(mock_certifier_service);
1358
1359            let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1360            deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1361
1362            build_runner_with_fixture_data(deps).await
1363        };
1364
1365        runner
1366            .is_open_message_outdated(
1367                open_message_to_verify.signed_entity_type,
1368                &current_time_point,
1369            )
1370            .await
1371            .unwrap()
1372    }
1373}