mithril_aggregator/runtime/
state_machine.rs

1use anyhow::Context;
2use chrono::Local;
3use slog::{Logger, info, trace};
4use std::fmt::Display;
5use std::sync::Arc;
6
7use mithril_common::entities::TimePoint;
8use mithril_common::logging::LoggerExtensions;
9
10use crate::AggregatorConfig;
11use crate::entities::OpenMessage;
12use crate::runtime::{AggregatorRunnerTrait, RuntimeError};
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct IdleState {
16    current_time_point: Option<TimePoint>,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct ReadyState {
21    current_time_point: TimePoint,
22}
23
24#[derive(Clone, Debug, PartialEq)]
25pub struct SigningState {
26    current_time_point: TimePoint,
27    open_message: OpenMessage,
28}
29
30#[derive(Clone, Debug, PartialEq)]
31pub enum AggregatorState {
32    Idle(IdleState),
33    Ready(ReadyState),
34    Signing(SigningState),
35}
36
37impl Display for AggregatorState {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            AggregatorState::Idle(state) => write!(
41                f,
42                "Idle - {}",
43                match &state.current_time_point {
44                    None => "No TimePoint".to_string(),
45                    Some(time_point) => time_point.to_string(),
46                }
47            ),
48            AggregatorState::Ready(state) => write!(f, "Ready - {}", state.current_time_point),
49            AggregatorState::Signing(state) => write!(f, "Signing - {}", state.current_time_point),
50        }
51    }
52}
53
54/// The AggregatorRuntime responsibility is to create a state machine to handle
55/// all actions required by the process of getting multi-signatures.
56/// See the
57/// [documentation](https://mithril.network/doc/mithril/mithril-network/aggregator#under-the-hood)
58/// for more explanations about the Aggregator state machine.
59pub struct AggregatorRuntime {
60    config: AggregatorConfig,
61    state: AggregatorState,
62    runner: Arc<dyn AggregatorRunnerTrait>,
63    logger: Logger,
64}
65
66impl AggregatorRuntime {
67    /// Create a new instance of the state machine.
68    pub async fn new(
69        aggregator_config: AggregatorConfig,
70        init_state: Option<AggregatorState>,
71        runner: Arc<dyn AggregatorRunnerTrait>,
72        logger: Logger,
73    ) -> Result<Self, RuntimeError> {
74        let logger = logger.new_with_component_name::<Self>();
75        info!(logger, "Initializing runtime");
76
77        let state = if let Some(init_state) = init_state {
78            trace!(logger, "Got initial state from caller");
79            init_state
80        } else {
81            trace!(logger, "Idle state, no current time point");
82            AggregatorState::Idle(IdleState {
83                current_time_point: None,
84            })
85        };
86
87        Ok(Self {
88            config: aggregator_config,
89            state,
90            runner,
91            logger,
92        })
93    }
94
95    /// Return the actual state of the state machine.
96    pub fn get_state(&self) -> String {
97        match self.state {
98            AggregatorState::Idle(_) => "idle".to_string(),
99            AggregatorState::Ready(_) => "ready".to_string(),
100            AggregatorState::Signing(_) => "signing".to_string(),
101        }
102    }
103
104    /// Launches an infinite loop ticking the state machine.
105    pub async fn run(&mut self) -> Result<(), RuntimeError> {
106        info!(self.logger, "Launching State Machine");
107        let mut interval = tokio::time::interval(self.config.interval);
108
109        loop {
110            interval.tick().await;
111            // Note: the "time" property in logs produced by our formatter (slog_bunyan) uses local
112            // time, so we must use it as well to avoid confusion.
113            let approximate_next_cycle_time = Local::now() + self.config.interval;
114
115            if let Err(e) = self.cycle().await {
116                e.write_to_log(&self.logger);
117                if e.is_critical() {
118                    return Err(e);
119                }
120            }
121
122            info!(
123                self.logger, "… Cycle finished";
124                "approximate_next_cycle_time" => %approximate_next_cycle_time.time().format("%H:%M:%S%.3f"),
125                "run_interval_in_ms" => self.config.interval.as_millis(),
126            );
127        }
128    }
129
130    /// Perform one tick of the state machine.
131    pub async fn cycle(&mut self) -> Result<(), RuntimeError> {
132        info!(
133            self.logger,
134            "================================================================================"
135        );
136        info!(self.logger, "New cycle: {}", self.state);
137
138        self.runner.increment_runtime_cycle_total_since_startup_counter();
139
140        match self.state.clone() {
141            AggregatorState::Idle(state) => {
142                let last_time_point = self.runner.get_time_point_from_chain().await.with_context(
143                    || "AggregatorRuntime in the state IDLE can not get current time point from chain",
144                )?;
145
146                info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);
147
148                let can_try_transition_from_idle_to_ready = if self.config.is_follower {
149                    self.runner
150                        .is_follower_aggregator_at_same_epoch_as_leader(&last_time_point)
151                        .await?
152                } else {
153                    true
154                };
155                if can_try_transition_from_idle_to_ready {
156                    self.try_transition_from_idle_to_ready(
157                        state.current_time_point,
158                        last_time_point.clone(),
159                    )
160                    .await?;
161                    self.state = AggregatorState::Ready(ReadyState {
162                        current_time_point: last_time_point,
163                    });
164                }
165            }
166            AggregatorState::Ready(state) => {
167                let last_time_point: TimePoint = self
168                    .runner
169                    .get_time_point_from_chain()
170                    .await
171                    .with_context(|| {
172                        "AggregatorRuntime in the state READY can not get current time point from chain"
173                    })?;
174
175                if state.current_time_point.epoch < last_time_point.epoch {
176                    // transition READY > IDLE
177                    info!(self.logger, "→ Epoch has changed, transitioning to IDLE"; "last_time_point" => ?last_time_point);
178                    self.state = AggregatorState::Idle(IdleState {
179                        current_time_point: Some(state.current_time_point),
180                    });
181                } else if let Some(open_message) = self
182                    .runner
183                    .get_current_non_certified_open_message(&last_time_point)
184                    .await
185                    .with_context(|| "AggregatorRuntime can not get the current open message")?
186                {
187                    // transition READY > SIGNING
188                    info!(self.logger, "→ Transitioning to SIGNING");
189                    let new_state = self
190                        .transition_from_ready_to_signing(last_time_point.clone(), open_message.clone())
191                        .await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
192                    self.state = AggregatorState::Signing(new_state);
193                } else {
194                    // READY > READY
195                    info!(
196                        self.logger, " ⋅ No open message to certify, waiting…";
197                        "time_point" => ?state.current_time_point
198                    );
199                    self.state = AggregatorState::Ready(ReadyState {
200                        current_time_point: last_time_point,
201                    });
202                }
203            }
204            AggregatorState::Signing(state) => {
205                let last_time_point: TimePoint =
206                    self.runner.get_time_point_from_chain().await.with_context(|| {
207                        "AggregatorRuntime in the state SIGNING can not get current time point from chain"
208                    })?;
209
210                let is_outdated = self
211                    .runner
212                    .is_open_message_outdated(
213                        state.open_message.signed_entity_type.clone(),
214                        &last_time_point,
215                    )
216                    .await?;
217
218                if state.current_time_point.epoch < last_time_point.epoch {
219                    // SIGNING > IDLE
220                    info!(self.logger, "→ Epoch changed, transitioning to IDLE");
221                    let new_state = self.transition_from_signing_to_idle(state).await?;
222                    self.state = AggregatorState::Idle(new_state);
223                } else if is_outdated {
224                    // SIGNING > READY
225                    info!(
226                        self.logger,
227                        "→ Open message changed, transitioning to READY"
228                    );
229                    let new_state =
230                        self.transition_from_signing_to_ready_new_open_message(state).await?;
231                    self.state = AggregatorState::Ready(new_state);
232                } else {
233                    // SIGNING > READY
234                    let new_state =
235                        self.transition_from_signing_to_ready_multisignature(state).await?;
236                    info!(
237                        self.logger,
238                        "→ A multi-signature has been created, build an artifact & a certificate and transitioning back to READY"
239                    );
240                    self.state = AggregatorState::Ready(new_state);
241                }
242            }
243        }
244
245        self.runner.increment_runtime_cycle_success_since_startup_counter();
246
247        Ok(())
248    }
249
250    /// Perform a transition from `IDLE` state to `READY` state when
251    /// the certificate chain is valid.
252    async fn try_transition_from_idle_to_ready(
253        &mut self,
254        maybe_current_time_point: Option<TimePoint>,
255        new_time_point: TimePoint,
256    ) -> Result<(), RuntimeError> {
257        trace!(self.logger, "Trying transition from IDLE to READY state");
258
259        if maybe_current_time_point.is_none()
260            || maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
261        {
262            self.runner.close_signer_registration_round().await?;
263            self.runner
264                .update_era_checker(new_time_point.epoch)
265                .await
266                .map_err(|e| RuntimeError::critical("transiting IDLE → READY", Some(e)))?;
267            self.runner.update_stake_distribution(&new_time_point).await?;
268            self.runner.inform_new_epoch(new_time_point.epoch).await?;
269            self.runner.upkeep(new_time_point.epoch).await?;
270            self.runner.open_signer_registration_round(&new_time_point).await?;
271            self.runner.update_epoch_settings().await?;
272            if self.config.is_follower {
273                self.runner
274                    .synchronize_follower_aggregator_signer_registration()
275                    .await?;
276            }
277            self.runner.precompute_epoch_data().await?;
278        }
279
280        let chain_validity_result = self
281            .runner
282            .is_certificate_chain_valid(&new_time_point)
283            .await
284            .map_err(|e| RuntimeError::KeepState {
285                message: "certificate chain is invalid".to_string(),
286                nested_error: e.into(),
287            });
288        if self.config.is_follower {
289            let force_sync = chain_validity_result.is_err();
290            self.runner
291                .synchronize_follower_aggregator_certificate_chain(force_sync)
292                .await?;
293        }
294        chain_validity_result?;
295
296        Ok(())
297    }
298
299    /// Perform a transition from `SIGNING` state to `READY` state when a new
300    /// multi-signature is issued.
301    async fn transition_from_signing_to_ready_multisignature(
302        &self,
303        state: SigningState,
304    ) -> Result<ReadyState, RuntimeError> {
305        trace!(
306            self.logger,
307            "Launching transition from SIGNING to READY state"
308        );
309        let certificate = self
310            .runner
311            .create_certificate(&state.open_message.signed_entity_type)
312            .await?
313            .ok_or_else(|| RuntimeError::KeepState {
314                message: "not enough signature yet to create a certificate, waiting…".to_string(),
315                nested_error: None,
316            })?;
317        self.runner
318            .create_artifact(&state.open_message.signed_entity_type, &certificate)
319            .await
320            .map_err(|e| RuntimeError::ReInit {
321                message: "transiting SIGNING → READY: failed to create artifact. Retrying…"
322                    .to_string(),
323                nested_error: Some(e),
324            })?;
325
326        Ok(ReadyState {
327            current_time_point: state.current_time_point,
328        })
329    }
330
331    /// Perform a transition from `SIGNING` state to `IDLE` state when a new
332    /// epoch is detected.
333    async fn transition_from_signing_to_idle(
334        &self,
335        state: SigningState,
336    ) -> Result<IdleState, RuntimeError> {
337        trace!(
338            self.logger,
339            "Launching transition from SIGNING to IDLE state"
340        );
341
342        Ok(IdleState {
343            current_time_point: Some(state.current_time_point),
344        })
345    }
346
347    /// Perform a transition from `SIGNING` state to `READY` state when a new
348    /// open message is detected.
349    async fn transition_from_signing_to_ready_new_open_message(
350        &self,
351        state: SigningState,
352    ) -> Result<ReadyState, RuntimeError> {
353        trace!(
354            self.logger,
355            "Launching transition from SIGNING to READY state"
356        );
357
358        Ok(ReadyState {
359            current_time_point: state.current_time_point,
360        })
361    }
362
363    /// Perform a transition from `READY` state to `SIGNING` state when a new
364    /// open message is opened.
365    async fn transition_from_ready_to_signing(
366        &mut self,
367        new_time_point: TimePoint,
368        open_message: OpenMessage,
369    ) -> Result<SigningState, RuntimeError> {
370        trace!(
371            self.logger,
372            "Launching transition from READY to SIGNING state"
373        );
374
375        let state = SigningState {
376            current_time_point: new_time_point,
377            open_message,
378        };
379
380        Ok(state)
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use anyhow::anyhow;
387    use mockall::predicate;
388    use std::time::Duration;
389
390    use mithril_common::test::double::{Dummy, fake_data};
391
392    use crate::entities::OpenMessage;
393    use crate::test::TestLogger;
394
395    use super::super::runner::MockAggregatorRunner;
396    use super::*;
397
398    async fn init_runtime(
399        init_state: Option<AggregatorState>,
400        runner: MockAggregatorRunner,
401        is_follower: bool,
402    ) -> AggregatorRuntime {
403        AggregatorRuntime::new(
404            AggregatorConfig::new(Duration::from_millis(20), is_follower),
405            init_state,
406            Arc::new(runner),
407            TestLogger::stdout(),
408        )
409        .await
410        .unwrap()
411    }
412
413    mod leader {
414        use super::*;
415
416        #[tokio::test]
417        pub async fn idle_check_certificate_chain_is_not_valid() {
418            let mut runner = MockAggregatorRunner::new();
419            runner
420                .expect_get_time_point_from_chain()
421                .once()
422                .returning(|| Ok(TimePoint::dummy()));
423            runner
424                .expect_update_stake_distribution()
425                .with(predicate::eq(TimePoint::dummy()))
426                .once()
427                .returning(|_| Ok(()));
428            runner
429                .expect_close_signer_registration_round()
430                .once()
431                .returning(|| Ok(()));
432            runner
433                .expect_open_signer_registration_round()
434                .once()
435                .returning(|_| Ok(()));
436            runner
437                .expect_is_certificate_chain_valid()
438                .once()
439                .returning(|_| Err(anyhow!("error")));
440            runner
441                .expect_update_era_checker()
442                .with(predicate::eq(TimePoint::dummy().epoch))
443                .once()
444                .returning(|_| Ok(()));
445            runner
446                .expect_inform_new_epoch()
447                .with(predicate::eq(TimePoint::dummy().epoch))
448                .once()
449                .returning(|_| Ok(()));
450            runner.expect_update_epoch_settings().once().returning(|| Ok(()));
451            runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
452            runner
453                .expect_upkeep()
454                .with(predicate::eq(TimePoint::dummy().epoch))
455                .once()
456                .returning(|_| Ok(()));
457            runner
458                .expect_increment_runtime_cycle_total_since_startup_counter()
459                .once()
460                .returning(|| ());
461            runner
462                .expect_increment_runtime_cycle_success_since_startup_counter()
463                .never();
464
465            let mut runtime = init_runtime(
466                Some(AggregatorState::Idle(IdleState {
467                    current_time_point: None,
468                })),
469                runner,
470                false,
471            )
472            .await;
473            let err = runtime.cycle().await.unwrap_err();
474            assert!(matches!(err, RuntimeError::KeepState { .. }));
475
476            assert_eq!("idle".to_string(), runtime.get_state());
477        }
478
479        #[tokio::test]
480        pub async fn idle_check_certificate_chain_is_valid() {
481            let mut runner = MockAggregatorRunner::new();
482            runner
483                .expect_get_time_point_from_chain()
484                .once()
485                .returning(|| Ok(TimePoint::dummy()));
486            runner
487                .expect_update_stake_distribution()
488                .with(predicate::eq(TimePoint::dummy()))
489                .once()
490                .returning(|_| Ok(()));
491            runner
492                .expect_close_signer_registration_round()
493                .once()
494                .returning(|| Ok(()));
495            runner
496                .expect_open_signer_registration_round()
497                .once()
498                .returning(|_| Ok(()));
499            runner
500                .expect_is_certificate_chain_valid()
501                .once()
502                .returning(|_| Ok(()));
503            runner
504                .expect_update_era_checker()
505                .with(predicate::eq(TimePoint::dummy().epoch))
506                .once()
507                .returning(|_| Ok(()));
508            runner
509                .expect_inform_new_epoch()
510                .with(predicate::eq(TimePoint::dummy().epoch))
511                .once()
512                .returning(|_| Ok(()));
513            runner.expect_update_epoch_settings().once().returning(|| Ok(()));
514            runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
515            runner
516                .expect_upkeep()
517                .with(predicate::eq(TimePoint::dummy().epoch))
518                .once()
519                .returning(|_| Ok(()));
520            runner
521                .expect_increment_runtime_cycle_total_since_startup_counter()
522                .once()
523                .returning(|| ());
524            runner
525                .expect_increment_runtime_cycle_success_since_startup_counter()
526                .once()
527                .returning(|| ());
528
529            let mut runtime = init_runtime(
530                Some(AggregatorState::Idle(IdleState {
531                    current_time_point: None,
532                })),
533                runner,
534                false,
535            )
536            .await;
537            runtime.cycle().await.unwrap();
538
539            assert_eq!("ready".to_string(), runtime.get_state());
540        }
541
542        #[tokio::test]
543        pub async fn ready_new_epoch_detected() {
544            let mut runner = MockAggregatorRunner::new();
545            let time_point = TimePoint::dummy();
546            let new_time_point = TimePoint {
547                epoch: time_point.epoch + 1,
548                ..time_point.clone()
549            };
550            runner
551                .expect_get_time_point_from_chain()
552                .once()
553                .returning(move || Ok(new_time_point.clone()));
554            runner
555                .expect_increment_runtime_cycle_total_since_startup_counter()
556                .once()
557                .returning(|| ());
558            runner
559                .expect_increment_runtime_cycle_success_since_startup_counter()
560                .once()
561                .returning(|| ());
562            let mut runtime = init_runtime(
563                Some(AggregatorState::Ready(ReadyState {
564                    current_time_point: time_point,
565                })),
566                runner,
567                false,
568            )
569            .await;
570            runtime.cycle().await.unwrap();
571
572            assert_eq!("idle".to_string(), runtime.get_state());
573        }
574
575        #[tokio::test]
576        pub async fn ready_open_message_not_exist() {
577            let mut runner = MockAggregatorRunner::new();
578            let time_point = TimePoint::dummy();
579            let next_time_point = TimePoint {
580                immutable_file_number: time_point.immutable_file_number + 1,
581                ..time_point.clone()
582            };
583            let expected_time_point = next_time_point.clone();
584            runner
585                .expect_get_time_point_from_chain()
586                .once()
587                .returning(move || Ok(next_time_point.clone()));
588            runner
589                .expect_get_current_non_certified_open_message()
590                .once()
591                .returning(|_| Ok(None));
592            runner
593                .expect_increment_runtime_cycle_total_since_startup_counter()
594                .once()
595                .returning(|| ());
596            runner
597                .expect_increment_runtime_cycle_success_since_startup_counter()
598                .once()
599                .returning(|| ());
600            let mut runtime = init_runtime(
601                Some(AggregatorState::Ready(ReadyState {
602                    current_time_point: time_point.clone(),
603                })),
604                runner,
605                false,
606            )
607            .await;
608            runtime.cycle().await.unwrap();
609
610            assert_eq!("ready".to_string(), runtime.get_state());
611            assert_eq!(
612                AggregatorState::Ready(ReadyState {
613                    current_time_point: expected_time_point,
614                }),
615                runtime.state
616            );
617        }
618
619        #[tokio::test]
620        pub async fn ready_certificate_does_not_exist_for_time_point() {
621            let mut runner = MockAggregatorRunner::new();
622            runner
623                .expect_get_time_point_from_chain()
624                .once()
625                .returning(|| Ok(TimePoint::dummy()));
626            runner
627                .expect_get_current_non_certified_open_message()
628                .once()
629                .returning(|_| {
630                    let open_message = OpenMessage {
631                        is_certified: false,
632                        ..OpenMessage::dummy()
633                    };
634                    Ok(Some(open_message))
635                });
636            runner
637                .expect_increment_runtime_cycle_total_since_startup_counter()
638                .once()
639                .returning(|| ());
640            runner
641                .expect_increment_runtime_cycle_success_since_startup_counter()
642                .once()
643                .returning(|| ());
644
645            let mut runtime = init_runtime(
646                Some(AggregatorState::Ready(ReadyState {
647                    current_time_point: TimePoint::dummy(),
648                })),
649                runner,
650                false,
651            )
652            .await;
653            runtime.cycle().await.unwrap();
654
655            assert_eq!("signing".to_string(), runtime.get_state());
656        }
657
658        #[tokio::test]
659        async fn signing_changing_open_message_to_ready() {
660            let mut runner = MockAggregatorRunner::new();
661            runner
662                .expect_get_time_point_from_chain()
663                .once()
664                .returning(|| Ok(TimePoint::dummy()));
665            runner
666                .expect_is_open_message_outdated()
667                .once()
668                .returning(|_, _| Ok(true));
669            runner
670                .expect_increment_runtime_cycle_total_since_startup_counter()
671                .once()
672                .returning(|| ());
673            runner
674                .expect_increment_runtime_cycle_success_since_startup_counter()
675                .once()
676                .returning(|| ());
677
678            let initial_state = AggregatorState::Signing(SigningState {
679                current_time_point: TimePoint::dummy(),
680                open_message: OpenMessage::dummy(),
681            });
682
683            let mut runtime = init_runtime(Some(initial_state), runner, false).await;
684            runtime.cycle().await.unwrap();
685
686            assert_eq!("ready".to_string(), runtime.get_state());
687        }
688
689        #[tokio::test]
690        async fn signing_certificate_is_not_created() {
691            let mut runner = MockAggregatorRunner::new();
692            runner
693                .expect_get_time_point_from_chain()
694                .once()
695                .returning(|| Ok(TimePoint::dummy()));
696            runner
697                .expect_is_open_message_outdated()
698                .once()
699                .returning(|_, _| Ok(false));
700            runner.expect_create_certificate().once().returning(|_| Ok(None));
701            runner
702                .expect_increment_runtime_cycle_total_since_startup_counter()
703                .once()
704                .returning(|| ());
705            runner
706                .expect_increment_runtime_cycle_success_since_startup_counter()
707                .never();
708            let state = SigningState {
709                current_time_point: TimePoint::dummy(),
710                open_message: OpenMessage::dummy(),
711            };
712            let mut runtime =
713                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
714            let err = runtime
715                .cycle()
716                .await
717                .expect_err("cycle should have returned an error");
718
719            match err {
720                RuntimeError::KeepState { .. } => (),
721                _ => panic!("KeepState error expected, got {err:?}."),
722            };
723
724            assert_eq!("signing".to_string(), runtime.get_state());
725        }
726
727        #[tokio::test]
728        async fn signing_artifact_not_created() {
729            let mut runner = MockAggregatorRunner::new();
730            runner
731                .expect_get_time_point_from_chain()
732                .once()
733                .returning(|| Ok(TimePoint::dummy()));
734            runner
735                .expect_is_open_message_outdated()
736                .once()
737                .returning(|_, _| Ok(false));
738            runner
739                .expect_create_certificate()
740                .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
741            runner
742                .expect_create_artifact()
743                .once()
744                .returning(|_, _| Err(anyhow!("whatever")));
745            runner
746                .expect_increment_runtime_cycle_total_since_startup_counter()
747                .once()
748                .returning(|| ());
749            runner
750                .expect_increment_runtime_cycle_success_since_startup_counter()
751                .never();
752            let state = SigningState {
753                current_time_point: TimePoint::dummy(),
754                open_message: OpenMessage::dummy(),
755            };
756            let mut runtime =
757                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
758            let err = runtime
759                .cycle()
760                .await
761                .expect_err("cycle should have returned an error");
762
763            match err {
764                RuntimeError::ReInit { .. } => (),
765                _ => panic!("ReInit error expected, got {err:?}."),
766            };
767
768            assert_eq!("signing".to_string(), runtime.get_state());
769        }
770
771        #[tokio::test]
772        async fn signing_certificate_is_created() {
773            let mut runner = MockAggregatorRunner::new();
774            runner
775                .expect_get_time_point_from_chain()
776                .once()
777                .returning(|| Ok(TimePoint::dummy()));
778            runner
779                .expect_is_open_message_outdated()
780                .once()
781                .returning(|_, _| Ok(false));
782            runner
783                .expect_create_certificate()
784                .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
785            runner.expect_create_artifact().once().returning(|_, _| Ok(()));
786            runner
787                .expect_increment_runtime_cycle_total_since_startup_counter()
788                .once()
789                .returning(|| ());
790            runner
791                .expect_increment_runtime_cycle_success_since_startup_counter()
792                .once()
793                .returning(|| ());
794
795            let state = SigningState {
796                current_time_point: TimePoint::dummy(),
797                open_message: OpenMessage::dummy(),
798            };
799            let mut runtime =
800                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
801            runtime.cycle().await.unwrap();
802
803            assert_eq!("ready".to_string(), runtime.get_state());
804        }
805
806        #[tokio::test]
807        pub async fn critical_error() {
808            let mut runner = MockAggregatorRunner::new();
809            runner
810                .expect_get_time_point_from_chain()
811                .once()
812                .returning(|| Ok(TimePoint::dummy()));
813            runner
814                .expect_update_era_checker()
815                .with(predicate::eq(TimePoint::dummy().epoch))
816                .once()
817                .returning(|_| Err(anyhow!("ERROR")));
818            runner
819                .expect_close_signer_registration_round()
820                .once()
821                .returning(|| Ok(()));
822            runner
823                .expect_increment_runtime_cycle_total_since_startup_counter()
824                .once()
825                .returning(|| ());
826            runner
827                .expect_increment_runtime_cycle_success_since_startup_counter()
828                .never();
829
830            let mut runtime = init_runtime(
831                Some(AggregatorState::Idle(IdleState {
832                    current_time_point: None,
833                })),
834                runner,
835                false,
836            )
837            .await;
838            runtime.cycle().await.unwrap_err();
839
840            assert_eq!("idle".to_string(), runtime.get_state());
841        }
842    }
843
844    mod follower {
845        use mockall::predicate::eq;
846
847        use super::*;
848
849        #[tokio::test]
850        pub async fn idle_new_epoch_detected_and_leader_not_transitioned_to_epoch() {
851            let mut runner = MockAggregatorRunner::new();
852            let time_point = TimePoint::dummy();
853            let new_time_point = TimePoint {
854                epoch: time_point.epoch + 1,
855                ..time_point.clone()
856            };
857            runner
858                .expect_get_time_point_from_chain()
859                .once()
860                .returning(move || Ok(new_time_point.clone()));
861            runner
862                .expect_is_follower_aggregator_at_same_epoch_as_leader()
863                .once()
864                .returning(|_| Ok(false));
865            runner
866                .expect_increment_runtime_cycle_total_since_startup_counter()
867                .once()
868                .returning(|| ());
869            runner
870                .expect_increment_runtime_cycle_success_since_startup_counter()
871                .once()
872                .returning(|| ());
873            let mut runtime = init_runtime(
874                Some(AggregatorState::Idle(IdleState {
875                    current_time_point: Some(time_point),
876                })),
877                runner,
878                true,
879            )
880            .await;
881            runtime.cycle().await.unwrap();
882
883            assert_eq!("idle".to_string(), runtime.get_state());
884        }
885
886        #[tokio::test]
887        pub async fn idle_new_epoch_detected_and_leader_has_transitioned_to_epoch() {
888            let mut runner = MockAggregatorRunner::new();
889            let time_point = TimePoint::dummy();
890            let new_time_point = TimePoint {
891                epoch: time_point.epoch + 1,
892                ..time_point.clone()
893            };
894            let new_time_point_clone = new_time_point.clone();
895            runner
896                .expect_get_time_point_from_chain()
897                .once()
898                .returning(move || Ok(new_time_point.clone()));
899            runner
900                .expect_is_follower_aggregator_at_same_epoch_as_leader()
901                .once()
902                .returning(|_| Ok(true));
903            runner
904                .expect_update_stake_distribution()
905                .with(predicate::eq(new_time_point_clone.clone()))
906                .once()
907                .returning(|_| Ok(()));
908            runner
909                .expect_close_signer_registration_round()
910                .once()
911                .returning(|| Ok(()));
912            runner
913                .expect_synchronize_follower_aggregator_signer_registration()
914                .once()
915                .returning(|| Ok(()));
916            runner
917                .expect_open_signer_registration_round()
918                .once()
919                .returning(|_| Ok(()));
920            runner
921                .expect_is_certificate_chain_valid()
922                .once()
923                .returning(|_| Ok(()));
924            runner
925                .expect_synchronize_follower_aggregator_certificate_chain()
926                .once()
927                .with(eq(false)) // Certificate chain valid so force_sync must be false
928                .returning(|_| Ok(()));
929            runner
930                .expect_update_era_checker()
931                .with(predicate::eq(new_time_point_clone.clone().epoch))
932                .once()
933                .returning(|_| Ok(()));
934            runner
935                .expect_inform_new_epoch()
936                .with(predicate::eq(new_time_point_clone.clone().epoch))
937                .once()
938                .returning(|_| Ok(()));
939            runner.expect_update_epoch_settings().once().returning(|| Ok(()));
940            runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
941            runner
942                .expect_upkeep()
943                .with(predicate::eq(new_time_point_clone.clone().epoch))
944                .once()
945                .returning(|_| Ok(()));
946            runner
947                .expect_increment_runtime_cycle_total_since_startup_counter()
948                .once()
949                .returning(|| ());
950            runner
951                .expect_increment_runtime_cycle_success_since_startup_counter()
952                .once()
953                .returning(|| ());
954            let mut runtime = init_runtime(
955                Some(AggregatorState::Idle(IdleState {
956                    current_time_point: Some(time_point),
957                })),
958                runner,
959                true,
960            )
961            .await;
962            runtime.cycle().await.unwrap();
963
964            assert_eq!("ready".to_string(), runtime.get_state());
965        }
966    }
967}