mithril_aggregator/runtime/
runner.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{debug, warn, Logger};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
8use mithril_common::logging::LoggerExtensions;
9use mithril_common::StdResult;
10use mithril_persistence::store::StakeStorer;
11
12use crate::entities::OpenMessage;
13use crate::ServeCommandDependenciesContainer;
14
15/// Configuration structure dedicated to the AggregatorRuntime.
16#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18    /// Interval between each snapshot, in ms
19    pub interval: Duration,
20
21    /// Whether the aggregator is a follower
22    pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26    /// Create a new instance of AggregatorConfig.
27    pub fn new(interval: Duration, is_follower: bool) -> Self {
28        Self {
29            interval,
30            is_follower,
31        }
32    }
33}
34
35/// This trait is intended to allow mocking the AggregatorRunner in tests.
36/// It exposes all the methods needed by the state machine.
37#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39    /// Return the current [TimePoint] from the chain
40    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42    /// Retrieves the current open message for a given signed entity type.
43    async fn get_current_open_message_for_signed_entity_type(
44        &self,
45        signed_entity_type: &SignedEntityType,
46    ) -> StdResult<Option<OpenMessage>>;
47
48    /// Retrieves the current non-certified open message.
49    async fn get_current_non_certified_open_message(
50        &self,
51        current_time_point: &TimePoint,
52    ) -> StdResult<Option<OpenMessage>>;
53
54    /// Check if a certificate chain is valid.
55    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57    /// Read the stake distribution from the blockchain and store it.
58    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60    /// Open the signer registration round of an epoch.
61    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63    /// Close the signer registration round of an epoch.
64    async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66    /// Check if the follower aggregator is running the same epoch as the leader.
67    async fn is_follower_aggregator_at_same_epoch_as_leader(
68        &self,
69        time_point: &TimePoint,
70    ) -> StdResult<bool>;
71
72    /// Synchronize the follower aggregator signer registration.
73    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75    /// Ask the EpochService to update the epoch settings.
76    async fn update_epoch_settings(&self) -> StdResult<()>;
77
78    /// Compute the protocol message
79    async fn compute_protocol_message(
80        &self,
81        signed_entity_type: &SignedEntityType,
82    ) -> StdResult<ProtocolMessage>;
83
84    /// Mark expired open message.
85    async fn mark_open_message_if_expired(
86        &self,
87        signed_entity_type: &SignedEntityType,
88    ) -> StdResult<Option<OpenMessage>>;
89
90    /// Tell the certifier to try to create a new certificate.
91    async fn create_certificate(
92        &self,
93        signed_entity_type: &SignedEntityType,
94    ) -> StdResult<Option<Certificate>>;
95
96    /// Create an artifact and persist it.
97    async fn create_artifact(
98        &self,
99        signed_entity_type: &SignedEntityType,
100        certificate: &Certificate,
101    ) -> StdResult<()>;
102
103    /// Update the EraChecker with EraReader information.
104    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
105
106    /// Ask services to update themselves for the new epoch
107    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
108
109    /// Perform the upkeep tasks.
110    async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
111
112    /// Precompute what doesn't change for the actual epoch
113    async fn precompute_epoch_data(&self) -> StdResult<()>;
114
115    /// Create new open message
116    async fn create_open_message(
117        &self,
118        signed_entity_type: &SignedEntityType,
119        protocol_message: &ProtocolMessage,
120    ) -> StdResult<OpenMessage>;
121
122    /// Checks if the open message is considered outdated.
123    async fn is_open_message_outdated(
124        &self,
125        open_message_signed_entity_type: SignedEntityType,
126        last_time_point: &TimePoint,
127    ) -> StdResult<bool>;
128
129    /// Increment the runtime cycle success metric.
130    fn increment_runtime_cycle_success_since_startup_counter(&self);
131
132    /// Increment the runtime cycle total metric.
133    fn increment_runtime_cycle_total_since_startup_counter(&self);
134}
135
136/// The runner responsibility is to expose a code API for the state machine. It
137/// holds services and configuration.
138pub struct AggregatorRunner {
139    dependencies: Arc<ServeCommandDependenciesContainer>,
140    logger: Logger,
141}
142
143impl AggregatorRunner {
144    /// Create a new instance of the Aggregator Runner.
145    pub fn new(dependencies: Arc<ServeCommandDependenciesContainer>) -> Self {
146        let logger = dependencies.root_logger.new_with_component_name::<Self>();
147        Self {
148            dependencies,
149            logger,
150        }
151    }
152
153    async fn list_available_signed_entity_types(
154        &self,
155        time_point: &TimePoint,
156    ) -> StdResult<Vec<SignedEntityType>> {
157        let signed_entity_types = self
158            .dependencies
159            .epoch_service
160            .read()
161            .await
162            .signed_entity_config()?
163            .list_allowed_signed_entity_types(time_point)?;
164        let unlocked_signed_entities = self
165            .dependencies
166            .signed_entity_type_lock
167            .filter_unlocked_entries(signed_entity_types)
168            .await;
169
170        Ok(unlocked_signed_entities)
171    }
172}
173
174#[cfg_attr(test, mockall::automock)]
175#[async_trait]
176impl AggregatorRunnerTrait for AggregatorRunner {
177    /// Return the current time point from the chain
178    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
179        debug!(self.logger, ">> get_time_point_from_chain");
180        let time_point = self
181            .dependencies
182            .ticker_service
183            .get_current_time_point()
184            .await?;
185
186        Ok(time_point)
187    }
188
189    async fn get_current_open_message_for_signed_entity_type(
190        &self,
191        signed_entity_type: &SignedEntityType,
192    ) -> StdResult<Option<OpenMessage>> {
193        debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
194        self.mark_open_message_if_expired(signed_entity_type)
195            .await?;
196
197        Ok(self
198            .dependencies
199            .certifier_service
200            .get_open_message(signed_entity_type)
201            .await
202            .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
203    }
204
205    async fn get_current_non_certified_open_message(
206        &self,
207        current_time_point: &TimePoint,
208    ) -> StdResult<Option<OpenMessage>> {
209        debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
210        let signed_entity_types = self
211            .list_available_signed_entity_types(current_time_point)
212            .await?;
213
214        for signed_entity_type in signed_entity_types {
215            let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
216                .await
217                .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
218            match current_open_message {
219                None => {
220                    let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
221                    let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
222                        .await
223                        .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
224
225                    return Ok(Some(open_message_new));
226                }
227                Some(open_message) => {
228                    if !open_message.is_certified && !open_message.is_expired {
229                        return Ok(Some(open_message));
230                    }
231                }
232            }
233        }
234
235        Ok(None)
236    }
237
238    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
239        debug!(self.logger, ">> is_certificate_chain_valid");
240        self.dependencies
241            .certifier_service
242            .verify_certificate_chain(time_point.epoch)
243            .await?;
244
245        Ok(())
246    }
247
248    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
249        debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
250        self.dependencies
251            .stake_distribution_service
252            .update_stake_distribution()
253            .await
254            .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
255    }
256
257    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
258        debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
259        let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
260
261        let stakes = self
262            .dependencies
263            .stake_store
264            .get_stakes(registration_epoch)
265            .await?
266            .unwrap_or_default();
267
268        self.dependencies
269            .signer_registration_round_opener
270            .open_registration_round(registration_epoch, stakes)
271            .await
272    }
273
274    async fn close_signer_registration_round(&self) -> StdResult<()> {
275        debug!(self.logger, ">> close_signer_registration_round");
276        self.dependencies
277            .signer_registration_round_opener
278            .close_registration_round()
279            .await
280    }
281
282    async fn is_follower_aggregator_at_same_epoch_as_leader(
283        &self,
284        time_point: &TimePoint,
285    ) -> StdResult<bool> {
286        self.dependencies
287            .signer_synchronizer
288            .can_synchronize_signers(time_point.epoch)
289            .await
290            .map_err(|e| e.into())
291    }
292
293    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
294        self.dependencies
295            .signer_synchronizer
296            .synchronize_all_signers()
297            .await
298            .map_err(|e| e.into())
299    }
300
301    async fn update_epoch_settings(&self) -> StdResult<()> {
302        debug!(self.logger, ">> update_epoch_settings");
303        self.dependencies
304            .epoch_service
305            .write()
306            .await
307            .update_epoch_settings()
308            .await
309    }
310
311    async fn compute_protocol_message(
312        &self,
313        signed_entity_type: &SignedEntityType,
314    ) -> StdResult<ProtocolMessage> {
315        debug!(self.logger, ">> compute_protocol_message");
316        let protocol_message = self
317            .dependencies
318            .signable_builder_service
319            .compute_protocol_message(signed_entity_type.to_owned())
320            .await
321            .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
322
323        Ok(protocol_message)
324    }
325
326    async fn mark_open_message_if_expired(
327        &self,
328        signed_entity_type: &SignedEntityType,
329    ) -> StdResult<Option<OpenMessage>> {
330        debug!(self.logger, ">> mark_open_message_if_expired");
331        let expired_open_message = self
332            .dependencies
333            .certifier_service
334            .mark_open_message_if_expired(signed_entity_type)
335            .await
336            .with_context(|| "CertifierService can not mark expired open message")?;
337
338        debug!(
339            self.logger, "Marked expired open messages";
340            "expired_open_message" => ?expired_open_message
341        );
342
343        Ok(expired_open_message)
344    }
345
346    async fn create_certificate(
347        &self,
348        signed_entity_type: &SignedEntityType,
349    ) -> StdResult<Option<Certificate>> {
350        debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
351
352        let certificate = self.dependencies
353            .certifier_service
354            .create_certificate(signed_entity_type)
355            .await
356            .with_context(|| {
357                format!(
358                    "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
359                )
360            })?;
361
362        if certificate.is_some() {
363            self.dependencies
364                .metrics_service
365                .get_certificate_total_produced_since_startup()
366                .increment();
367        }
368
369        Ok(certificate)
370    }
371
372    async fn create_artifact(
373        &self,
374        signed_entity_type: &SignedEntityType,
375        certificate: &Certificate,
376    ) -> StdResult<()> {
377        debug!(
378            self.logger, ">> create_artifact";
379            "signed_entity_type" => ?signed_entity_type,
380            "certificate_hash" => &certificate.hash
381        );
382
383        self.dependencies
384            .signed_entity_service
385            .create_artifact(signed_entity_type.to_owned(), certificate)
386            .await
387            .with_context(|| {
388                format!(
389                    "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
390                    certificate.hash
391                )
392            })?;
393
394        Ok(())
395    }
396
397    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
398        debug!(self.logger, ">> update_era_checker({epoch:?})");
399        let token = self
400            .dependencies
401            .era_reader
402            .read_era_epoch_token(epoch)
403            .await
404            .with_context(|| {
405                format!(
406                    "EraReader can not get era epoch token for current epoch: '{}'",
407                    epoch
408                )
409            })?;
410
411        let current_era = token
412            .get_current_supported_era()
413            .with_context(|| "EraEpochToken can not get current supported era")?;
414        self.dependencies
415            .era_checker
416            .change_era(current_era, token.get_current_epoch());
417        debug!(
418            self.logger,
419            "Current Era is {current_era} (Epoch {}).",
420            token.get_current_epoch()
421        );
422
423        if token.get_next_supported_era().is_err() {
424            let era_name = &token.get_next_era_marker().unwrap().name;
425            warn!(self.logger,"Upcoming Era '{era_name}' is not supported by this version of the software. Please update!");
426        }
427
428        Ok(())
429    }
430
431    async fn precompute_epoch_data(&self) -> StdResult<()> {
432        debug!(self.logger, ">> precompute_epoch_data");
433        self.dependencies
434            .epoch_service
435            .write()
436            .await
437            .precompute_epoch_data()
438            .await?;
439
440        Ok(())
441    }
442
443    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
444        debug!(self.logger, ">> inform_new_epoch({epoch:?})");
445        self.dependencies
446            .certifier_service
447            .inform_epoch(epoch)
448            .await?;
449
450        self.dependencies
451            .epoch_service
452            .write()
453            .await
454            .inform_epoch(epoch)
455            .await?;
456
457        Ok(())
458    }
459
460    async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
461        debug!(self.logger, ">> upkeep");
462        self.dependencies.upkeep_service.run(epoch).await
463    }
464
465    async fn create_open_message(
466        &self,
467        signed_entity_type: &SignedEntityType,
468        protocol_message: &ProtocolMessage,
469    ) -> StdResult<OpenMessage> {
470        debug!(self.logger, ">> create_open_message");
471        self.dependencies
472            .certifier_service
473            .create_open_message(signed_entity_type, protocol_message)
474            .await
475    }
476
477    async fn is_open_message_outdated(
478        &self,
479        open_message_signed_entity_type: SignedEntityType,
480        last_time_point: &TimePoint,
481    ) -> StdResult<bool> {
482        let current_open_message = self
483            .get_current_open_message_for_signed_entity_type(
484                &open_message_signed_entity_type,
485            )
486            .await
487            .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
488        let is_expired_open_message = current_open_message
489            .as_ref()
490            .map(|om| om.is_expired)
491            .unwrap_or(false);
492
493        let exists_newer_open_message = {
494            let new_signed_entity_type = self
495                .dependencies
496                .epoch_service
497                .read()
498                .await
499                .signed_entity_config()?
500                .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
501            new_signed_entity_type != open_message_signed_entity_type
502        };
503
504        Ok(exists_newer_open_message || is_expired_open_message)
505    }
506
507    fn increment_runtime_cycle_success_since_startup_counter(&self) {
508        self.dependencies
509            .metrics_service
510            .get_runtime_cycle_success_since_startup()
511            .increment();
512    }
513
514    fn increment_runtime_cycle_total_since_startup_counter(&self) {
515        self.dependencies
516            .metrics_service
517            .get_runtime_cycle_total_since_startup()
518            .increment();
519    }
520}
521
522#[cfg(test)]
523pub mod tests {
524    use crate::dependency_injection::DependenciesBuilder;
525    use crate::entities::AggregatorEpochSettings;
526    use crate::services::{FakeEpochService, FakeEpochServiceBuilder, MockUpkeepService};
527    use crate::{
528        entities::OpenMessage,
529        initialize_dependencies,
530        runtime::{AggregatorRunner, AggregatorRunnerTrait},
531        services::{MithrilStakeDistributionService, MockCertifierService},
532        MithrilSignerRegistrationLeader, ServeCommandConfiguration,
533        ServeCommandDependenciesContainer, SignerRegistrationRound,
534    };
535    use async_trait::async_trait;
536    use chrono::{DateTime, Utc};
537    use mithril_common::entities::{
538        CardanoTransactionsSigningConfig, ChainPoint, Epoch, SignedEntityConfig,
539        SignedEntityTypeDiscriminants,
540    };
541    use mithril_common::temp_dir;
542    use mithril_common::{
543        chain_observer::FakeObserver,
544        digesters::DumbImmutableFileObserver,
545        entities::{ProtocolMessage, SignedEntityType, StakeDistribution, TimePoint},
546        signable_builder::SignableBuilderService,
547        test_utils::{fake_data, MithrilFixtureBuilder},
548        MithrilTickerService, StdResult,
549    };
550    use mithril_persistence::store::StakeStorer;
551    use mithril_signed_entity_lock::SignedEntityTypeLock;
552    use mockall::predicate::eq;
553    use mockall::{mock, Sequence};
554    use std::path::PathBuf;
555    use std::sync::Arc;
556    use tokio::sync::RwLock;
557
558    mock! {
559        SignableBuilderServiceImpl { }
560
561        #[async_trait]
562        impl SignableBuilderService for SignableBuilderServiceImpl
563        {
564
565            async fn compute_protocol_message(
566                &self,
567                signed_entity_type: SignedEntityType,
568            ) -> StdResult<ProtocolMessage>;
569        }
570    }
571
572    async fn build_runner_with_fixture_data(
573        deps: ServeCommandDependenciesContainer,
574    ) -> AggregatorRunner {
575        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
576        let current_epoch = deps
577            .chain_observer
578            .get_current_epoch()
579            .await
580            .unwrap()
581            .unwrap();
582        deps.init_state_from_fixture(
583            &fixture,
584            &CardanoTransactionsSigningConfig::dummy(),
585            &[
586                current_epoch.offset_to_signer_retrieval_epoch().unwrap(),
587                current_epoch,
588                current_epoch.next(),
589            ],
590        )
591        .await;
592
593        AggregatorRunner::new(Arc::new(deps))
594    }
595
596    async fn build_runner(
597        temp_dir: PathBuf,
598        mock_certifier_service: MockCertifierService,
599    ) -> AggregatorRunner {
600        let mut deps = initialize_dependencies(temp_dir).await;
601        deps.certifier_service = Arc::new(mock_certifier_service);
602
603        let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
604        mock_signable_builder_service
605            .expect_compute_protocol_message()
606            .return_once(|_| Ok(ProtocolMessage::default()));
607        deps.signable_builder_service = Arc::new(mock_signable_builder_service);
608
609        let runner = build_runner_with_fixture_data(deps).await;
610
611        let current_epoch = runner
612            .dependencies
613            .ticker_service
614            .get_current_epoch()
615            .await
616            .unwrap();
617        runner.inform_new_epoch(current_epoch).await.unwrap();
618        runner.precompute_epoch_data().await.unwrap();
619        runner
620    }
621
622    fn init_certifier_service_mock(
623        mock_certifier_service: &mut MockCertifierService,
624        messages: Vec<OpenMessage>,
625    ) {
626        for message in messages {
627            mock_certifier_service
628                .expect_get_open_message()
629                .return_once(|_| Ok(Some(message)))
630                .times(1);
631        }
632        // When all messages are retrieved, the function return None
633        mock_certifier_service
634            .expect_get_open_message()
635            .returning(|_| Ok(None));
636
637        mock_certifier_service
638            .expect_inform_epoch()
639            .return_once(|_| Ok(()));
640        mock_certifier_service
641            .expect_mark_open_message_if_expired()
642            .returning(|_| Ok(None));
643    }
644
645    fn create_open_message(is_certified: IsCertified, is_expired: IsExpired) -> OpenMessage {
646        OpenMessage {
647            signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(fake_data::beacon()),
648            is_certified: is_certified == IsCertified::Yes,
649            is_expired: is_expired == IsExpired::Yes,
650            ..OpenMessage::dummy()
651        }
652    }
653
654    #[derive(Eq, PartialEq)]
655    enum IsCertified {
656        Yes,
657        No,
658    }
659
660    #[derive(Eq, PartialEq)]
661    enum IsExpired {
662        Yes,
663        No,
664    }
665
666    #[tokio::test]
667    async fn test_get_time_point_from_chain() {
668        let expected = TimePoint::new(2, 17, ChainPoint::dummy());
669        let mut dependencies = initialize_dependencies!().await;
670        let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
671        immutable_file_observer
672            .shall_return(Some(expected.immutable_file_number))
673            .await;
674        let ticker_service = Arc::new(MithrilTickerService::new(
675            Arc::new(FakeObserver::new(Some(expected.clone()))),
676            immutable_file_observer,
677        ));
678        dependencies.ticker_service = ticker_service;
679        let runner = AggregatorRunner::new(Arc::new(dependencies));
680
681        // Retrieves the expected time point
682        let res = runner.get_time_point_from_chain().await;
683        assert_eq!(expected, res.unwrap());
684    }
685
686    #[tokio::test]
687    async fn test_update_stake_distribution() {
688        let chain_observer = Arc::new(FakeObserver::default());
689        let deps = {
690            let mut deps = initialize_dependencies!().await;
691            deps.chain_observer = chain_observer.clone();
692            deps.stake_distribution_service = Arc::new(MithrilStakeDistributionService::new(
693                deps.stake_store.clone(),
694                chain_observer.clone(),
695            ));
696            Arc::new(deps)
697        };
698        let runner = AggregatorRunner::new(deps.clone());
699        let time_point = runner.get_time_point_from_chain().await.unwrap();
700        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
701        let expected = fixture.stake_distribution();
702
703        chain_observer
704            .set_signers(fixture.signers_with_stake())
705            .await;
706        runner
707            .update_stake_distribution(&time_point)
708            .await
709            .expect("updating stake distribution should not return an error");
710
711        let saved_stake_distribution = deps
712            .stake_store
713            .get_stakes(time_point.epoch.offset_to_recording_epoch())
714            .await
715            .unwrap()
716            .unwrap_or_else(|| {
717                panic!(
718                    "I should have a stake distribution for the epoch {:?}",
719                    time_point.epoch
720                )
721            });
722
723        assert_eq!(expected, saved_stake_distribution);
724    }
725
726    #[tokio::test]
727    async fn test_open_signer_registration_round() {
728        let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
729        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
730
731        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
732            builder.get_verification_key_store().await.unwrap(),
733            builder.get_signer_store().await.unwrap(),
734            builder.get_signer_registration_verifier().await.unwrap(),
735        ));
736        let mut deps = builder.build_serve_dependencies_container().await.unwrap();
737        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
738        let stake_store = deps.stake_store.clone();
739        let deps = Arc::new(deps);
740        let runner = AggregatorRunner::new(deps.clone());
741
742        let time_point = TimePoint::dummy();
743        let recording_epoch = time_point.epoch.offset_to_recording_epoch();
744        let stake_distribution: StakeDistribution =
745            StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
746
747        stake_store
748            .save_stakes(recording_epoch, stake_distribution.clone())
749            .await
750            .expect("Save Stake distribution should not fail");
751
752        runner
753            .open_signer_registration_round(&time_point)
754            .await
755            .expect("opening signer registration should not return an error");
756
757        let saved_current_round = signer_registration_round_opener.get_current_round().await;
758
759        let expected_signer_registration_round =
760            SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
761
762        assert_eq!(
763            Some(expected_signer_registration_round),
764            saved_current_round,
765        );
766    }
767
768    #[tokio::test]
769    async fn test_close_signer_registration_round() {
770        let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
771        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
772
773        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
774            builder.get_verification_key_store().await.unwrap(),
775            builder.get_signer_store().await.unwrap(),
776            builder.get_signer_registration_verifier().await.unwrap(),
777        ));
778        let mut deps = builder.build_serve_dependencies_container().await.unwrap();
779        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
780        let deps = Arc::new(deps);
781        let runner = AggregatorRunner::new(deps.clone());
782
783        let time_point = TimePoint::dummy();
784        runner
785            .open_signer_registration_round(&time_point)
786            .await
787            .expect("opening signer registration should not return an error");
788
789        runner
790            .close_signer_registration_round()
791            .await
792            .expect("closing signer registration should not return an error");
793
794        let saved_current_round = signer_registration_round_opener.get_current_round().await;
795        assert!(saved_current_round.is_none());
796    }
797
798    #[tokio::test]
799    async fn test_expire_open_message() {
800        let open_message_expected = OpenMessage {
801            signed_entity_type: SignedEntityType::dummy(),
802            is_certified: false,
803            is_expired: false,
804            expires_at: Some(
805                DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
806                    .unwrap()
807                    .with_timezone(&Utc),
808            ),
809            ..OpenMessage::dummy()
810        };
811        let open_message_clone = open_message_expected.clone();
812
813        let mut mock_certifier_service = MockCertifierService::new();
814        mock_certifier_service
815            .expect_mark_open_message_if_expired()
816            .return_once(|_| Ok(Some(open_message_clone)));
817
818        let mut deps = initialize_dependencies!().await;
819        deps.certifier_service = Arc::new(mock_certifier_service);
820
821        let runner = build_runner_with_fixture_data(deps).await;
822        let open_message_expired = runner
823            .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
824            .await
825            .expect("mark_open_message_if_expired should not fail");
826
827        assert_eq!(Some(open_message_expected), open_message_expired);
828    }
829
830    #[tokio::test]
831    async fn test_update_era_checker() {
832        let deps = initialize_dependencies!().await;
833        let ticker_service = deps.ticker_service.clone();
834        let era_checker = deps.era_checker.clone();
835        let mut time_point = ticker_service.get_current_time_point().await.unwrap();
836
837        assert_eq!(time_point.epoch, era_checker.current_epoch());
838        let runner = AggregatorRunner::new(Arc::new(deps));
839        time_point.epoch += 1;
840
841        runner.update_era_checker(time_point.epoch).await.unwrap();
842        assert_eq!(time_point.epoch, era_checker.current_epoch());
843    }
844
845    #[tokio::test]
846    async fn test_inform_new_epoch() {
847        let mut mock_certifier_service = MockCertifierService::new();
848        mock_certifier_service
849            .expect_inform_epoch()
850            .returning(|_| Ok(()))
851            .times(1);
852        let mut deps = initialize_dependencies!().await;
853        let current_epoch = deps
854            .chain_observer
855            .get_current_epoch()
856            .await
857            .unwrap()
858            .unwrap();
859
860        deps.certifier_service = Arc::new(mock_certifier_service);
861        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
862            current_epoch,
863            &MithrilFixtureBuilder::default().build(),
864        )));
865
866        let runner = AggregatorRunner::new(Arc::new(deps));
867
868        runner.inform_new_epoch(current_epoch).await.unwrap();
869    }
870
871    #[tokio::test]
872    async fn test_upkeep_calls_run_on_upkeep_service() {
873        let mut upkeep_service = MockUpkeepService::new();
874        upkeep_service
875            .expect_run()
876            .with(eq(Epoch(5)))
877            .returning(|_| Ok(()))
878            .times(1);
879
880        let mut deps = initialize_dependencies!().await;
881        deps.upkeep_service = Arc::new(upkeep_service);
882
883        let runner = AggregatorRunner::new(Arc::new(deps));
884
885        runner.upkeep(Epoch(5)).await.unwrap();
886    }
887
888    #[tokio::test]
889    async fn test_update_epoch_settings() {
890        let mut mock_certifier_service = MockCertifierService::new();
891        mock_certifier_service
892            .expect_inform_epoch()
893            .returning(|_| Ok(()))
894            .times(1);
895
896        let config = ServeCommandConfiguration::new_sample(temp_dir!());
897        let mut deps = DependenciesBuilder::new_with_stdout_logger(Arc::new(config.clone()))
898            .build_serve_dependencies_container()
899            .await
900            .unwrap();
901        deps.certifier_service = Arc::new(mock_certifier_service);
902        let epoch_settings_storer = deps.epoch_settings_storer.clone();
903        let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
904        let insert_epoch = current_epoch.offset_to_epoch_settings_recording_epoch();
905
906        let runner = build_runner_with_fixture_data(deps).await;
907        runner.inform_new_epoch(current_epoch).await.unwrap();
908        runner
909            .update_epoch_settings()
910            .await
911            .expect("update_epoch_settings should not fail");
912
913        let saved_epoch_settings = epoch_settings_storer
914            .get_epoch_settings(insert_epoch)
915            .await
916            .unwrap()
917            .unwrap_or_else(|| panic!("should have epoch settings for epoch {insert_epoch}",));
918
919        assert_eq!(
920            AggregatorEpochSettings {
921                protocol_parameters: config.protocol_parameters.clone(),
922                cardano_transactions_signing_config: config
923                    .cardano_transactions_signing_config
924                    .clone(),
925            },
926            saved_epoch_settings
927        );
928    }
929
930    #[tokio::test]
931    async fn test_precompute_epoch_data() {
932        let mut deps = initialize_dependencies!().await;
933        let current_epoch = deps
934            .chain_observer
935            .get_current_epoch()
936            .await
937            .unwrap()
938            .unwrap();
939
940        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
941            current_epoch,
942            &MithrilFixtureBuilder::default().build(),
943        )));
944
945        let runner = AggregatorRunner::new(Arc::new(deps));
946
947        runner.precompute_epoch_data().await.unwrap();
948    }
949
950    #[tokio::test]
951    async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists(
952    ) {
953        let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
954        let open_message_expected = open_message_created.clone();
955
956        let runner = {
957            let mut mock_certifier_service = MockCertifierService::new();
958            init_certifier_service_mock(&mut mock_certifier_service, vec![]);
959
960            mock_certifier_service
961                .expect_create_open_message()
962                .return_once(|_, _| Ok(open_message_created))
963                .times(1);
964            build_runner(temp_dir!(), mock_certifier_service).await
965        };
966
967        let open_message_returned = runner
968            .get_current_non_certified_open_message(&TimePoint::dummy())
969            .await
970            .unwrap();
971        assert_eq!(Some(open_message_expected), open_message_returned);
972    }
973
974    #[tokio::test]
975    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired(
976    ) {
977        let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
978
979        let open_message_expected = not_certified_and_not_expired.clone();
980
981        let runner = {
982            let mut mock_certifier_service = MockCertifierService::new();
983            init_certifier_service_mock(
984                &mut mock_certifier_service,
985                vec![not_certified_and_not_expired],
986            );
987
988            mock_certifier_service.expect_create_open_message().never();
989            build_runner(temp_dir!(), mock_certifier_service).await
990        };
991
992        let open_message_returned = runner
993            .get_current_non_certified_open_message(&TimePoint::dummy())
994            .await
995            .unwrap();
996
997        assert_eq!(Some(open_message_expected), open_message_returned);
998    }
999
1000    #[tokio::test]
1001    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified(
1002    ) {
1003        let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
1004        let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
1005
1006        let open_message_expected = not_certified_and_not_expired.clone();
1007
1008        let runner = {
1009            let mut mock_certifier_service = MockCertifierService::new();
1010            init_certifier_service_mock(
1011                &mut mock_certifier_service,
1012                vec![certified_and_not_expired, not_certified_and_not_expired],
1013            );
1014
1015            mock_certifier_service.expect_create_open_message().never();
1016            build_runner(temp_dir!(), mock_certifier_service).await
1017        };
1018
1019        let open_message_returned = runner
1020            .get_current_non_certified_open_message(&TimePoint::dummy())
1021            .await
1022            .unwrap();
1023
1024        assert_eq!(Some(open_message_expected), open_message_returned);
1025    }
1026
1027    #[tokio::test]
1028    async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified(
1029    ) {
1030        let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
1031
1032        let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
1033        let open_message_expected = open_message_created.clone();
1034
1035        let runner = {
1036            let mut mock_certifier_service = MockCertifierService::new();
1037            init_certifier_service_mock(
1038                &mut mock_certifier_service,
1039                vec![certified_and_not_expired],
1040            );
1041
1042            mock_certifier_service
1043                .expect_create_open_message()
1044                .return_once(|_, _| Ok(open_message_created))
1045                .times(1);
1046            build_runner(temp_dir!(), mock_certifier_service).await
1047        };
1048
1049        let open_message_returned = runner
1050            .get_current_non_certified_open_message(&TimePoint::dummy())
1051            .await
1052            .unwrap();
1053
1054        assert_eq!(Some(open_message_expected), open_message_returned);
1055    }
1056
1057    #[tokio::test]
1058    async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified(
1059    ) {
1060        let certified_and_not_expired_1 = create_open_message(IsCertified::Yes, IsExpired::No);
1061        let certified_and_not_expired_2 = create_open_message(IsCertified::Yes, IsExpired::No);
1062
1063        let runner = {
1064            let mut mock_certifier_service = MockCertifierService::new();
1065            init_certifier_service_mock(
1066                &mut mock_certifier_service,
1067                vec![certified_and_not_expired_1, certified_and_not_expired_2],
1068            );
1069
1070            mock_certifier_service.expect_create_open_message().never();
1071            build_runner(temp_dir!(), mock_certifier_service).await
1072        };
1073
1074        let open_message_returned = runner
1075            .get_current_non_certified_open_message(&TimePoint::dummy())
1076            .await
1077            .unwrap();
1078
1079        assert!(open_message_returned.is_none());
1080    }
1081
1082    #[tokio::test]
1083    async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message(
1084    ) {
1085        let not_certified_and_expired = create_open_message(IsCertified::No, IsExpired::Yes);
1086        let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
1087
1088        let open_message_expected = not_certified_and_not_expired.clone();
1089
1090        let runner = {
1091            let mut mock_certifier_service = MockCertifierService::new();
1092            init_certifier_service_mock(
1093                &mut mock_certifier_service,
1094                vec![not_certified_and_expired, not_certified_and_not_expired],
1095            );
1096
1097            mock_certifier_service.expect_create_open_message().never();
1098            build_runner(temp_dir!(), mock_certifier_service).await
1099        };
1100
1101        let open_message_returned = runner
1102            .get_current_non_certified_open_message(&TimePoint::dummy())
1103            .await
1104            .unwrap();
1105
1106        assert_eq!(Some(open_message_expected), open_message_returned);
1107    }
1108
1109    #[tokio::test]
1110    async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution_and_then_for_immutable_file(
1111    ) {
1112        let mut mock_certifier_service = MockCertifierService::new();
1113
1114        let mut seq = Sequence::new();
1115        mock_certifier_service
1116            .expect_get_open_message()
1117            .with(eq(SignedEntityType::MithrilStakeDistribution(
1118                TimePoint::dummy().epoch,
1119            )))
1120            .times(1)
1121            .in_sequence(&mut seq)
1122            .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1123
1124        mock_certifier_service
1125            .expect_get_open_message()
1126            .with(eq(SignedEntityType::CardanoImmutableFilesFull(
1127                fake_data::beacon(),
1128            )))
1129            .times(1)
1130            .in_sequence(&mut seq)
1131            .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1132
1133        mock_certifier_service.expect_create_open_message().never();
1134
1135        mock_certifier_service
1136            .expect_inform_epoch()
1137            .return_once(|_| Ok(()));
1138        mock_certifier_service
1139            .expect_mark_open_message_if_expired()
1140            .returning(|_| Ok(None));
1141
1142        let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1143
1144        runner
1145            .get_current_non_certified_open_message(&TimePoint::dummy())
1146            .await
1147            .unwrap();
1148    }
1149
1150    #[tokio::test]
1151    async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1152        let runner = {
1153            let mut dependencies = initialize_dependencies!().await;
1154            let epoch_service = FakeEpochServiceBuilder {
1155                signed_entity_config: SignedEntityConfig {
1156                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1157                    ..SignedEntityConfig::dummy()
1158                },
1159                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1160            }
1161            .build();
1162            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1163            dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1164            AggregatorRunner::new(Arc::new(dependencies))
1165        };
1166
1167        let time_point = TimePoint::dummy();
1168        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1169            .list_available_signed_entity_types(&time_point)
1170            .await
1171            .unwrap()
1172            .into_iter()
1173            .map(Into::into)
1174            .collect();
1175
1176        assert_eq!(
1177            signed_entities,
1178            SignedEntityTypeDiscriminants::all()
1179                .into_iter()
1180                .collect::<Vec<_>>()
1181        );
1182    }
1183
1184    #[tokio::test]
1185    async fn list_available_signed_entity_types_exclude_locked_entities() {
1186        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1187        let runner = {
1188            let mut dependencies = initialize_dependencies!().await;
1189            dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1190            let epoch_service = FakeEpochServiceBuilder {
1191                signed_entity_config: SignedEntityConfig {
1192                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1193                    ..SignedEntityConfig::dummy()
1194                },
1195                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1196            }
1197            .build();
1198            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1199
1200            AggregatorRunner::new(Arc::new(dependencies))
1201        };
1202
1203        signed_entity_type_lock
1204            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1205            .await;
1206
1207        let time_point = TimePoint::dummy();
1208        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1209            .list_available_signed_entity_types(&time_point)
1210            .await
1211            .unwrap()
1212            .into_iter()
1213            .map(Into::into)
1214            .collect();
1215
1216        assert!(!signed_entities.is_empty());
1217        assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1218    }
1219
1220    #[tokio::test]
1221    async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message(
1222    ) {
1223        assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1224    }
1225
1226    #[tokio::test]
1227    async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message(
1228    ) {
1229        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1230    }
1231
1232    #[tokio::test]
1233    async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message(
1234    ) {
1235        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1236    }
1237
1238    #[tokio::test]
1239    async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message(
1240    ) {
1241        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1242    }
1243
1244    async fn is_outdated_returned_when(
1245        tmp_path: PathBuf,
1246        is_expired: IsExpired,
1247        newer_open_message: bool,
1248    ) -> bool {
1249        let current_time_point = TimePoint {
1250            epoch: Epoch(2),
1251            ..TimePoint::dummy()
1252        };
1253
1254        let message_epoch = if newer_open_message {
1255            current_time_point.epoch + 54
1256        } else {
1257            current_time_point.epoch
1258        };
1259        let open_message_to_verify = OpenMessage {
1260            signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1261            is_expired: is_expired == IsExpired::Yes,
1262            ..OpenMessage::dummy()
1263        };
1264
1265        let runner = {
1266            let mut deps = initialize_dependencies(tmp_path).await;
1267            let mut mock_certifier_service = MockCertifierService::new();
1268
1269            let open_message_current = open_message_to_verify.clone();
1270            mock_certifier_service
1271                .expect_get_open_message()
1272                .times(1)
1273                .return_once(|_| Ok(Some(open_message_current)));
1274            mock_certifier_service
1275                .expect_mark_open_message_if_expired()
1276                .returning(|_| Ok(None));
1277
1278            deps.certifier_service = Arc::new(mock_certifier_service);
1279
1280            let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1281            deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1282
1283            build_runner_with_fixture_data(deps).await
1284        };
1285
1286        runner
1287            .is_open_message_outdated(
1288                open_message_to_verify.signed_entity_type,
1289                &current_time_point,
1290            )
1291            .await
1292            .unwrap()
1293    }
1294}