mithril_aggregator/runtime/
state_machine.rs

1use anyhow::Context;
2use chrono::Local;
3use slog::{info, trace, Logger};
4use std::fmt::Display;
5use std::sync::Arc;
6
7use mithril_common::entities::TimePoint;
8use mithril_common::logging::LoggerExtensions;
9
10use crate::entities::OpenMessage;
11use crate::runtime::{AggregatorRunnerTrait, RuntimeError};
12use crate::AggregatorConfig;
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct IdleState {
16    current_time_point: Option<TimePoint>,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct ReadyState {
21    current_time_point: TimePoint,
22}
23
24#[derive(Clone, Debug, PartialEq)]
25pub struct SigningState {
26    current_time_point: TimePoint,
27    open_message: OpenMessage,
28}
29
30#[derive(Clone, Debug, PartialEq)]
31pub enum AggregatorState {
32    Idle(IdleState),
33    Ready(ReadyState),
34    Signing(SigningState),
35}
36
37impl Display for AggregatorState {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            AggregatorState::Idle(state) => write!(
41                f,
42                "Idle - {}",
43                match &state.current_time_point {
44                    None => "No TimePoint".to_string(),
45                    Some(time_point) => time_point.to_string(),
46                }
47            ),
48            AggregatorState::Ready(state) => write!(f, "Ready - {}", state.current_time_point),
49            AggregatorState::Signing(state) => write!(f, "Signing - {}", state.current_time_point),
50        }
51    }
52}
53
54/// The AggregatorRuntime responsibility is to create a state machine to handle
55/// all actions required by the process of getting multi-signatures.
56/// See the
57/// [documentation](https://mithril.network/doc/mithril/mithril-network/aggregator#under-the-hood)
58/// for more explanations about the Aggregator state machine.
59pub struct AggregatorRuntime {
60    config: AggregatorConfig,
61    state: AggregatorState,
62    runner: Arc<dyn AggregatorRunnerTrait>,
63    logger: Logger,
64}
65
66impl AggregatorRuntime {
67    /// Create a new instance of the state machine.
68    pub async fn new(
69        aggregator_config: AggregatorConfig,
70        init_state: Option<AggregatorState>,
71        runner: Arc<dyn AggregatorRunnerTrait>,
72        logger: Logger,
73    ) -> Result<Self, RuntimeError> {
74        let logger = logger.new_with_component_name::<Self>();
75        info!(logger, "Initializing runtime");
76
77        let state = if let Some(init_state) = init_state {
78            trace!(logger, "Got initial state from caller");
79            init_state
80        } else {
81            trace!(logger, "Idle state, no current time point");
82            AggregatorState::Idle(IdleState {
83                current_time_point: None,
84            })
85        };
86
87        Ok(Self {
88            config: aggregator_config,
89            state,
90            runner,
91            logger,
92        })
93    }
94
95    /// Return the actual state of the state machine.
96    pub fn get_state(&self) -> String {
97        match self.state {
98            AggregatorState::Idle(_) => "idle".to_string(),
99            AggregatorState::Ready(_) => "ready".to_string(),
100            AggregatorState::Signing(_) => "signing".to_string(),
101        }
102    }
103
104    /// Launches an infinite loop ticking the state machine.
105    pub async fn run(&mut self) -> Result<(), RuntimeError> {
106        info!(self.logger, "Launching State Machine");
107        let mut interval = tokio::time::interval(self.config.interval);
108
109        loop {
110            interval.tick().await;
111            // Note: the "time" property in logs produced by our formatter (slog_bunyan) uses local
112            // time, so we must use it as well to avoid confusion.
113            let approximate_next_cycle_time = Local::now() + self.config.interval;
114
115            if let Err(e) = self.cycle().await {
116                e.write_to_log(&self.logger);
117                if e.is_critical() {
118                    return Err(e);
119                }
120            }
121
122            info!(
123                self.logger, "… Cycle finished";
124                "approximate_next_cycle_time" => %approximate_next_cycle_time.time().format("%H:%M:%S%.3f"),
125                "run_interval_in_ms" => self.config.interval.as_millis(),
126            );
127        }
128    }
129
130    /// Perform one tick of the state machine.
131    pub async fn cycle(&mut self) -> Result<(), RuntimeError> {
132        info!(
133            self.logger,
134            "================================================================================"
135        );
136        info!(self.logger, "New cycle: {}", self.state);
137
138        self.runner
139            .increment_runtime_cycle_total_since_startup_counter();
140
141        match self.state.clone() {
142            AggregatorState::Idle(state) => {
143                let last_time_point = self.runner.get_time_point_from_chain().await.with_context(
144                    || "AggregatorRuntime in the state IDLE can not get current time point from chain",
145                )?;
146
147                info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);
148
149                let can_try_transition_from_idle_to_ready = if self.config.is_follower {
150                    self.runner
151                        .is_follower_aggregator_at_same_epoch_as_leader(&last_time_point)
152                        .await?
153                } else {
154                    true
155                };
156                if can_try_transition_from_idle_to_ready {
157                    self.try_transition_from_idle_to_ready(
158                        state.current_time_point,
159                        last_time_point.clone(),
160                    )
161                    .await?;
162                    self.state = AggregatorState::Ready(ReadyState {
163                        current_time_point: last_time_point,
164                    });
165                }
166            }
167            AggregatorState::Ready(state) => {
168                let last_time_point: TimePoint = self
169                    .runner
170                    .get_time_point_from_chain()
171                    .await
172                    .with_context(|| {
173                        "AggregatorRuntime in the state READY can not get current time point from chain"
174                    })?;
175
176                if state.current_time_point.epoch < last_time_point.epoch {
177                    // transition READY > IDLE
178                    info!(self.logger, "→ Epoch has changed, transitioning to IDLE"; "last_time_point" => ?last_time_point);
179                    self.state = AggregatorState::Idle(IdleState {
180                        current_time_point: Some(state.current_time_point),
181                    });
182                } else if let Some(open_message) = self
183                    .runner
184                    .get_current_non_certified_open_message(&last_time_point)
185                    .await
186                    .with_context(|| "AggregatorRuntime can not get the current open message")?
187                {
188                    // transition READY > SIGNING
189                    info!(self.logger, "→ Transitioning to SIGNING");
190                    let new_state = self
191                        .transition_from_ready_to_signing(last_time_point.clone(), open_message.clone())
192                        .await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
193                    self.state = AggregatorState::Signing(new_state);
194                } else {
195                    // READY > READY
196                    info!(
197                        self.logger, " ⋅ No open message to certify, waiting…";
198                        "time_point" => ?state.current_time_point
199                    );
200                    self.state = AggregatorState::Ready(ReadyState {
201                        current_time_point: last_time_point,
202                    });
203                }
204            }
205            AggregatorState::Signing(state) => {
206                let last_time_point: TimePoint =
207                    self.runner.get_time_point_from_chain().await.with_context(|| {
208                        "AggregatorRuntime in the state SIGNING can not get current time point from chain"
209                    })?;
210
211                let is_outdated = self
212                    .runner
213                    .is_open_message_outdated(
214                        state.open_message.signed_entity_type.clone(),
215                        &last_time_point,
216                    )
217                    .await?;
218
219                if state.current_time_point.epoch < last_time_point.epoch {
220                    // SIGNING > IDLE
221                    info!(self.logger, "→ Epoch changed, transitioning to IDLE");
222                    let new_state = self.transition_from_signing_to_idle(state).await?;
223                    self.state = AggregatorState::Idle(new_state);
224                } else if is_outdated {
225                    // SIGNING > READY
226                    info!(
227                        self.logger,
228                        "→ Open message changed, transitioning to READY"
229                    );
230                    let new_state = self
231                        .transition_from_signing_to_ready_new_open_message(state)
232                        .await?;
233                    self.state = AggregatorState::Ready(new_state);
234                } else {
235                    // SIGNING > READY
236                    let new_state = self
237                        .transition_from_signing_to_ready_multisignature(state)
238                        .await?;
239                    info!(self.logger, "→ A multi-signature has been created, build an artifact & a certificate and transitioning back to READY");
240                    self.state = AggregatorState::Ready(new_state);
241                }
242            }
243        }
244
245        self.runner
246            .increment_runtime_cycle_success_since_startup_counter();
247
248        Ok(())
249    }
250
251    /// Perform a transition from `IDLE` state to `READY` state when
252    /// the certificate chain is valid.
253    async fn try_transition_from_idle_to_ready(
254        &mut self,
255        maybe_current_time_point: Option<TimePoint>,
256        new_time_point: TimePoint,
257    ) -> Result<(), RuntimeError> {
258        trace!(self.logger, "Trying transition from IDLE to READY state");
259
260        if maybe_current_time_point.is_none()
261            || maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
262        {
263            self.runner.close_signer_registration_round().await?;
264            self.runner
265                .update_era_checker(new_time_point.epoch)
266                .await
267                .map_err(|e| RuntimeError::critical("transiting IDLE → READY", Some(e)))?;
268            self.runner
269                .update_stake_distribution(&new_time_point)
270                .await?;
271            self.runner.inform_new_epoch(new_time_point.epoch).await?;
272            self.runner.upkeep(new_time_point.epoch).await?;
273            self.runner
274                .open_signer_registration_round(&new_time_point)
275                .await?;
276            self.runner.update_epoch_settings().await?;
277            if self.config.is_follower {
278                self.runner
279                    .synchronize_follower_aggregator_signer_registration()
280                    .await?;
281            }
282            self.runner.precompute_epoch_data().await?;
283        }
284
285        self.runner
286            .is_certificate_chain_valid(&new_time_point)
287            .await
288            .map_err(|e| RuntimeError::KeepState {
289                message: "certificate chain is invalid".to_string(),
290                nested_error: e.into(),
291            })?;
292
293        Ok(())
294    }
295
296    /// Perform a transition from `SIGNING` state to `READY` state when a new
297    /// multi-signature is issued.
298    async fn transition_from_signing_to_ready_multisignature(
299        &self,
300        state: SigningState,
301    ) -> Result<ReadyState, RuntimeError> {
302        trace!(
303            self.logger,
304            "Launching transition from SIGNING to READY state"
305        );
306        let certificate = self
307            .runner
308            .create_certificate(&state.open_message.signed_entity_type)
309            .await?
310            .ok_or_else(|| RuntimeError::KeepState {
311                message: "not enough signature yet to create a certificate, waiting…".to_string(),
312                nested_error: None,
313            })?;
314        self.runner
315            .create_artifact(&state.open_message.signed_entity_type, &certificate)
316            .await
317            .map_err(|e| RuntimeError::ReInit {
318                message: "transiting SIGNING → READY: failed to create artifact. Retrying…"
319                    .to_string(),
320                nested_error: Some(e),
321            })?;
322
323        Ok(ReadyState {
324            current_time_point: state.current_time_point,
325        })
326    }
327
328    /// Perform a transition from `SIGNING` state to `IDLE` state when a new
329    /// epoch is detected.
330    async fn transition_from_signing_to_idle(
331        &self,
332        state: SigningState,
333    ) -> Result<IdleState, RuntimeError> {
334        trace!(
335            self.logger,
336            "Launching transition from SIGNING to IDLE state"
337        );
338
339        Ok(IdleState {
340            current_time_point: Some(state.current_time_point),
341        })
342    }
343
344    /// Perform a transition from `SIGNING` state to `READY` state when a new
345    /// open message is detected.
346    async fn transition_from_signing_to_ready_new_open_message(
347        &self,
348        state: SigningState,
349    ) -> Result<ReadyState, RuntimeError> {
350        trace!(
351            self.logger,
352            "Launching transition from SIGNING to READY state"
353        );
354
355        Ok(ReadyState {
356            current_time_point: state.current_time_point,
357        })
358    }
359
360    /// Perform a transition from `READY` state to `SIGNING` state when a new
361    /// open message is opened.
362    async fn transition_from_ready_to_signing(
363        &mut self,
364        new_time_point: TimePoint,
365        open_message: OpenMessage,
366    ) -> Result<SigningState, RuntimeError> {
367        trace!(
368            self.logger,
369            "Launching transition from READY to SIGNING state"
370        );
371
372        let state = SigningState {
373            current_time_point: new_time_point,
374            open_message,
375        };
376
377        Ok(state)
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use crate::entities::OpenMessage;
384    use anyhow::anyhow;
385    use mockall::predicate;
386    use std::time::Duration;
387
388    use mithril_common::test_utils::fake_data;
389
390    use crate::test_tools::TestLogger;
391
392    use super::super::runner::MockAggregatorRunner;
393    use super::*;
394
395    async fn init_runtime(
396        init_state: Option<AggregatorState>,
397        runner: MockAggregatorRunner,
398        is_follower: bool,
399    ) -> AggregatorRuntime {
400        AggregatorRuntime::new(
401            AggregatorConfig::new(Duration::from_millis(20), is_follower),
402            init_state,
403            Arc::new(runner),
404            TestLogger::stdout(),
405        )
406        .await
407        .unwrap()
408    }
409
410    mod leader {
411        use super::*;
412
413        #[tokio::test]
414        pub async fn idle_check_certificate_chain_is_not_valid() {
415            let mut runner = MockAggregatorRunner::new();
416            runner
417                .expect_get_time_point_from_chain()
418                .once()
419                .returning(|| Ok(TimePoint::dummy()));
420            runner
421                .expect_update_stake_distribution()
422                .with(predicate::eq(TimePoint::dummy()))
423                .once()
424                .returning(|_| Ok(()));
425            runner
426                .expect_close_signer_registration_round()
427                .once()
428                .returning(|| Ok(()));
429            runner
430                .expect_open_signer_registration_round()
431                .once()
432                .returning(|_| Ok(()));
433            runner
434                .expect_is_certificate_chain_valid()
435                .once()
436                .returning(|_| Err(anyhow!("error")));
437            runner
438                .expect_update_era_checker()
439                .with(predicate::eq(TimePoint::dummy().epoch))
440                .once()
441                .returning(|_| Ok(()));
442            runner
443                .expect_inform_new_epoch()
444                .with(predicate::eq(TimePoint::dummy().epoch))
445                .once()
446                .returning(|_| Ok(()));
447            runner
448                .expect_update_epoch_settings()
449                .once()
450                .returning(|| Ok(()));
451            runner
452                .expect_precompute_epoch_data()
453                .once()
454                .returning(|| Ok(()));
455            runner
456                .expect_upkeep()
457                .with(predicate::eq(TimePoint::dummy().epoch))
458                .once()
459                .returning(|_| Ok(()));
460            runner
461                .expect_increment_runtime_cycle_total_since_startup_counter()
462                .once()
463                .returning(|| ());
464            runner
465                .expect_increment_runtime_cycle_success_since_startup_counter()
466                .never();
467
468            let mut runtime = init_runtime(
469                Some(AggregatorState::Idle(IdleState {
470                    current_time_point: None,
471                })),
472                runner,
473                false,
474            )
475            .await;
476            let err = runtime.cycle().await.unwrap_err();
477            assert!(matches!(err, RuntimeError::KeepState { .. }));
478
479            assert_eq!("idle".to_string(), runtime.get_state());
480        }
481
482        #[tokio::test]
483        pub async fn idle_check_certificate_chain_is_valid() {
484            let mut runner = MockAggregatorRunner::new();
485            runner
486                .expect_get_time_point_from_chain()
487                .once()
488                .returning(|| Ok(TimePoint::dummy()));
489            runner
490                .expect_update_stake_distribution()
491                .with(predicate::eq(TimePoint::dummy()))
492                .once()
493                .returning(|_| Ok(()));
494            runner
495                .expect_close_signer_registration_round()
496                .once()
497                .returning(|| Ok(()));
498            runner
499                .expect_open_signer_registration_round()
500                .once()
501                .returning(|_| Ok(()));
502            runner
503                .expect_is_certificate_chain_valid()
504                .once()
505                .returning(|_| Ok(()));
506            runner
507                .expect_update_era_checker()
508                .with(predicate::eq(TimePoint::dummy().epoch))
509                .once()
510                .returning(|_| Ok(()));
511            runner
512                .expect_inform_new_epoch()
513                .with(predicate::eq(TimePoint::dummy().epoch))
514                .once()
515                .returning(|_| Ok(()));
516            runner
517                .expect_update_epoch_settings()
518                .once()
519                .returning(|| Ok(()));
520            runner
521                .expect_precompute_epoch_data()
522                .once()
523                .returning(|| Ok(()));
524            runner
525                .expect_upkeep()
526                .with(predicate::eq(TimePoint::dummy().epoch))
527                .once()
528                .returning(|_| Ok(()));
529            runner
530                .expect_increment_runtime_cycle_total_since_startup_counter()
531                .once()
532                .returning(|| ());
533            runner
534                .expect_increment_runtime_cycle_success_since_startup_counter()
535                .once()
536                .returning(|| ());
537
538            let mut runtime = init_runtime(
539                Some(AggregatorState::Idle(IdleState {
540                    current_time_point: None,
541                })),
542                runner,
543                false,
544            )
545            .await;
546            runtime.cycle().await.unwrap();
547
548            assert_eq!("ready".to_string(), runtime.get_state());
549        }
550
551        #[tokio::test]
552        pub async fn ready_new_epoch_detected() {
553            let mut runner = MockAggregatorRunner::new();
554            let time_point = TimePoint::dummy();
555            let new_time_point = TimePoint {
556                epoch: time_point.epoch + 1,
557                ..time_point.clone()
558            };
559            runner
560                .expect_get_time_point_from_chain()
561                .once()
562                .returning(move || Ok(new_time_point.clone()));
563            runner
564                .expect_increment_runtime_cycle_total_since_startup_counter()
565                .once()
566                .returning(|| ());
567            runner
568                .expect_increment_runtime_cycle_success_since_startup_counter()
569                .once()
570                .returning(|| ());
571            let mut runtime = init_runtime(
572                Some(AggregatorState::Ready(ReadyState {
573                    current_time_point: time_point,
574                })),
575                runner,
576                false,
577            )
578            .await;
579            runtime.cycle().await.unwrap();
580
581            assert_eq!("idle".to_string(), runtime.get_state());
582        }
583
584        #[tokio::test]
585        pub async fn ready_open_message_not_exist() {
586            let mut runner = MockAggregatorRunner::new();
587            let time_point = TimePoint::dummy();
588            let next_time_point = TimePoint {
589                immutable_file_number: time_point.immutable_file_number + 1,
590                ..time_point.clone()
591            };
592            let expected_time_point = next_time_point.clone();
593            runner
594                .expect_get_time_point_from_chain()
595                .once()
596                .returning(move || Ok(next_time_point.clone()));
597            runner
598                .expect_get_current_non_certified_open_message()
599                .once()
600                .returning(|_| Ok(None));
601            runner
602                .expect_increment_runtime_cycle_total_since_startup_counter()
603                .once()
604                .returning(|| ());
605            runner
606                .expect_increment_runtime_cycle_success_since_startup_counter()
607                .once()
608                .returning(|| ());
609            let mut runtime = init_runtime(
610                Some(AggregatorState::Ready(ReadyState {
611                    current_time_point: time_point.clone(),
612                })),
613                runner,
614                false,
615            )
616            .await;
617            runtime.cycle().await.unwrap();
618
619            assert_eq!("ready".to_string(), runtime.get_state());
620            assert_eq!(
621                AggregatorState::Ready(ReadyState {
622                    current_time_point: expected_time_point,
623                }),
624                runtime.state
625            );
626        }
627
628        #[tokio::test]
629        pub async fn ready_certificate_does_not_exist_for_time_point() {
630            let mut runner = MockAggregatorRunner::new();
631            runner
632                .expect_get_time_point_from_chain()
633                .once()
634                .returning(|| Ok(TimePoint::dummy()));
635            runner
636                .expect_get_current_non_certified_open_message()
637                .once()
638                .returning(|_| {
639                    let open_message = OpenMessage {
640                        is_certified: false,
641                        ..OpenMessage::dummy()
642                    };
643                    Ok(Some(open_message))
644                });
645            runner
646                .expect_increment_runtime_cycle_total_since_startup_counter()
647                .once()
648                .returning(|| ());
649            runner
650                .expect_increment_runtime_cycle_success_since_startup_counter()
651                .once()
652                .returning(|| ());
653
654            let mut runtime = init_runtime(
655                Some(AggregatorState::Ready(ReadyState {
656                    current_time_point: TimePoint::dummy(),
657                })),
658                runner,
659                false,
660            )
661            .await;
662            runtime.cycle().await.unwrap();
663
664            assert_eq!("signing".to_string(), runtime.get_state());
665        }
666
667        #[tokio::test]
668        async fn signing_changing_open_message_to_ready() {
669            let mut runner = MockAggregatorRunner::new();
670            runner
671                .expect_get_time_point_from_chain()
672                .once()
673                .returning(|| Ok(TimePoint::dummy()));
674            runner
675                .expect_is_open_message_outdated()
676                .once()
677                .returning(|_, _| Ok(true));
678            runner
679                .expect_increment_runtime_cycle_total_since_startup_counter()
680                .once()
681                .returning(|| ());
682            runner
683                .expect_increment_runtime_cycle_success_since_startup_counter()
684                .once()
685                .returning(|| ());
686
687            let initial_state = AggregatorState::Signing(SigningState {
688                current_time_point: TimePoint::dummy(),
689                open_message: OpenMessage::dummy(),
690            });
691
692            let mut runtime = init_runtime(Some(initial_state), runner, false).await;
693            runtime.cycle().await.unwrap();
694
695            assert_eq!("ready".to_string(), runtime.get_state());
696        }
697
698        #[tokio::test]
699        async fn signing_certificate_is_not_created() {
700            let mut runner = MockAggregatorRunner::new();
701            runner
702                .expect_get_time_point_from_chain()
703                .once()
704                .returning(|| Ok(TimePoint::dummy()));
705            runner
706                .expect_is_open_message_outdated()
707                .once()
708                .returning(|_, _| Ok(false));
709            runner
710                .expect_create_certificate()
711                .once()
712                .returning(|_| Ok(None));
713            runner
714                .expect_increment_runtime_cycle_total_since_startup_counter()
715                .once()
716                .returning(|| ());
717            runner
718                .expect_increment_runtime_cycle_success_since_startup_counter()
719                .never();
720            let state = SigningState {
721                current_time_point: TimePoint::dummy(),
722                open_message: OpenMessage::dummy(),
723            };
724            let mut runtime =
725                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
726            let err = runtime
727                .cycle()
728                .await
729                .expect_err("cycle should have returned an error");
730
731            match err {
732                RuntimeError::KeepState { .. } => (),
733                _ => panic!("KeepState error expected, got {err:?}."),
734            };
735
736            assert_eq!("signing".to_string(), runtime.get_state());
737        }
738
739        #[tokio::test]
740        async fn signing_artifact_not_created() {
741            let mut runner = MockAggregatorRunner::new();
742            runner
743                .expect_get_time_point_from_chain()
744                .once()
745                .returning(|| Ok(TimePoint::dummy()));
746            runner
747                .expect_is_open_message_outdated()
748                .once()
749                .returning(|_, _| Ok(false));
750            runner
751                .expect_create_certificate()
752                .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
753            runner
754                .expect_create_artifact()
755                .once()
756                .returning(|_, _| Err(anyhow!("whatever")));
757            runner
758                .expect_increment_runtime_cycle_total_since_startup_counter()
759                .once()
760                .returning(|| ());
761            runner
762                .expect_increment_runtime_cycle_success_since_startup_counter()
763                .never();
764            let state = SigningState {
765                current_time_point: TimePoint::dummy(),
766                open_message: OpenMessage::dummy(),
767            };
768            let mut runtime =
769                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
770            let err = runtime
771                .cycle()
772                .await
773                .expect_err("cycle should have returned an error");
774
775            match err {
776                RuntimeError::ReInit { .. } => (),
777                _ => panic!("ReInit error expected, got {err:?}."),
778            };
779
780            assert_eq!("signing".to_string(), runtime.get_state());
781        }
782
783        #[tokio::test]
784        async fn signing_certificate_is_created() {
785            let mut runner = MockAggregatorRunner::new();
786            runner
787                .expect_get_time_point_from_chain()
788                .once()
789                .returning(|| Ok(TimePoint::dummy()));
790            runner
791                .expect_is_open_message_outdated()
792                .once()
793                .returning(|_, _| Ok(false));
794            runner
795                .expect_create_certificate()
796                .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
797            runner
798                .expect_create_artifact()
799                .once()
800                .returning(|_, _| Ok(()));
801            runner
802                .expect_increment_runtime_cycle_total_since_startup_counter()
803                .once()
804                .returning(|| ());
805            runner
806                .expect_increment_runtime_cycle_success_since_startup_counter()
807                .once()
808                .returning(|| ());
809
810            let state = SigningState {
811                current_time_point: TimePoint::dummy(),
812                open_message: OpenMessage::dummy(),
813            };
814            let mut runtime =
815                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
816            runtime.cycle().await.unwrap();
817
818            assert_eq!("ready".to_string(), runtime.get_state());
819        }
820
821        #[tokio::test]
822        pub async fn critical_error() {
823            let mut runner = MockAggregatorRunner::new();
824            runner
825                .expect_get_time_point_from_chain()
826                .once()
827                .returning(|| Ok(TimePoint::dummy()));
828            runner
829                .expect_update_era_checker()
830                .with(predicate::eq(TimePoint::dummy().epoch))
831                .once()
832                .returning(|_| Err(anyhow!("ERROR")));
833            runner
834                .expect_close_signer_registration_round()
835                .once()
836                .returning(|| Ok(()));
837            runner
838                .expect_increment_runtime_cycle_total_since_startup_counter()
839                .once()
840                .returning(|| ());
841            runner
842                .expect_increment_runtime_cycle_success_since_startup_counter()
843                .never();
844
845            let mut runtime = init_runtime(
846                Some(AggregatorState::Idle(IdleState {
847                    current_time_point: None,
848                })),
849                runner,
850                false,
851            )
852            .await;
853            runtime.cycle().await.unwrap_err();
854
855            assert_eq!("idle".to_string(), runtime.get_state());
856        }
857    }
858
859    mod follower {
860        use super::*;
861
862        #[tokio::test]
863        pub async fn idle_new_epoch_detected_and_leader_not_transitioned_to_epoch() {
864            let mut runner = MockAggregatorRunner::new();
865            let time_point = TimePoint::dummy();
866            let new_time_point = TimePoint {
867                epoch: time_point.epoch + 1,
868                ..time_point.clone()
869            };
870            runner
871                .expect_get_time_point_from_chain()
872                .once()
873                .returning(move || Ok(new_time_point.clone()));
874            runner
875                .expect_is_follower_aggregator_at_same_epoch_as_leader()
876                .once()
877                .returning(|_| Ok(false));
878            runner
879                .expect_increment_runtime_cycle_total_since_startup_counter()
880                .once()
881                .returning(|| ());
882            runner
883                .expect_increment_runtime_cycle_success_since_startup_counter()
884                .once()
885                .returning(|| ());
886            let mut runtime = init_runtime(
887                Some(AggregatorState::Idle(IdleState {
888                    current_time_point: Some(time_point),
889                })),
890                runner,
891                true,
892            )
893            .await;
894            runtime.cycle().await.unwrap();
895
896            assert_eq!("idle".to_string(), runtime.get_state());
897        }
898
899        #[tokio::test]
900        pub async fn idle_new_epoch_detected_and_leader_has_transitioned_to_epoch() {
901            let mut runner = MockAggregatorRunner::new();
902            let time_point = TimePoint::dummy();
903            let new_time_point = TimePoint {
904                epoch: time_point.epoch + 1,
905                ..time_point.clone()
906            };
907            let new_time_point_clone = new_time_point.clone();
908            runner
909                .expect_get_time_point_from_chain()
910                .once()
911                .returning(move || Ok(new_time_point.clone()));
912            runner
913                .expect_is_follower_aggregator_at_same_epoch_as_leader()
914                .once()
915                .returning(|_| Ok(true));
916            runner
917                .expect_update_stake_distribution()
918                .with(predicate::eq(new_time_point_clone.clone()))
919                .once()
920                .returning(|_| Ok(()));
921            runner
922                .expect_close_signer_registration_round()
923                .once()
924                .returning(|| Ok(()));
925            runner
926                .expect_synchronize_follower_aggregator_signer_registration()
927                .once()
928                .returning(|| Ok(()));
929            runner
930                .expect_open_signer_registration_round()
931                .once()
932                .returning(|_| Ok(()));
933            runner
934                .expect_is_certificate_chain_valid()
935                .once()
936                .returning(|_| Ok(()));
937            runner
938                .expect_update_era_checker()
939                .with(predicate::eq(new_time_point_clone.clone().epoch))
940                .once()
941                .returning(|_| Ok(()));
942            runner
943                .expect_inform_new_epoch()
944                .with(predicate::eq(new_time_point_clone.clone().epoch))
945                .once()
946                .returning(|_| Ok(()));
947            runner
948                .expect_update_epoch_settings()
949                .once()
950                .returning(|| Ok(()));
951            runner
952                .expect_precompute_epoch_data()
953                .once()
954                .returning(|| Ok(()));
955            runner
956                .expect_upkeep()
957                .with(predicate::eq(new_time_point_clone.clone().epoch))
958                .once()
959                .returning(|_| Ok(()));
960            runner
961                .expect_increment_runtime_cycle_total_since_startup_counter()
962                .once()
963                .returning(|| ());
964            runner
965                .expect_increment_runtime_cycle_success_since_startup_counter()
966                .once()
967                .returning(|| ());
968            let mut runtime = init_runtime(
969                Some(AggregatorState::Idle(IdleState {
970                    current_time_point: Some(time_point),
971                })),
972                runner,
973                true,
974            )
975            .await;
976            runtime.cycle().await.unwrap();
977
978            assert_eq!("ready".to_string(), runtime.get_state());
979        }
980    }
981}