1use anyhow::Context;
2use chrono::Local;
3use slog::{info, trace, Logger};
4use std::fmt::Display;
5use std::sync::Arc;
6
7use mithril_common::entities::TimePoint;
8use mithril_common::logging::LoggerExtensions;
9
10use crate::entities::OpenMessage;
11use crate::runtime::{AggregatorRunnerTrait, RuntimeError};
12use crate::AggregatorConfig;
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct IdleState {
16 current_time_point: Option<TimePoint>,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct ReadyState {
21 current_time_point: TimePoint,
22}
23
24#[derive(Clone, Debug, PartialEq)]
25pub struct SigningState {
26 current_time_point: TimePoint,
27 open_message: OpenMessage,
28}
29
30#[derive(Clone, Debug, PartialEq)]
31pub enum AggregatorState {
32 Idle(IdleState),
33 Ready(ReadyState),
34 Signing(SigningState),
35}
36
37impl Display for AggregatorState {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 AggregatorState::Idle(state) => write!(
41 f,
42 "Idle - {}",
43 match &state.current_time_point {
44 None => "No TimePoint".to_string(),
45 Some(time_point) => time_point.to_string(),
46 }
47 ),
48 AggregatorState::Ready(state) => write!(f, "Ready - {}", state.current_time_point),
49 AggregatorState::Signing(state) => write!(f, "Signing - {}", state.current_time_point),
50 }
51 }
52}
53
54pub struct AggregatorRuntime {
60 config: AggregatorConfig,
61 state: AggregatorState,
62 runner: Arc<dyn AggregatorRunnerTrait>,
63 logger: Logger,
64}
65
66impl AggregatorRuntime {
67 pub async fn new(
69 aggregator_config: AggregatorConfig,
70 init_state: Option<AggregatorState>,
71 runner: Arc<dyn AggregatorRunnerTrait>,
72 logger: Logger,
73 ) -> Result<Self, RuntimeError> {
74 let logger = logger.new_with_component_name::<Self>();
75 info!(logger, "Initializing runtime");
76
77 let state = if let Some(init_state) = init_state {
78 trace!(logger, "Got initial state from caller");
79 init_state
80 } else {
81 trace!(logger, "Idle state, no current time point");
82 AggregatorState::Idle(IdleState {
83 current_time_point: None,
84 })
85 };
86
87 Ok(Self {
88 config: aggregator_config,
89 state,
90 runner,
91 logger,
92 })
93 }
94
95 pub fn get_state(&self) -> String {
97 match self.state {
98 AggregatorState::Idle(_) => "idle".to_string(),
99 AggregatorState::Ready(_) => "ready".to_string(),
100 AggregatorState::Signing(_) => "signing".to_string(),
101 }
102 }
103
104 pub async fn run(&mut self) -> Result<(), RuntimeError> {
106 info!(self.logger, "Launching State Machine");
107 let mut interval = tokio::time::interval(self.config.interval);
108
109 loop {
110 interval.tick().await;
111 let approximate_next_cycle_time = Local::now() + self.config.interval;
114
115 if let Err(e) = self.cycle().await {
116 e.write_to_log(&self.logger);
117 if e.is_critical() {
118 return Err(e);
119 }
120 }
121
122 info!(
123 self.logger, "… Cycle finished";
124 "approximate_next_cycle_time" => %approximate_next_cycle_time.time().format("%H:%M:%S%.3f"),
125 "run_interval_in_ms" => self.config.interval.as_millis(),
126 );
127 }
128 }
129
130 pub async fn cycle(&mut self) -> Result<(), RuntimeError> {
132 info!(
133 self.logger,
134 "================================================================================"
135 );
136 info!(self.logger, "New cycle: {}", self.state);
137
138 self.runner
139 .increment_runtime_cycle_total_since_startup_counter();
140
141 match self.state.clone() {
142 AggregatorState::Idle(state) => {
143 let last_time_point = self.runner.get_time_point_from_chain().await.with_context(
144 || "AggregatorRuntime in the state IDLE can not get current time point from chain",
145 )?;
146
147 info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);
148
149 let can_try_transition_from_idle_to_ready = if self.config.is_follower {
150 self.runner
151 .is_follower_aggregator_at_same_epoch_as_leader(&last_time_point)
152 .await?
153 } else {
154 true
155 };
156 if can_try_transition_from_idle_to_ready {
157 self.try_transition_from_idle_to_ready(
158 state.current_time_point,
159 last_time_point.clone(),
160 )
161 .await?;
162 self.state = AggregatorState::Ready(ReadyState {
163 current_time_point: last_time_point,
164 });
165 }
166 }
167 AggregatorState::Ready(state) => {
168 let last_time_point: TimePoint = self
169 .runner
170 .get_time_point_from_chain()
171 .await
172 .with_context(|| {
173 "AggregatorRuntime in the state READY can not get current time point from chain"
174 })?;
175
176 if state.current_time_point.epoch < last_time_point.epoch {
177 info!(self.logger, "→ Epoch has changed, transitioning to IDLE"; "last_time_point" => ?last_time_point);
179 self.state = AggregatorState::Idle(IdleState {
180 current_time_point: Some(state.current_time_point),
181 });
182 } else if let Some(open_message) = self
183 .runner
184 .get_current_non_certified_open_message(&last_time_point)
185 .await
186 .with_context(|| "AggregatorRuntime can not get the current open message")?
187 {
188 info!(self.logger, "→ Transitioning to SIGNING");
190 let new_state = self
191 .transition_from_ready_to_signing(last_time_point.clone(), open_message.clone())
192 .await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
193 self.state = AggregatorState::Signing(new_state);
194 } else {
195 info!(
197 self.logger, " ⋅ No open message to certify, waiting…";
198 "time_point" => ?state.current_time_point
199 );
200 self.state = AggregatorState::Ready(ReadyState {
201 current_time_point: last_time_point,
202 });
203 }
204 }
205 AggregatorState::Signing(state) => {
206 let last_time_point: TimePoint =
207 self.runner.get_time_point_from_chain().await.with_context(|| {
208 "AggregatorRuntime in the state SIGNING can not get current time point from chain"
209 })?;
210
211 let is_outdated = self
212 .runner
213 .is_open_message_outdated(
214 state.open_message.signed_entity_type.clone(),
215 &last_time_point,
216 )
217 .await?;
218
219 if state.current_time_point.epoch < last_time_point.epoch {
220 info!(self.logger, "→ Epoch changed, transitioning to IDLE");
222 let new_state = self.transition_from_signing_to_idle(state).await?;
223 self.state = AggregatorState::Idle(new_state);
224 } else if is_outdated {
225 info!(
227 self.logger,
228 "→ Open message changed, transitioning to READY"
229 );
230 let new_state = self
231 .transition_from_signing_to_ready_new_open_message(state)
232 .await?;
233 self.state = AggregatorState::Ready(new_state);
234 } else {
235 let new_state = self
237 .transition_from_signing_to_ready_multisignature(state)
238 .await?;
239 info!(self.logger, "→ A multi-signature has been created, build an artifact & a certificate and transitioning back to READY");
240 self.state = AggregatorState::Ready(new_state);
241 }
242 }
243 }
244
245 self.runner
246 .increment_runtime_cycle_success_since_startup_counter();
247
248 Ok(())
249 }
250
251 async fn try_transition_from_idle_to_ready(
254 &mut self,
255 maybe_current_time_point: Option<TimePoint>,
256 new_time_point: TimePoint,
257 ) -> Result<(), RuntimeError> {
258 trace!(self.logger, "Trying transition from IDLE to READY state");
259
260 if maybe_current_time_point.is_none()
261 || maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
262 {
263 self.runner.close_signer_registration_round().await?;
264 self.runner
265 .update_era_checker(new_time_point.epoch)
266 .await
267 .map_err(|e| RuntimeError::critical("transiting IDLE → READY", Some(e)))?;
268 self.runner
269 .update_stake_distribution(&new_time_point)
270 .await?;
271 self.runner.inform_new_epoch(new_time_point.epoch).await?;
272 self.runner.upkeep(new_time_point.epoch).await?;
273 self.runner
274 .open_signer_registration_round(&new_time_point)
275 .await?;
276 self.runner.update_epoch_settings().await?;
277 if self.config.is_follower {
278 self.runner
279 .synchronize_follower_aggregator_signer_registration()
280 .await?;
281 }
282 self.runner.precompute_epoch_data().await?;
283 }
284
285 self.runner
286 .is_certificate_chain_valid(&new_time_point)
287 .await
288 .map_err(|e| RuntimeError::KeepState {
289 message: "certificate chain is invalid".to_string(),
290 nested_error: e.into(),
291 })?;
292
293 Ok(())
294 }
295
296 async fn transition_from_signing_to_ready_multisignature(
299 &self,
300 state: SigningState,
301 ) -> Result<ReadyState, RuntimeError> {
302 trace!(
303 self.logger,
304 "Launching transition from SIGNING to READY state"
305 );
306 let certificate = self
307 .runner
308 .create_certificate(&state.open_message.signed_entity_type)
309 .await?
310 .ok_or_else(|| RuntimeError::KeepState {
311 message: "not enough signature yet to create a certificate, waiting…".to_string(),
312 nested_error: None,
313 })?;
314 self.runner
315 .create_artifact(&state.open_message.signed_entity_type, &certificate)
316 .await
317 .map_err(|e| RuntimeError::ReInit {
318 message: "transiting SIGNING → READY: failed to create artifact. Retrying…"
319 .to_string(),
320 nested_error: Some(e),
321 })?;
322
323 Ok(ReadyState {
324 current_time_point: state.current_time_point,
325 })
326 }
327
328 async fn transition_from_signing_to_idle(
331 &self,
332 state: SigningState,
333 ) -> Result<IdleState, RuntimeError> {
334 trace!(
335 self.logger,
336 "Launching transition from SIGNING to IDLE state"
337 );
338
339 Ok(IdleState {
340 current_time_point: Some(state.current_time_point),
341 })
342 }
343
344 async fn transition_from_signing_to_ready_new_open_message(
347 &self,
348 state: SigningState,
349 ) -> Result<ReadyState, RuntimeError> {
350 trace!(
351 self.logger,
352 "Launching transition from SIGNING to READY state"
353 );
354
355 Ok(ReadyState {
356 current_time_point: state.current_time_point,
357 })
358 }
359
360 async fn transition_from_ready_to_signing(
363 &mut self,
364 new_time_point: TimePoint,
365 open_message: OpenMessage,
366 ) -> Result<SigningState, RuntimeError> {
367 trace!(
368 self.logger,
369 "Launching transition from READY to SIGNING state"
370 );
371
372 let state = SigningState {
373 current_time_point: new_time_point,
374 open_message,
375 };
376
377 Ok(state)
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use crate::entities::OpenMessage;
384 use anyhow::anyhow;
385 use mockall::predicate;
386 use std::time::Duration;
387
388 use mithril_common::test_utils::fake_data;
389
390 use crate::test_tools::TestLogger;
391
392 use super::super::runner::MockAggregatorRunner;
393 use super::*;
394
395 async fn init_runtime(
396 init_state: Option<AggregatorState>,
397 runner: MockAggregatorRunner,
398 is_follower: bool,
399 ) -> AggregatorRuntime {
400 AggregatorRuntime::new(
401 AggregatorConfig::new(Duration::from_millis(20), is_follower),
402 init_state,
403 Arc::new(runner),
404 TestLogger::stdout(),
405 )
406 .await
407 .unwrap()
408 }
409
410 mod leader {
411 use super::*;
412
413 #[tokio::test]
414 pub async fn idle_check_certificate_chain_is_not_valid() {
415 let mut runner = MockAggregatorRunner::new();
416 runner
417 .expect_get_time_point_from_chain()
418 .once()
419 .returning(|| Ok(TimePoint::dummy()));
420 runner
421 .expect_update_stake_distribution()
422 .with(predicate::eq(TimePoint::dummy()))
423 .once()
424 .returning(|_| Ok(()));
425 runner
426 .expect_close_signer_registration_round()
427 .once()
428 .returning(|| Ok(()));
429 runner
430 .expect_open_signer_registration_round()
431 .once()
432 .returning(|_| Ok(()));
433 runner
434 .expect_is_certificate_chain_valid()
435 .once()
436 .returning(|_| Err(anyhow!("error")));
437 runner
438 .expect_update_era_checker()
439 .with(predicate::eq(TimePoint::dummy().epoch))
440 .once()
441 .returning(|_| Ok(()));
442 runner
443 .expect_inform_new_epoch()
444 .with(predicate::eq(TimePoint::dummy().epoch))
445 .once()
446 .returning(|_| Ok(()));
447 runner
448 .expect_update_epoch_settings()
449 .once()
450 .returning(|| Ok(()));
451 runner
452 .expect_precompute_epoch_data()
453 .once()
454 .returning(|| Ok(()));
455 runner
456 .expect_upkeep()
457 .with(predicate::eq(TimePoint::dummy().epoch))
458 .once()
459 .returning(|_| Ok(()));
460 runner
461 .expect_increment_runtime_cycle_total_since_startup_counter()
462 .once()
463 .returning(|| ());
464 runner
465 .expect_increment_runtime_cycle_success_since_startup_counter()
466 .never();
467
468 let mut runtime = init_runtime(
469 Some(AggregatorState::Idle(IdleState {
470 current_time_point: None,
471 })),
472 runner,
473 false,
474 )
475 .await;
476 let err = runtime.cycle().await.unwrap_err();
477 assert!(matches!(err, RuntimeError::KeepState { .. }));
478
479 assert_eq!("idle".to_string(), runtime.get_state());
480 }
481
482 #[tokio::test]
483 pub async fn idle_check_certificate_chain_is_valid() {
484 let mut runner = MockAggregatorRunner::new();
485 runner
486 .expect_get_time_point_from_chain()
487 .once()
488 .returning(|| Ok(TimePoint::dummy()));
489 runner
490 .expect_update_stake_distribution()
491 .with(predicate::eq(TimePoint::dummy()))
492 .once()
493 .returning(|_| Ok(()));
494 runner
495 .expect_close_signer_registration_round()
496 .once()
497 .returning(|| Ok(()));
498 runner
499 .expect_open_signer_registration_round()
500 .once()
501 .returning(|_| Ok(()));
502 runner
503 .expect_is_certificate_chain_valid()
504 .once()
505 .returning(|_| Ok(()));
506 runner
507 .expect_update_era_checker()
508 .with(predicate::eq(TimePoint::dummy().epoch))
509 .once()
510 .returning(|_| Ok(()));
511 runner
512 .expect_inform_new_epoch()
513 .with(predicate::eq(TimePoint::dummy().epoch))
514 .once()
515 .returning(|_| Ok(()));
516 runner
517 .expect_update_epoch_settings()
518 .once()
519 .returning(|| Ok(()));
520 runner
521 .expect_precompute_epoch_data()
522 .once()
523 .returning(|| Ok(()));
524 runner
525 .expect_upkeep()
526 .with(predicate::eq(TimePoint::dummy().epoch))
527 .once()
528 .returning(|_| Ok(()));
529 runner
530 .expect_increment_runtime_cycle_total_since_startup_counter()
531 .once()
532 .returning(|| ());
533 runner
534 .expect_increment_runtime_cycle_success_since_startup_counter()
535 .once()
536 .returning(|| ());
537
538 let mut runtime = init_runtime(
539 Some(AggregatorState::Idle(IdleState {
540 current_time_point: None,
541 })),
542 runner,
543 false,
544 )
545 .await;
546 runtime.cycle().await.unwrap();
547
548 assert_eq!("ready".to_string(), runtime.get_state());
549 }
550
551 #[tokio::test]
552 pub async fn ready_new_epoch_detected() {
553 let mut runner = MockAggregatorRunner::new();
554 let time_point = TimePoint::dummy();
555 let new_time_point = TimePoint {
556 epoch: time_point.epoch + 1,
557 ..time_point.clone()
558 };
559 runner
560 .expect_get_time_point_from_chain()
561 .once()
562 .returning(move || Ok(new_time_point.clone()));
563 runner
564 .expect_increment_runtime_cycle_total_since_startup_counter()
565 .once()
566 .returning(|| ());
567 runner
568 .expect_increment_runtime_cycle_success_since_startup_counter()
569 .once()
570 .returning(|| ());
571 let mut runtime = init_runtime(
572 Some(AggregatorState::Ready(ReadyState {
573 current_time_point: time_point,
574 })),
575 runner,
576 false,
577 )
578 .await;
579 runtime.cycle().await.unwrap();
580
581 assert_eq!("idle".to_string(), runtime.get_state());
582 }
583
584 #[tokio::test]
585 pub async fn ready_open_message_not_exist() {
586 let mut runner = MockAggregatorRunner::new();
587 let time_point = TimePoint::dummy();
588 let next_time_point = TimePoint {
589 immutable_file_number: time_point.immutable_file_number + 1,
590 ..time_point.clone()
591 };
592 let expected_time_point = next_time_point.clone();
593 runner
594 .expect_get_time_point_from_chain()
595 .once()
596 .returning(move || Ok(next_time_point.clone()));
597 runner
598 .expect_get_current_non_certified_open_message()
599 .once()
600 .returning(|_| Ok(None));
601 runner
602 .expect_increment_runtime_cycle_total_since_startup_counter()
603 .once()
604 .returning(|| ());
605 runner
606 .expect_increment_runtime_cycle_success_since_startup_counter()
607 .once()
608 .returning(|| ());
609 let mut runtime = init_runtime(
610 Some(AggregatorState::Ready(ReadyState {
611 current_time_point: time_point.clone(),
612 })),
613 runner,
614 false,
615 )
616 .await;
617 runtime.cycle().await.unwrap();
618
619 assert_eq!("ready".to_string(), runtime.get_state());
620 assert_eq!(
621 AggregatorState::Ready(ReadyState {
622 current_time_point: expected_time_point,
623 }),
624 runtime.state
625 );
626 }
627
628 #[tokio::test]
629 pub async fn ready_certificate_does_not_exist_for_time_point() {
630 let mut runner = MockAggregatorRunner::new();
631 runner
632 .expect_get_time_point_from_chain()
633 .once()
634 .returning(|| Ok(TimePoint::dummy()));
635 runner
636 .expect_get_current_non_certified_open_message()
637 .once()
638 .returning(|_| {
639 let open_message = OpenMessage {
640 is_certified: false,
641 ..OpenMessage::dummy()
642 };
643 Ok(Some(open_message))
644 });
645 runner
646 .expect_increment_runtime_cycle_total_since_startup_counter()
647 .once()
648 .returning(|| ());
649 runner
650 .expect_increment_runtime_cycle_success_since_startup_counter()
651 .once()
652 .returning(|| ());
653
654 let mut runtime = init_runtime(
655 Some(AggregatorState::Ready(ReadyState {
656 current_time_point: TimePoint::dummy(),
657 })),
658 runner,
659 false,
660 )
661 .await;
662 runtime.cycle().await.unwrap();
663
664 assert_eq!("signing".to_string(), runtime.get_state());
665 }
666
667 #[tokio::test]
668 async fn signing_changing_open_message_to_ready() {
669 let mut runner = MockAggregatorRunner::new();
670 runner
671 .expect_get_time_point_from_chain()
672 .once()
673 .returning(|| Ok(TimePoint::dummy()));
674 runner
675 .expect_is_open_message_outdated()
676 .once()
677 .returning(|_, _| Ok(true));
678 runner
679 .expect_increment_runtime_cycle_total_since_startup_counter()
680 .once()
681 .returning(|| ());
682 runner
683 .expect_increment_runtime_cycle_success_since_startup_counter()
684 .once()
685 .returning(|| ());
686
687 let initial_state = AggregatorState::Signing(SigningState {
688 current_time_point: TimePoint::dummy(),
689 open_message: OpenMessage::dummy(),
690 });
691
692 let mut runtime = init_runtime(Some(initial_state), runner, false).await;
693 runtime.cycle().await.unwrap();
694
695 assert_eq!("ready".to_string(), runtime.get_state());
696 }
697
698 #[tokio::test]
699 async fn signing_certificate_is_not_created() {
700 let mut runner = MockAggregatorRunner::new();
701 runner
702 .expect_get_time_point_from_chain()
703 .once()
704 .returning(|| Ok(TimePoint::dummy()));
705 runner
706 .expect_is_open_message_outdated()
707 .once()
708 .returning(|_, _| Ok(false));
709 runner
710 .expect_create_certificate()
711 .once()
712 .returning(|_| Ok(None));
713 runner
714 .expect_increment_runtime_cycle_total_since_startup_counter()
715 .once()
716 .returning(|| ());
717 runner
718 .expect_increment_runtime_cycle_success_since_startup_counter()
719 .never();
720 let state = SigningState {
721 current_time_point: TimePoint::dummy(),
722 open_message: OpenMessage::dummy(),
723 };
724 let mut runtime =
725 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
726 let err = runtime
727 .cycle()
728 .await
729 .expect_err("cycle should have returned an error");
730
731 match err {
732 RuntimeError::KeepState { .. } => (),
733 _ => panic!("KeepState error expected, got {err:?}."),
734 };
735
736 assert_eq!("signing".to_string(), runtime.get_state());
737 }
738
739 #[tokio::test]
740 async fn signing_artifact_not_created() {
741 let mut runner = MockAggregatorRunner::new();
742 runner
743 .expect_get_time_point_from_chain()
744 .once()
745 .returning(|| Ok(TimePoint::dummy()));
746 runner
747 .expect_is_open_message_outdated()
748 .once()
749 .returning(|_, _| Ok(false));
750 runner
751 .expect_create_certificate()
752 .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
753 runner
754 .expect_create_artifact()
755 .once()
756 .returning(|_, _| Err(anyhow!("whatever")));
757 runner
758 .expect_increment_runtime_cycle_total_since_startup_counter()
759 .once()
760 .returning(|| ());
761 runner
762 .expect_increment_runtime_cycle_success_since_startup_counter()
763 .never();
764 let state = SigningState {
765 current_time_point: TimePoint::dummy(),
766 open_message: OpenMessage::dummy(),
767 };
768 let mut runtime =
769 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
770 let err = runtime
771 .cycle()
772 .await
773 .expect_err("cycle should have returned an error");
774
775 match err {
776 RuntimeError::ReInit { .. } => (),
777 _ => panic!("ReInit error expected, got {err:?}."),
778 };
779
780 assert_eq!("signing".to_string(), runtime.get_state());
781 }
782
783 #[tokio::test]
784 async fn signing_certificate_is_created() {
785 let mut runner = MockAggregatorRunner::new();
786 runner
787 .expect_get_time_point_from_chain()
788 .once()
789 .returning(|| Ok(TimePoint::dummy()));
790 runner
791 .expect_is_open_message_outdated()
792 .once()
793 .returning(|_, _| Ok(false));
794 runner
795 .expect_create_certificate()
796 .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
797 runner
798 .expect_create_artifact()
799 .once()
800 .returning(|_, _| Ok(()));
801 runner
802 .expect_increment_runtime_cycle_total_since_startup_counter()
803 .once()
804 .returning(|| ());
805 runner
806 .expect_increment_runtime_cycle_success_since_startup_counter()
807 .once()
808 .returning(|| ());
809
810 let state = SigningState {
811 current_time_point: TimePoint::dummy(),
812 open_message: OpenMessage::dummy(),
813 };
814 let mut runtime =
815 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
816 runtime.cycle().await.unwrap();
817
818 assert_eq!("ready".to_string(), runtime.get_state());
819 }
820
821 #[tokio::test]
822 pub async fn critical_error() {
823 let mut runner = MockAggregatorRunner::new();
824 runner
825 .expect_get_time_point_from_chain()
826 .once()
827 .returning(|| Ok(TimePoint::dummy()));
828 runner
829 .expect_update_era_checker()
830 .with(predicate::eq(TimePoint::dummy().epoch))
831 .once()
832 .returning(|_| Err(anyhow!("ERROR")));
833 runner
834 .expect_close_signer_registration_round()
835 .once()
836 .returning(|| Ok(()));
837 runner
838 .expect_increment_runtime_cycle_total_since_startup_counter()
839 .once()
840 .returning(|| ());
841 runner
842 .expect_increment_runtime_cycle_success_since_startup_counter()
843 .never();
844
845 let mut runtime = init_runtime(
846 Some(AggregatorState::Idle(IdleState {
847 current_time_point: None,
848 })),
849 runner,
850 false,
851 )
852 .await;
853 runtime.cycle().await.unwrap_err();
854
855 assert_eq!("idle".to_string(), runtime.get_state());
856 }
857 }
858
859 mod follower {
860 use super::*;
861
862 #[tokio::test]
863 pub async fn idle_new_epoch_detected_and_leader_not_transitioned_to_epoch() {
864 let mut runner = MockAggregatorRunner::new();
865 let time_point = TimePoint::dummy();
866 let new_time_point = TimePoint {
867 epoch: time_point.epoch + 1,
868 ..time_point.clone()
869 };
870 runner
871 .expect_get_time_point_from_chain()
872 .once()
873 .returning(move || Ok(new_time_point.clone()));
874 runner
875 .expect_is_follower_aggregator_at_same_epoch_as_leader()
876 .once()
877 .returning(|_| Ok(false));
878 runner
879 .expect_increment_runtime_cycle_total_since_startup_counter()
880 .once()
881 .returning(|| ());
882 runner
883 .expect_increment_runtime_cycle_success_since_startup_counter()
884 .once()
885 .returning(|| ());
886 let mut runtime = init_runtime(
887 Some(AggregatorState::Idle(IdleState {
888 current_time_point: Some(time_point),
889 })),
890 runner,
891 true,
892 )
893 .await;
894 runtime.cycle().await.unwrap();
895
896 assert_eq!("idle".to_string(), runtime.get_state());
897 }
898
899 #[tokio::test]
900 pub async fn idle_new_epoch_detected_and_leader_has_transitioned_to_epoch() {
901 let mut runner = MockAggregatorRunner::new();
902 let time_point = TimePoint::dummy();
903 let new_time_point = TimePoint {
904 epoch: time_point.epoch + 1,
905 ..time_point.clone()
906 };
907 let new_time_point_clone = new_time_point.clone();
908 runner
909 .expect_get_time_point_from_chain()
910 .once()
911 .returning(move || Ok(new_time_point.clone()));
912 runner
913 .expect_is_follower_aggregator_at_same_epoch_as_leader()
914 .once()
915 .returning(|_| Ok(true));
916 runner
917 .expect_update_stake_distribution()
918 .with(predicate::eq(new_time_point_clone.clone()))
919 .once()
920 .returning(|_| Ok(()));
921 runner
922 .expect_close_signer_registration_round()
923 .once()
924 .returning(|| Ok(()));
925 runner
926 .expect_synchronize_follower_aggregator_signer_registration()
927 .once()
928 .returning(|| Ok(()));
929 runner
930 .expect_open_signer_registration_round()
931 .once()
932 .returning(|_| Ok(()));
933 runner
934 .expect_is_certificate_chain_valid()
935 .once()
936 .returning(|_| Ok(()));
937 runner
938 .expect_update_era_checker()
939 .with(predicate::eq(new_time_point_clone.clone().epoch))
940 .once()
941 .returning(|_| Ok(()));
942 runner
943 .expect_inform_new_epoch()
944 .with(predicate::eq(new_time_point_clone.clone().epoch))
945 .once()
946 .returning(|_| Ok(()));
947 runner
948 .expect_update_epoch_settings()
949 .once()
950 .returning(|| Ok(()));
951 runner
952 .expect_precompute_epoch_data()
953 .once()
954 .returning(|| Ok(()));
955 runner
956 .expect_upkeep()
957 .with(predicate::eq(new_time_point_clone.clone().epoch))
958 .once()
959 .returning(|_| Ok(()));
960 runner
961 .expect_increment_runtime_cycle_total_since_startup_counter()
962 .once()
963 .returning(|| ());
964 runner
965 .expect_increment_runtime_cycle_success_since_startup_counter()
966 .once()
967 .returning(|| ());
968 let mut runtime = init_runtime(
969 Some(AggregatorState::Idle(IdleState {
970 current_time_point: Some(time_point),
971 })),
972 runner,
973 true,
974 )
975 .await;
976 runtime.cycle().await.unwrap();
977
978 assert_eq!("ready".to_string(), runtime.get_state());
979 }
980 }
981}