mithril_aggregator/runtime/
runner.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{Logger, debug, warn};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::StdResult;
8use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
9use mithril_common::logging::LoggerExtensions;
10use mithril_persistence::store::StakeStorer;
11
12use crate::ServeCommandDependenciesContainer;
13use crate::entities::OpenMessage;
14
15/// Configuration structure dedicated to the AggregatorRuntime.
16#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18    /// Interval between each snapshot, in ms
19    pub interval: Duration,
20
21    /// Whether the aggregator is a follower
22    pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26    /// Create a new instance of AggregatorConfig.
27    pub fn new(interval: Duration, is_follower: bool) -> Self {
28        Self {
29            interval,
30            is_follower,
31        }
32    }
33}
34
35/// This trait is intended to allow mocking the AggregatorRunner in tests.
36/// It exposes all the methods needed by the state machine.
37#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39    /// Return the current [TimePoint] from the chain
40    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42    /// Retrieves the current open message for a given signed entity type.
43    async fn get_current_open_message_for_signed_entity_type(
44        &self,
45        signed_entity_type: &SignedEntityType,
46    ) -> StdResult<Option<OpenMessage>>;
47
48    /// Retrieves the current non-certified open message.
49    async fn get_current_non_certified_open_message(
50        &self,
51        current_time_point: &TimePoint,
52    ) -> StdResult<Option<OpenMessage>>;
53
54    /// Check if a certificate chain is valid.
55    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57    /// Read the stake distribution from the blockchain and store it.
58    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60    /// Open the signer registration round of an epoch.
61    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63    /// Close the signer registration round of an epoch.
64    async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66    /// Check if the follower aggregator is running the same epoch as the leader.
67    async fn is_follower_aggregator_at_same_epoch_as_leader(
68        &self,
69        time_point: &TimePoint,
70    ) -> StdResult<bool>;
71
72    /// Synchronize the follower aggregator signer registration.
73    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75    /// Ask the EpochService to update the epoch settings.
76    async fn update_epoch_settings(&self) -> StdResult<()>;
77
78    /// Compute the protocol message
79    async fn compute_protocol_message(
80        &self,
81        signed_entity_type: &SignedEntityType,
82    ) -> StdResult<ProtocolMessage>;
83
84    /// Mark expired open message.
85    async fn mark_open_message_if_expired(
86        &self,
87        signed_entity_type: &SignedEntityType,
88    ) -> StdResult<Option<OpenMessage>>;
89
90    /// Tell the certifier to try to create a new certificate.
91    async fn create_certificate(
92        &self,
93        signed_entity_type: &SignedEntityType,
94    ) -> StdResult<Option<Certificate>>;
95
96    /// Create an artifact and persist it.
97    async fn create_artifact(
98        &self,
99        signed_entity_type: &SignedEntityType,
100        certificate: &Certificate,
101    ) -> StdResult<()>;
102
103    /// Update the EraChecker with EraReader information.
104    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
105
106    /// Ask services to update themselves for the new epoch
107    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
108
109    /// Perform the upkeep tasks.
110    async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
111
112    /// Precompute what doesn't change for the actual epoch
113    async fn precompute_epoch_data(&self) -> StdResult<()>;
114
115    /// Create new open message
116    async fn create_open_message(
117        &self,
118        signed_entity_type: &SignedEntityType,
119        protocol_message: &ProtocolMessage,
120    ) -> StdResult<OpenMessage>;
121
122    /// Checks if the open message is considered outdated.
123    async fn is_open_message_outdated(
124        &self,
125        open_message_signed_entity_type: SignedEntityType,
126        last_time_point: &TimePoint,
127    ) -> StdResult<bool>;
128
129    /// Increment the runtime cycle success metric.
130    fn increment_runtime_cycle_success_since_startup_counter(&self);
131
132    /// Increment the runtime cycle total metric.
133    fn increment_runtime_cycle_total_since_startup_counter(&self);
134}
135
136/// The runner responsibility is to expose a code API for the state machine. It
137/// holds services and configuration.
138pub struct AggregatorRunner {
139    dependencies: Arc<ServeCommandDependenciesContainer>,
140    logger: Logger,
141}
142
143impl AggregatorRunner {
144    /// Create a new instance of the Aggregator Runner.
145    pub fn new(dependencies: Arc<ServeCommandDependenciesContainer>) -> Self {
146        let logger = dependencies.root_logger.new_with_component_name::<Self>();
147        Self {
148            dependencies,
149            logger,
150        }
151    }
152
153    async fn list_available_signed_entity_types(
154        &self,
155        time_point: &TimePoint,
156    ) -> StdResult<Vec<SignedEntityType>> {
157        let signed_entity_types = self
158            .dependencies
159            .epoch_service
160            .read()
161            .await
162            .signed_entity_config()?
163            .list_allowed_signed_entity_types(time_point)?;
164        let unlocked_signed_entities = self
165            .dependencies
166            .signed_entity_type_lock
167            .filter_unlocked_entries(signed_entity_types)
168            .await;
169
170        Ok(unlocked_signed_entities)
171    }
172}
173
174#[cfg_attr(test, mockall::automock)]
175#[async_trait]
176impl AggregatorRunnerTrait for AggregatorRunner {
177    /// Return the current time point from the chain
178    async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
179        debug!(self.logger, ">> get_time_point_from_chain");
180        let time_point = self.dependencies.ticker_service.get_current_time_point().await?;
181
182        Ok(time_point)
183    }
184
185    async fn get_current_open_message_for_signed_entity_type(
186        &self,
187        signed_entity_type: &SignedEntityType,
188    ) -> StdResult<Option<OpenMessage>> {
189        debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
190        self.mark_open_message_if_expired(signed_entity_type).await?;
191
192        Ok(self
193            .dependencies
194            .certifier_service
195            .get_open_message(signed_entity_type)
196            .await
197            .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
198    }
199
200    async fn get_current_non_certified_open_message(
201        &self,
202        current_time_point: &TimePoint,
203    ) -> StdResult<Option<OpenMessage>> {
204        debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
205        let signed_entity_types =
206            self.list_available_signed_entity_types(current_time_point).await?;
207
208        for signed_entity_type in signed_entity_types {
209            let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
210                .await
211                .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
212            match current_open_message {
213                None => {
214                    let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
215                    let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
216                        .await
217                        .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
218
219                    return Ok(Some(open_message_new));
220                }
221                Some(open_message) => {
222                    if !open_message.is_certified && !open_message.is_expired {
223                        return Ok(Some(open_message));
224                    }
225                }
226            }
227        }
228
229        Ok(None)
230    }
231
232    async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
233        debug!(self.logger, ">> is_certificate_chain_valid");
234        self.dependencies
235            .certifier_service
236            .verify_certificate_chain(time_point.epoch)
237            .await?;
238
239        Ok(())
240    }
241
242    async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
243        debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
244        self.dependencies
245            .stake_distribution_service
246            .update_stake_distribution()
247            .await
248            .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
249    }
250
251    async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
252        debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
253        let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
254
255        let stakes = self
256            .dependencies
257            .stake_store
258            .get_stakes(registration_epoch)
259            .await?
260            .unwrap_or_default();
261
262        self.dependencies
263            .signer_registration_round_opener
264            .open_registration_round(registration_epoch, stakes)
265            .await
266    }
267
268    async fn close_signer_registration_round(&self) -> StdResult<()> {
269        debug!(self.logger, ">> close_signer_registration_round");
270        self.dependencies
271            .signer_registration_round_opener
272            .close_registration_round()
273            .await
274    }
275
276    async fn is_follower_aggregator_at_same_epoch_as_leader(
277        &self,
278        time_point: &TimePoint,
279    ) -> StdResult<bool> {
280        self.dependencies
281            .signer_synchronizer
282            .can_synchronize_signers(time_point.epoch)
283            .await
284            .map_err(|e| e.into())
285    }
286
287    async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
288        self.dependencies
289            .signer_synchronizer
290            .synchronize_all_signers()
291            .await
292            .map_err(|e| e.into())
293    }
294
295    async fn update_epoch_settings(&self) -> StdResult<()> {
296        debug!(self.logger, ">> update_epoch_settings");
297        self.dependencies
298            .epoch_service
299            .write()
300            .await
301            .update_epoch_settings()
302            .await
303    }
304
305    async fn compute_protocol_message(
306        &self,
307        signed_entity_type: &SignedEntityType,
308    ) -> StdResult<ProtocolMessage> {
309        debug!(self.logger, ">> compute_protocol_message");
310        let protocol_message = self
311            .dependencies
312            .signable_builder_service
313            .compute_protocol_message(signed_entity_type.to_owned())
314            .await
315            .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
316
317        Ok(protocol_message)
318    }
319
320    async fn mark_open_message_if_expired(
321        &self,
322        signed_entity_type: &SignedEntityType,
323    ) -> StdResult<Option<OpenMessage>> {
324        debug!(self.logger, ">> mark_open_message_if_expired");
325        let expired_open_message = self
326            .dependencies
327            .certifier_service
328            .mark_open_message_if_expired(signed_entity_type)
329            .await
330            .with_context(|| "CertifierService can not mark expired open message")?;
331
332        debug!(
333            self.logger, "Marked expired open messages";
334            "expired_open_message" => ?expired_open_message
335        );
336
337        Ok(expired_open_message)
338    }
339
340    async fn create_certificate(
341        &self,
342        signed_entity_type: &SignedEntityType,
343    ) -> StdResult<Option<Certificate>> {
344        debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
345
346        let certificate = self.dependencies
347            .certifier_service
348            .create_certificate(signed_entity_type)
349            .await
350            .with_context(|| {
351                format!(
352                    "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
353                )
354            })?;
355
356        if certificate.is_some() {
357            self.dependencies
358                .metrics_service
359                .get_certificate_total_produced_since_startup()
360                .increment();
361        }
362
363        Ok(certificate)
364    }
365
366    async fn create_artifact(
367        &self,
368        signed_entity_type: &SignedEntityType,
369        certificate: &Certificate,
370    ) -> StdResult<()> {
371        debug!(
372            self.logger, ">> create_artifact";
373            "signed_entity_type" => ?signed_entity_type,
374            "certificate_hash" => &certificate.hash
375        );
376
377        self.dependencies
378            .signed_entity_service
379            .create_artifact(signed_entity_type.to_owned(), certificate)
380            .await
381            .with_context(|| {
382                format!(
383                    "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
384                    certificate.hash
385                )
386            })?;
387
388        Ok(())
389    }
390
391    async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
392        debug!(self.logger, ">> update_era_checker({epoch:?})");
393        let token = self
394            .dependencies
395            .era_reader
396            .read_era_epoch_token(epoch)
397            .await
398            .with_context(|| {
399                format!("EraReader can not get era epoch token for current epoch: '{epoch}'")
400            })?;
401
402        let current_era = token
403            .get_current_supported_era()
404            .with_context(|| "EraEpochToken can not get current supported era")?;
405        self.dependencies
406            .era_checker
407            .change_era(current_era, token.get_current_epoch());
408        debug!(
409            self.logger,
410            "Current Era is {current_era} (Epoch {}).",
411            token.get_current_epoch()
412        );
413
414        if token.get_next_supported_era().is_err() {
415            let era_name = &token.get_next_era_marker().unwrap().name;
416            warn!(
417                self.logger,
418                "Upcoming Era '{era_name}' is not supported by this version of the software. Please update!"
419            );
420        }
421
422        Ok(())
423    }
424
425    async fn precompute_epoch_data(&self) -> StdResult<()> {
426        debug!(self.logger, ">> precompute_epoch_data");
427        self.dependencies
428            .epoch_service
429            .write()
430            .await
431            .precompute_epoch_data()
432            .await?;
433
434        Ok(())
435    }
436
437    async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
438        debug!(self.logger, ">> inform_new_epoch({epoch:?})");
439        self.dependencies.certifier_service.inform_epoch(epoch).await?;
440
441        self.dependencies
442            .epoch_service
443            .write()
444            .await
445            .inform_epoch(epoch)
446            .await?;
447
448        Ok(())
449    }
450
451    async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
452        debug!(self.logger, ">> upkeep");
453        self.dependencies.upkeep_service.run(epoch).await
454    }
455
456    async fn create_open_message(
457        &self,
458        signed_entity_type: &SignedEntityType,
459        protocol_message: &ProtocolMessage,
460    ) -> StdResult<OpenMessage> {
461        debug!(self.logger, ">> create_open_message");
462        self.dependencies
463            .certifier_service
464            .create_open_message(signed_entity_type, protocol_message)
465            .await
466    }
467
468    async fn is_open_message_outdated(
469        &self,
470        open_message_signed_entity_type: SignedEntityType,
471        last_time_point: &TimePoint,
472    ) -> StdResult<bool> {
473        let current_open_message = self
474            .get_current_open_message_for_signed_entity_type(
475                &open_message_signed_entity_type,
476            )
477            .await
478            .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
479        let is_expired_open_message =
480            current_open_message.as_ref().map(|om| om.is_expired).unwrap_or(false);
481
482        let exists_newer_open_message = {
483            let new_signed_entity_type = self
484                .dependencies
485                .epoch_service
486                .read()
487                .await
488                .signed_entity_config()?
489                .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
490            new_signed_entity_type != open_message_signed_entity_type
491        };
492
493        Ok(exists_newer_open_message || is_expired_open_message)
494    }
495
496    fn increment_runtime_cycle_success_since_startup_counter(&self) {
497        self.dependencies
498            .metrics_service
499            .get_runtime_cycle_success_since_startup()
500            .increment();
501    }
502
503    fn increment_runtime_cycle_total_since_startup_counter(&self) {
504        self.dependencies
505            .metrics_service
506            .get_runtime_cycle_total_since_startup()
507            .increment();
508    }
509}
510
511#[cfg(test)]
512pub mod tests {
513    use async_trait::async_trait;
514    use chrono::{DateTime, Utc};
515    use mockall::mock;
516    use mockall::predicate::eq;
517    use std::path::PathBuf;
518    use std::sync::Arc;
519    use tokio::sync::RwLock;
520
521    use mithril_cardano_node_chain::test::double::FakeChainObserver;
522    use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
523    use mithril_common::{
524        StdResult,
525        entities::{
526            CardanoTransactionsSigningConfig, ChainPoint, Epoch, ProtocolMessage,
527            SignedEntityConfig, SignedEntityType, SignedEntityTypeDiscriminants, StakeDistribution,
528            TimePoint,
529        },
530        signable_builder::SignableBuilderService,
531        temp_dir,
532        test_utils::{MithrilFixtureBuilder, fake_data},
533    };
534    use mithril_persistence::store::StakeStorer;
535    use mithril_signed_entity_lock::SignedEntityTypeLock;
536    use mithril_ticker::MithrilTickerService;
537
538    use crate::{
539        MithrilSignerRegistrationLeader, ServeCommandConfiguration,
540        ServeCommandDependenciesContainer, SignerRegistrationRound,
541        dependency_injection::DependenciesBuilder,
542        entities::{AggregatorEpochSettings, OpenMessage},
543        initialize_dependencies,
544        runtime::{AggregatorRunner, AggregatorRunnerTrait},
545        services::{
546            FakeEpochService, FakeEpochServiceBuilder, MithrilStakeDistributionService,
547            MockCertifierService, MockUpkeepService,
548        },
549    };
550
551    mock! {
552        SignableBuilderServiceImpl { }
553
554        #[async_trait]
555        impl SignableBuilderService for SignableBuilderServiceImpl
556        {
557
558            async fn compute_protocol_message(
559                &self,
560                signed_entity_type: SignedEntityType,
561            ) -> StdResult<ProtocolMessage>;
562        }
563    }
564
565    async fn build_runner_with_fixture_data(
566        deps: ServeCommandDependenciesContainer,
567    ) -> AggregatorRunner {
568        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
569        let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
570        deps.init_state_from_fixture(
571            &fixture,
572            &CardanoTransactionsSigningConfig::dummy(),
573            &[
574                current_epoch.offset_to_signer_retrieval_epoch().unwrap(),
575                current_epoch,
576                current_epoch.next(),
577            ],
578        )
579        .await;
580
581        AggregatorRunner::new(Arc::new(deps))
582    }
583
584    async fn build_runner(
585        temp_dir: PathBuf,
586        mock_certifier_service: MockCertifierService,
587    ) -> AggregatorRunner {
588        build_runner_with_discriminants(temp_dir, mock_certifier_service, vec![]).await
589    }
590
591    async fn build_runner_with_discriminants(
592        temp_dir: PathBuf,
593        mock_certifier_service: MockCertifierService,
594        allowed_discriminants: Vec<SignedEntityTypeDiscriminants>,
595    ) -> AggregatorRunner {
596        let mut deps = initialize_dependencies(temp_dir).await;
597        deps.certifier_service = Arc::new(mock_certifier_service);
598
599        let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
600        mock_signable_builder_service
601            .expect_compute_protocol_message()
602            .return_once(|_| Ok(ProtocolMessage::default()));
603        deps.signable_builder_service = Arc::new(mock_signable_builder_service);
604
605        // Configure EpochService with allowed_discriminants
606        if !allowed_discriminants.is_empty() {
607            let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
608            let epoch_service = FakeEpochServiceBuilder {
609                signed_entity_config: SignedEntityConfig {
610                    allowed_discriminants: allowed_discriminants.into_iter().collect(),
611                    ..SignedEntityConfig::dummy()
612                },
613                ..FakeEpochServiceBuilder::dummy(current_epoch)
614            }
615            .build();
616            deps.epoch_service = Arc::new(RwLock::new(epoch_service));
617        }
618
619        let runner = build_runner_with_fixture_data(deps).await;
620
621        let current_epoch = runner.dependencies.ticker_service.get_current_epoch().await.unwrap();
622        runner.inform_new_epoch(current_epoch).await.unwrap();
623        runner.precompute_epoch_data().await.unwrap();
624        runner
625    }
626
627    fn init_certifier_service_mock(
628        mock_certifier_service: &mut MockCertifierService,
629        messages: Vec<OpenMessage>,
630    ) {
631        for message in messages {
632            mock_certifier_service
633                .expect_get_open_message()
634                .return_once(|_| Ok(Some(message)))
635                .times(1);
636        }
637        // When all messages are retrieved, the function return None
638        mock_certifier_service
639            .expect_get_open_message()
640            .returning(|_| Ok(None));
641
642        mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
643        mock_certifier_service
644            .expect_mark_open_message_if_expired()
645            .returning(|_| Ok(None));
646    }
647
648    fn create_open_message(
649        is_certified: IsCertified,
650        is_expired: IsExpired,
651        signed_entity_type: SignedEntityType,
652    ) -> OpenMessage {
653        OpenMessage {
654            signed_entity_type,
655            is_certified: is_certified == IsCertified::Yes,
656            is_expired: is_expired == IsExpired::Yes,
657            ..OpenMessage::dummy()
658        }
659    }
660
661    #[derive(Eq, PartialEq)]
662    enum IsCertified {
663        Yes,
664        No,
665    }
666
667    #[derive(Eq, PartialEq)]
668    enum IsExpired {
669        Yes,
670        No,
671    }
672
673    #[tokio::test]
674    async fn test_get_time_point_from_chain() {
675        let expected = TimePoint::new(2, 17, ChainPoint::dummy());
676        let mut dependencies = initialize_dependencies!().await;
677        let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
678        immutable_file_observer
679            .shall_return(Some(expected.immutable_file_number))
680            .await;
681        let ticker_service = Arc::new(MithrilTickerService::new(
682            Arc::new(FakeChainObserver::new(Some(expected.clone()))),
683            immutable_file_observer,
684        ));
685        dependencies.ticker_service = ticker_service;
686        let runner = AggregatorRunner::new(Arc::new(dependencies));
687
688        // Retrieves the expected time point
689        let res = runner.get_time_point_from_chain().await;
690        assert_eq!(expected, res.unwrap());
691    }
692
693    #[tokio::test]
694    async fn test_update_stake_distribution() {
695        let chain_observer = Arc::new(FakeChainObserver::default());
696        let deps = {
697            let mut deps = initialize_dependencies!().await;
698            deps.chain_observer = chain_observer.clone();
699            deps.stake_distribution_service = Arc::new(MithrilStakeDistributionService::new(
700                deps.stake_store.clone(),
701                chain_observer.clone(),
702            ));
703            Arc::new(deps)
704        };
705        let runner = AggregatorRunner::new(deps.clone());
706        let time_point = runner.get_time_point_from_chain().await.unwrap();
707        let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
708        let expected = fixture.stake_distribution();
709
710        chain_observer.set_signers(fixture.signers_with_stake()).await;
711        runner
712            .update_stake_distribution(&time_point)
713            .await
714            .expect("updating stake distribution should not return an error");
715
716        let saved_stake_distribution = deps
717            .stake_store
718            .get_stakes(time_point.epoch.offset_to_recording_epoch())
719            .await
720            .unwrap()
721            .unwrap_or_else(|| {
722                panic!(
723                    "I should have a stake distribution for the epoch {:?}",
724                    time_point.epoch
725                )
726            });
727
728        assert_eq!(expected, saved_stake_distribution);
729    }
730
731    #[tokio::test]
732    async fn test_open_signer_registration_round() {
733        let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
734        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
735
736        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
737            builder.get_verification_key_store().await.unwrap(),
738            builder.get_signer_store().await.unwrap(),
739            builder.get_signer_registration_verifier().await.unwrap(),
740        ));
741        let mut deps = builder.build_serve_dependencies_container().await.unwrap();
742        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
743        let stake_store = deps.stake_store.clone();
744        let deps = Arc::new(deps);
745        let runner = AggregatorRunner::new(deps.clone());
746
747        let time_point = TimePoint::dummy();
748        let recording_epoch = time_point.epoch.offset_to_recording_epoch();
749        let stake_distribution: StakeDistribution =
750            StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
751
752        stake_store
753            .save_stakes(recording_epoch, stake_distribution.clone())
754            .await
755            .expect("Save Stake distribution should not fail");
756
757        runner
758            .open_signer_registration_round(&time_point)
759            .await
760            .expect("opening signer registration should not return an error");
761
762        let saved_current_round = signer_registration_round_opener.get_current_round().await;
763
764        let expected_signer_registration_round =
765            SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
766
767        assert_eq!(
768            Some(expected_signer_registration_round),
769            saved_current_round,
770        );
771    }
772
773    #[tokio::test]
774    async fn test_close_signer_registration_round() {
775        let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
776        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
777
778        let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
779            builder.get_verification_key_store().await.unwrap(),
780            builder.get_signer_store().await.unwrap(),
781            builder.get_signer_registration_verifier().await.unwrap(),
782        ));
783        let mut deps = builder.build_serve_dependencies_container().await.unwrap();
784        deps.signer_registration_round_opener = signer_registration_round_opener.clone();
785        let deps = Arc::new(deps);
786        let runner = AggregatorRunner::new(deps.clone());
787
788        let time_point = TimePoint::dummy();
789        runner
790            .open_signer_registration_round(&time_point)
791            .await
792            .expect("opening signer registration should not return an error");
793
794        runner
795            .close_signer_registration_round()
796            .await
797            .expect("closing signer registration should not return an error");
798
799        let saved_current_round = signer_registration_round_opener.get_current_round().await;
800        assert!(saved_current_round.is_none());
801    }
802
803    #[tokio::test]
804    async fn test_expire_open_message() {
805        let open_message_expected = OpenMessage {
806            signed_entity_type: SignedEntityType::dummy(),
807            is_certified: false,
808            is_expired: false,
809            expires_at: Some(
810                DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
811                    .unwrap()
812                    .with_timezone(&Utc),
813            ),
814            ..OpenMessage::dummy()
815        };
816        let open_message_clone = open_message_expected.clone();
817
818        let mut mock_certifier_service = MockCertifierService::new();
819        mock_certifier_service
820            .expect_mark_open_message_if_expired()
821            .return_once(|_| Ok(Some(open_message_clone)));
822
823        let mut deps = initialize_dependencies!().await;
824        deps.certifier_service = Arc::new(mock_certifier_service);
825
826        let runner = build_runner_with_fixture_data(deps).await;
827        let open_message_expired = runner
828            .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
829            .await
830            .expect("mark_open_message_if_expired should not fail");
831
832        assert_eq!(Some(open_message_expected), open_message_expired);
833    }
834
835    #[tokio::test]
836    async fn test_update_era_checker() {
837        let deps = initialize_dependencies!().await;
838        let ticker_service = deps.ticker_service.clone();
839        let era_checker = deps.era_checker.clone();
840        let mut time_point = ticker_service.get_current_time_point().await.unwrap();
841
842        assert_eq!(time_point.epoch, era_checker.current_epoch());
843        let runner = AggregatorRunner::new(Arc::new(deps));
844        time_point.epoch += 1;
845
846        runner.update_era_checker(time_point.epoch).await.unwrap();
847        assert_eq!(time_point.epoch, era_checker.current_epoch());
848    }
849
850    #[tokio::test]
851    async fn test_inform_new_epoch() {
852        let mut mock_certifier_service = MockCertifierService::new();
853        mock_certifier_service
854            .expect_inform_epoch()
855            .returning(|_| Ok(()))
856            .times(1);
857        let mut deps = initialize_dependencies!().await;
858        let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
859
860        deps.certifier_service = Arc::new(mock_certifier_service);
861        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
862            current_epoch,
863            &MithrilFixtureBuilder::default().build(),
864        )));
865
866        let runner = AggregatorRunner::new(Arc::new(deps));
867
868        runner.inform_new_epoch(current_epoch).await.unwrap();
869    }
870
871    #[tokio::test]
872    async fn test_upkeep_calls_run_on_upkeep_service() {
873        let mut upkeep_service = MockUpkeepService::new();
874        upkeep_service
875            .expect_run()
876            .with(eq(Epoch(5)))
877            .returning(|_| Ok(()))
878            .times(1);
879
880        let mut deps = initialize_dependencies!().await;
881        deps.upkeep_service = Arc::new(upkeep_service);
882
883        let runner = AggregatorRunner::new(Arc::new(deps));
884
885        runner.upkeep(Epoch(5)).await.unwrap();
886    }
887
888    #[tokio::test]
889    async fn test_update_epoch_settings() {
890        let mut mock_certifier_service = MockCertifierService::new();
891        mock_certifier_service
892            .expect_inform_epoch()
893            .returning(|_| Ok(()))
894            .times(1);
895
896        let config = ServeCommandConfiguration::new_sample(temp_dir!());
897        let mut deps = DependenciesBuilder::new_with_stdout_logger(Arc::new(config.clone()))
898            .build_serve_dependencies_container()
899            .await
900            .unwrap();
901        deps.certifier_service = Arc::new(mock_certifier_service);
902        let epoch_settings_storer = deps.epoch_settings_storer.clone();
903        let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
904        let insert_epoch = current_epoch.offset_to_epoch_settings_recording_epoch();
905
906        let runner = build_runner_with_fixture_data(deps).await;
907        runner.inform_new_epoch(current_epoch).await.unwrap();
908        runner
909            .update_epoch_settings()
910            .await
911            .expect("update_epoch_settings should not fail");
912
913        let saved_epoch_settings = epoch_settings_storer
914            .get_epoch_settings(insert_epoch)
915            .await
916            .unwrap()
917            .unwrap_or_else(|| panic!("should have epoch settings for epoch {insert_epoch}",));
918
919        assert_eq!(
920            AggregatorEpochSettings {
921                protocol_parameters: config.protocol_parameters.clone(),
922                cardano_transactions_signing_config: config
923                    .cardano_transactions_signing_config
924                    .clone(),
925            },
926            saved_epoch_settings
927        );
928    }
929
930    #[tokio::test]
931    async fn test_precompute_epoch_data() {
932        let mut deps = initialize_dependencies!().await;
933        let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
934
935        deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
936            current_epoch,
937            &MithrilFixtureBuilder::default().build(),
938        )));
939
940        let runner = AggregatorRunner::new(Arc::new(deps));
941
942        runner.precompute_epoch_data().await.unwrap();
943    }
944
945    #[tokio::test]
946    async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists()
947     {
948        let open_message_created = create_open_message(
949            IsCertified::No,
950            IsExpired::No,
951            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
952        );
953        let open_message_expected = open_message_created.clone();
954
955        let runner = {
956            let mut mock_certifier_service = MockCertifierService::new();
957            init_certifier_service_mock(&mut mock_certifier_service, vec![]);
958
959            mock_certifier_service
960                .expect_create_open_message()
961                .return_once(|_, _| Ok(open_message_created))
962                .times(1);
963            build_runner(temp_dir!(), mock_certifier_service).await
964        };
965
966        let open_message_returned = runner
967            .get_current_non_certified_open_message(&TimePoint::dummy())
968            .await
969            .unwrap();
970        assert_eq!(Some(open_message_expected), open_message_returned);
971    }
972
973    #[tokio::test]
974    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired()
975     {
976        let not_certified_and_not_expired = create_open_message(
977            IsCertified::No,
978            IsExpired::No,
979            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
980        );
981
982        let open_message_expected = not_certified_and_not_expired.clone();
983
984        let runner = {
985            let mut mock_certifier_service = MockCertifierService::new();
986            init_certifier_service_mock(
987                &mut mock_certifier_service,
988                vec![not_certified_and_not_expired],
989            );
990
991            mock_certifier_service.expect_create_open_message().never();
992            build_runner(temp_dir!(), mock_certifier_service).await
993        };
994
995        let open_message_returned = runner
996            .get_current_non_certified_open_message(&TimePoint::dummy())
997            .await
998            .unwrap();
999
1000        assert_eq!(Some(open_message_expected), open_message_returned);
1001    }
1002
1003    #[tokio::test]
1004    async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified()
1005     {
1006        let certified_and_not_expired = create_open_message(
1007            IsCertified::Yes,
1008            IsExpired::No,
1009            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1010        );
1011        let not_certified_and_not_expired = create_open_message(
1012            IsCertified::No,
1013            IsExpired::No,
1014            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1015        );
1016
1017        let open_message_expected = not_certified_and_not_expired.clone();
1018
1019        let runner = {
1020            let mut mock_certifier_service = MockCertifierService::new();
1021            init_certifier_service_mock(
1022                &mut mock_certifier_service,
1023                vec![certified_and_not_expired, not_certified_and_not_expired],
1024            );
1025
1026            mock_certifier_service.expect_create_open_message().never();
1027            build_runner_with_discriminants(
1028                temp_dir!(),
1029                mock_certifier_service,
1030                vec![
1031                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1032                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1033                ],
1034            )
1035            .await
1036        };
1037
1038        let open_message_returned = runner
1039            .get_current_non_certified_open_message(&TimePoint::dummy())
1040            .await
1041            .unwrap();
1042
1043        assert_eq!(Some(open_message_expected), open_message_returned);
1044    }
1045
1046    #[tokio::test]
1047    async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified()
1048     {
1049        let certified_and_not_expired = create_open_message(
1050            IsCertified::Yes,
1051            IsExpired::No,
1052            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1053        );
1054        let open_message_created = create_open_message(
1055            IsCertified::No,
1056            IsExpired::No,
1057            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1058        );
1059        let open_message_expected = open_message_created.clone();
1060
1061        let runner = {
1062            let mut mock_certifier_service = MockCertifierService::new();
1063            init_certifier_service_mock(
1064                &mut mock_certifier_service,
1065                vec![certified_and_not_expired],
1066            );
1067
1068            mock_certifier_service
1069                .expect_create_open_message()
1070                .return_once(|_, _| Ok(open_message_created))
1071                .times(1);
1072            build_runner_with_discriminants(
1073                temp_dir!(),
1074                mock_certifier_service,
1075                vec![
1076                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1077                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1078                ],
1079            )
1080            .await
1081        };
1082
1083        let open_message_returned = runner
1084            .get_current_non_certified_open_message(&TimePoint::dummy())
1085            .await
1086            .unwrap();
1087
1088        assert_eq!(Some(open_message_expected), open_message_returned);
1089    }
1090
1091    #[tokio::test]
1092    async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified()
1093     {
1094        let certified_and_not_expired_1 = create_open_message(
1095            IsCertified::Yes,
1096            IsExpired::No,
1097            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1098        );
1099        let certified_and_not_expired_2 = create_open_message(
1100            IsCertified::Yes,
1101            IsExpired::No,
1102            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1103        );
1104
1105        let runner = {
1106            let mut mock_certifier_service = MockCertifierService::new();
1107            init_certifier_service_mock(
1108                &mut mock_certifier_service,
1109                vec![certified_and_not_expired_1, certified_and_not_expired_2],
1110            );
1111
1112            mock_certifier_service.expect_create_open_message().never();
1113            build_runner_with_discriminants(
1114                temp_dir!(),
1115                mock_certifier_service,
1116                vec![
1117                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1118                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1119                ],
1120            )
1121            .await
1122        };
1123
1124        let open_message_returned = runner
1125            .get_current_non_certified_open_message(&TimePoint::dummy())
1126            .await
1127            .unwrap();
1128
1129        assert!(open_message_returned.is_none());
1130    }
1131
1132    #[tokio::test]
1133    async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message()
1134     {
1135        let not_certified_and_expired = create_open_message(
1136            IsCertified::No,
1137            IsExpired::Yes,
1138            SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1139        );
1140        let not_certified_and_not_expired = create_open_message(
1141            IsCertified::No,
1142            IsExpired::No,
1143            SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1144        );
1145
1146        let open_message_expected = not_certified_and_not_expired.clone();
1147
1148        let runner = {
1149            let mut mock_certifier_service = MockCertifierService::new();
1150            init_certifier_service_mock(
1151                &mut mock_certifier_service,
1152                vec![not_certified_and_expired, not_certified_and_not_expired],
1153            );
1154
1155            mock_certifier_service.expect_create_open_message().never();
1156            build_runner_with_discriminants(
1157                temp_dir!(),
1158                mock_certifier_service,
1159                vec![
1160                    SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1161                    SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1162                ],
1163            )
1164            .await
1165        };
1166
1167        let open_message_returned = runner
1168            .get_current_non_certified_open_message(&TimePoint::dummy())
1169            .await
1170            .unwrap();
1171
1172        assert_eq!(Some(open_message_expected), open_message_returned);
1173    }
1174
1175    #[tokio::test]
1176    async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution() {
1177        let mut mock_certifier_service = MockCertifierService::new();
1178
1179        mock_certifier_service
1180            .expect_get_open_message()
1181            .with(eq(SignedEntityType::MithrilStakeDistribution(
1182                TimePoint::dummy().epoch,
1183            )))
1184            .times(1)
1185            .return_once(|_| {
1186                Ok(Some(create_open_message(
1187                    IsCertified::Yes,
1188                    IsExpired::No,
1189                    SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1190                )))
1191            });
1192
1193        mock_certifier_service.expect_create_open_message().never();
1194
1195        mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
1196        mock_certifier_service
1197            .expect_mark_open_message_if_expired()
1198            .returning(|_| Ok(None));
1199
1200        let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1201
1202        runner
1203            .get_current_non_certified_open_message(&TimePoint::dummy())
1204            .await
1205            .unwrap();
1206    }
1207
1208    #[tokio::test]
1209    async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1210        let runner = {
1211            let mut dependencies = initialize_dependencies!().await;
1212            let epoch_service = FakeEpochServiceBuilder {
1213                signed_entity_config: SignedEntityConfig {
1214                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1215                    ..SignedEntityConfig::dummy()
1216                },
1217                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1218            }
1219            .build();
1220            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1221            dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1222            AggregatorRunner::new(Arc::new(dependencies))
1223        };
1224
1225        let time_point = TimePoint::dummy();
1226        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1227            .list_available_signed_entity_types(&time_point)
1228            .await
1229            .unwrap()
1230            .into_iter()
1231            .map(Into::into)
1232            .collect();
1233
1234        assert_eq!(
1235            signed_entities,
1236            SignedEntityTypeDiscriminants::all().into_iter().collect::<Vec<_>>()
1237        );
1238    }
1239
1240    #[tokio::test]
1241    async fn list_available_signed_entity_types_exclude_locked_entities() {
1242        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1243        let runner = {
1244            let mut dependencies = initialize_dependencies!().await;
1245            dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1246            let epoch_service = FakeEpochServiceBuilder {
1247                signed_entity_config: SignedEntityConfig {
1248                    allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1249                    ..SignedEntityConfig::dummy()
1250                },
1251                ..FakeEpochServiceBuilder::dummy(Epoch(32))
1252            }
1253            .build();
1254            dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1255
1256            AggregatorRunner::new(Arc::new(dependencies))
1257        };
1258
1259        signed_entity_type_lock
1260            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1261            .await;
1262
1263        let time_point = TimePoint::dummy();
1264        let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1265            .list_available_signed_entity_types(&time_point)
1266            .await
1267            .unwrap()
1268            .into_iter()
1269            .map(Into::into)
1270            .collect();
1271
1272        assert!(!signed_entities.is_empty());
1273        assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1274    }
1275
1276    #[tokio::test]
1277    async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message()
1278     {
1279        assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1280    }
1281
1282    #[tokio::test]
1283    async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message()
1284     {
1285        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1286    }
1287
1288    #[tokio::test]
1289    async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message()
1290     {
1291        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1292    }
1293
1294    #[tokio::test]
1295    async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message()
1296     {
1297        assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1298    }
1299
1300    async fn is_outdated_returned_when(
1301        tmp_path: PathBuf,
1302        is_expired: IsExpired,
1303        newer_open_message: bool,
1304    ) -> bool {
1305        let current_time_point = TimePoint {
1306            epoch: Epoch(2),
1307            ..TimePoint::dummy()
1308        };
1309
1310        let message_epoch = if newer_open_message {
1311            current_time_point.epoch + 54
1312        } else {
1313            current_time_point.epoch
1314        };
1315        let open_message_to_verify = OpenMessage {
1316            signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1317            is_expired: is_expired == IsExpired::Yes,
1318            ..OpenMessage::dummy()
1319        };
1320
1321        let runner = {
1322            let mut deps = initialize_dependencies(tmp_path).await;
1323            let mut mock_certifier_service = MockCertifierService::new();
1324
1325            let open_message_current = open_message_to_verify.clone();
1326            mock_certifier_service
1327                .expect_get_open_message()
1328                .times(1)
1329                .return_once(|_| Ok(Some(open_message_current)));
1330            mock_certifier_service
1331                .expect_mark_open_message_if_expired()
1332                .returning(|_| Ok(None));
1333
1334            deps.certifier_service = Arc::new(mock_certifier_service);
1335
1336            let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1337            deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1338
1339            build_runner_with_fixture_data(deps).await
1340        };
1341
1342        runner
1343            .is_open_message_outdated(
1344                open_message_to_verify.signed_entity_type,
1345                &current_time_point,
1346            )
1347            .await
1348            .unwrap()
1349    }
1350}