mithril_aggregator/runtime/
state_machine.rs

1use anyhow::Context;
2use chrono::Local;
3use slog::{Logger, info, trace};
4use std::fmt::Display;
5use std::sync::Arc;
6
7use mithril_common::entities::TimePoint;
8use mithril_common::logging::LoggerExtensions;
9
10use crate::AggregatorConfig;
11use crate::entities::OpenMessage;
12use crate::runtime::{AggregatorRunnerTrait, RuntimeError};
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct IdleState {
16    current_time_point: Option<TimePoint>,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct ReadyState {
21    current_time_point: TimePoint,
22}
23
24#[derive(Clone, Debug, PartialEq)]
25pub struct SigningState {
26    current_time_point: TimePoint,
27    open_message: OpenMessage,
28}
29
30#[derive(Clone, Debug, PartialEq)]
31pub enum AggregatorState {
32    Idle(IdleState),
33    Ready(ReadyState),
34    Signing(SigningState),
35}
36
37impl Display for AggregatorState {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            AggregatorState::Idle(state) => write!(
41                f,
42                "Idle - {}",
43                match &state.current_time_point {
44                    None => "No TimePoint".to_string(),
45                    Some(time_point) => time_point.to_string(),
46                }
47            ),
48            AggregatorState::Ready(state) => write!(f, "Ready - {}", state.current_time_point),
49            AggregatorState::Signing(state) => write!(f, "Signing - {}", state.current_time_point),
50        }
51    }
52}
53
54/// The AggregatorRuntime responsibility is to create a state machine to handle
55/// all actions required by the process of getting multi-signatures.
56/// See the
57/// [documentation](https://mithril.network/doc/mithril/mithril-network/aggregator#under-the-hood)
58/// for more explanations about the Aggregator state machine.
59pub struct AggregatorRuntime {
60    config: AggregatorConfig,
61    state: AggregatorState,
62    runner: Arc<dyn AggregatorRunnerTrait>,
63    logger: Logger,
64}
65
66impl AggregatorRuntime {
67    /// Create a new instance of the state machine.
68    pub async fn new(
69        aggregator_config: AggregatorConfig,
70        init_state: Option<AggregatorState>,
71        runner: Arc<dyn AggregatorRunnerTrait>,
72        logger: Logger,
73    ) -> Result<Self, RuntimeError> {
74        let logger = logger.new_with_component_name::<Self>();
75        info!(logger, "Initializing runtime");
76
77        let state = if let Some(init_state) = init_state {
78            trace!(logger, "Got initial state from caller");
79            init_state
80        } else {
81            trace!(logger, "Idle state, no current time point");
82            AggregatorState::Idle(IdleState {
83                current_time_point: None,
84            })
85        };
86
87        Ok(Self {
88            config: aggregator_config,
89            state,
90            runner,
91            logger,
92        })
93    }
94
95    /// Return the actual state of the state machine.
96    pub fn get_state(&self) -> String {
97        match self.state {
98            AggregatorState::Idle(_) => "idle".to_string(),
99            AggregatorState::Ready(_) => "ready".to_string(),
100            AggregatorState::Signing(_) => "signing".to_string(),
101        }
102    }
103
104    /// Launches an infinite loop ticking the state machine.
105    pub async fn run(&mut self) -> Result<(), RuntimeError> {
106        info!(self.logger, "Launching State Machine");
107        let mut interval = tokio::time::interval(self.config.interval);
108
109        loop {
110            interval.tick().await;
111            // Note: the "time" property in logs produced by our formatter (slog_bunyan) uses local
112            // time, so we must use it as well to avoid confusion.
113            let approximate_next_cycle_time = Local::now() + self.config.interval;
114
115            if let Err(e) = self.cycle().await {
116                e.write_to_log(&self.logger);
117                if e.is_critical() {
118                    return Err(e);
119                }
120            }
121
122            info!(
123                self.logger, "… Cycle finished";
124                "approximate_next_cycle_time" => %approximate_next_cycle_time.time().format("%H:%M:%S%.3f"),
125                "run_interval_in_ms" => self.config.interval.as_millis(),
126            );
127        }
128    }
129
130    /// Perform one tick of the state machine.
131    pub async fn cycle(&mut self) -> Result<(), RuntimeError> {
132        info!(
133            self.logger,
134            "================================================================================"
135        );
136        info!(self.logger, "New cycle: {}", self.state);
137
138        self.runner.increment_runtime_cycle_total_since_startup_counter();
139
140        match self.state.clone() {
141            AggregatorState::Idle(state) => {
142                let last_time_point = self.runner.get_time_point_from_chain().await.with_context(
143                    || "AggregatorRuntime in the state IDLE can not get current time point from chain",
144                )?;
145
146                info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);
147
148                let can_try_transition_from_idle_to_ready = if self.config.is_follower {
149                    self.runner
150                        .is_follower_aggregator_at_same_epoch_as_leader(&last_time_point)
151                        .await?
152                } else {
153                    true
154                };
155                if can_try_transition_from_idle_to_ready {
156                    self.try_transition_from_idle_to_ready(
157                        state.current_time_point,
158                        last_time_point.clone(),
159                    )
160                    .await?;
161                    self.state = AggregatorState::Ready(ReadyState {
162                        current_time_point: last_time_point,
163                    });
164                }
165            }
166            AggregatorState::Ready(state) => {
167                let last_time_point: TimePoint = self
168                    .runner
169                    .get_time_point_from_chain()
170                    .await
171                    .with_context(|| {
172                        "AggregatorRuntime in the state READY can not get current time point from chain"
173                    })?;
174
175                if state.current_time_point.epoch < last_time_point.epoch {
176                    // transition READY > IDLE
177                    info!(self.logger, "→ Epoch has changed, transitioning to IDLE"; "last_time_point" => ?last_time_point);
178                    self.state = AggregatorState::Idle(IdleState {
179                        current_time_point: Some(state.current_time_point),
180                    });
181                } else if let Some(open_message) = self
182                    .runner
183                    .get_current_non_certified_open_message(&last_time_point)
184                    .await
185                    .with_context(|| "AggregatorRuntime can not get the current open message")?
186                {
187                    // transition READY > SIGNING
188                    info!(self.logger, "→ Transitioning to SIGNING");
189                    let new_state = self
190                        .transition_from_ready_to_signing(last_time_point.clone(), open_message.clone())
191                        .await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
192                    self.state = AggregatorState::Signing(new_state);
193                } else {
194                    // READY > READY
195                    info!(
196                        self.logger, " ⋅ No open message to certify, waiting…";
197                        "time_point" => ?state.current_time_point
198                    );
199                    self.state = AggregatorState::Ready(ReadyState {
200                        current_time_point: last_time_point,
201                    });
202                }
203            }
204            AggregatorState::Signing(state) => {
205                let last_time_point: TimePoint =
206                    self.runner.get_time_point_from_chain().await.with_context(|| {
207                        "AggregatorRuntime in the state SIGNING can not get current time point from chain"
208                    })?;
209
210                let is_outdated = self
211                    .runner
212                    .is_open_message_outdated(
213                        state.open_message.signed_entity_type.clone(),
214                        &last_time_point,
215                    )
216                    .await?;
217
218                if state.current_time_point.epoch < last_time_point.epoch {
219                    // SIGNING > IDLE
220                    info!(self.logger, "→ Epoch changed, transitioning to IDLE");
221                    let new_state = self.transition_from_signing_to_idle(state).await?;
222                    self.state = AggregatorState::Idle(new_state);
223                } else if is_outdated {
224                    // SIGNING > READY
225                    info!(
226                        self.logger,
227                        "→ Open message changed, transitioning to READY"
228                    );
229                    let new_state =
230                        self.transition_from_signing_to_ready_new_open_message(state).await?;
231                    self.state = AggregatorState::Ready(new_state);
232                } else {
233                    // SIGNING > READY
234                    let new_state =
235                        self.transition_from_signing_to_ready_multisignature(state).await?;
236                    info!(
237                        self.logger,
238                        "→ A multi-signature has been created, build an artifact & a certificate and transitioning back to READY"
239                    );
240                    self.state = AggregatorState::Ready(new_state);
241                }
242            }
243        }
244
245        self.runner.increment_runtime_cycle_success_since_startup_counter();
246
247        Ok(())
248    }
249
250    /// Perform a transition from `IDLE` state to `READY` state when
251    /// the certificate chain is valid.
252    async fn try_transition_from_idle_to_ready(
253        &mut self,
254        maybe_current_time_point: Option<TimePoint>,
255        new_time_point: TimePoint,
256    ) -> Result<(), RuntimeError> {
257        trace!(self.logger, "Trying transition from IDLE to READY state");
258
259        if maybe_current_time_point.is_none()
260            || maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
261        {
262            self.runner.close_signer_registration_round().await?;
263            self.runner
264                .update_era_checker(new_time_point.epoch)
265                .await
266                .map_err(|e| RuntimeError::critical("transiting IDLE → READY", Some(e)))?;
267            self.runner.update_stake_distribution(&new_time_point).await?;
268            self.runner.inform_new_epoch(new_time_point.epoch).await?;
269            self.runner.upkeep(new_time_point.epoch).await?;
270            self.runner.open_signer_registration_round(&new_time_point).await?;
271            if self.config.is_follower {
272                self.runner
273                    .synchronize_follower_aggregator_signer_registration()
274                    .await?;
275            }
276            self.runner.precompute_epoch_data().await?;
277        }
278
279        let chain_validity_result = self
280            .runner
281            .is_certificate_chain_valid(&new_time_point)
282            .await
283            .map_err(|e| RuntimeError::KeepState {
284                message: "certificate chain is invalid".to_string(),
285                nested_error: e.into(),
286            });
287        if self.config.is_follower {
288            let force_sync = chain_validity_result.is_err();
289            self.runner
290                .synchronize_follower_aggregator_certificate_chain(force_sync)
291                .await?;
292        }
293        chain_validity_result?;
294
295        Ok(())
296    }
297
298    /// Perform a transition from `SIGNING` state to `READY` state when a new
299    /// multi-signature is issued.
300    async fn transition_from_signing_to_ready_multisignature(
301        &self,
302        state: SigningState,
303    ) -> Result<ReadyState, RuntimeError> {
304        trace!(
305            self.logger,
306            "Launching transition from SIGNING to READY state"
307        );
308        let certificate = self
309            .runner
310            .create_certificate(&state.open_message.signed_entity_type)
311            .await?
312            .ok_or_else(|| RuntimeError::KeepState {
313                message: "not enough signature yet to create a certificate, waiting…".to_string(),
314                nested_error: None,
315            })?;
316        self.runner
317            .create_artifact(&state.open_message.signed_entity_type, &certificate)
318            .await
319            .map_err(|e| RuntimeError::ReInit {
320                message: "transiting SIGNING → READY: failed to create artifact. Retrying…"
321                    .to_string(),
322                nested_error: Some(e),
323            })?;
324
325        Ok(ReadyState {
326            current_time_point: state.current_time_point,
327        })
328    }
329
330    /// Perform a transition from `SIGNING` state to `IDLE` state when a new
331    /// epoch is detected.
332    async fn transition_from_signing_to_idle(
333        &self,
334        state: SigningState,
335    ) -> Result<IdleState, RuntimeError> {
336        trace!(
337            self.logger,
338            "Launching transition from SIGNING to IDLE state"
339        );
340
341        Ok(IdleState {
342            current_time_point: Some(state.current_time_point),
343        })
344    }
345
346    /// Perform a transition from `SIGNING` state to `READY` state when a new
347    /// open message is detected.
348    async fn transition_from_signing_to_ready_new_open_message(
349        &self,
350        state: SigningState,
351    ) -> Result<ReadyState, RuntimeError> {
352        trace!(
353            self.logger,
354            "Launching transition from SIGNING to READY state"
355        );
356
357        Ok(ReadyState {
358            current_time_point: state.current_time_point,
359        })
360    }
361
362    /// Perform a transition from `READY` state to `SIGNING` state when a new
363    /// open message is opened.
364    async fn transition_from_ready_to_signing(
365        &mut self,
366        new_time_point: TimePoint,
367        open_message: OpenMessage,
368    ) -> Result<SigningState, RuntimeError> {
369        trace!(
370            self.logger,
371            "Launching transition from READY to SIGNING state"
372        );
373
374        let state = SigningState {
375            current_time_point: new_time_point,
376            open_message,
377        };
378
379        Ok(state)
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use anyhow::anyhow;
386    use mockall::predicate;
387    use std::time::Duration;
388
389    use mithril_common::test::double::{Dummy, fake_data};
390
391    use crate::entities::OpenMessage;
392    use crate::test::TestLogger;
393
394    use super::super::runner::MockAggregatorRunner;
395    use super::*;
396
397    async fn init_runtime(
398        init_state: Option<AggregatorState>,
399        runner: MockAggregatorRunner,
400        is_follower: bool,
401    ) -> AggregatorRuntime {
402        AggregatorRuntime::new(
403            AggregatorConfig::new(Duration::from_millis(20), is_follower),
404            init_state,
405            Arc::new(runner),
406            TestLogger::stdout(),
407        )
408        .await
409        .unwrap()
410    }
411
412    mod leader {
413        use super::*;
414
415        #[tokio::test]
416        pub async fn idle_check_certificate_chain_is_not_valid() {
417            let mut runner = MockAggregatorRunner::new();
418            runner
419                .expect_get_time_point_from_chain()
420                .once()
421                .returning(|| Ok(TimePoint::dummy()));
422            runner
423                .expect_update_stake_distribution()
424                .with(predicate::eq(TimePoint::dummy()))
425                .once()
426                .returning(|_| Ok(()));
427            runner
428                .expect_close_signer_registration_round()
429                .once()
430                .returning(|| Ok(()));
431            runner
432                .expect_open_signer_registration_round()
433                .once()
434                .returning(|_| Ok(()));
435            runner
436                .expect_is_certificate_chain_valid()
437                .once()
438                .returning(|_| Err(anyhow!("error")));
439            runner
440                .expect_update_era_checker()
441                .with(predicate::eq(TimePoint::dummy().epoch))
442                .once()
443                .returning(|_| Ok(()));
444            runner
445                .expect_inform_new_epoch()
446                .with(predicate::eq(TimePoint::dummy().epoch))
447                .once()
448                .returning(|_| Ok(()));
449            runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
450            runner
451                .expect_upkeep()
452                .with(predicate::eq(TimePoint::dummy().epoch))
453                .once()
454                .returning(|_| Ok(()));
455            runner
456                .expect_increment_runtime_cycle_total_since_startup_counter()
457                .once()
458                .returning(|| ());
459            runner
460                .expect_increment_runtime_cycle_success_since_startup_counter()
461                .never();
462
463            let mut runtime = init_runtime(
464                Some(AggregatorState::Idle(IdleState {
465                    current_time_point: None,
466                })),
467                runner,
468                false,
469            )
470            .await;
471            let err = runtime.cycle().await.unwrap_err();
472            assert!(matches!(err, RuntimeError::KeepState { .. }));
473
474            assert_eq!("idle".to_string(), runtime.get_state());
475        }
476
477        #[tokio::test]
478        pub async fn idle_check_certificate_chain_is_valid() {
479            let mut runner = MockAggregatorRunner::new();
480            runner
481                .expect_get_time_point_from_chain()
482                .once()
483                .returning(|| Ok(TimePoint::dummy()));
484            runner
485                .expect_update_stake_distribution()
486                .with(predicate::eq(TimePoint::dummy()))
487                .once()
488                .returning(|_| Ok(()));
489            runner
490                .expect_close_signer_registration_round()
491                .once()
492                .returning(|| Ok(()));
493            runner
494                .expect_open_signer_registration_round()
495                .once()
496                .returning(|_| Ok(()));
497            runner
498                .expect_is_certificate_chain_valid()
499                .once()
500                .returning(|_| Ok(()));
501            runner
502                .expect_update_era_checker()
503                .with(predicate::eq(TimePoint::dummy().epoch))
504                .once()
505                .returning(|_| Ok(()));
506            runner
507                .expect_inform_new_epoch()
508                .with(predicate::eq(TimePoint::dummy().epoch))
509                .once()
510                .returning(|_| Ok(()));
511            runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
512            runner
513                .expect_upkeep()
514                .with(predicate::eq(TimePoint::dummy().epoch))
515                .once()
516                .returning(|_| Ok(()));
517            runner
518                .expect_increment_runtime_cycle_total_since_startup_counter()
519                .once()
520                .returning(|| ());
521            runner
522                .expect_increment_runtime_cycle_success_since_startup_counter()
523                .once()
524                .returning(|| ());
525
526            let mut runtime = init_runtime(
527                Some(AggregatorState::Idle(IdleState {
528                    current_time_point: None,
529                })),
530                runner,
531                false,
532            )
533            .await;
534            runtime.cycle().await.unwrap();
535
536            assert_eq!("ready".to_string(), runtime.get_state());
537        }
538
539        #[tokio::test]
540        pub async fn ready_new_epoch_detected() {
541            let mut runner = MockAggregatorRunner::new();
542            let time_point = TimePoint::dummy();
543            let new_time_point = TimePoint {
544                epoch: time_point.epoch + 1,
545                ..time_point.clone()
546            };
547            runner
548                .expect_get_time_point_from_chain()
549                .once()
550                .returning(move || Ok(new_time_point.clone()));
551            runner
552                .expect_increment_runtime_cycle_total_since_startup_counter()
553                .once()
554                .returning(|| ());
555            runner
556                .expect_increment_runtime_cycle_success_since_startup_counter()
557                .once()
558                .returning(|| ());
559            let mut runtime = init_runtime(
560                Some(AggregatorState::Ready(ReadyState {
561                    current_time_point: time_point,
562                })),
563                runner,
564                false,
565            )
566            .await;
567            runtime.cycle().await.unwrap();
568
569            assert_eq!("idle".to_string(), runtime.get_state());
570        }
571
572        #[tokio::test]
573        pub async fn ready_open_message_not_exist() {
574            let mut runner = MockAggregatorRunner::new();
575            let time_point = TimePoint::dummy();
576            let next_time_point = TimePoint {
577                immutable_file_number: time_point.immutable_file_number + 1,
578                ..time_point.clone()
579            };
580            let expected_time_point = next_time_point.clone();
581            runner
582                .expect_get_time_point_from_chain()
583                .once()
584                .returning(move || Ok(next_time_point.clone()));
585            runner
586                .expect_get_current_non_certified_open_message()
587                .once()
588                .returning(|_| Ok(None));
589            runner
590                .expect_increment_runtime_cycle_total_since_startup_counter()
591                .once()
592                .returning(|| ());
593            runner
594                .expect_increment_runtime_cycle_success_since_startup_counter()
595                .once()
596                .returning(|| ());
597            let mut runtime = init_runtime(
598                Some(AggregatorState::Ready(ReadyState {
599                    current_time_point: time_point.clone(),
600                })),
601                runner,
602                false,
603            )
604            .await;
605            runtime.cycle().await.unwrap();
606
607            assert_eq!("ready".to_string(), runtime.get_state());
608            assert_eq!(
609                AggregatorState::Ready(ReadyState {
610                    current_time_point: expected_time_point,
611                }),
612                runtime.state
613            );
614        }
615
616        #[tokio::test]
617        pub async fn ready_certificate_does_not_exist_for_time_point() {
618            let mut runner = MockAggregatorRunner::new();
619            runner
620                .expect_get_time_point_from_chain()
621                .once()
622                .returning(|| Ok(TimePoint::dummy()));
623            runner
624                .expect_get_current_non_certified_open_message()
625                .once()
626                .returning(|_| {
627                    let open_message = OpenMessage {
628                        is_certified: false,
629                        ..OpenMessage::dummy()
630                    };
631                    Ok(Some(open_message))
632                });
633            runner
634                .expect_increment_runtime_cycle_total_since_startup_counter()
635                .once()
636                .returning(|| ());
637            runner
638                .expect_increment_runtime_cycle_success_since_startup_counter()
639                .once()
640                .returning(|| ());
641
642            let mut runtime = init_runtime(
643                Some(AggregatorState::Ready(ReadyState {
644                    current_time_point: TimePoint::dummy(),
645                })),
646                runner,
647                false,
648            )
649            .await;
650            runtime.cycle().await.unwrap();
651
652            assert_eq!("signing".to_string(), runtime.get_state());
653        }
654
655        #[tokio::test]
656        async fn signing_changing_open_message_to_ready() {
657            let mut runner = MockAggregatorRunner::new();
658            runner
659                .expect_get_time_point_from_chain()
660                .once()
661                .returning(|| Ok(TimePoint::dummy()));
662            runner
663                .expect_is_open_message_outdated()
664                .once()
665                .returning(|_, _| Ok(true));
666            runner
667                .expect_increment_runtime_cycle_total_since_startup_counter()
668                .once()
669                .returning(|| ());
670            runner
671                .expect_increment_runtime_cycle_success_since_startup_counter()
672                .once()
673                .returning(|| ());
674
675            let initial_state = AggregatorState::Signing(SigningState {
676                current_time_point: TimePoint::dummy(),
677                open_message: OpenMessage::dummy(),
678            });
679
680            let mut runtime = init_runtime(Some(initial_state), runner, false).await;
681            runtime.cycle().await.unwrap();
682
683            assert_eq!("ready".to_string(), runtime.get_state());
684        }
685
686        #[tokio::test]
687        async fn signing_certificate_is_not_created() {
688            let mut runner = MockAggregatorRunner::new();
689            runner
690                .expect_get_time_point_from_chain()
691                .once()
692                .returning(|| Ok(TimePoint::dummy()));
693            runner
694                .expect_is_open_message_outdated()
695                .once()
696                .returning(|_, _| Ok(false));
697            runner.expect_create_certificate().once().returning(|_| Ok(None));
698            runner
699                .expect_increment_runtime_cycle_total_since_startup_counter()
700                .once()
701                .returning(|| ());
702            runner
703                .expect_increment_runtime_cycle_success_since_startup_counter()
704                .never();
705            let state = SigningState {
706                current_time_point: TimePoint::dummy(),
707                open_message: OpenMessage::dummy(),
708            };
709            let mut runtime =
710                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
711            let err = runtime
712                .cycle()
713                .await
714                .expect_err("cycle should have returned an error");
715
716            match err {
717                RuntimeError::KeepState { .. } => (),
718                _ => panic!("KeepState error expected, got {err:?}."),
719            };
720
721            assert_eq!("signing".to_string(), runtime.get_state());
722        }
723
724        #[tokio::test]
725        async fn signing_artifact_not_created() {
726            let mut runner = MockAggregatorRunner::new();
727            runner
728                .expect_get_time_point_from_chain()
729                .once()
730                .returning(|| Ok(TimePoint::dummy()));
731            runner
732                .expect_is_open_message_outdated()
733                .once()
734                .returning(|_, _| Ok(false));
735            runner
736                .expect_create_certificate()
737                .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
738            runner
739                .expect_create_artifact()
740                .once()
741                .returning(|_, _| Err(anyhow!("whatever")));
742            runner
743                .expect_increment_runtime_cycle_total_since_startup_counter()
744                .once()
745                .returning(|| ());
746            runner
747                .expect_increment_runtime_cycle_success_since_startup_counter()
748                .never();
749            let state = SigningState {
750                current_time_point: TimePoint::dummy(),
751                open_message: OpenMessage::dummy(),
752            };
753            let mut runtime =
754                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
755            let err = runtime
756                .cycle()
757                .await
758                .expect_err("cycle should have returned an error");
759
760            match err {
761                RuntimeError::ReInit { .. } => (),
762                _ => panic!("ReInit error expected, got {err:?}."),
763            };
764
765            assert_eq!("signing".to_string(), runtime.get_state());
766        }
767
768        #[tokio::test]
769        async fn signing_certificate_is_created() {
770            let mut runner = MockAggregatorRunner::new();
771            runner
772                .expect_get_time_point_from_chain()
773                .once()
774                .returning(|| Ok(TimePoint::dummy()));
775            runner
776                .expect_is_open_message_outdated()
777                .once()
778                .returning(|_, _| Ok(false));
779            runner
780                .expect_create_certificate()
781                .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
782            runner.expect_create_artifact().once().returning(|_, _| Ok(()));
783            runner
784                .expect_increment_runtime_cycle_total_since_startup_counter()
785                .once()
786                .returning(|| ());
787            runner
788                .expect_increment_runtime_cycle_success_since_startup_counter()
789                .once()
790                .returning(|| ());
791
792            let state = SigningState {
793                current_time_point: TimePoint::dummy(),
794                open_message: OpenMessage::dummy(),
795            };
796            let mut runtime =
797                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
798            runtime.cycle().await.unwrap();
799
800            assert_eq!("ready".to_string(), runtime.get_state());
801        }
802
803        #[tokio::test]
804        pub async fn critical_error() {
805            let mut runner = MockAggregatorRunner::new();
806            runner
807                .expect_get_time_point_from_chain()
808                .once()
809                .returning(|| Ok(TimePoint::dummy()));
810            runner
811                .expect_update_era_checker()
812                .with(predicate::eq(TimePoint::dummy().epoch))
813                .once()
814                .returning(|_| Err(anyhow!("ERROR")));
815            runner
816                .expect_close_signer_registration_round()
817                .once()
818                .returning(|| Ok(()));
819            runner
820                .expect_increment_runtime_cycle_total_since_startup_counter()
821                .once()
822                .returning(|| ());
823            runner
824                .expect_increment_runtime_cycle_success_since_startup_counter()
825                .never();
826
827            let mut runtime = init_runtime(
828                Some(AggregatorState::Idle(IdleState {
829                    current_time_point: None,
830                })),
831                runner,
832                false,
833            )
834            .await;
835            runtime.cycle().await.unwrap_err();
836
837            assert_eq!("idle".to_string(), runtime.get_state());
838        }
839    }
840
841    mod follower {
842        use mockall::predicate::eq;
843
844        use super::*;
845
846        #[tokio::test]
847        pub async fn idle_new_epoch_detected_and_leader_not_transitioned_to_epoch() {
848            let mut runner = MockAggregatorRunner::new();
849            let time_point = TimePoint::dummy();
850            let new_time_point = TimePoint {
851                epoch: time_point.epoch + 1,
852                ..time_point.clone()
853            };
854            runner
855                .expect_get_time_point_from_chain()
856                .once()
857                .returning(move || Ok(new_time_point.clone()));
858            runner
859                .expect_is_follower_aggregator_at_same_epoch_as_leader()
860                .once()
861                .returning(|_| Ok(false));
862            runner
863                .expect_increment_runtime_cycle_total_since_startup_counter()
864                .once()
865                .returning(|| ());
866            runner
867                .expect_increment_runtime_cycle_success_since_startup_counter()
868                .once()
869                .returning(|| ());
870            let mut runtime = init_runtime(
871                Some(AggregatorState::Idle(IdleState {
872                    current_time_point: Some(time_point),
873                })),
874                runner,
875                true,
876            )
877            .await;
878            runtime.cycle().await.unwrap();
879
880            assert_eq!("idle".to_string(), runtime.get_state());
881        }
882
883        #[tokio::test]
884        pub async fn idle_new_epoch_detected_and_leader_has_transitioned_to_epoch() {
885            let mut runner = MockAggregatorRunner::new();
886            let time_point = TimePoint::dummy();
887            let new_time_point = TimePoint {
888                epoch: time_point.epoch + 1,
889                ..time_point.clone()
890            };
891            let new_time_point_clone = new_time_point.clone();
892            runner
893                .expect_get_time_point_from_chain()
894                .once()
895                .returning(move || Ok(new_time_point.clone()));
896            runner
897                .expect_is_follower_aggregator_at_same_epoch_as_leader()
898                .once()
899                .returning(|_| Ok(true));
900            runner
901                .expect_update_stake_distribution()
902                .with(predicate::eq(new_time_point_clone.clone()))
903                .once()
904                .returning(|_| Ok(()));
905            runner
906                .expect_close_signer_registration_round()
907                .once()
908                .returning(|| Ok(()));
909            runner
910                .expect_synchronize_follower_aggregator_signer_registration()
911                .once()
912                .returning(|| Ok(()));
913            runner
914                .expect_open_signer_registration_round()
915                .once()
916                .returning(|_| Ok(()));
917            runner
918                .expect_is_certificate_chain_valid()
919                .once()
920                .returning(|_| Ok(()));
921            runner
922                .expect_synchronize_follower_aggregator_certificate_chain()
923                .once()
924                .with(eq(false)) // Certificate chain valid so force_sync must be false
925                .returning(|_| Ok(()));
926            runner
927                .expect_update_era_checker()
928                .with(predicate::eq(new_time_point_clone.clone().epoch))
929                .once()
930                .returning(|_| Ok(()));
931            runner
932                .expect_inform_new_epoch()
933                .with(predicate::eq(new_time_point_clone.clone().epoch))
934                .once()
935                .returning(|_| Ok(()));
936            runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
937            runner
938                .expect_upkeep()
939                .with(predicate::eq(new_time_point_clone.clone().epoch))
940                .once()
941                .returning(|_| Ok(()));
942            runner
943                .expect_increment_runtime_cycle_total_since_startup_counter()
944                .once()
945                .returning(|| ());
946            runner
947                .expect_increment_runtime_cycle_success_since_startup_counter()
948                .once()
949                .returning(|| ());
950            let mut runtime = init_runtime(
951                Some(AggregatorState::Idle(IdleState {
952                    current_time_point: Some(time_point),
953                })),
954                runner,
955                true,
956            )
957            .await;
958            runtime.cycle().await.unwrap();
959
960            assert_eq!("ready".to_string(), runtime.get_state());
961        }
962    }
963}