1use anyhow::Context;
2use chrono::Local;
3use slog::{Logger, info, trace};
4use std::fmt::Display;
5use std::sync::Arc;
6
7use mithril_common::entities::TimePoint;
8use mithril_common::logging::LoggerExtensions;
9
10use crate::AggregatorConfig;
11use crate::entities::OpenMessage;
12use crate::runtime::{AggregatorRunnerTrait, RuntimeError};
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct IdleState {
16 current_time_point: Option<TimePoint>,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct ReadyState {
21 current_time_point: TimePoint,
22}
23
24#[derive(Clone, Debug, PartialEq)]
25pub struct SigningState {
26 current_time_point: TimePoint,
27 open_message: OpenMessage,
28}
29
30#[derive(Clone, Debug, PartialEq)]
31pub enum AggregatorState {
32 Idle(IdleState),
33 Ready(ReadyState),
34 Signing(SigningState),
35}
36
37impl Display for AggregatorState {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 AggregatorState::Idle(state) => write!(
41 f,
42 "Idle - {}",
43 match &state.current_time_point {
44 None => "No TimePoint".to_string(),
45 Some(time_point) => time_point.to_string(),
46 }
47 ),
48 AggregatorState::Ready(state) => write!(f, "Ready - {}", state.current_time_point),
49 AggregatorState::Signing(state) => write!(f, "Signing - {}", state.current_time_point),
50 }
51 }
52}
53
54pub struct AggregatorRuntime {
60 config: AggregatorConfig,
61 state: AggregatorState,
62 runner: Arc<dyn AggregatorRunnerTrait>,
63 logger: Logger,
64}
65
66impl AggregatorRuntime {
67 pub async fn new(
69 aggregator_config: AggregatorConfig,
70 init_state: Option<AggregatorState>,
71 runner: Arc<dyn AggregatorRunnerTrait>,
72 logger: Logger,
73 ) -> Result<Self, RuntimeError> {
74 let logger = logger.new_with_component_name::<Self>();
75 info!(logger, "Initializing runtime");
76
77 let state = if let Some(init_state) = init_state {
78 trace!(logger, "Got initial state from caller");
79 init_state
80 } else {
81 trace!(logger, "Idle state, no current time point");
82 AggregatorState::Idle(IdleState {
83 current_time_point: None,
84 })
85 };
86
87 Ok(Self {
88 config: aggregator_config,
89 state,
90 runner,
91 logger,
92 })
93 }
94
95 pub fn get_state(&self) -> String {
97 match self.state {
98 AggregatorState::Idle(_) => "idle".to_string(),
99 AggregatorState::Ready(_) => "ready".to_string(),
100 AggregatorState::Signing(_) => "signing".to_string(),
101 }
102 }
103
104 pub async fn run(&mut self) -> Result<(), RuntimeError> {
106 info!(self.logger, "Launching State Machine");
107 let mut interval = tokio::time::interval(self.config.interval);
108
109 loop {
110 interval.tick().await;
111 let approximate_next_cycle_time = Local::now() + self.config.interval;
114
115 if let Err(e) = self.cycle().await {
116 e.write_to_log(&self.logger);
117 if e.is_critical() {
118 return Err(e);
119 }
120 }
121
122 info!(
123 self.logger, "… Cycle finished";
124 "approximate_next_cycle_time" => %approximate_next_cycle_time.time().format("%H:%M:%S%.3f"),
125 "run_interval_in_ms" => self.config.interval.as_millis(),
126 );
127 }
128 }
129
130 pub async fn cycle(&mut self) -> Result<(), RuntimeError> {
132 info!(
133 self.logger,
134 "================================================================================"
135 );
136 info!(self.logger, "New cycle: {}", self.state);
137
138 self.runner.increment_runtime_cycle_total_since_startup_counter();
139
140 match self.state.clone() {
141 AggregatorState::Idle(state) => {
142 let last_time_point = self.runner.get_time_point_from_chain().await.with_context(
143 || "AggregatorRuntime in the state IDLE can not get current time point from chain",
144 )?;
145
146 info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);
147
148 let can_try_transition_from_idle_to_ready = if self.config.is_follower {
149 self.runner
150 .is_follower_aggregator_at_same_epoch_as_leader(&last_time_point)
151 .await?
152 } else {
153 true
154 };
155 if can_try_transition_from_idle_to_ready {
156 self.try_transition_from_idle_to_ready(
157 state.current_time_point,
158 last_time_point.clone(),
159 )
160 .await?;
161 self.state = AggregatorState::Ready(ReadyState {
162 current_time_point: last_time_point,
163 });
164 }
165 }
166 AggregatorState::Ready(state) => {
167 let last_time_point: TimePoint = self
168 .runner
169 .get_time_point_from_chain()
170 .await
171 .with_context(|| {
172 "AggregatorRuntime in the state READY can not get current time point from chain"
173 })?;
174
175 if state.current_time_point.epoch < last_time_point.epoch {
176 info!(self.logger, "→ Epoch has changed, transitioning to IDLE"; "last_time_point" => ?last_time_point);
178 self.state = AggregatorState::Idle(IdleState {
179 current_time_point: Some(state.current_time_point),
180 });
181 } else if let Some(open_message) = self
182 .runner
183 .get_current_non_certified_open_message(&last_time_point)
184 .await
185 .with_context(|| "AggregatorRuntime can not get the current open message")?
186 {
187 info!(self.logger, "→ Transitioning to SIGNING");
189 let new_state = self
190 .transition_from_ready_to_signing(last_time_point.clone(), open_message.clone())
191 .await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
192 self.state = AggregatorState::Signing(new_state);
193 } else {
194 info!(
196 self.logger, " ⋅ No open message to certify, waiting…";
197 "time_point" => ?state.current_time_point
198 );
199 self.state = AggregatorState::Ready(ReadyState {
200 current_time_point: last_time_point,
201 });
202 }
203 }
204 AggregatorState::Signing(state) => {
205 let last_time_point: TimePoint =
206 self.runner.get_time_point_from_chain().await.with_context(|| {
207 "AggregatorRuntime in the state SIGNING can not get current time point from chain"
208 })?;
209
210 let is_outdated = self
211 .runner
212 .is_open_message_outdated(
213 state.open_message.signed_entity_type.clone(),
214 &last_time_point,
215 )
216 .await?;
217
218 if state.current_time_point.epoch < last_time_point.epoch {
219 info!(self.logger, "→ Epoch changed, transitioning to IDLE");
221 let new_state = self.transition_from_signing_to_idle(state).await?;
222 self.state = AggregatorState::Idle(new_state);
223 } else if is_outdated {
224 info!(
226 self.logger,
227 "→ Open message changed, transitioning to READY"
228 );
229 let new_state =
230 self.transition_from_signing_to_ready_new_open_message(state).await?;
231 self.state = AggregatorState::Ready(new_state);
232 } else {
233 let new_state =
235 self.transition_from_signing_to_ready_multisignature(state).await?;
236 info!(
237 self.logger,
238 "→ A multi-signature has been created, build an artifact & a certificate and transitioning back to READY"
239 );
240 self.state = AggregatorState::Ready(new_state);
241 }
242 }
243 }
244
245 self.runner.increment_runtime_cycle_success_since_startup_counter();
246
247 Ok(())
248 }
249
250 async fn try_transition_from_idle_to_ready(
253 &mut self,
254 maybe_current_time_point: Option<TimePoint>,
255 new_time_point: TimePoint,
256 ) -> Result<(), RuntimeError> {
257 trace!(self.logger, "Trying transition from IDLE to READY state");
258
259 if maybe_current_time_point.is_none()
260 || maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
261 {
262 self.runner.close_signer_registration_round().await?;
263 self.runner
264 .update_era_checker(new_time_point.epoch)
265 .await
266 .map_err(|e| RuntimeError::critical("transiting IDLE → READY", Some(e)))?;
267 self.runner.update_stake_distribution(&new_time_point).await?;
268 self.runner.inform_new_epoch(new_time_point.epoch).await?;
269 self.runner.upkeep(new_time_point.epoch).await?;
270 self.runner.open_signer_registration_round(&new_time_point).await?;
271 if self.config.is_follower {
272 self.runner
273 .synchronize_follower_aggregator_signer_registration()
274 .await?;
275 }
276 self.runner.precompute_epoch_data().await?;
277 }
278
279 let chain_validity_result = self
280 .runner
281 .is_certificate_chain_valid(&new_time_point)
282 .await
283 .map_err(|e| RuntimeError::KeepState {
284 message: "certificate chain is invalid".to_string(),
285 nested_error: e.into(),
286 });
287 if self.config.is_follower {
288 let force_sync = chain_validity_result.is_err();
289 self.runner
290 .synchronize_follower_aggregator_certificate_chain(force_sync)
291 .await?;
292 }
293 chain_validity_result?;
294
295 Ok(())
296 }
297
298 async fn transition_from_signing_to_ready_multisignature(
301 &self,
302 state: SigningState,
303 ) -> Result<ReadyState, RuntimeError> {
304 trace!(
305 self.logger,
306 "Launching transition from SIGNING to READY state"
307 );
308 let certificate = self
309 .runner
310 .create_certificate(&state.open_message.signed_entity_type)
311 .await?
312 .ok_or_else(|| RuntimeError::KeepState {
313 message: "not enough signature yet to create a certificate, waiting…".to_string(),
314 nested_error: None,
315 })?;
316 self.runner
317 .create_artifact(&state.open_message.signed_entity_type, &certificate)
318 .await
319 .map_err(|e| RuntimeError::ReInit {
320 message: "transiting SIGNING → READY: failed to create artifact. Retrying…"
321 .to_string(),
322 nested_error: Some(e),
323 })?;
324
325 Ok(ReadyState {
326 current_time_point: state.current_time_point,
327 })
328 }
329
330 async fn transition_from_signing_to_idle(
333 &self,
334 state: SigningState,
335 ) -> Result<IdleState, RuntimeError> {
336 trace!(
337 self.logger,
338 "Launching transition from SIGNING to IDLE state"
339 );
340
341 Ok(IdleState {
342 current_time_point: Some(state.current_time_point),
343 })
344 }
345
346 async fn transition_from_signing_to_ready_new_open_message(
349 &self,
350 state: SigningState,
351 ) -> Result<ReadyState, RuntimeError> {
352 trace!(
353 self.logger,
354 "Launching transition from SIGNING to READY state"
355 );
356
357 Ok(ReadyState {
358 current_time_point: state.current_time_point,
359 })
360 }
361
362 async fn transition_from_ready_to_signing(
365 &mut self,
366 new_time_point: TimePoint,
367 open_message: OpenMessage,
368 ) -> Result<SigningState, RuntimeError> {
369 trace!(
370 self.logger,
371 "Launching transition from READY to SIGNING state"
372 );
373
374 let state = SigningState {
375 current_time_point: new_time_point,
376 open_message,
377 };
378
379 Ok(state)
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use anyhow::anyhow;
386 use mockall::predicate;
387 use std::time::Duration;
388
389 use mithril_common::test::double::{Dummy, fake_data};
390
391 use crate::entities::OpenMessage;
392 use crate::test::TestLogger;
393
394 use super::super::runner::MockAggregatorRunner;
395 use super::*;
396
397 async fn init_runtime(
398 init_state: Option<AggregatorState>,
399 runner: MockAggregatorRunner,
400 is_follower: bool,
401 ) -> AggregatorRuntime {
402 AggregatorRuntime::new(
403 AggregatorConfig::new(Duration::from_millis(20), is_follower),
404 init_state,
405 Arc::new(runner),
406 TestLogger::stdout(),
407 )
408 .await
409 .unwrap()
410 }
411
412 mod leader {
413 use super::*;
414
415 #[tokio::test]
416 pub async fn idle_check_certificate_chain_is_not_valid() {
417 let mut runner = MockAggregatorRunner::new();
418 runner
419 .expect_get_time_point_from_chain()
420 .once()
421 .returning(|| Ok(TimePoint::dummy()));
422 runner
423 .expect_update_stake_distribution()
424 .with(predicate::eq(TimePoint::dummy()))
425 .once()
426 .returning(|_| Ok(()));
427 runner
428 .expect_close_signer_registration_round()
429 .once()
430 .returning(|| Ok(()));
431 runner
432 .expect_open_signer_registration_round()
433 .once()
434 .returning(|_| Ok(()));
435 runner
436 .expect_is_certificate_chain_valid()
437 .once()
438 .returning(|_| Err(anyhow!("error")));
439 runner
440 .expect_update_era_checker()
441 .with(predicate::eq(TimePoint::dummy().epoch))
442 .once()
443 .returning(|_| Ok(()));
444 runner
445 .expect_inform_new_epoch()
446 .with(predicate::eq(TimePoint::dummy().epoch))
447 .once()
448 .returning(|_| Ok(()));
449 runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
450 runner
451 .expect_upkeep()
452 .with(predicate::eq(TimePoint::dummy().epoch))
453 .once()
454 .returning(|_| Ok(()));
455 runner
456 .expect_increment_runtime_cycle_total_since_startup_counter()
457 .once()
458 .returning(|| ());
459 runner
460 .expect_increment_runtime_cycle_success_since_startup_counter()
461 .never();
462
463 let mut runtime = init_runtime(
464 Some(AggregatorState::Idle(IdleState {
465 current_time_point: None,
466 })),
467 runner,
468 false,
469 )
470 .await;
471 let err = runtime.cycle().await.unwrap_err();
472 assert!(matches!(err, RuntimeError::KeepState { .. }));
473
474 assert_eq!("idle".to_string(), runtime.get_state());
475 }
476
477 #[tokio::test]
478 pub async fn idle_check_certificate_chain_is_valid() {
479 let mut runner = MockAggregatorRunner::new();
480 runner
481 .expect_get_time_point_from_chain()
482 .once()
483 .returning(|| Ok(TimePoint::dummy()));
484 runner
485 .expect_update_stake_distribution()
486 .with(predicate::eq(TimePoint::dummy()))
487 .once()
488 .returning(|_| Ok(()));
489 runner
490 .expect_close_signer_registration_round()
491 .once()
492 .returning(|| Ok(()));
493 runner
494 .expect_open_signer_registration_round()
495 .once()
496 .returning(|_| Ok(()));
497 runner
498 .expect_is_certificate_chain_valid()
499 .once()
500 .returning(|_| Ok(()));
501 runner
502 .expect_update_era_checker()
503 .with(predicate::eq(TimePoint::dummy().epoch))
504 .once()
505 .returning(|_| Ok(()));
506 runner
507 .expect_inform_new_epoch()
508 .with(predicate::eq(TimePoint::dummy().epoch))
509 .once()
510 .returning(|_| Ok(()));
511 runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
512 runner
513 .expect_upkeep()
514 .with(predicate::eq(TimePoint::dummy().epoch))
515 .once()
516 .returning(|_| Ok(()));
517 runner
518 .expect_increment_runtime_cycle_total_since_startup_counter()
519 .once()
520 .returning(|| ());
521 runner
522 .expect_increment_runtime_cycle_success_since_startup_counter()
523 .once()
524 .returning(|| ());
525
526 let mut runtime = init_runtime(
527 Some(AggregatorState::Idle(IdleState {
528 current_time_point: None,
529 })),
530 runner,
531 false,
532 )
533 .await;
534 runtime.cycle().await.unwrap();
535
536 assert_eq!("ready".to_string(), runtime.get_state());
537 }
538
539 #[tokio::test]
540 pub async fn ready_new_epoch_detected() {
541 let mut runner = MockAggregatorRunner::new();
542 let time_point = TimePoint::dummy();
543 let new_time_point = TimePoint {
544 epoch: time_point.epoch + 1,
545 ..time_point.clone()
546 };
547 runner
548 .expect_get_time_point_from_chain()
549 .once()
550 .returning(move || Ok(new_time_point.clone()));
551 runner
552 .expect_increment_runtime_cycle_total_since_startup_counter()
553 .once()
554 .returning(|| ());
555 runner
556 .expect_increment_runtime_cycle_success_since_startup_counter()
557 .once()
558 .returning(|| ());
559 let mut runtime = init_runtime(
560 Some(AggregatorState::Ready(ReadyState {
561 current_time_point: time_point,
562 })),
563 runner,
564 false,
565 )
566 .await;
567 runtime.cycle().await.unwrap();
568
569 assert_eq!("idle".to_string(), runtime.get_state());
570 }
571
572 #[tokio::test]
573 pub async fn ready_open_message_not_exist() {
574 let mut runner = MockAggregatorRunner::new();
575 let time_point = TimePoint::dummy();
576 let next_time_point = TimePoint {
577 immutable_file_number: time_point.immutable_file_number + 1,
578 ..time_point.clone()
579 };
580 let expected_time_point = next_time_point.clone();
581 runner
582 .expect_get_time_point_from_chain()
583 .once()
584 .returning(move || Ok(next_time_point.clone()));
585 runner
586 .expect_get_current_non_certified_open_message()
587 .once()
588 .returning(|_| Ok(None));
589 runner
590 .expect_increment_runtime_cycle_total_since_startup_counter()
591 .once()
592 .returning(|| ());
593 runner
594 .expect_increment_runtime_cycle_success_since_startup_counter()
595 .once()
596 .returning(|| ());
597 let mut runtime = init_runtime(
598 Some(AggregatorState::Ready(ReadyState {
599 current_time_point: time_point.clone(),
600 })),
601 runner,
602 false,
603 )
604 .await;
605 runtime.cycle().await.unwrap();
606
607 assert_eq!("ready".to_string(), runtime.get_state());
608 assert_eq!(
609 AggregatorState::Ready(ReadyState {
610 current_time_point: expected_time_point,
611 }),
612 runtime.state
613 );
614 }
615
616 #[tokio::test]
617 pub async fn ready_certificate_does_not_exist_for_time_point() {
618 let mut runner = MockAggregatorRunner::new();
619 runner
620 .expect_get_time_point_from_chain()
621 .once()
622 .returning(|| Ok(TimePoint::dummy()));
623 runner
624 .expect_get_current_non_certified_open_message()
625 .once()
626 .returning(|_| {
627 let open_message = OpenMessage {
628 is_certified: false,
629 ..OpenMessage::dummy()
630 };
631 Ok(Some(open_message))
632 });
633 runner
634 .expect_increment_runtime_cycle_total_since_startup_counter()
635 .once()
636 .returning(|| ());
637 runner
638 .expect_increment_runtime_cycle_success_since_startup_counter()
639 .once()
640 .returning(|| ());
641
642 let mut runtime = init_runtime(
643 Some(AggregatorState::Ready(ReadyState {
644 current_time_point: TimePoint::dummy(),
645 })),
646 runner,
647 false,
648 )
649 .await;
650 runtime.cycle().await.unwrap();
651
652 assert_eq!("signing".to_string(), runtime.get_state());
653 }
654
655 #[tokio::test]
656 async fn signing_changing_open_message_to_ready() {
657 let mut runner = MockAggregatorRunner::new();
658 runner
659 .expect_get_time_point_from_chain()
660 .once()
661 .returning(|| Ok(TimePoint::dummy()));
662 runner
663 .expect_is_open_message_outdated()
664 .once()
665 .returning(|_, _| Ok(true));
666 runner
667 .expect_increment_runtime_cycle_total_since_startup_counter()
668 .once()
669 .returning(|| ());
670 runner
671 .expect_increment_runtime_cycle_success_since_startup_counter()
672 .once()
673 .returning(|| ());
674
675 let initial_state = AggregatorState::Signing(SigningState {
676 current_time_point: TimePoint::dummy(),
677 open_message: OpenMessage::dummy(),
678 });
679
680 let mut runtime = init_runtime(Some(initial_state), runner, false).await;
681 runtime.cycle().await.unwrap();
682
683 assert_eq!("ready".to_string(), runtime.get_state());
684 }
685
686 #[tokio::test]
687 async fn signing_certificate_is_not_created() {
688 let mut runner = MockAggregatorRunner::new();
689 runner
690 .expect_get_time_point_from_chain()
691 .once()
692 .returning(|| Ok(TimePoint::dummy()));
693 runner
694 .expect_is_open_message_outdated()
695 .once()
696 .returning(|_, _| Ok(false));
697 runner.expect_create_certificate().once().returning(|_| Ok(None));
698 runner
699 .expect_increment_runtime_cycle_total_since_startup_counter()
700 .once()
701 .returning(|| ());
702 runner
703 .expect_increment_runtime_cycle_success_since_startup_counter()
704 .never();
705 let state = SigningState {
706 current_time_point: TimePoint::dummy(),
707 open_message: OpenMessage::dummy(),
708 };
709 let mut runtime =
710 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
711 let err = runtime
712 .cycle()
713 .await
714 .expect_err("cycle should have returned an error");
715
716 match err {
717 RuntimeError::KeepState { .. } => (),
718 _ => panic!("KeepState error expected, got {err:?}."),
719 };
720
721 assert_eq!("signing".to_string(), runtime.get_state());
722 }
723
724 #[tokio::test]
725 async fn signing_artifact_not_created() {
726 let mut runner = MockAggregatorRunner::new();
727 runner
728 .expect_get_time_point_from_chain()
729 .once()
730 .returning(|| Ok(TimePoint::dummy()));
731 runner
732 .expect_is_open_message_outdated()
733 .once()
734 .returning(|_, _| Ok(false));
735 runner
736 .expect_create_certificate()
737 .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
738 runner
739 .expect_create_artifact()
740 .once()
741 .returning(|_, _| Err(anyhow!("whatever")));
742 runner
743 .expect_increment_runtime_cycle_total_since_startup_counter()
744 .once()
745 .returning(|| ());
746 runner
747 .expect_increment_runtime_cycle_success_since_startup_counter()
748 .never();
749 let state = SigningState {
750 current_time_point: TimePoint::dummy(),
751 open_message: OpenMessage::dummy(),
752 };
753 let mut runtime =
754 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
755 let err = runtime
756 .cycle()
757 .await
758 .expect_err("cycle should have returned an error");
759
760 match err {
761 RuntimeError::ReInit { .. } => (),
762 _ => panic!("ReInit error expected, got {err:?}."),
763 };
764
765 assert_eq!("signing".to_string(), runtime.get_state());
766 }
767
768 #[tokio::test]
769 async fn signing_certificate_is_created() {
770 let mut runner = MockAggregatorRunner::new();
771 runner
772 .expect_get_time_point_from_chain()
773 .once()
774 .returning(|| Ok(TimePoint::dummy()));
775 runner
776 .expect_is_open_message_outdated()
777 .once()
778 .returning(|_, _| Ok(false));
779 runner
780 .expect_create_certificate()
781 .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
782 runner.expect_create_artifact().once().returning(|_, _| Ok(()));
783 runner
784 .expect_increment_runtime_cycle_total_since_startup_counter()
785 .once()
786 .returning(|| ());
787 runner
788 .expect_increment_runtime_cycle_success_since_startup_counter()
789 .once()
790 .returning(|| ());
791
792 let state = SigningState {
793 current_time_point: TimePoint::dummy(),
794 open_message: OpenMessage::dummy(),
795 };
796 let mut runtime =
797 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
798 runtime.cycle().await.unwrap();
799
800 assert_eq!("ready".to_string(), runtime.get_state());
801 }
802
803 #[tokio::test]
804 pub async fn critical_error() {
805 let mut runner = MockAggregatorRunner::new();
806 runner
807 .expect_get_time_point_from_chain()
808 .once()
809 .returning(|| Ok(TimePoint::dummy()));
810 runner
811 .expect_update_era_checker()
812 .with(predicate::eq(TimePoint::dummy().epoch))
813 .once()
814 .returning(|_| Err(anyhow!("ERROR")));
815 runner
816 .expect_close_signer_registration_round()
817 .once()
818 .returning(|| Ok(()));
819 runner
820 .expect_increment_runtime_cycle_total_since_startup_counter()
821 .once()
822 .returning(|| ());
823 runner
824 .expect_increment_runtime_cycle_success_since_startup_counter()
825 .never();
826
827 let mut runtime = init_runtime(
828 Some(AggregatorState::Idle(IdleState {
829 current_time_point: None,
830 })),
831 runner,
832 false,
833 )
834 .await;
835 runtime.cycle().await.unwrap_err();
836
837 assert_eq!("idle".to_string(), runtime.get_state());
838 }
839 }
840
841 mod follower {
842 use mockall::predicate::eq;
843
844 use super::*;
845
846 #[tokio::test]
847 pub async fn idle_new_epoch_detected_and_leader_not_transitioned_to_epoch() {
848 let mut runner = MockAggregatorRunner::new();
849 let time_point = TimePoint::dummy();
850 let new_time_point = TimePoint {
851 epoch: time_point.epoch + 1,
852 ..time_point.clone()
853 };
854 runner
855 .expect_get_time_point_from_chain()
856 .once()
857 .returning(move || Ok(new_time_point.clone()));
858 runner
859 .expect_is_follower_aggregator_at_same_epoch_as_leader()
860 .once()
861 .returning(|_| Ok(false));
862 runner
863 .expect_increment_runtime_cycle_total_since_startup_counter()
864 .once()
865 .returning(|| ());
866 runner
867 .expect_increment_runtime_cycle_success_since_startup_counter()
868 .once()
869 .returning(|| ());
870 let mut runtime = init_runtime(
871 Some(AggregatorState::Idle(IdleState {
872 current_time_point: Some(time_point),
873 })),
874 runner,
875 true,
876 )
877 .await;
878 runtime.cycle().await.unwrap();
879
880 assert_eq!("idle".to_string(), runtime.get_state());
881 }
882
883 #[tokio::test]
884 pub async fn idle_new_epoch_detected_and_leader_has_transitioned_to_epoch() {
885 let mut runner = MockAggregatorRunner::new();
886 let time_point = TimePoint::dummy();
887 let new_time_point = TimePoint {
888 epoch: time_point.epoch + 1,
889 ..time_point.clone()
890 };
891 let new_time_point_clone = new_time_point.clone();
892 runner
893 .expect_get_time_point_from_chain()
894 .once()
895 .returning(move || Ok(new_time_point.clone()));
896 runner
897 .expect_is_follower_aggregator_at_same_epoch_as_leader()
898 .once()
899 .returning(|_| Ok(true));
900 runner
901 .expect_update_stake_distribution()
902 .with(predicate::eq(new_time_point_clone.clone()))
903 .once()
904 .returning(|_| Ok(()));
905 runner
906 .expect_close_signer_registration_round()
907 .once()
908 .returning(|| Ok(()));
909 runner
910 .expect_synchronize_follower_aggregator_signer_registration()
911 .once()
912 .returning(|| Ok(()));
913 runner
914 .expect_open_signer_registration_round()
915 .once()
916 .returning(|_| Ok(()));
917 runner
918 .expect_is_certificate_chain_valid()
919 .once()
920 .returning(|_| Ok(()));
921 runner
922 .expect_synchronize_follower_aggregator_certificate_chain()
923 .once()
924 .with(eq(false)) .returning(|_| Ok(()));
926 runner
927 .expect_update_era_checker()
928 .with(predicate::eq(new_time_point_clone.clone().epoch))
929 .once()
930 .returning(|_| Ok(()));
931 runner
932 .expect_inform_new_epoch()
933 .with(predicate::eq(new_time_point_clone.clone().epoch))
934 .once()
935 .returning(|_| Ok(()));
936 runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
937 runner
938 .expect_upkeep()
939 .with(predicate::eq(new_time_point_clone.clone().epoch))
940 .once()
941 .returning(|_| Ok(()));
942 runner
943 .expect_increment_runtime_cycle_total_since_startup_counter()
944 .once()
945 .returning(|| ());
946 runner
947 .expect_increment_runtime_cycle_success_since_startup_counter()
948 .once()
949 .returning(|| ());
950 let mut runtime = init_runtime(
951 Some(AggregatorState::Idle(IdleState {
952 current_time_point: Some(time_point),
953 })),
954 runner,
955 true,
956 )
957 .await;
958 runtime.cycle().await.unwrap();
959
960 assert_eq!("ready".to_string(), runtime.get_state());
961 }
962 }
963}