mithril_aggregator/runtime/
state_machine.rs

1use anyhow::Context;
2use chrono::Local;
3use slog::{Logger, info, trace};
4use std::fmt::Display;
5use std::sync::Arc;
6
7use mithril_common::entities::TimePoint;
8use mithril_common::logging::LoggerExtensions;
9
10use crate::AggregatorConfig;
11use crate::entities::OpenMessage;
12use crate::runtime::{AggregatorRunnerTrait, RuntimeError};
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct IdleState {
16    current_time_point: Option<TimePoint>,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct ReadyState {
21    current_time_point: TimePoint,
22}
23
24#[derive(Clone, Debug, PartialEq)]
25pub struct SigningState {
26    current_time_point: TimePoint,
27    open_message: OpenMessage,
28}
29
30#[derive(Clone, Debug, PartialEq)]
31pub enum AggregatorState {
32    Idle(IdleState),
33    Ready(ReadyState),
34    Signing(SigningState),
35}
36
37impl Display for AggregatorState {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            AggregatorState::Idle(state) => write!(
41                f,
42                "Idle - {}",
43                match &state.current_time_point {
44                    None => "No TimePoint".to_string(),
45                    Some(time_point) => time_point.to_string(),
46                }
47            ),
48            AggregatorState::Ready(state) => write!(f, "Ready - {}", state.current_time_point),
49            AggregatorState::Signing(state) => write!(f, "Signing - {}", state.current_time_point),
50        }
51    }
52}
53
54/// The AggregatorRuntime responsibility is to create a state machine to handle
55/// all actions required by the process of getting multi-signatures.
56/// See the
57/// [documentation](https://mithril.network/doc/mithril/mithril-network/aggregator#under-the-hood)
58/// for more explanations about the Aggregator state machine.
59pub struct AggregatorRuntime {
60    config: AggregatorConfig,
61    state: AggregatorState,
62    runner: Arc<dyn AggregatorRunnerTrait>,
63    logger: Logger,
64}
65
66impl AggregatorRuntime {
67    /// Create a new instance of the state machine.
68    pub async fn new(
69        aggregator_config: AggregatorConfig,
70        init_state: Option<AggregatorState>,
71        runner: Arc<dyn AggregatorRunnerTrait>,
72        logger: Logger,
73    ) -> Result<Self, RuntimeError> {
74        let logger = logger.new_with_component_name::<Self>();
75        info!(logger, "Initializing runtime");
76
77        let state = if let Some(init_state) = init_state {
78            trace!(logger, "Got initial state from caller");
79            init_state
80        } else {
81            trace!(logger, "Idle state, no current time point");
82            AggregatorState::Idle(IdleState {
83                current_time_point: None,
84            })
85        };
86
87        Ok(Self {
88            config: aggregator_config,
89            state,
90            runner,
91            logger,
92        })
93    }
94
95    /// Return the actual state of the state machine.
96    pub fn get_state(&self) -> String {
97        match self.state {
98            AggregatorState::Idle(_) => "idle".to_string(),
99            AggregatorState::Ready(_) => "ready".to_string(),
100            AggregatorState::Signing(_) => "signing".to_string(),
101        }
102    }
103
104    /// Launches an infinite loop ticking the state machine.
105    pub async fn run(&mut self) -> Result<(), RuntimeError> {
106        info!(self.logger, "Launching State Machine");
107        let mut interval = tokio::time::interval(self.config.interval);
108
109        loop {
110            interval.tick().await;
111            // Note: the "time" property in logs produced by our formatter (slog_bunyan) uses local
112            // time, so we must use it as well to avoid confusion.
113            let approximate_next_cycle_time = Local::now() + self.config.interval;
114
115            if let Err(e) = self.cycle().await {
116                e.write_to_log(&self.logger);
117                if e.is_critical() {
118                    return Err(e);
119                }
120            }
121
122            info!(
123                self.logger, "… Cycle finished";
124                "approximate_next_cycle_time" => %approximate_next_cycle_time.time().format("%H:%M:%S%.3f"),
125                "run_interval_in_ms" => self.config.interval.as_millis(),
126            );
127        }
128    }
129
130    /// Perform one tick of the state machine.
131    pub async fn cycle(&mut self) -> Result<(), RuntimeError> {
132        info!(
133            self.logger,
134            "================================================================================"
135        );
136        info!(self.logger, "New cycle: {}", self.state);
137
138        self.runner.increment_runtime_cycle_total_since_startup_counter();
139
140        match self.state.clone() {
141            AggregatorState::Idle(state) => {
142                let last_time_point = self.runner.get_time_point_from_chain().await.with_context(
143                    || "AggregatorRuntime in the state IDLE can not get current time point from chain",
144                )?;
145
146                info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);
147
148                let can_try_transition_from_idle_to_ready = if self.config.is_follower {
149                    self.runner
150                        .is_follower_aggregator_at_same_epoch_as_leader(&last_time_point)
151                        .await?
152                } else {
153                    true
154                };
155                if can_try_transition_from_idle_to_ready {
156                    self.try_transition_from_idle_to_ready(
157                        state.current_time_point,
158                        last_time_point.clone(),
159                    )
160                    .await?;
161                    self.state = AggregatorState::Ready(ReadyState {
162                        current_time_point: last_time_point,
163                    });
164                }
165            }
166            AggregatorState::Ready(state) => {
167                let last_time_point: TimePoint = self
168                    .runner
169                    .get_time_point_from_chain()
170                    .await
171                    .with_context(|| {
172                        "AggregatorRuntime in the state READY can not get current time point from chain"
173                    })?;
174
175                if state.current_time_point.epoch < last_time_point.epoch {
176                    // transition READY > IDLE
177                    info!(self.logger, "→ Epoch has changed, transitioning to IDLE"; "last_time_point" => ?last_time_point);
178                    self.state = AggregatorState::Idle(IdleState {
179                        current_time_point: Some(state.current_time_point),
180                    });
181                } else if let Some(open_message) = self
182                    .runner
183                    .get_current_non_certified_open_message(&last_time_point)
184                    .await
185                    .with_context(|| "AggregatorRuntime can not get the current open message")?
186                {
187                    // transition READY > SIGNING
188                    info!(self.logger, "→ Transitioning to SIGNING");
189                    let new_state = self
190                        .transition_from_ready_to_signing(last_time_point.clone(), open_message.clone())
191                        .await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
192                    self.state = AggregatorState::Signing(new_state);
193                } else {
194                    // READY > READY
195                    info!(
196                        self.logger, " ⋅ No open message to certify, waiting…";
197                        "time_point" => ?state.current_time_point
198                    );
199                    self.state = AggregatorState::Ready(ReadyState {
200                        current_time_point: last_time_point,
201                    });
202                }
203            }
204            AggregatorState::Signing(state) => {
205                let last_time_point: TimePoint =
206                    self.runner.get_time_point_from_chain().await.with_context(|| {
207                        "AggregatorRuntime in the state SIGNING can not get current time point from chain"
208                    })?;
209
210                let is_outdated = self
211                    .runner
212                    .is_open_message_outdated(
213                        state.open_message.signed_entity_type.clone(),
214                        &last_time_point,
215                    )
216                    .await?;
217
218                if state.current_time_point.epoch < last_time_point.epoch {
219                    // SIGNING > IDLE
220                    info!(self.logger, "→ Epoch changed, transitioning to IDLE");
221                    let new_state = self.transition_from_signing_to_idle(state).await?;
222                    self.state = AggregatorState::Idle(new_state);
223                } else if is_outdated {
224                    // SIGNING > READY
225                    info!(
226                        self.logger,
227                        "→ Open message changed, transitioning to READY"
228                    );
229                    let new_state =
230                        self.transition_from_signing_to_ready_new_open_message(state).await?;
231                    self.state = AggregatorState::Ready(new_state);
232                } else {
233                    // SIGNING > READY
234                    let new_state =
235                        self.transition_from_signing_to_ready_multisignature(state).await?;
236                    info!(
237                        self.logger,
238                        "→ A multi-signature has been created, build an artifact & a certificate and transitioning back to READY"
239                    );
240                    self.state = AggregatorState::Ready(new_state);
241                }
242            }
243        }
244
245        self.runner.increment_runtime_cycle_success_since_startup_counter();
246
247        Ok(())
248    }
249
250    /// Perform a transition from `IDLE` state to `READY` state when
251    /// the certificate chain is valid.
252    async fn try_transition_from_idle_to_ready(
253        &mut self,
254        maybe_current_time_point: Option<TimePoint>,
255        new_time_point: TimePoint,
256    ) -> Result<(), RuntimeError> {
257        trace!(self.logger, "Trying transition from IDLE to READY state");
258
259        if maybe_current_time_point.is_none()
260            || maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
261        {
262            self.runner.close_signer_registration_round().await?;
263            self.runner
264                .update_era_checker(new_time_point.epoch)
265                .await
266                .map_err(|e| RuntimeError::critical("transiting IDLE → READY", Some(e)))?;
267            self.runner.update_stake_distribution(&new_time_point).await?;
268            self.runner.inform_new_epoch(new_time_point.epoch).await?;
269            self.runner.upkeep(new_time_point.epoch).await?;
270            self.runner.open_signer_registration_round(&new_time_point).await?;
271            self.runner.update_epoch_settings().await?;
272            if self.config.is_follower {
273                self.runner
274                    .synchronize_follower_aggregator_signer_registration()
275                    .await?;
276            }
277            self.runner.precompute_epoch_data().await?;
278        }
279
280        self.runner
281            .is_certificate_chain_valid(&new_time_point)
282            .await
283            .map_err(|e| RuntimeError::KeepState {
284                message: "certificate chain is invalid".to_string(),
285                nested_error: e.into(),
286            })?;
287
288        Ok(())
289    }
290
291    /// Perform a transition from `SIGNING` state to `READY` state when a new
292    /// multi-signature is issued.
293    async fn transition_from_signing_to_ready_multisignature(
294        &self,
295        state: SigningState,
296    ) -> Result<ReadyState, RuntimeError> {
297        trace!(
298            self.logger,
299            "Launching transition from SIGNING to READY state"
300        );
301        let certificate = self
302            .runner
303            .create_certificate(&state.open_message.signed_entity_type)
304            .await?
305            .ok_or_else(|| RuntimeError::KeepState {
306                message: "not enough signature yet to create a certificate, waiting…".to_string(),
307                nested_error: None,
308            })?;
309        self.runner
310            .create_artifact(&state.open_message.signed_entity_type, &certificate)
311            .await
312            .map_err(|e| RuntimeError::ReInit {
313                message: "transiting SIGNING → READY: failed to create artifact. Retrying…"
314                    .to_string(),
315                nested_error: Some(e),
316            })?;
317
318        Ok(ReadyState {
319            current_time_point: state.current_time_point,
320        })
321    }
322
323    /// Perform a transition from `SIGNING` state to `IDLE` state when a new
324    /// epoch is detected.
325    async fn transition_from_signing_to_idle(
326        &self,
327        state: SigningState,
328    ) -> Result<IdleState, RuntimeError> {
329        trace!(
330            self.logger,
331            "Launching transition from SIGNING to IDLE state"
332        );
333
334        Ok(IdleState {
335            current_time_point: Some(state.current_time_point),
336        })
337    }
338
339    /// Perform a transition from `SIGNING` state to `READY` state when a new
340    /// open message is detected.
341    async fn transition_from_signing_to_ready_new_open_message(
342        &self,
343        state: SigningState,
344    ) -> Result<ReadyState, RuntimeError> {
345        trace!(
346            self.logger,
347            "Launching transition from SIGNING to READY state"
348        );
349
350        Ok(ReadyState {
351            current_time_point: state.current_time_point,
352        })
353    }
354
355    /// Perform a transition from `READY` state to `SIGNING` state when a new
356    /// open message is opened.
357    async fn transition_from_ready_to_signing(
358        &mut self,
359        new_time_point: TimePoint,
360        open_message: OpenMessage,
361    ) -> Result<SigningState, RuntimeError> {
362        trace!(
363            self.logger,
364            "Launching transition from READY to SIGNING state"
365        );
366
367        let state = SigningState {
368            current_time_point: new_time_point,
369            open_message,
370        };
371
372        Ok(state)
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use crate::entities::OpenMessage;
379    use anyhow::anyhow;
380    use mockall::predicate;
381    use std::time::Duration;
382
383    use mithril_common::test_utils::fake_data;
384
385    use crate::test_tools::TestLogger;
386
387    use super::super::runner::MockAggregatorRunner;
388    use super::*;
389
390    async fn init_runtime(
391        init_state: Option<AggregatorState>,
392        runner: MockAggregatorRunner,
393        is_follower: bool,
394    ) -> AggregatorRuntime {
395        AggregatorRuntime::new(
396            AggregatorConfig::new(Duration::from_millis(20), is_follower),
397            init_state,
398            Arc::new(runner),
399            TestLogger::stdout(),
400        )
401        .await
402        .unwrap()
403    }
404
405    mod leader {
406        use super::*;
407
408        #[tokio::test]
409        pub async fn idle_check_certificate_chain_is_not_valid() {
410            let mut runner = MockAggregatorRunner::new();
411            runner
412                .expect_get_time_point_from_chain()
413                .once()
414                .returning(|| Ok(TimePoint::dummy()));
415            runner
416                .expect_update_stake_distribution()
417                .with(predicate::eq(TimePoint::dummy()))
418                .once()
419                .returning(|_| Ok(()));
420            runner
421                .expect_close_signer_registration_round()
422                .once()
423                .returning(|| Ok(()));
424            runner
425                .expect_open_signer_registration_round()
426                .once()
427                .returning(|_| Ok(()));
428            runner
429                .expect_is_certificate_chain_valid()
430                .once()
431                .returning(|_| Err(anyhow!("error")));
432            runner
433                .expect_update_era_checker()
434                .with(predicate::eq(TimePoint::dummy().epoch))
435                .once()
436                .returning(|_| Ok(()));
437            runner
438                .expect_inform_new_epoch()
439                .with(predicate::eq(TimePoint::dummy().epoch))
440                .once()
441                .returning(|_| Ok(()));
442            runner.expect_update_epoch_settings().once().returning(|| Ok(()));
443            runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
444            runner
445                .expect_upkeep()
446                .with(predicate::eq(TimePoint::dummy().epoch))
447                .once()
448                .returning(|_| Ok(()));
449            runner
450                .expect_increment_runtime_cycle_total_since_startup_counter()
451                .once()
452                .returning(|| ());
453            runner
454                .expect_increment_runtime_cycle_success_since_startup_counter()
455                .never();
456
457            let mut runtime = init_runtime(
458                Some(AggregatorState::Idle(IdleState {
459                    current_time_point: None,
460                })),
461                runner,
462                false,
463            )
464            .await;
465            let err = runtime.cycle().await.unwrap_err();
466            assert!(matches!(err, RuntimeError::KeepState { .. }));
467
468            assert_eq!("idle".to_string(), runtime.get_state());
469        }
470
471        #[tokio::test]
472        pub async fn idle_check_certificate_chain_is_valid() {
473            let mut runner = MockAggregatorRunner::new();
474            runner
475                .expect_get_time_point_from_chain()
476                .once()
477                .returning(|| Ok(TimePoint::dummy()));
478            runner
479                .expect_update_stake_distribution()
480                .with(predicate::eq(TimePoint::dummy()))
481                .once()
482                .returning(|_| Ok(()));
483            runner
484                .expect_close_signer_registration_round()
485                .once()
486                .returning(|| Ok(()));
487            runner
488                .expect_open_signer_registration_round()
489                .once()
490                .returning(|_| Ok(()));
491            runner
492                .expect_is_certificate_chain_valid()
493                .once()
494                .returning(|_| Ok(()));
495            runner
496                .expect_update_era_checker()
497                .with(predicate::eq(TimePoint::dummy().epoch))
498                .once()
499                .returning(|_| Ok(()));
500            runner
501                .expect_inform_new_epoch()
502                .with(predicate::eq(TimePoint::dummy().epoch))
503                .once()
504                .returning(|_| Ok(()));
505            runner.expect_update_epoch_settings().once().returning(|| Ok(()));
506            runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
507            runner
508                .expect_upkeep()
509                .with(predicate::eq(TimePoint::dummy().epoch))
510                .once()
511                .returning(|_| Ok(()));
512            runner
513                .expect_increment_runtime_cycle_total_since_startup_counter()
514                .once()
515                .returning(|| ());
516            runner
517                .expect_increment_runtime_cycle_success_since_startup_counter()
518                .once()
519                .returning(|| ());
520
521            let mut runtime = init_runtime(
522                Some(AggregatorState::Idle(IdleState {
523                    current_time_point: None,
524                })),
525                runner,
526                false,
527            )
528            .await;
529            runtime.cycle().await.unwrap();
530
531            assert_eq!("ready".to_string(), runtime.get_state());
532        }
533
534        #[tokio::test]
535        pub async fn ready_new_epoch_detected() {
536            let mut runner = MockAggregatorRunner::new();
537            let time_point = TimePoint::dummy();
538            let new_time_point = TimePoint {
539                epoch: time_point.epoch + 1,
540                ..time_point.clone()
541            };
542            runner
543                .expect_get_time_point_from_chain()
544                .once()
545                .returning(move || Ok(new_time_point.clone()));
546            runner
547                .expect_increment_runtime_cycle_total_since_startup_counter()
548                .once()
549                .returning(|| ());
550            runner
551                .expect_increment_runtime_cycle_success_since_startup_counter()
552                .once()
553                .returning(|| ());
554            let mut runtime = init_runtime(
555                Some(AggregatorState::Ready(ReadyState {
556                    current_time_point: time_point,
557                })),
558                runner,
559                false,
560            )
561            .await;
562            runtime.cycle().await.unwrap();
563
564            assert_eq!("idle".to_string(), runtime.get_state());
565        }
566
567        #[tokio::test]
568        pub async fn ready_open_message_not_exist() {
569            let mut runner = MockAggregatorRunner::new();
570            let time_point = TimePoint::dummy();
571            let next_time_point = TimePoint {
572                immutable_file_number: time_point.immutable_file_number + 1,
573                ..time_point.clone()
574            };
575            let expected_time_point = next_time_point.clone();
576            runner
577                .expect_get_time_point_from_chain()
578                .once()
579                .returning(move || Ok(next_time_point.clone()));
580            runner
581                .expect_get_current_non_certified_open_message()
582                .once()
583                .returning(|_| Ok(None));
584            runner
585                .expect_increment_runtime_cycle_total_since_startup_counter()
586                .once()
587                .returning(|| ());
588            runner
589                .expect_increment_runtime_cycle_success_since_startup_counter()
590                .once()
591                .returning(|| ());
592            let mut runtime = init_runtime(
593                Some(AggregatorState::Ready(ReadyState {
594                    current_time_point: time_point.clone(),
595                })),
596                runner,
597                false,
598            )
599            .await;
600            runtime.cycle().await.unwrap();
601
602            assert_eq!("ready".to_string(), runtime.get_state());
603            assert_eq!(
604                AggregatorState::Ready(ReadyState {
605                    current_time_point: expected_time_point,
606                }),
607                runtime.state
608            );
609        }
610
611        #[tokio::test]
612        pub async fn ready_certificate_does_not_exist_for_time_point() {
613            let mut runner = MockAggregatorRunner::new();
614            runner
615                .expect_get_time_point_from_chain()
616                .once()
617                .returning(|| Ok(TimePoint::dummy()));
618            runner
619                .expect_get_current_non_certified_open_message()
620                .once()
621                .returning(|_| {
622                    let open_message = OpenMessage {
623                        is_certified: false,
624                        ..OpenMessage::dummy()
625                    };
626                    Ok(Some(open_message))
627                });
628            runner
629                .expect_increment_runtime_cycle_total_since_startup_counter()
630                .once()
631                .returning(|| ());
632            runner
633                .expect_increment_runtime_cycle_success_since_startup_counter()
634                .once()
635                .returning(|| ());
636
637            let mut runtime = init_runtime(
638                Some(AggregatorState::Ready(ReadyState {
639                    current_time_point: TimePoint::dummy(),
640                })),
641                runner,
642                false,
643            )
644            .await;
645            runtime.cycle().await.unwrap();
646
647            assert_eq!("signing".to_string(), runtime.get_state());
648        }
649
650        #[tokio::test]
651        async fn signing_changing_open_message_to_ready() {
652            let mut runner = MockAggregatorRunner::new();
653            runner
654                .expect_get_time_point_from_chain()
655                .once()
656                .returning(|| Ok(TimePoint::dummy()));
657            runner
658                .expect_is_open_message_outdated()
659                .once()
660                .returning(|_, _| Ok(true));
661            runner
662                .expect_increment_runtime_cycle_total_since_startup_counter()
663                .once()
664                .returning(|| ());
665            runner
666                .expect_increment_runtime_cycle_success_since_startup_counter()
667                .once()
668                .returning(|| ());
669
670            let initial_state = AggregatorState::Signing(SigningState {
671                current_time_point: TimePoint::dummy(),
672                open_message: OpenMessage::dummy(),
673            });
674
675            let mut runtime = init_runtime(Some(initial_state), runner, false).await;
676            runtime.cycle().await.unwrap();
677
678            assert_eq!("ready".to_string(), runtime.get_state());
679        }
680
681        #[tokio::test]
682        async fn signing_certificate_is_not_created() {
683            let mut runner = MockAggregatorRunner::new();
684            runner
685                .expect_get_time_point_from_chain()
686                .once()
687                .returning(|| Ok(TimePoint::dummy()));
688            runner
689                .expect_is_open_message_outdated()
690                .once()
691                .returning(|_, _| Ok(false));
692            runner.expect_create_certificate().once().returning(|_| Ok(None));
693            runner
694                .expect_increment_runtime_cycle_total_since_startup_counter()
695                .once()
696                .returning(|| ());
697            runner
698                .expect_increment_runtime_cycle_success_since_startup_counter()
699                .never();
700            let state = SigningState {
701                current_time_point: TimePoint::dummy(),
702                open_message: OpenMessage::dummy(),
703            };
704            let mut runtime =
705                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
706            let err = runtime
707                .cycle()
708                .await
709                .expect_err("cycle should have returned an error");
710
711            match err {
712                RuntimeError::KeepState { .. } => (),
713                _ => panic!("KeepState error expected, got {err:?}."),
714            };
715
716            assert_eq!("signing".to_string(), runtime.get_state());
717        }
718
719        #[tokio::test]
720        async fn signing_artifact_not_created() {
721            let mut runner = MockAggregatorRunner::new();
722            runner
723                .expect_get_time_point_from_chain()
724                .once()
725                .returning(|| Ok(TimePoint::dummy()));
726            runner
727                .expect_is_open_message_outdated()
728                .once()
729                .returning(|_, _| Ok(false));
730            runner
731                .expect_create_certificate()
732                .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
733            runner
734                .expect_create_artifact()
735                .once()
736                .returning(|_, _| Err(anyhow!("whatever")));
737            runner
738                .expect_increment_runtime_cycle_total_since_startup_counter()
739                .once()
740                .returning(|| ());
741            runner
742                .expect_increment_runtime_cycle_success_since_startup_counter()
743                .never();
744            let state = SigningState {
745                current_time_point: TimePoint::dummy(),
746                open_message: OpenMessage::dummy(),
747            };
748            let mut runtime =
749                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
750            let err = runtime
751                .cycle()
752                .await
753                .expect_err("cycle should have returned an error");
754
755            match err {
756                RuntimeError::ReInit { .. } => (),
757                _ => panic!("ReInit error expected, got {err:?}."),
758            };
759
760            assert_eq!("signing".to_string(), runtime.get_state());
761        }
762
763        #[tokio::test]
764        async fn signing_certificate_is_created() {
765            let mut runner = MockAggregatorRunner::new();
766            runner
767                .expect_get_time_point_from_chain()
768                .once()
769                .returning(|| Ok(TimePoint::dummy()));
770            runner
771                .expect_is_open_message_outdated()
772                .once()
773                .returning(|_, _| Ok(false));
774            runner
775                .expect_create_certificate()
776                .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
777            runner.expect_create_artifact().once().returning(|_, _| Ok(()));
778            runner
779                .expect_increment_runtime_cycle_total_since_startup_counter()
780                .once()
781                .returning(|| ());
782            runner
783                .expect_increment_runtime_cycle_success_since_startup_counter()
784                .once()
785                .returning(|| ());
786
787            let state = SigningState {
788                current_time_point: TimePoint::dummy(),
789                open_message: OpenMessage::dummy(),
790            };
791            let mut runtime =
792                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
793            runtime.cycle().await.unwrap();
794
795            assert_eq!("ready".to_string(), runtime.get_state());
796        }
797
798        #[tokio::test]
799        pub async fn critical_error() {
800            let mut runner = MockAggregatorRunner::new();
801            runner
802                .expect_get_time_point_from_chain()
803                .once()
804                .returning(|| Ok(TimePoint::dummy()));
805            runner
806                .expect_update_era_checker()
807                .with(predicate::eq(TimePoint::dummy().epoch))
808                .once()
809                .returning(|_| Err(anyhow!("ERROR")));
810            runner
811                .expect_close_signer_registration_round()
812                .once()
813                .returning(|| Ok(()));
814            runner
815                .expect_increment_runtime_cycle_total_since_startup_counter()
816                .once()
817                .returning(|| ());
818            runner
819                .expect_increment_runtime_cycle_success_since_startup_counter()
820                .never();
821
822            let mut runtime = init_runtime(
823                Some(AggregatorState::Idle(IdleState {
824                    current_time_point: None,
825                })),
826                runner,
827                false,
828            )
829            .await;
830            runtime.cycle().await.unwrap_err();
831
832            assert_eq!("idle".to_string(), runtime.get_state());
833        }
834    }
835
836    mod follower {
837        use super::*;
838
839        #[tokio::test]
840        pub async fn idle_new_epoch_detected_and_leader_not_transitioned_to_epoch() {
841            let mut runner = MockAggregatorRunner::new();
842            let time_point = TimePoint::dummy();
843            let new_time_point = TimePoint {
844                epoch: time_point.epoch + 1,
845                ..time_point.clone()
846            };
847            runner
848                .expect_get_time_point_from_chain()
849                .once()
850                .returning(move || Ok(new_time_point.clone()));
851            runner
852                .expect_is_follower_aggregator_at_same_epoch_as_leader()
853                .once()
854                .returning(|_| Ok(false));
855            runner
856                .expect_increment_runtime_cycle_total_since_startup_counter()
857                .once()
858                .returning(|| ());
859            runner
860                .expect_increment_runtime_cycle_success_since_startup_counter()
861                .once()
862                .returning(|| ());
863            let mut runtime = init_runtime(
864                Some(AggregatorState::Idle(IdleState {
865                    current_time_point: Some(time_point),
866                })),
867                runner,
868                true,
869            )
870            .await;
871            runtime.cycle().await.unwrap();
872
873            assert_eq!("idle".to_string(), runtime.get_state());
874        }
875
876        #[tokio::test]
877        pub async fn idle_new_epoch_detected_and_leader_has_transitioned_to_epoch() {
878            let mut runner = MockAggregatorRunner::new();
879            let time_point = TimePoint::dummy();
880            let new_time_point = TimePoint {
881                epoch: time_point.epoch + 1,
882                ..time_point.clone()
883            };
884            let new_time_point_clone = new_time_point.clone();
885            runner
886                .expect_get_time_point_from_chain()
887                .once()
888                .returning(move || Ok(new_time_point.clone()));
889            runner
890                .expect_is_follower_aggregator_at_same_epoch_as_leader()
891                .once()
892                .returning(|_| Ok(true));
893            runner
894                .expect_update_stake_distribution()
895                .with(predicate::eq(new_time_point_clone.clone()))
896                .once()
897                .returning(|_| Ok(()));
898            runner
899                .expect_close_signer_registration_round()
900                .once()
901                .returning(|| Ok(()));
902            runner
903                .expect_synchronize_follower_aggregator_signer_registration()
904                .once()
905                .returning(|| Ok(()));
906            runner
907                .expect_open_signer_registration_round()
908                .once()
909                .returning(|_| Ok(()));
910            runner
911                .expect_is_certificate_chain_valid()
912                .once()
913                .returning(|_| Ok(()));
914            runner
915                .expect_update_era_checker()
916                .with(predicate::eq(new_time_point_clone.clone().epoch))
917                .once()
918                .returning(|_| Ok(()));
919            runner
920                .expect_inform_new_epoch()
921                .with(predicate::eq(new_time_point_clone.clone().epoch))
922                .once()
923                .returning(|_| Ok(()));
924            runner.expect_update_epoch_settings().once().returning(|| Ok(()));
925            runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
926            runner
927                .expect_upkeep()
928                .with(predicate::eq(new_time_point_clone.clone().epoch))
929                .once()
930                .returning(|_| Ok(()));
931            runner
932                .expect_increment_runtime_cycle_total_since_startup_counter()
933                .once()
934                .returning(|| ());
935            runner
936                .expect_increment_runtime_cycle_success_since_startup_counter()
937                .once()
938                .returning(|| ());
939            let mut runtime = init_runtime(
940                Some(AggregatorState::Idle(IdleState {
941                    current_time_point: Some(time_point),
942                })),
943                runner,
944                true,
945            )
946            .await;
947            runtime.cycle().await.unwrap();
948
949            assert_eq!("ready".to_string(), runtime.get_state());
950        }
951    }
952}