mithril_aggregator/runtime/
runner.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{Logger, debug, warn};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::StdResult;
8use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
9use mithril_common::logging::LoggerExtensions;
10use mithril_persistence::store::StakeStorer;
11
12use crate::ServeCommandDependenciesContainer;
13use crate::entities::OpenMessage;
14
15/// Configuration structure dedicated to the AggregatorRuntime.
16#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18    /// Interval between each snapshot, in ms
19    pub interval: Duration,
20
21    /// Whether the aggregator is a follower
22    pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26    /// Create a new instance of AggregatorConfig.
27    pub fn new(interval: Duration, is_follower: bool) -> Self {
28        Self {
29            interval,
30            is_follower,
31        }
32    }
33}
34
35/// This trait is intended to allow mocking the AggregatorRunner in tests.
36/// It exposes all the methods needed by the state machine.
37#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39    /// Return the current [TimePoint] from the chain
40    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42    /// Retrieves the current open message for a given signed entity type.
43    async fn get_current_open_message_for_signed_entity_type(
44        &self,
45        signed_entity_type: &SignedEntityType,
46    ) -> StdResult<Option<OpenMessage>>;
47
48    /// Retrieves the current non-certified open message.
49    async fn get_current_non_certified_open_message(
50        &self,
51        current_time_point: &TimePoint,
52    ) -> StdResult<Option<OpenMessage>>;
53
54    /// Check if a certificate chain is valid.
55    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57    /// Read the stake distribution from the blockchain and store it.
58    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60    /// Open the signer registration round of an epoch.
61    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63    /// Close the signer registration round of an epoch.
64    async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66    /// Check if the follower aggregator is running the same epoch as the leader.
67    async fn is_follower_aggregator_at_same_epoch_as_leader(
68        &self,
69        time_point: &TimePoint,
70    ) -> StdResult<bool>;
71
72    /// Synchronize the follower aggregator signer registration.
73    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75    /// Synchronize the follower aggregator certificate chain
76    async fn synchronize_follower_aggregator_certificate_chain(
77        &self,
78        force_sync: bool,
79    ) -> StdResult<()>;
80
81    /// Compute the protocol message
82    async fn compute_protocol_message(
83        &self,
84        signed_entity_type: &SignedEntityType,
85    ) -> StdResult<ProtocolMessage>;
86
87    /// Mark expired open message.
88    async fn mark_open_message_if_expired(
89        &self,
90        signed_entity_type: &SignedEntityType,
91    ) -> StdResult<Option<OpenMessage>>;
92
93    /// Tell the certifier to try to create a new certificate.
94    async fn create_certificate(
95        &self,
96        signed_entity_type: &SignedEntityType,
97    ) -> StdResult<Option<Certificate>>;
98
99    /// Create an artifact and persist it.
100    async fn create_artifact(
101        &self,
102        signed_entity_type: &SignedEntityType,
103        certificate: &Certificate,
104    ) -> StdResult<()>;
105
106    /// Update the EraChecker with EraReader information.
107    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
108
109    /// Ask services to update themselves for the new epoch
110    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
111
112    /// Perform the upkeep tasks.
113    async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
114
115    /// Precompute what doesn't change for the actual epoch
116    async fn precompute_epoch_data(&self) -> StdResult<()>;
117
118    /// Create new open message
119    async fn create_open_message(
120        &self,
121        signed_entity_type: &SignedEntityType,
122        protocol_message: &ProtocolMessage,
123    ) -> StdResult<OpenMessage>;
124
125    /// Checks if the open message is considered outdated.
126    async fn is_open_message_outdated(
127        &self,
128        open_message_signed_entity_type: SignedEntityType,
129        last_time_point: &TimePoint,
130    ) -> StdResult<bool>;
131
132    /// Increment the runtime cycle success metric.
133    fn increment_runtime_cycle_success_since_startup_counter(&self);
134
135    /// Increment the runtime cycle total metric.
136    fn increment_runtime_cycle_total_since_startup_counter(&self);
137}
138
139/// The runner responsibility is to expose a code API for the state machine. It
140/// holds services and configuration.
141pub struct AggregatorRunner {
142    dependencies: Arc<ServeCommandDependenciesContainer>,
143    logger: Logger,
144}
145
146impl AggregatorRunner {
147    /// Create a new instance of the Aggregator Runner.
148    pub fn new(dependencies: Arc<ServeCommandDependenciesContainer>) -> Self {
149        let logger = dependencies.root_logger.new_with_component_name::<Self>();
150        Self {
151            dependencies,
152            logger,
153        }
154    }
155
156    async fn list_available_signed_entity_types(
157        &self,
158        time_point: &TimePoint,
159    ) -> StdResult<Vec<SignedEntityType>> {
160        let signed_entity_types = self
161            .dependencies
162            .epoch_service
163            .read()
164            .await
165            .signed_entity_config()?
166            .list_allowed_signed_entity_types(time_point)?;
167        let unlocked_signed_entities = self
168            .dependencies
169            .signed_entity_type_lock
170            .filter_unlocked_entries(signed_entity_types)
171            .await;
172
173        Ok(unlocked_signed_entities)
174    }
175}
176
177#[cfg_attr(test, mockall::automock)]
178#[async_trait]
179impl AggregatorRunnerTrait for AggregatorRunner {
180    /// Return the current time point from the chain
181    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
182        debug!(self.logger, ">> get_time_point_from_chain");
183        let time_point = self.dependencies.ticker_service.get_current_time_point().await?;
184
185        Ok(time_point)
186    }
187
188    async fn get_current_open_message_for_signed_entity_type(
189        &self,
190        signed_entity_type: &SignedEntityType,
191    ) -> StdResult<Option<OpenMessage>> {
192        debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
193        self.mark_open_message_if_expired(signed_entity_type).await?;
194
195        Ok(self
196            .dependencies
197            .certifier_service
198            .get_open_message(signed_entity_type)
199            .await
200            .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
201    }
202
203    async fn get_current_non_certified_open_message(
204        &self,
205        current_time_point: &TimePoint,
206    ) -> StdResult<Option<OpenMessage>> {
207        debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
208        let signed_entity_types =
209            self.list_available_signed_entity_types(current_time_point).await?;
210
211        for signed_entity_type in signed_entity_types {
212            let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
213                .await
214                .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
215            match current_open_message {
216                None => {
217                    let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
218                    let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
219                        .await
220                        .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
221
222                    return Ok(Some(open_message_new));
223                }
224                Some(open_message) => {
225                    if !open_message.is_certified && !open_message.is_expired {
226                        return Ok(Some(open_message));
227                    }
228                }
229            }
230        }
231
232        Ok(None)
233    }
234
235    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
236        debug!(self.logger, ">> is_certificate_chain_valid");
237        self.dependencies
238            .certifier_service
239            .verify_certificate_chain(time_point.epoch)
240            .await?;
241
242        Ok(())
243    }
244
245    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
246        debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
247        self.dependencies
248            .stake_distribution_service
249            .update_stake_distribution()
250            .await
251            .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
252    }
253
254    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
255        debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
256        let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
257
258        let stakes = self
259            .dependencies
260            .stake_store
261            .get_stakes(registration_epoch)
262            .await?
263            .unwrap_or_default();
264
265        self.dependencies
266            .signer_registration_round_opener
267            .open_registration_round(registration_epoch, stakes)
268            .await
269    }
270
271    async fn close_signer_registration_round(&self) -> StdResult<()> {
272        debug!(self.logger, ">> close_signer_registration_round");
273        self.dependencies
274            .signer_registration_round_opener
275            .close_registration_round()
276            .await
277    }
278
279    async fn is_follower_aggregator_at_same_epoch_as_leader(
280        &self,
281        time_point: &TimePoint,
282    ) -> StdResult<bool> {
283        self.dependencies
284            .signer_synchronizer
285            .can_synchronize_signers(time_point.epoch)
286            .await
287            .map_err(|e| e.into())
288    }
289
290    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
291        self.dependencies
292            .signer_synchronizer
293            .synchronize_all_signers()
294            .await
295            .map_err(|e| e.into())
296    }
297
298    async fn compute_protocol_message(
299        &self,
300        signed_entity_type: &SignedEntityType,
301    ) -> StdResult<ProtocolMessage> {
302        debug!(self.logger, ">> compute_protocol_message");
303        let protocol_message = self
304            .dependencies
305            .signable_builder_service
306            .compute_protocol_message(signed_entity_type.to_owned())
307            .await
308            .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
309
310        Ok(protocol_message)
311    }
312
313    async fn mark_open_message_if_expired(
314        &self,
315        signed_entity_type: &SignedEntityType,
316    ) -> StdResult<Option<OpenMessage>> {
317        debug!(self.logger, ">> mark_open_message_if_expired");
318        let expired_open_message = self
319            .dependencies
320            .certifier_service
321            .mark_open_message_if_expired(signed_entity_type)
322            .await
323            .with_context(|| "CertifierService can not mark expired open message")?;
324
325        debug!(
326            self.logger, "Marked expired open messages";
327            "expired_open_message" => ?expired_open_message
328        );
329
330        Ok(expired_open_message)
331    }
332
333    async fn create_certificate(
334        &self,
335        signed_entity_type: &SignedEntityType,
336    ) -> StdResult<Option<Certificate>> {
337        debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
338
339        let certificate = self.dependencies
340            .certifier_service
341            .create_certificate(signed_entity_type)
342            .await
343            .with_context(|| {
344                format!(
345                    "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
346                )
347            })?;
348
349        if certificate.is_some() {
350            self.dependencies
351                .metrics_service
352                .get_certificate_total_produced_since_startup()
353                .increment();
354        }
355
356        Ok(certificate)
357    }
358
359    async fn create_artifact(
360        &self,
361        signed_entity_type: &SignedEntityType,
362        certificate: &Certificate,
363    ) -> StdResult<()> {
364        debug!(
365            self.logger, ">> create_artifact";
366            "signed_entity_type" => ?signed_entity_type,
367            "certificate_hash" => &certificate.hash
368        );
369
370        self.dependencies
371            .signed_entity_service
372            .create_artifact(signed_entity_type.to_owned(), certificate)
373            .await
374            .with_context(|| {
375                format!(
376                    "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
377                    certificate.hash
378                )
379            })?;
380
381        Ok(())
382    }
383
384    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
385        debug!(self.logger, ">> update_era_checker({epoch:?})");
386        let token = self
387            .dependencies
388            .era_reader
389            .read_era_epoch_token(epoch)
390            .await
391            .with_context(|| {
392                format!("EraReader can not get era epoch token for current epoch: '{epoch}'")
393            })?;
394
395        let current_era = token
396            .get_current_supported_era()
397            .with_context(|| "EraEpochToken can not get current supported era")?;
398        self.dependencies
399            .era_checker
400            .change_era(current_era, token.get_current_epoch());
401        debug!(
402            self.logger,
403            "Current Era is {current_era} (Epoch {}).",
404            token.get_current_epoch()
405        );
406
407        if token.get_next_supported_era().is_err() {
408            let era_name = &token.get_next_era_marker().unwrap().name;
409            warn!(
410                self.logger,
411                "Upcoming Era '{era_name}' is not supported by this version of the software. Please update!"
412            );
413        }
414
415        Ok(())
416    }
417
418    async fn precompute_epoch_data(&self) -> StdResult<()> {
419        debug!(self.logger, ">> precompute_epoch_data");
420        self.dependencies
421            .epoch_service
422            .write()
423            .await
424            .precompute_epoch_data()
425            .await?;
426
427        Ok(())
428    }
429
430    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
431        debug!(self.logger, ">> inform_new_epoch({epoch:?})");
432        self.dependencies.certifier_service.inform_epoch(epoch).await?;
433
434        self.dependencies
435            .epoch_service
436            .write()
437            .await
438            .inform_epoch(epoch)
439            .await?;
440
441        Ok(())
442    }
443
444    async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
445        debug!(self.logger, ">> upkeep");
446        self.dependencies.upkeep_service.run(epoch).await
447    }
448
449    async fn create_open_message(
450        &self,
451        signed_entity_type: &SignedEntityType,
452        protocol_message: &ProtocolMessage,
453    ) -> StdResult<OpenMessage> {
454        debug!(self.logger, ">> create_open_message");
455        self.dependencies
456            .certifier_service
457            .create_open_message(signed_entity_type, protocol_message)
458            .await
459    }
460
461    async fn is_open_message_outdated(
462        &self,
463        open_message_signed_entity_type: SignedEntityType,
464        last_time_point: &TimePoint,
465    ) -> StdResult<bool> {
466        let current_open_message = self
467            .get_current_open_message_for_signed_entity_type(
468                &open_message_signed_entity_type,
469            )
470            .await
471            .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
472        let is_expired_open_message =
473            current_open_message.as_ref().map(|om| om.is_expired).unwrap_or(false);
474
475        let exists_newer_open_message = {
476            let new_signed_entity_type = self
477                .dependencies
478                .epoch_service
479                .read()
480                .await
481                .signed_entity_config()?
482                .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
483            new_signed_entity_type != open_message_signed_entity_type
484        };
485
486        Ok(exists_newer_open_message || is_expired_open_message)
487    }
488
489    fn increment_runtime_cycle_success_since_startup_counter(&self) {
490        self.dependencies
491            .metrics_service
492            .get_runtime_cycle_success_since_startup()
493            .increment();
494    }
495
496    fn increment_runtime_cycle_total_since_startup_counter(&self) {
497        self.dependencies
498            .metrics_service
499            .get_runtime_cycle_total_since_startup()
500            .increment();
501    }
502
503    async fn synchronize_follower_aggregator_certificate_chain(
504        &self,
505        force_sync: bool,
506    ) -> StdResult<()> {
507        debug!(
508            self.logger,
509            ">> synchronize_follower_aggregator_certificate_chain(force_sync:{force_sync})"
510        );
511        self.dependencies
512            .certificate_chain_synchronizer
513            .synchronize_certificate_chain(force_sync)
514            .await
515    }
516}
517
518#[cfg(test)]
519pub mod tests {
520    use async_trait::async_trait;
521    use chrono::{DateTime, Utc};
522    use mockall::mock;
523    use mockall::predicate::eq;
524    use std::path::PathBuf;
525    use std::sync::Arc;
526    use tokio::sync::RwLock;
527
528    use mithril_cardano_node_chain::test::double::FakeChainObserver;
529    use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
530    use mithril_common::{
531        StdResult,
532        entities::{
533            ChainPoint, Epoch, ProtocolMessage, SignedEntityConfig, SignedEntityType,
534            SignedEntityTypeDiscriminants, StakeDistribution, TimePoint,
535        },
536        signable_builder::SignableBuilderService,
537        temp_dir,
538        test::{
539            builder::MithrilFixtureBuilder,
540            double::{Dummy, fake_data},
541        },
542    };
543    use mithril_persistence::store::StakeStorer;
544    use mithril_signed_entity_lock::SignedEntityTypeLock;
545    use mithril_ticker::MithrilTickerService;
546
547    use crate::{
548        MithrilSignerRegistrationLeader, ServeCommandConfiguration,
549        ServeCommandDependenciesContainer, SignerRegistrationRound,
550        dependency_injection::DependenciesBuilder,
551        entities::OpenMessage,
552        initialize_dependencies,
553        runtime::{AggregatorRunner, AggregatorRunnerTrait},
554        services::{
555            FakeEpochService, FakeEpochServiceBuilder, MockCertifierService, MockUpkeepService,
556        },
557    };
558
559    mock! {
560        SignableBuilderServiceImpl { }
561
562        #[async_trait]
563        impl SignableBuilderService for SignableBuilderServiceImpl
564        {
565
566            async fn compute_protocol_message(
567                &self,
568                signed_entity_type: SignedEntityType,
569            ) -> StdResult<ProtocolMessage>;
570        }
571    }
572
573    async fn build_runner_with_fixture_data(
574        deps: ServeCommandDependenciesContainer,
575    ) -> AggregatorRunner {
576        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
577        let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
578        deps.init_state_from_fixture(&fixture, current_epoch).await;
579
580        AggregatorRunner::new(Arc::new(deps))
581    }
582
583    async fn build_runner(
584        temp_dir: PathBuf,
585        mock_certifier_service: MockCertifierService,
586    ) -> AggregatorRunner {
587        build_runner_with_discriminants(temp_dir, mock_certifier_service, vec![]).await
588    }
589
590    async fn build_runner_with_discriminants(
591        temp_dir: PathBuf,
592        mock_certifier_service: MockCertifierService,
593        allowed_discriminants: Vec<SignedEntityTypeDiscriminants>,
594    ) -> AggregatorRunner {
595        let mut deps = initialize_dependencies(temp_dir).await;
596        deps.certifier_service = Arc::new(mock_certifier_service);
597
598        let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
599        mock_signable_builder_service
600            .expect_compute_protocol_message()
601            .return_once(|_| Ok(ProtocolMessage::default()));
602        deps.signable_builder_service = Arc::new(mock_signable_builder_service);
603
604        // Configure EpochService with allowed_discriminants
605        if !allowed_discriminants.is_empty() {
606            let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
607            let epoch_service = FakeEpochServiceBuilder {
608                signed_entity_config: SignedEntityConfig {
609                    allowed_discriminants: allowed_discriminants.into_iter().collect(),
610                    ..SignedEntityConfig::dummy()
611                },
612                ..FakeEpochServiceBuilder::dummy(current_epoch)
613            }
614            .build();
615            deps.epoch_service = Arc::new(RwLock::new(epoch_service));
616        }
617
618        let runner = build_runner_with_fixture_data(deps).await;
619
620        let current_epoch = runner.dependencies.ticker_service.get_current_epoch().await.unwrap();
621        runner.inform_new_epoch(current_epoch).await.unwrap();
622        runner.precompute_epoch_data().await.unwrap();
623        runner
624    }
625
626    fn init_certifier_service_mock(
627        mock_certifier_service: &mut MockCertifierService,
628        messages: Vec<OpenMessage>,
629    ) {
630        for message in messages {
631            mock_certifier_service
632                .expect_get_open_message()
633                .return_once(|_| Ok(Some(message)))
634                .times(1);
635        }
636        // When all messages are retrieved, the function return None
637        mock_certifier_service
638            .expect_get_open_message()
639            .returning(|_| Ok(None));
640
641        mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
642        mock_certifier_service
643            .expect_mark_open_message_if_expired()
644            .returning(|_| Ok(None));
645    }
646
647    fn create_open_message(
648        is_certified: IsCertified,
649        is_expired: IsExpired,
650        signed_entity_type: SignedEntityType,
651    ) -> OpenMessage {
652        OpenMessage {
653            signed_entity_type,
654            is_certified: is_certified == IsCertified::Yes,
655            is_expired: is_expired == IsExpired::Yes,
656            ..OpenMessage::dummy()
657        }
658    }
659
660    #[derive(Eq, PartialEq)]
661    enum IsCertified {
662        Yes,
663        No,
664    }
665
666    #[derive(Eq, PartialEq)]
667    enum IsExpired {
668        Yes,
669        No,
670    }
671
672    #[tokio::test]
673    async fn test_get_time_point_from_chain() {
674        let expected = TimePoint::new(2, 17, ChainPoint::dummy());
675        let mut dependencies = initialize_dependencies!().await;
676        let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
677        immutable_file_observer
678            .shall_return(Some(expected.immutable_file_number))
679            .await;
680        let ticker_service = Arc::new(MithrilTickerService::new(
681            Arc::new(FakeChainObserver::new(Some(expected.clone()))),
682            immutable_file_observer,
683        ));
684        dependencies.ticker_service = ticker_service;
685        let runner = AggregatorRunner::new(Arc::new(dependencies));
686
687        // Retrieves the expected time point
688        let res = runner.get_time_point_from_chain().await;
689        assert_eq!(expected, res.unwrap());
690    }
691
692    #[tokio::test]
693    async fn test_update_stake_distribution() {
694        let chain_observer = Arc::new(FakeChainObserver::default());
695        let deps = {
696            let config = ServeCommandConfiguration::new_sample(temp_dir!());
697            let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
698            builder.chain_observer = Some(chain_observer.clone());
699
700            Arc::new(builder.build_serve_dependencies_container().await.unwrap())
701        };
702        let runner = AggregatorRunner::new(deps.clone());
703        let time_point = runner.get_time_point_from_chain().await.unwrap();
704        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
705        let expected = fixture.stake_distribution();
706
707        chain_observer.set_signers(fixture.signers_with_stake()).await;
708        runner
709            .update_stake_distribution(&time_point)
710            .await
711            .expect("updating stake distribution should not return an error");
712
713        let saved_stake_distribution = deps
714            .stake_store
715            .get_stakes(time_point.epoch.offset_to_recording_epoch())
716            .await
717            .unwrap()
718            .unwrap_or_else(|| {
719                panic!(
720                    "I should have a stake distribution for the epoch {:?}",
721                    time_point.epoch
722                )
723            });
724
725        assert_eq!(expected, saved_stake_distribution);
726    }
727
728    #[tokio::test]
729    async fn test_open_signer_registration_round() {
730        let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
731        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
732
733        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
734            builder.get_verification_key_store().await.unwrap(),
735            builder.get_signer_store().await.unwrap(),
736            builder.get_signer_registration_verifier().await.unwrap(),
737        ));
738        let mut deps = builder.build_serve_dependencies_container().await.unwrap();
739        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
740        let stake_store = deps.stake_store.clone();
741        let deps = Arc::new(deps);
742        let runner = AggregatorRunner::new(deps.clone());
743
744        let time_point = TimePoint::dummy();
745        let recording_epoch = time_point.epoch.offset_to_recording_epoch();
746        let stake_distribution: StakeDistribution =
747            StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
748
749        stake_store
750            .save_stakes(recording_epoch, stake_distribution.clone())
751            .await
752            .expect("Save Stake distribution should not fail");
753
754        runner
755            .open_signer_registration_round(&time_point)
756            .await
757            .expect("opening signer registration should not return an error");
758
759        let saved_current_round = signer_registration_round_opener.get_current_round().await;
760
761        let expected_signer_registration_round =
762            SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
763
764        assert_eq!(
765            Some(expected_signer_registration_round),
766            saved_current_round,
767        );
768    }
769
770    #[tokio::test]
771    async fn test_close_signer_registration_round() {
772        let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
773        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
774
775        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
776            builder.get_verification_key_store().await.unwrap(),
777            builder.get_signer_store().await.unwrap(),
778            builder.get_signer_registration_verifier().await.unwrap(),
779        ));
780        let mut deps = builder.build_serve_dependencies_container().await.unwrap();
781        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
782        let deps = Arc::new(deps);
783        let runner = AggregatorRunner::new(deps.clone());
784
785        let time_point = TimePoint::dummy();
786        runner
787            .open_signer_registration_round(&time_point)
788            .await
789            .expect("opening signer registration should not return an error");
790
791        runner
792            .close_signer_registration_round()
793            .await
794            .expect("closing signer registration should not return an error");
795
796        let saved_current_round = signer_registration_round_opener.get_current_round().await;
797        assert!(saved_current_round.is_none());
798    }
799
800    #[tokio::test]
801    async fn test_expire_open_message() {
802        let open_message_expected = OpenMessage {
803            signed_entity_type: SignedEntityType::dummy(),
804            is_certified: false,
805            is_expired: false,
806            expires_at: Some(
807                DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
808                    .unwrap()
809                    .with_timezone(&Utc),
810            ),
811            ..OpenMessage::dummy()
812        };
813        let open_message_clone = open_message_expected.clone();
814
815        let mut mock_certifier_service = MockCertifierService::new();
816        mock_certifier_service
817            .expect_mark_open_message_if_expired()
818            .return_once(|_| Ok(Some(open_message_clone)));
819
820        let mut deps = initialize_dependencies!().await;
821        deps.certifier_service = Arc::new(mock_certifier_service);
822
823        let runner = build_runner_with_fixture_data(deps).await;
824        let open_message_expired = runner
825            .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
826            .await
827            .expect("mark_open_message_if_expired should not fail");
828
829        assert_eq!(Some(open_message_expected), open_message_expired);
830    }
831
832    #[tokio::test]
833    async fn test_update_era_checker() {
834        let deps = initialize_dependencies!().await;
835        let ticker_service = deps.ticker_service.clone();
836        let era_checker = deps.era_checker.clone();
837        let mut time_point = ticker_service.get_current_time_point().await.unwrap();
838
839        assert_eq!(time_point.epoch, era_checker.current_epoch());
840        let runner = AggregatorRunner::new(Arc::new(deps));
841        time_point.epoch += 1;
842
843        runner.update_era_checker(time_point.epoch).await.unwrap();
844        assert_eq!(time_point.epoch, era_checker.current_epoch());
845    }
846
847    #[tokio::test]
848    async fn test_inform_new_epoch() {
849        let mut mock_certifier_service = MockCertifierService::new();
850        mock_certifier_service
851            .expect_inform_epoch()
852            .returning(|_| Ok(()))
853            .times(1);
854        let mut deps = initialize_dependencies!().await;
855        let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
856
857        deps.certifier_service = Arc::new(mock_certifier_service);
858        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
859            current_epoch,
860            &MithrilFixtureBuilder::default().build(),
861        )));
862
863        let runner = AggregatorRunner::new(Arc::new(deps));
864
865        runner.inform_new_epoch(current_epoch).await.unwrap();
866    }
867
868    #[tokio::test]
869    async fn test_upkeep_calls_run_on_upkeep_service() {
870        let mut upkeep_service = MockUpkeepService::new();
871        upkeep_service
872            .expect_run()
873            .with(eq(Epoch(5)))
874            .returning(|_| Ok(()))
875            .times(1);
876
877        let mut deps = initialize_dependencies!().await;
878        deps.upkeep_service = Arc::new(upkeep_service);
879
880        let runner = AggregatorRunner::new(Arc::new(deps));
881
882        runner.upkeep(Epoch(5)).await.unwrap();
883    }
884
885    #[tokio::test]
886    async fn test_precompute_epoch_data() {
887        let mut deps = initialize_dependencies!().await;
888        let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
889
890        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
891            current_epoch,
892            &MithrilFixtureBuilder::default().build(),
893        )));
894
895        let runner = AggregatorRunner::new(Arc::new(deps));
896
897        runner.precompute_epoch_data().await.unwrap();
898    }
899
900    #[tokio::test]
901    async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists()
902     {
903        let open_message_created = create_open_message(
904            IsCertified::No,
905            IsExpired::No,
906            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
907        );
908        let open_message_expected = open_message_created.clone();
909
910        let runner = {
911            let mut mock_certifier_service = MockCertifierService::new();
912            init_certifier_service_mock(&mut mock_certifier_service, vec![]);
913
914            mock_certifier_service
915                .expect_create_open_message()
916                .return_once(|_, _| Ok(open_message_created))
917                .times(1);
918            build_runner(temp_dir!(), mock_certifier_service).await
919        };
920
921        let open_message_returned = runner
922            .get_current_non_certified_open_message(&TimePoint::dummy())
923            .await
924            .unwrap();
925        assert_eq!(Some(open_message_expected), open_message_returned);
926    }
927
928    #[tokio::test]
929    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired()
930     {
931        let not_certified_and_not_expired = create_open_message(
932            IsCertified::No,
933            IsExpired::No,
934            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
935        );
936
937        let open_message_expected = not_certified_and_not_expired.clone();
938
939        let runner = {
940            let mut mock_certifier_service = MockCertifierService::new();
941            init_certifier_service_mock(
942                &mut mock_certifier_service,
943                vec![not_certified_and_not_expired],
944            );
945
946            mock_certifier_service.expect_create_open_message().never();
947            build_runner(temp_dir!(), mock_certifier_service).await
948        };
949
950        let open_message_returned = runner
951            .get_current_non_certified_open_message(&TimePoint::dummy())
952            .await
953            .unwrap();
954
955        assert_eq!(Some(open_message_expected), open_message_returned);
956    }
957
958    #[tokio::test]
959    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified()
960     {
961        let certified_and_not_expired = create_open_message(
962            IsCertified::Yes,
963            IsExpired::No,
964            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
965        );
966        let not_certified_and_not_expired = create_open_message(
967            IsCertified::No,
968            IsExpired::No,
969            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
970        );
971
972        let open_message_expected = not_certified_and_not_expired.clone();
973
974        let runner = {
975            let mut mock_certifier_service = MockCertifierService::new();
976            init_certifier_service_mock(
977                &mut mock_certifier_service,
978                vec![certified_and_not_expired, not_certified_and_not_expired],
979            );
980
981            mock_certifier_service.expect_create_open_message().never();
982            build_runner_with_discriminants(
983                temp_dir!(),
984                mock_certifier_service,
985                vec![
986                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
987                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
988                ],
989            )
990            .await
991        };
992
993        let open_message_returned = runner
994            .get_current_non_certified_open_message(&TimePoint::dummy())
995            .await
996            .unwrap();
997
998        assert_eq!(Some(open_message_expected), open_message_returned);
999    }
1000
1001    #[tokio::test]
1002    async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified()
1003     {
1004        let certified_and_not_expired = create_open_message(
1005            IsCertified::Yes,
1006            IsExpired::No,
1007            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1008        );
1009        let open_message_created = create_open_message(
1010            IsCertified::No,
1011            IsExpired::No,
1012            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1013        );
1014        let open_message_expected = open_message_created.clone();
1015
1016        let runner = {
1017            let mut mock_certifier_service = MockCertifierService::new();
1018            init_certifier_service_mock(
1019                &mut mock_certifier_service,
1020                vec![certified_and_not_expired],
1021            );
1022
1023            mock_certifier_service
1024                .expect_create_open_message()
1025                .return_once(|_, _| Ok(open_message_created))
1026                .times(1);
1027            build_runner_with_discriminants(
1028                temp_dir!(),
1029                mock_certifier_service,
1030                vec![
1031                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1032                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1033                ],
1034            )
1035            .await
1036        };
1037
1038        let open_message_returned = runner
1039            .get_current_non_certified_open_message(&TimePoint::dummy())
1040            .await
1041            .unwrap();
1042
1043        assert_eq!(Some(open_message_expected), open_message_returned);
1044    }
1045
1046    #[tokio::test]
1047    async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified()
1048     {
1049        let certified_and_not_expired_1 = create_open_message(
1050            IsCertified::Yes,
1051            IsExpired::No,
1052            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1053        );
1054        let certified_and_not_expired_2 = create_open_message(
1055            IsCertified::Yes,
1056            IsExpired::No,
1057            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1058        );
1059
1060        let runner = {
1061            let mut mock_certifier_service = MockCertifierService::new();
1062            init_certifier_service_mock(
1063                &mut mock_certifier_service,
1064                vec![certified_and_not_expired_1, certified_and_not_expired_2],
1065            );
1066
1067            mock_certifier_service.expect_create_open_message().never();
1068            build_runner_with_discriminants(
1069                temp_dir!(),
1070                mock_certifier_service,
1071                vec![
1072                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1073                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1074                ],
1075            )
1076            .await
1077        };
1078
1079        let open_message_returned = runner
1080            .get_current_non_certified_open_message(&TimePoint::dummy())
1081            .await
1082            .unwrap();
1083
1084        assert!(open_message_returned.is_none());
1085    }
1086
1087    #[tokio::test]
1088    async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message()
1089     {
1090        let not_certified_and_expired = create_open_message(
1091            IsCertified::No,
1092            IsExpired::Yes,
1093            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1094        );
1095        let not_certified_and_not_expired = create_open_message(
1096            IsCertified::No,
1097            IsExpired::No,
1098            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1099        );
1100
1101        let open_message_expected = not_certified_and_not_expired.clone();
1102
1103        let runner = {
1104            let mut mock_certifier_service = MockCertifierService::new();
1105            init_certifier_service_mock(
1106                &mut mock_certifier_service,
1107                vec![not_certified_and_expired, not_certified_and_not_expired],
1108            );
1109
1110            mock_certifier_service.expect_create_open_message().never();
1111            build_runner_with_discriminants(
1112                temp_dir!(),
1113                mock_certifier_service,
1114                vec![
1115                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1116                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1117                ],
1118            )
1119            .await
1120        };
1121
1122        let open_message_returned = runner
1123            .get_current_non_certified_open_message(&TimePoint::dummy())
1124            .await
1125            .unwrap();
1126
1127        assert_eq!(Some(open_message_expected), open_message_returned);
1128    }
1129
1130    #[tokio::test]
1131    async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution() {
1132        let mut mock_certifier_service = MockCertifierService::new();
1133
1134        mock_certifier_service
1135            .expect_get_open_message()
1136            .with(eq(SignedEntityType::MithrilStakeDistribution(
1137                TimePoint::dummy().epoch,
1138            )))
1139            .times(1)
1140            .return_once(|_| {
1141                Ok(Some(create_open_message(
1142                    IsCertified::Yes,
1143                    IsExpired::No,
1144                    SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1145                )))
1146            });
1147
1148        mock_certifier_service.expect_create_open_message().never();
1149
1150        mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
1151        mock_certifier_service
1152            .expect_mark_open_message_if_expired()
1153            .returning(|_| Ok(None));
1154
1155        let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1156
1157        runner
1158            .get_current_non_certified_open_message(&TimePoint::dummy())
1159            .await
1160            .unwrap();
1161    }
1162
1163    #[tokio::test]
1164    async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1165        let runner = {
1166            let mut dependencies = initialize_dependencies!().await;
1167            let epoch_service = FakeEpochServiceBuilder {
1168                signed_entity_config: SignedEntityConfig {
1169                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1170                    ..SignedEntityConfig::dummy()
1171                },
1172                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1173            }
1174            .build();
1175            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1176            dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1177            AggregatorRunner::new(Arc::new(dependencies))
1178        };
1179
1180        let time_point = TimePoint::dummy();
1181        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1182            .list_available_signed_entity_types(&time_point)
1183            .await
1184            .unwrap()
1185            .into_iter()
1186            .map(Into::into)
1187            .collect();
1188
1189        assert_eq!(
1190            signed_entities,
1191            SignedEntityTypeDiscriminants::all().into_iter().collect::<Vec<_>>()
1192        );
1193    }
1194
1195    #[tokio::test]
1196    async fn list_available_signed_entity_types_exclude_locked_entities() {
1197        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1198        let runner = {
1199            let mut dependencies = initialize_dependencies!().await;
1200            dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1201            let epoch_service = FakeEpochServiceBuilder {
1202                signed_entity_config: SignedEntityConfig {
1203                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1204                    ..SignedEntityConfig::dummy()
1205                },
1206                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1207            }
1208            .build();
1209            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1210
1211            AggregatorRunner::new(Arc::new(dependencies))
1212        };
1213
1214        signed_entity_type_lock
1215            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1216            .await;
1217
1218        let time_point = TimePoint::dummy();
1219        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1220            .list_available_signed_entity_types(&time_point)
1221            .await
1222            .unwrap()
1223            .into_iter()
1224            .map(Into::into)
1225            .collect();
1226
1227        assert!(!signed_entities.is_empty());
1228        assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1229    }
1230
1231    #[tokio::test]
1232    async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message()
1233     {
1234        assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1235    }
1236
1237    #[tokio::test]
1238    async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message()
1239     {
1240        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1241    }
1242
1243    #[tokio::test]
1244    async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message()
1245     {
1246        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1247    }
1248
1249    #[tokio::test]
1250    async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message()
1251     {
1252        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1253    }
1254
1255    async fn is_outdated_returned_when(
1256        tmp_path: PathBuf,
1257        is_expired: IsExpired,
1258        newer_open_message: bool,
1259    ) -> bool {
1260        let current_time_point = TimePoint {
1261            epoch: Epoch(2),
1262            ..TimePoint::dummy()
1263        };
1264
1265        let message_epoch = if newer_open_message {
1266            current_time_point.epoch + 54
1267        } else {
1268            current_time_point.epoch
1269        };
1270        let open_message_to_verify = OpenMessage {
1271            signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1272            is_expired: is_expired == IsExpired::Yes,
1273            ..OpenMessage::dummy()
1274        };
1275
1276        let runner = {
1277            let mut deps = initialize_dependencies(tmp_path).await;
1278            let mut mock_certifier_service = MockCertifierService::new();
1279
1280            let open_message_current = open_message_to_verify.clone();
1281            mock_certifier_service
1282                .expect_get_open_message()
1283                .times(1)
1284                .return_once(|_| Ok(Some(open_message_current)));
1285            mock_certifier_service
1286                .expect_mark_open_message_if_expired()
1287                .returning(|_| Ok(None));
1288
1289            deps.certifier_service = Arc::new(mock_certifier_service);
1290
1291            let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1292            deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1293
1294            build_runner_with_fixture_data(deps).await
1295        };
1296
1297        runner
1298            .is_open_message_outdated(
1299                open_message_to_verify.signed_entity_type,
1300                &current_time_point,
1301            )
1302            .await
1303            .unwrap()
1304    }
1305}