mithril_aggregator/runtime/
state_machine.rs

1use crate::{
2    entities::OpenMessage,
3    runtime::{AggregatorRunnerTrait, RuntimeError},
4    AggregatorConfig,
5};
6
7use anyhow::Context;
8use mithril_common::entities::TimePoint;
9use mithril_common::logging::LoggerExtensions;
10use slog::{info, trace, Logger};
11use std::fmt::Display;
12use std::sync::Arc;
13use tokio::time::sleep;
14
15#[derive(Clone, Debug, PartialEq, Eq)]
16pub struct IdleState {
17    current_time_point: Option<TimePoint>,
18}
19
20#[derive(Clone, Debug, PartialEq, Eq)]
21pub struct ReadyState {
22    current_time_point: TimePoint,
23}
24
25#[derive(Clone, Debug, PartialEq)]
26pub struct SigningState {
27    current_time_point: TimePoint,
28    open_message: OpenMessage,
29}
30
31#[derive(Clone, Debug, PartialEq)]
32pub enum AggregatorState {
33    Idle(IdleState),
34    Ready(ReadyState),
35    Signing(SigningState),
36}
37
38impl Display for AggregatorState {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            AggregatorState::Idle(state) => write!(
42                f,
43                "Idle - {}",
44                match &state.current_time_point {
45                    None => "No TimePoint".to_string(),
46                    Some(time_point) => time_point.to_string(),
47                }
48            ),
49            AggregatorState::Ready(state) => write!(f, "Ready - {}", state.current_time_point),
50            AggregatorState::Signing(state) => write!(f, "Signing - {}", state.current_time_point),
51        }
52    }
53}
54
55/// The AggregatorRuntime responsibility is to create a state machine to handle
56/// all actions required by the process of getting multi-signatures.
57/// See the
58/// [documentation](https://mithril.network/doc/mithril/mithril-network/aggregator#under-the-hood)
59/// for more explanations about the Aggregator state machine.
60pub struct AggregatorRuntime {
61    config: AggregatorConfig,
62    state: AggregatorState,
63    runner: Arc<dyn AggregatorRunnerTrait>,
64    logger: Logger,
65}
66
67impl AggregatorRuntime {
68    /// Create a new instance of the state machine.
69    pub async fn new(
70        aggregator_config: AggregatorConfig,
71        init_state: Option<AggregatorState>,
72        runner: Arc<dyn AggregatorRunnerTrait>,
73        logger: Logger,
74    ) -> Result<Self, RuntimeError> {
75        let logger = logger.new_with_component_name::<Self>();
76        info!(logger, "Initializing runtime");
77
78        let state = if let Some(init_state) = init_state {
79            trace!(logger, "Got initial state from caller");
80            init_state
81        } else {
82            trace!(logger, "Idle state, no current time point");
83            AggregatorState::Idle(IdleState {
84                current_time_point: None,
85            })
86        };
87
88        Ok(Self {
89            config: aggregator_config,
90            state,
91            runner,
92            logger,
93        })
94    }
95
96    /// Return the actual state of the state machine.
97    pub fn get_state(&self) -> String {
98        match self.state {
99            AggregatorState::Idle(_) => "idle".to_string(),
100            AggregatorState::Ready(_) => "ready".to_string(),
101            AggregatorState::Signing(_) => "signing".to_string(),
102        }
103    }
104
105    /// Launches an infinite loop ticking the state machine.
106    pub async fn run(&mut self) -> Result<(), RuntimeError> {
107        info!(self.logger, "Launching State Machine");
108
109        loop {
110            if let Err(e) = self.cycle().await {
111                e.write_to_log(&self.logger);
112                if e.is_critical() {
113                    return Err(e);
114                }
115            }
116
117            info!(
118                self.logger,
119                "… Cycle finished, Sleeping for {} ms",
120                self.config.interval.as_millis()
121            );
122            sleep(self.config.interval).await;
123        }
124    }
125
126    /// Perform one tick of the state machine.
127    pub async fn cycle(&mut self) -> Result<(), RuntimeError> {
128        info!(
129            self.logger,
130            "================================================================================"
131        );
132        info!(self.logger, "new cycle: {}", self.state);
133
134        self.runner
135            .increment_runtime_cycle_total_since_startup_counter();
136
137        match self.state.clone() {
138            AggregatorState::Idle(state) => {
139                let last_time_point = self.runner.get_time_point_from_chain().await.with_context(
140                    || "AggregatorRuntime in the state IDLE can not get current time point from chain",
141                )?;
142
143                info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);
144
145                let can_try_transition_from_idle_to_ready = if self.config.is_follower {
146                    self.runner
147                        .is_follower_aggregator_at_same_epoch_as_leader(&last_time_point)
148                        .await?
149                } else {
150                    true
151                };
152                if can_try_transition_from_idle_to_ready {
153                    self.try_transition_from_idle_to_ready(
154                        state.current_time_point,
155                        last_time_point.clone(),
156                    )
157                    .await?;
158                    self.state = AggregatorState::Ready(ReadyState {
159                        current_time_point: last_time_point,
160                    });
161                }
162            }
163            AggregatorState::Ready(state) => {
164                let last_time_point: TimePoint = self
165                    .runner
166                    .get_time_point_from_chain()
167                    .await
168                    .with_context(|| {
169                        "AggregatorRuntime in the state READY can not get current time point from chain"
170                    })?;
171
172                if state.current_time_point.epoch < last_time_point.epoch {
173                    // transition READY > IDLE
174                    info!(self.logger, "→ Epoch has changed, transitioning to IDLE"; "last_time_point" => ?last_time_point);
175                    self.state = AggregatorState::Idle(IdleState {
176                        current_time_point: Some(state.current_time_point),
177                    });
178                } else if let Some(open_message) = self
179                    .runner
180                    .get_current_non_certified_open_message(&last_time_point)
181                    .await
182                    .with_context(|| "AggregatorRuntime can not get the current open message")?
183                {
184                    // transition READY > SIGNING
185                    info!(self.logger, "→ Transitioning to SIGNING");
186                    let new_state = self
187                        .transition_from_ready_to_signing(last_time_point.clone(), open_message.clone())
188                        .await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
189                    self.state = AggregatorState::Signing(new_state);
190                } else {
191                    // READY > READY
192                    info!(
193                        self.logger, " ⋅ No open message to certify, waiting…";
194                        "time_point" => ?state.current_time_point
195                    );
196                    self.state = AggregatorState::Ready(ReadyState {
197                        current_time_point: last_time_point,
198                    });
199                }
200            }
201            AggregatorState::Signing(state) => {
202                let last_time_point: TimePoint =
203                    self.runner.get_time_point_from_chain().await.with_context(|| {
204                        "AggregatorRuntime in the state SIGNING can not get current time point from chain"
205                    })?;
206
207                let is_outdated = self
208                    .runner
209                    .is_open_message_outdated(
210                        state.open_message.signed_entity_type.clone(),
211                        &last_time_point,
212                    )
213                    .await?;
214
215                if state.current_time_point.epoch < last_time_point.epoch {
216                    // SIGNING > IDLE
217                    info!(self.logger, "→ Epoch changed, transitioning to IDLE");
218                    let new_state = self.transition_from_signing_to_idle(state).await?;
219                    self.state = AggregatorState::Idle(new_state);
220                } else if is_outdated {
221                    // SIGNING > READY
222                    info!(
223                        self.logger,
224                        "→ Open message changed, transitioning to READY"
225                    );
226                    let new_state = self
227                        .transition_from_signing_to_ready_new_open_message(state)
228                        .await?;
229                    self.state = AggregatorState::Ready(new_state);
230                } else {
231                    // SIGNING > READY
232                    let new_state = self
233                        .transition_from_signing_to_ready_multisignature(state)
234                        .await?;
235                    info!(self.logger, "→ A multi-signature has been created, build an artifact & a certificate and transitioning back to READY");
236                    self.state = AggregatorState::Ready(new_state);
237                }
238            }
239        }
240
241        self.runner
242            .increment_runtime_cycle_success_since_startup_counter();
243
244        Ok(())
245    }
246
247    /// Perform a transition from `IDLE` state to `READY` state when
248    /// the certificate chain is valid.
249    async fn try_transition_from_idle_to_ready(
250        &mut self,
251        maybe_current_time_point: Option<TimePoint>,
252        new_time_point: TimePoint,
253    ) -> Result<(), RuntimeError> {
254        trace!(self.logger, "Trying transition from IDLE to READY state");
255
256        if maybe_current_time_point.is_none()
257            || maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
258        {
259            self.runner.close_signer_registration_round().await?;
260            self.runner
261                .update_era_checker(new_time_point.epoch)
262                .await
263                .map_err(|e| RuntimeError::critical("transiting IDLE → READY", Some(e)))?;
264            self.runner
265                .update_stake_distribution(&new_time_point)
266                .await?;
267            self.runner.inform_new_epoch(new_time_point.epoch).await?;
268            self.runner.upkeep(new_time_point.epoch).await?;
269            self.runner
270                .open_signer_registration_round(&new_time_point)
271                .await?;
272            self.runner.update_epoch_settings().await?;
273            if self.config.is_follower {
274                self.runner
275                    .synchronize_follower_aggregator_signer_registration()
276                    .await?;
277            }
278            self.runner.precompute_epoch_data().await?;
279        }
280
281        self.runner
282            .is_certificate_chain_valid(&new_time_point)
283            .await
284            .map_err(|e| RuntimeError::KeepState {
285                message: "certificate chain is invalid".to_string(),
286                nested_error: e.into(),
287            })?;
288
289        Ok(())
290    }
291
292    /// Perform a transition from `SIGNING` state to `READY` state when a new
293    /// multi-signature is issued.
294    async fn transition_from_signing_to_ready_multisignature(
295        &self,
296        state: SigningState,
297    ) -> Result<ReadyState, RuntimeError> {
298        trace!(
299            self.logger,
300            "Launching transition from SIGNING to READY state"
301        );
302        let certificate = self
303            .runner
304            .create_certificate(&state.open_message.signed_entity_type)
305            .await?
306            .ok_or_else(|| RuntimeError::KeepState {
307                message: "not enough signature yet to create a certificate, waiting…".to_string(),
308                nested_error: None,
309            })?;
310        self.runner
311            .create_artifact(&state.open_message.signed_entity_type, &certificate)
312            .await
313            .map_err(|e| RuntimeError::ReInit {
314                message: "transiting SIGNING → READY: failed to create artifact. Retrying…"
315                    .to_string(),
316                nested_error: Some(e),
317            })?;
318
319        Ok(ReadyState {
320            current_time_point: state.current_time_point,
321        })
322    }
323
324    /// Perform a transition from `SIGNING` state to `IDLE` state when a new
325    /// epoch is detected.
326    async fn transition_from_signing_to_idle(
327        &self,
328        state: SigningState,
329    ) -> Result<IdleState, RuntimeError> {
330        trace!(
331            self.logger,
332            "Launching transition from SIGNING to IDLE state"
333        );
334
335        Ok(IdleState {
336            current_time_point: Some(state.current_time_point),
337        })
338    }
339
340    /// Perform a transition from `SIGNING` state to `READY` state when a new
341    /// open message is detected.
342    async fn transition_from_signing_to_ready_new_open_message(
343        &self,
344        state: SigningState,
345    ) -> Result<ReadyState, RuntimeError> {
346        trace!(
347            self.logger,
348            "Launching transition from SIGNING to READY state"
349        );
350
351        Ok(ReadyState {
352            current_time_point: state.current_time_point,
353        })
354    }
355
356    /// Perform a transition from `READY` state to `SIGNING` state when a new
357    /// open message is opened.
358    async fn transition_from_ready_to_signing(
359        &mut self,
360        new_time_point: TimePoint,
361        open_message: OpenMessage,
362    ) -> Result<SigningState, RuntimeError> {
363        trace!(
364            self.logger,
365            "Launching transition from READY to SIGNING state"
366        );
367
368        let state = SigningState {
369            current_time_point: new_time_point,
370            open_message,
371        };
372
373        Ok(state)
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use crate::entities::OpenMessage;
380    use anyhow::anyhow;
381    use mockall::predicate;
382    use std::time::Duration;
383
384    use mithril_common::test_utils::fake_data;
385
386    use crate::test_tools::TestLogger;
387
388    use super::super::runner::MockAggregatorRunner;
389    use super::*;
390
391    async fn init_runtime(
392        init_state: Option<AggregatorState>,
393        runner: MockAggregatorRunner,
394        is_follower: bool,
395    ) -> AggregatorRuntime {
396        AggregatorRuntime::new(
397            AggregatorConfig::new(Duration::from_millis(20), is_follower),
398            init_state,
399            Arc::new(runner),
400            TestLogger::stdout(),
401        )
402        .await
403        .unwrap()
404    }
405
406    mod leader {
407        use super::*;
408
409        #[tokio::test]
410        pub async fn idle_check_certificate_chain_is_not_valid() {
411            let mut runner = MockAggregatorRunner::new();
412            runner
413                .expect_get_time_point_from_chain()
414                .once()
415                .returning(|| Ok(TimePoint::dummy()));
416            runner
417                .expect_update_stake_distribution()
418                .with(predicate::eq(TimePoint::dummy()))
419                .once()
420                .returning(|_| Ok(()));
421            runner
422                .expect_close_signer_registration_round()
423                .once()
424                .returning(|| Ok(()));
425            runner
426                .expect_open_signer_registration_round()
427                .once()
428                .returning(|_| Ok(()));
429            runner
430                .expect_is_certificate_chain_valid()
431                .once()
432                .returning(|_| Err(anyhow!("error")));
433            runner
434                .expect_update_era_checker()
435                .with(predicate::eq(TimePoint::dummy().epoch))
436                .once()
437                .returning(|_| Ok(()));
438            runner
439                .expect_inform_new_epoch()
440                .with(predicate::eq(TimePoint::dummy().epoch))
441                .once()
442                .returning(|_| Ok(()));
443            runner
444                .expect_update_epoch_settings()
445                .once()
446                .returning(|| Ok(()));
447            runner
448                .expect_precompute_epoch_data()
449                .once()
450                .returning(|| Ok(()));
451            runner
452                .expect_upkeep()
453                .with(predicate::eq(TimePoint::dummy().epoch))
454                .once()
455                .returning(|_| Ok(()));
456            runner
457                .expect_increment_runtime_cycle_total_since_startup_counter()
458                .once()
459                .returning(|| ());
460            runner
461                .expect_increment_runtime_cycle_success_since_startup_counter()
462                .never();
463
464            let mut runtime = init_runtime(
465                Some(AggregatorState::Idle(IdleState {
466                    current_time_point: None,
467                })),
468                runner,
469                false,
470            )
471            .await;
472            let err = runtime.cycle().await.unwrap_err();
473            assert!(matches!(err, RuntimeError::KeepState { .. }));
474
475            assert_eq!("idle".to_string(), runtime.get_state());
476        }
477
478        #[tokio::test]
479        pub async fn idle_check_certificate_chain_is_valid() {
480            let mut runner = MockAggregatorRunner::new();
481            runner
482                .expect_get_time_point_from_chain()
483                .once()
484                .returning(|| Ok(TimePoint::dummy()));
485            runner
486                .expect_update_stake_distribution()
487                .with(predicate::eq(TimePoint::dummy()))
488                .once()
489                .returning(|_| Ok(()));
490            runner
491                .expect_close_signer_registration_round()
492                .once()
493                .returning(|| Ok(()));
494            runner
495                .expect_open_signer_registration_round()
496                .once()
497                .returning(|_| Ok(()));
498            runner
499                .expect_is_certificate_chain_valid()
500                .once()
501                .returning(|_| Ok(()));
502            runner
503                .expect_update_era_checker()
504                .with(predicate::eq(TimePoint::dummy().epoch))
505                .once()
506                .returning(|_| Ok(()));
507            runner
508                .expect_inform_new_epoch()
509                .with(predicate::eq(TimePoint::dummy().epoch))
510                .once()
511                .returning(|_| Ok(()));
512            runner
513                .expect_update_epoch_settings()
514                .once()
515                .returning(|| Ok(()));
516            runner
517                .expect_precompute_epoch_data()
518                .once()
519                .returning(|| Ok(()));
520            runner
521                .expect_upkeep()
522                .with(predicate::eq(TimePoint::dummy().epoch))
523                .once()
524                .returning(|_| Ok(()));
525            runner
526                .expect_increment_runtime_cycle_total_since_startup_counter()
527                .once()
528                .returning(|| ());
529            runner
530                .expect_increment_runtime_cycle_success_since_startup_counter()
531                .once()
532                .returning(|| ());
533
534            let mut runtime = init_runtime(
535                Some(AggregatorState::Idle(IdleState {
536                    current_time_point: None,
537                })),
538                runner,
539                false,
540            )
541            .await;
542            runtime.cycle().await.unwrap();
543
544            assert_eq!("ready".to_string(), runtime.get_state());
545        }
546
547        #[tokio::test]
548        pub async fn ready_new_epoch_detected() {
549            let mut runner = MockAggregatorRunner::new();
550            let time_point = TimePoint::dummy();
551            let new_time_point = TimePoint {
552                epoch: time_point.epoch + 1,
553                ..time_point.clone()
554            };
555            runner
556                .expect_get_time_point_from_chain()
557                .once()
558                .returning(move || Ok(new_time_point.clone()));
559            runner
560                .expect_increment_runtime_cycle_total_since_startup_counter()
561                .once()
562                .returning(|| ());
563            runner
564                .expect_increment_runtime_cycle_success_since_startup_counter()
565                .once()
566                .returning(|| ());
567            let mut runtime = init_runtime(
568                Some(AggregatorState::Ready(ReadyState {
569                    current_time_point: time_point,
570                })),
571                runner,
572                false,
573            )
574            .await;
575            runtime.cycle().await.unwrap();
576
577            assert_eq!("idle".to_string(), runtime.get_state());
578        }
579
580        #[tokio::test]
581        pub async fn ready_open_message_not_exist() {
582            let mut runner = MockAggregatorRunner::new();
583            let time_point = TimePoint::dummy();
584            let next_time_point = TimePoint {
585                immutable_file_number: time_point.immutable_file_number + 1,
586                ..time_point.clone()
587            };
588            let expected_time_point = next_time_point.clone();
589            runner
590                .expect_get_time_point_from_chain()
591                .once()
592                .returning(move || Ok(next_time_point.clone()));
593            runner
594                .expect_get_current_non_certified_open_message()
595                .once()
596                .returning(|_| Ok(None));
597            runner
598                .expect_increment_runtime_cycle_total_since_startup_counter()
599                .once()
600                .returning(|| ());
601            runner
602                .expect_increment_runtime_cycle_success_since_startup_counter()
603                .once()
604                .returning(|| ());
605            let mut runtime = init_runtime(
606                Some(AggregatorState::Ready(ReadyState {
607                    current_time_point: time_point.clone(),
608                })),
609                runner,
610                false,
611            )
612            .await;
613            runtime.cycle().await.unwrap();
614
615            assert_eq!("ready".to_string(), runtime.get_state());
616            assert_eq!(
617                AggregatorState::Ready(ReadyState {
618                    current_time_point: expected_time_point,
619                }),
620                runtime.state
621            );
622        }
623
624        #[tokio::test]
625        pub async fn ready_certificate_does_not_exist_for_time_point() {
626            let mut runner = MockAggregatorRunner::new();
627            runner
628                .expect_get_time_point_from_chain()
629                .once()
630                .returning(|| Ok(TimePoint::dummy()));
631            runner
632                .expect_get_current_non_certified_open_message()
633                .once()
634                .returning(|_| {
635                    let open_message = OpenMessage {
636                        is_certified: false,
637                        ..OpenMessage::dummy()
638                    };
639                    Ok(Some(open_message))
640                });
641            runner
642                .expect_increment_runtime_cycle_total_since_startup_counter()
643                .once()
644                .returning(|| ());
645            runner
646                .expect_increment_runtime_cycle_success_since_startup_counter()
647                .once()
648                .returning(|| ());
649
650            let mut runtime = init_runtime(
651                Some(AggregatorState::Ready(ReadyState {
652                    current_time_point: TimePoint::dummy(),
653                })),
654                runner,
655                false,
656            )
657            .await;
658            runtime.cycle().await.unwrap();
659
660            assert_eq!("signing".to_string(), runtime.get_state());
661        }
662
663        #[tokio::test]
664        async fn signing_changing_open_message_to_ready() {
665            let mut runner = MockAggregatorRunner::new();
666            runner
667                .expect_get_time_point_from_chain()
668                .once()
669                .returning(|| Ok(TimePoint::dummy()));
670            runner
671                .expect_is_open_message_outdated()
672                .once()
673                .returning(|_, _| Ok(true));
674            runner
675                .expect_increment_runtime_cycle_total_since_startup_counter()
676                .once()
677                .returning(|| ());
678            runner
679                .expect_increment_runtime_cycle_success_since_startup_counter()
680                .once()
681                .returning(|| ());
682
683            let initial_state = AggregatorState::Signing(SigningState {
684                current_time_point: TimePoint::dummy(),
685                open_message: OpenMessage::dummy(),
686            });
687
688            let mut runtime = init_runtime(Some(initial_state), runner, false).await;
689            runtime.cycle().await.unwrap();
690
691            assert_eq!("ready".to_string(), runtime.get_state());
692        }
693
694        #[tokio::test]
695        async fn signing_certificate_is_not_created() {
696            let mut runner = MockAggregatorRunner::new();
697            runner
698                .expect_get_time_point_from_chain()
699                .once()
700                .returning(|| Ok(TimePoint::dummy()));
701            runner
702                .expect_is_open_message_outdated()
703                .once()
704                .returning(|_, _| Ok(false));
705            runner
706                .expect_create_certificate()
707                .once()
708                .returning(|_| Ok(None));
709            runner
710                .expect_increment_runtime_cycle_total_since_startup_counter()
711                .once()
712                .returning(|| ());
713            runner
714                .expect_increment_runtime_cycle_success_since_startup_counter()
715                .never();
716            let state = SigningState {
717                current_time_point: TimePoint::dummy(),
718                open_message: OpenMessage::dummy(),
719            };
720            let mut runtime =
721                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
722            let err = runtime
723                .cycle()
724                .await
725                .expect_err("cycle should have returned an error");
726
727            match err {
728                RuntimeError::KeepState { .. } => (),
729                _ => panic!("KeepState error expected, got {err:?}."),
730            };
731
732            assert_eq!("signing".to_string(), runtime.get_state());
733        }
734
735        #[tokio::test]
736        async fn signing_artifact_not_created() {
737            let mut runner = MockAggregatorRunner::new();
738            runner
739                .expect_get_time_point_from_chain()
740                .once()
741                .returning(|| Ok(TimePoint::dummy()));
742            runner
743                .expect_is_open_message_outdated()
744                .once()
745                .returning(|_, _| Ok(false));
746            runner
747                .expect_create_certificate()
748                .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
749            runner
750                .expect_create_artifact()
751                .once()
752                .returning(|_, _| Err(anyhow!("whatever")));
753            runner
754                .expect_increment_runtime_cycle_total_since_startup_counter()
755                .once()
756                .returning(|| ());
757            runner
758                .expect_increment_runtime_cycle_success_since_startup_counter()
759                .never();
760            let state = SigningState {
761                current_time_point: TimePoint::dummy(),
762                open_message: OpenMessage::dummy(),
763            };
764            let mut runtime =
765                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
766            let err = runtime
767                .cycle()
768                .await
769                .expect_err("cycle should have returned an error");
770
771            match err {
772                RuntimeError::ReInit { .. } => (),
773                _ => panic!("ReInit error expected, got {err:?}."),
774            };
775
776            assert_eq!("signing".to_string(), runtime.get_state());
777        }
778
779        #[tokio::test]
780        async fn signing_certificate_is_created() {
781            let mut runner = MockAggregatorRunner::new();
782            runner
783                .expect_get_time_point_from_chain()
784                .once()
785                .returning(|| Ok(TimePoint::dummy()));
786            runner
787                .expect_is_open_message_outdated()
788                .once()
789                .returning(|_, _| Ok(false));
790            runner
791                .expect_create_certificate()
792                .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
793            runner
794                .expect_create_artifact()
795                .once()
796                .returning(|_, _| Ok(()));
797            runner
798                .expect_increment_runtime_cycle_total_since_startup_counter()
799                .once()
800                .returning(|| ());
801            runner
802                .expect_increment_runtime_cycle_success_since_startup_counter()
803                .once()
804                .returning(|| ());
805
806            let state = SigningState {
807                current_time_point: TimePoint::dummy(),
808                open_message: OpenMessage::dummy(),
809            };
810            let mut runtime =
811                init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
812            runtime.cycle().await.unwrap();
813
814            assert_eq!("ready".to_string(), runtime.get_state());
815        }
816
817        #[tokio::test]
818        pub async fn critical_error() {
819            let mut runner = MockAggregatorRunner::new();
820            runner
821                .expect_get_time_point_from_chain()
822                .once()
823                .returning(|| Ok(TimePoint::dummy()));
824            runner
825                .expect_update_era_checker()
826                .with(predicate::eq(TimePoint::dummy().epoch))
827                .once()
828                .returning(|_| Err(anyhow!("ERROR")));
829            runner
830                .expect_close_signer_registration_round()
831                .once()
832                .returning(|| Ok(()));
833            runner
834                .expect_increment_runtime_cycle_total_since_startup_counter()
835                .once()
836                .returning(|| ());
837            runner
838                .expect_increment_runtime_cycle_success_since_startup_counter()
839                .never();
840
841            let mut runtime = init_runtime(
842                Some(AggregatorState::Idle(IdleState {
843                    current_time_point: None,
844                })),
845                runner,
846                false,
847            )
848            .await;
849            runtime.cycle().await.unwrap_err();
850
851            assert_eq!("idle".to_string(), runtime.get_state());
852        }
853    }
854
855    mod follower {
856        use super::*;
857
858        #[tokio::test]
859        pub async fn idle_new_epoch_detected_and_leader_not_transitioned_to_epoch() {
860            let mut runner = MockAggregatorRunner::new();
861            let time_point = TimePoint::dummy();
862            let new_time_point = TimePoint {
863                epoch: time_point.epoch + 1,
864                ..time_point.clone()
865            };
866            runner
867                .expect_get_time_point_from_chain()
868                .once()
869                .returning(move || Ok(new_time_point.clone()));
870            runner
871                .expect_is_follower_aggregator_at_same_epoch_as_leader()
872                .once()
873                .returning(|_| Ok(false));
874            runner
875                .expect_increment_runtime_cycle_total_since_startup_counter()
876                .once()
877                .returning(|| ());
878            runner
879                .expect_increment_runtime_cycle_success_since_startup_counter()
880                .once()
881                .returning(|| ());
882            let mut runtime = init_runtime(
883                Some(AggregatorState::Idle(IdleState {
884                    current_time_point: Some(time_point),
885                })),
886                runner,
887                true,
888            )
889            .await;
890            runtime.cycle().await.unwrap();
891
892            assert_eq!("idle".to_string(), runtime.get_state());
893        }
894
895        #[tokio::test]
896        pub async fn idle_new_epoch_detected_and_leader_has_transitioned_to_epoch() {
897            let mut runner = MockAggregatorRunner::new();
898            let time_point = TimePoint::dummy();
899            let new_time_point = TimePoint {
900                epoch: time_point.epoch + 1,
901                ..time_point.clone()
902            };
903            let new_time_point_clone = new_time_point.clone();
904            runner
905                .expect_get_time_point_from_chain()
906                .once()
907                .returning(move || Ok(new_time_point.clone()));
908            runner
909                .expect_is_follower_aggregator_at_same_epoch_as_leader()
910                .once()
911                .returning(|_| Ok(true));
912            runner
913                .expect_update_stake_distribution()
914                .with(predicate::eq(new_time_point_clone.clone()))
915                .once()
916                .returning(|_| Ok(()));
917            runner
918                .expect_close_signer_registration_round()
919                .once()
920                .returning(|| Ok(()));
921            runner
922                .expect_synchronize_follower_aggregator_signer_registration()
923                .once()
924                .returning(|| Ok(()));
925            runner
926                .expect_open_signer_registration_round()
927                .once()
928                .returning(|_| Ok(()));
929            runner
930                .expect_is_certificate_chain_valid()
931                .once()
932                .returning(|_| Ok(()));
933            runner
934                .expect_update_era_checker()
935                .with(predicate::eq(new_time_point_clone.clone().epoch))
936                .once()
937                .returning(|_| Ok(()));
938            runner
939                .expect_inform_new_epoch()
940                .with(predicate::eq(new_time_point_clone.clone().epoch))
941                .once()
942                .returning(|_| Ok(()));
943            runner
944                .expect_update_epoch_settings()
945                .once()
946                .returning(|| Ok(()));
947            runner
948                .expect_precompute_epoch_data()
949                .once()
950                .returning(|| Ok(()));
951            runner
952                .expect_upkeep()
953                .with(predicate::eq(new_time_point_clone.clone().epoch))
954                .once()
955                .returning(|_| Ok(()));
956            runner
957                .expect_increment_runtime_cycle_total_since_startup_counter()
958                .once()
959                .returning(|| ());
960            runner
961                .expect_increment_runtime_cycle_success_since_startup_counter()
962                .once()
963                .returning(|| ());
964            let mut runtime = init_runtime(
965                Some(AggregatorState::Idle(IdleState {
966                    current_time_point: Some(time_point),
967                })),
968                runner,
969                true,
970            )
971            .await;
972            runtime.cycle().await.unwrap();
973
974            assert_eq!("ready".to_string(), runtime.get_state());
975        }
976    }
977}