1use crate::{
2 entities::OpenMessage,
3 runtime::{AggregatorRunnerTrait, RuntimeError},
4 AggregatorConfig,
5};
6
7use anyhow::Context;
8use mithril_common::entities::TimePoint;
9use mithril_common::logging::LoggerExtensions;
10use slog::{info, trace, Logger};
11use std::fmt::Display;
12use std::sync::Arc;
13use tokio::time::sleep;
14
15#[derive(Clone, Debug, PartialEq, Eq)]
16pub struct IdleState {
17 current_time_point: Option<TimePoint>,
18}
19
20#[derive(Clone, Debug, PartialEq, Eq)]
21pub struct ReadyState {
22 current_time_point: TimePoint,
23}
24
25#[derive(Clone, Debug, PartialEq)]
26pub struct SigningState {
27 current_time_point: TimePoint,
28 open_message: OpenMessage,
29}
30
31#[derive(Clone, Debug, PartialEq)]
32pub enum AggregatorState {
33 Idle(IdleState),
34 Ready(ReadyState),
35 Signing(SigningState),
36}
37
38impl Display for AggregatorState {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 AggregatorState::Idle(state) => write!(
42 f,
43 "Idle - {}",
44 match &state.current_time_point {
45 None => "No TimePoint".to_string(),
46 Some(time_point) => time_point.to_string(),
47 }
48 ),
49 AggregatorState::Ready(state) => write!(f, "Ready - {}", state.current_time_point),
50 AggregatorState::Signing(state) => write!(f, "Signing - {}", state.current_time_point),
51 }
52 }
53}
54
55pub struct AggregatorRuntime {
61 config: AggregatorConfig,
62 state: AggregatorState,
63 runner: Arc<dyn AggregatorRunnerTrait>,
64 logger: Logger,
65}
66
67impl AggregatorRuntime {
68 pub async fn new(
70 aggregator_config: AggregatorConfig,
71 init_state: Option<AggregatorState>,
72 runner: Arc<dyn AggregatorRunnerTrait>,
73 logger: Logger,
74 ) -> Result<Self, RuntimeError> {
75 let logger = logger.new_with_component_name::<Self>();
76 info!(logger, "Initializing runtime");
77
78 let state = if let Some(init_state) = init_state {
79 trace!(logger, "Got initial state from caller");
80 init_state
81 } else {
82 trace!(logger, "Idle state, no current time point");
83 AggregatorState::Idle(IdleState {
84 current_time_point: None,
85 })
86 };
87
88 Ok(Self {
89 config: aggregator_config,
90 state,
91 runner,
92 logger,
93 })
94 }
95
96 pub fn get_state(&self) -> String {
98 match self.state {
99 AggregatorState::Idle(_) => "idle".to_string(),
100 AggregatorState::Ready(_) => "ready".to_string(),
101 AggregatorState::Signing(_) => "signing".to_string(),
102 }
103 }
104
105 pub async fn run(&mut self) -> Result<(), RuntimeError> {
107 info!(self.logger, "Launching State Machine");
108
109 loop {
110 if let Err(e) = self.cycle().await {
111 e.write_to_log(&self.logger);
112 if e.is_critical() {
113 return Err(e);
114 }
115 }
116
117 info!(
118 self.logger,
119 "… Cycle finished, Sleeping for {} ms",
120 self.config.interval.as_millis()
121 );
122 sleep(self.config.interval).await;
123 }
124 }
125
126 pub async fn cycle(&mut self) -> Result<(), RuntimeError> {
128 info!(
129 self.logger,
130 "================================================================================"
131 );
132 info!(self.logger, "new cycle: {}", self.state);
133
134 self.runner
135 .increment_runtime_cycle_total_since_startup_counter();
136
137 match self.state.clone() {
138 AggregatorState::Idle(state) => {
139 let last_time_point = self.runner.get_time_point_from_chain().await.with_context(
140 || "AggregatorRuntime in the state IDLE can not get current time point from chain",
141 )?;
142
143 info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);
144
145 let can_try_transition_from_idle_to_ready = if self.config.is_follower {
146 self.runner
147 .is_follower_aggregator_at_same_epoch_as_leader(&last_time_point)
148 .await?
149 } else {
150 true
151 };
152 if can_try_transition_from_idle_to_ready {
153 self.try_transition_from_idle_to_ready(
154 state.current_time_point,
155 last_time_point.clone(),
156 )
157 .await?;
158 self.state = AggregatorState::Ready(ReadyState {
159 current_time_point: last_time_point,
160 });
161 }
162 }
163 AggregatorState::Ready(state) => {
164 let last_time_point: TimePoint = self
165 .runner
166 .get_time_point_from_chain()
167 .await
168 .with_context(|| {
169 "AggregatorRuntime in the state READY can not get current time point from chain"
170 })?;
171
172 if state.current_time_point.epoch < last_time_point.epoch {
173 info!(self.logger, "→ Epoch has changed, transitioning to IDLE"; "last_time_point" => ?last_time_point);
175 self.state = AggregatorState::Idle(IdleState {
176 current_time_point: Some(state.current_time_point),
177 });
178 } else if let Some(open_message) = self
179 .runner
180 .get_current_non_certified_open_message(&last_time_point)
181 .await
182 .with_context(|| "AggregatorRuntime can not get the current open message")?
183 {
184 info!(self.logger, "→ Transitioning to SIGNING");
186 let new_state = self
187 .transition_from_ready_to_signing(last_time_point.clone(), open_message.clone())
188 .await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
189 self.state = AggregatorState::Signing(new_state);
190 } else {
191 info!(
193 self.logger, " ⋅ No open message to certify, waiting…";
194 "time_point" => ?state.current_time_point
195 );
196 self.state = AggregatorState::Ready(ReadyState {
197 current_time_point: last_time_point,
198 });
199 }
200 }
201 AggregatorState::Signing(state) => {
202 let last_time_point: TimePoint =
203 self.runner.get_time_point_from_chain().await.with_context(|| {
204 "AggregatorRuntime in the state SIGNING can not get current time point from chain"
205 })?;
206
207 let is_outdated = self
208 .runner
209 .is_open_message_outdated(
210 state.open_message.signed_entity_type.clone(),
211 &last_time_point,
212 )
213 .await?;
214
215 if state.current_time_point.epoch < last_time_point.epoch {
216 info!(self.logger, "→ Epoch changed, transitioning to IDLE");
218 let new_state = self.transition_from_signing_to_idle(state).await?;
219 self.state = AggregatorState::Idle(new_state);
220 } else if is_outdated {
221 info!(
223 self.logger,
224 "→ Open message changed, transitioning to READY"
225 );
226 let new_state = self
227 .transition_from_signing_to_ready_new_open_message(state)
228 .await?;
229 self.state = AggregatorState::Ready(new_state);
230 } else {
231 let new_state = self
233 .transition_from_signing_to_ready_multisignature(state)
234 .await?;
235 info!(self.logger, "→ A multi-signature has been created, build an artifact & a certificate and transitioning back to READY");
236 self.state = AggregatorState::Ready(new_state);
237 }
238 }
239 }
240
241 self.runner
242 .increment_runtime_cycle_success_since_startup_counter();
243
244 Ok(())
245 }
246
247 async fn try_transition_from_idle_to_ready(
250 &mut self,
251 maybe_current_time_point: Option<TimePoint>,
252 new_time_point: TimePoint,
253 ) -> Result<(), RuntimeError> {
254 trace!(self.logger, "Trying transition from IDLE to READY state");
255
256 if maybe_current_time_point.is_none()
257 || maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
258 {
259 self.runner.close_signer_registration_round().await?;
260 self.runner
261 .update_era_checker(new_time_point.epoch)
262 .await
263 .map_err(|e| RuntimeError::critical("transiting IDLE → READY", Some(e)))?;
264 self.runner
265 .update_stake_distribution(&new_time_point)
266 .await?;
267 self.runner.inform_new_epoch(new_time_point.epoch).await?;
268 self.runner.upkeep(new_time_point.epoch).await?;
269 self.runner
270 .open_signer_registration_round(&new_time_point)
271 .await?;
272 self.runner.update_epoch_settings().await?;
273 if self.config.is_follower {
274 self.runner
275 .synchronize_follower_aggregator_signer_registration()
276 .await?;
277 }
278 self.runner.precompute_epoch_data().await?;
279 }
280
281 self.runner
282 .is_certificate_chain_valid(&new_time_point)
283 .await
284 .map_err(|e| RuntimeError::KeepState {
285 message: "certificate chain is invalid".to_string(),
286 nested_error: e.into(),
287 })?;
288
289 Ok(())
290 }
291
292 async fn transition_from_signing_to_ready_multisignature(
295 &self,
296 state: SigningState,
297 ) -> Result<ReadyState, RuntimeError> {
298 trace!(
299 self.logger,
300 "Launching transition from SIGNING to READY state"
301 );
302 let certificate = self
303 .runner
304 .create_certificate(&state.open_message.signed_entity_type)
305 .await?
306 .ok_or_else(|| RuntimeError::KeepState {
307 message: "not enough signature yet to create a certificate, waiting…".to_string(),
308 nested_error: None,
309 })?;
310 self.runner
311 .create_artifact(&state.open_message.signed_entity_type, &certificate)
312 .await
313 .map_err(|e| RuntimeError::ReInit {
314 message: "transiting SIGNING → READY: failed to create artifact. Retrying…"
315 .to_string(),
316 nested_error: Some(e),
317 })?;
318
319 Ok(ReadyState {
320 current_time_point: state.current_time_point,
321 })
322 }
323
324 async fn transition_from_signing_to_idle(
327 &self,
328 state: SigningState,
329 ) -> Result<IdleState, RuntimeError> {
330 trace!(
331 self.logger,
332 "Launching transition from SIGNING to IDLE state"
333 );
334
335 Ok(IdleState {
336 current_time_point: Some(state.current_time_point),
337 })
338 }
339
340 async fn transition_from_signing_to_ready_new_open_message(
343 &self,
344 state: SigningState,
345 ) -> Result<ReadyState, RuntimeError> {
346 trace!(
347 self.logger,
348 "Launching transition from SIGNING to READY state"
349 );
350
351 Ok(ReadyState {
352 current_time_point: state.current_time_point,
353 })
354 }
355
356 async fn transition_from_ready_to_signing(
359 &mut self,
360 new_time_point: TimePoint,
361 open_message: OpenMessage,
362 ) -> Result<SigningState, RuntimeError> {
363 trace!(
364 self.logger,
365 "Launching transition from READY to SIGNING state"
366 );
367
368 let state = SigningState {
369 current_time_point: new_time_point,
370 open_message,
371 };
372
373 Ok(state)
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use crate::entities::OpenMessage;
380 use anyhow::anyhow;
381 use mockall::predicate;
382 use std::time::Duration;
383
384 use mithril_common::test_utils::fake_data;
385
386 use crate::test_tools::TestLogger;
387
388 use super::super::runner::MockAggregatorRunner;
389 use super::*;
390
391 async fn init_runtime(
392 init_state: Option<AggregatorState>,
393 runner: MockAggregatorRunner,
394 is_follower: bool,
395 ) -> AggregatorRuntime {
396 AggregatorRuntime::new(
397 AggregatorConfig::new(Duration::from_millis(20), is_follower),
398 init_state,
399 Arc::new(runner),
400 TestLogger::stdout(),
401 )
402 .await
403 .unwrap()
404 }
405
406 mod leader {
407 use super::*;
408
409 #[tokio::test]
410 pub async fn idle_check_certificate_chain_is_not_valid() {
411 let mut runner = MockAggregatorRunner::new();
412 runner
413 .expect_get_time_point_from_chain()
414 .once()
415 .returning(|| Ok(TimePoint::dummy()));
416 runner
417 .expect_update_stake_distribution()
418 .with(predicate::eq(TimePoint::dummy()))
419 .once()
420 .returning(|_| Ok(()));
421 runner
422 .expect_close_signer_registration_round()
423 .once()
424 .returning(|| Ok(()));
425 runner
426 .expect_open_signer_registration_round()
427 .once()
428 .returning(|_| Ok(()));
429 runner
430 .expect_is_certificate_chain_valid()
431 .once()
432 .returning(|_| Err(anyhow!("error")));
433 runner
434 .expect_update_era_checker()
435 .with(predicate::eq(TimePoint::dummy().epoch))
436 .once()
437 .returning(|_| Ok(()));
438 runner
439 .expect_inform_new_epoch()
440 .with(predicate::eq(TimePoint::dummy().epoch))
441 .once()
442 .returning(|_| Ok(()));
443 runner
444 .expect_update_epoch_settings()
445 .once()
446 .returning(|| Ok(()));
447 runner
448 .expect_precompute_epoch_data()
449 .once()
450 .returning(|| Ok(()));
451 runner
452 .expect_upkeep()
453 .with(predicate::eq(TimePoint::dummy().epoch))
454 .once()
455 .returning(|_| Ok(()));
456 runner
457 .expect_increment_runtime_cycle_total_since_startup_counter()
458 .once()
459 .returning(|| ());
460 runner
461 .expect_increment_runtime_cycle_success_since_startup_counter()
462 .never();
463
464 let mut runtime = init_runtime(
465 Some(AggregatorState::Idle(IdleState {
466 current_time_point: None,
467 })),
468 runner,
469 false,
470 )
471 .await;
472 let err = runtime.cycle().await.unwrap_err();
473 assert!(matches!(err, RuntimeError::KeepState { .. }));
474
475 assert_eq!("idle".to_string(), runtime.get_state());
476 }
477
478 #[tokio::test]
479 pub async fn idle_check_certificate_chain_is_valid() {
480 let mut runner = MockAggregatorRunner::new();
481 runner
482 .expect_get_time_point_from_chain()
483 .once()
484 .returning(|| Ok(TimePoint::dummy()));
485 runner
486 .expect_update_stake_distribution()
487 .with(predicate::eq(TimePoint::dummy()))
488 .once()
489 .returning(|_| Ok(()));
490 runner
491 .expect_close_signer_registration_round()
492 .once()
493 .returning(|| Ok(()));
494 runner
495 .expect_open_signer_registration_round()
496 .once()
497 .returning(|_| Ok(()));
498 runner
499 .expect_is_certificate_chain_valid()
500 .once()
501 .returning(|_| Ok(()));
502 runner
503 .expect_update_era_checker()
504 .with(predicate::eq(TimePoint::dummy().epoch))
505 .once()
506 .returning(|_| Ok(()));
507 runner
508 .expect_inform_new_epoch()
509 .with(predicate::eq(TimePoint::dummy().epoch))
510 .once()
511 .returning(|_| Ok(()));
512 runner
513 .expect_update_epoch_settings()
514 .once()
515 .returning(|| Ok(()));
516 runner
517 .expect_precompute_epoch_data()
518 .once()
519 .returning(|| Ok(()));
520 runner
521 .expect_upkeep()
522 .with(predicate::eq(TimePoint::dummy().epoch))
523 .once()
524 .returning(|_| Ok(()));
525 runner
526 .expect_increment_runtime_cycle_total_since_startup_counter()
527 .once()
528 .returning(|| ());
529 runner
530 .expect_increment_runtime_cycle_success_since_startup_counter()
531 .once()
532 .returning(|| ());
533
534 let mut runtime = init_runtime(
535 Some(AggregatorState::Idle(IdleState {
536 current_time_point: None,
537 })),
538 runner,
539 false,
540 )
541 .await;
542 runtime.cycle().await.unwrap();
543
544 assert_eq!("ready".to_string(), runtime.get_state());
545 }
546
547 #[tokio::test]
548 pub async fn ready_new_epoch_detected() {
549 let mut runner = MockAggregatorRunner::new();
550 let time_point = TimePoint::dummy();
551 let new_time_point = TimePoint {
552 epoch: time_point.epoch + 1,
553 ..time_point.clone()
554 };
555 runner
556 .expect_get_time_point_from_chain()
557 .once()
558 .returning(move || Ok(new_time_point.clone()));
559 runner
560 .expect_increment_runtime_cycle_total_since_startup_counter()
561 .once()
562 .returning(|| ());
563 runner
564 .expect_increment_runtime_cycle_success_since_startup_counter()
565 .once()
566 .returning(|| ());
567 let mut runtime = init_runtime(
568 Some(AggregatorState::Ready(ReadyState {
569 current_time_point: time_point,
570 })),
571 runner,
572 false,
573 )
574 .await;
575 runtime.cycle().await.unwrap();
576
577 assert_eq!("idle".to_string(), runtime.get_state());
578 }
579
580 #[tokio::test]
581 pub async fn ready_open_message_not_exist() {
582 let mut runner = MockAggregatorRunner::new();
583 let time_point = TimePoint::dummy();
584 let next_time_point = TimePoint {
585 immutable_file_number: time_point.immutable_file_number + 1,
586 ..time_point.clone()
587 };
588 let expected_time_point = next_time_point.clone();
589 runner
590 .expect_get_time_point_from_chain()
591 .once()
592 .returning(move || Ok(next_time_point.clone()));
593 runner
594 .expect_get_current_non_certified_open_message()
595 .once()
596 .returning(|_| Ok(None));
597 runner
598 .expect_increment_runtime_cycle_total_since_startup_counter()
599 .once()
600 .returning(|| ());
601 runner
602 .expect_increment_runtime_cycle_success_since_startup_counter()
603 .once()
604 .returning(|| ());
605 let mut runtime = init_runtime(
606 Some(AggregatorState::Ready(ReadyState {
607 current_time_point: time_point.clone(),
608 })),
609 runner,
610 false,
611 )
612 .await;
613 runtime.cycle().await.unwrap();
614
615 assert_eq!("ready".to_string(), runtime.get_state());
616 assert_eq!(
617 AggregatorState::Ready(ReadyState {
618 current_time_point: expected_time_point,
619 }),
620 runtime.state
621 );
622 }
623
624 #[tokio::test]
625 pub async fn ready_certificate_does_not_exist_for_time_point() {
626 let mut runner = MockAggregatorRunner::new();
627 runner
628 .expect_get_time_point_from_chain()
629 .once()
630 .returning(|| Ok(TimePoint::dummy()));
631 runner
632 .expect_get_current_non_certified_open_message()
633 .once()
634 .returning(|_| {
635 let open_message = OpenMessage {
636 is_certified: false,
637 ..OpenMessage::dummy()
638 };
639 Ok(Some(open_message))
640 });
641 runner
642 .expect_increment_runtime_cycle_total_since_startup_counter()
643 .once()
644 .returning(|| ());
645 runner
646 .expect_increment_runtime_cycle_success_since_startup_counter()
647 .once()
648 .returning(|| ());
649
650 let mut runtime = init_runtime(
651 Some(AggregatorState::Ready(ReadyState {
652 current_time_point: TimePoint::dummy(),
653 })),
654 runner,
655 false,
656 )
657 .await;
658 runtime.cycle().await.unwrap();
659
660 assert_eq!("signing".to_string(), runtime.get_state());
661 }
662
663 #[tokio::test]
664 async fn signing_changing_open_message_to_ready() {
665 let mut runner = MockAggregatorRunner::new();
666 runner
667 .expect_get_time_point_from_chain()
668 .once()
669 .returning(|| Ok(TimePoint::dummy()));
670 runner
671 .expect_is_open_message_outdated()
672 .once()
673 .returning(|_, _| Ok(true));
674 runner
675 .expect_increment_runtime_cycle_total_since_startup_counter()
676 .once()
677 .returning(|| ());
678 runner
679 .expect_increment_runtime_cycle_success_since_startup_counter()
680 .once()
681 .returning(|| ());
682
683 let initial_state = AggregatorState::Signing(SigningState {
684 current_time_point: TimePoint::dummy(),
685 open_message: OpenMessage::dummy(),
686 });
687
688 let mut runtime = init_runtime(Some(initial_state), runner, false).await;
689 runtime.cycle().await.unwrap();
690
691 assert_eq!("ready".to_string(), runtime.get_state());
692 }
693
694 #[tokio::test]
695 async fn signing_certificate_is_not_created() {
696 let mut runner = MockAggregatorRunner::new();
697 runner
698 .expect_get_time_point_from_chain()
699 .once()
700 .returning(|| Ok(TimePoint::dummy()));
701 runner
702 .expect_is_open_message_outdated()
703 .once()
704 .returning(|_, _| Ok(false));
705 runner
706 .expect_create_certificate()
707 .once()
708 .returning(|_| Ok(None));
709 runner
710 .expect_increment_runtime_cycle_total_since_startup_counter()
711 .once()
712 .returning(|| ());
713 runner
714 .expect_increment_runtime_cycle_success_since_startup_counter()
715 .never();
716 let state = SigningState {
717 current_time_point: TimePoint::dummy(),
718 open_message: OpenMessage::dummy(),
719 };
720 let mut runtime =
721 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
722 let err = runtime
723 .cycle()
724 .await
725 .expect_err("cycle should have returned an error");
726
727 match err {
728 RuntimeError::KeepState { .. } => (),
729 _ => panic!("KeepState error expected, got {err:?}."),
730 };
731
732 assert_eq!("signing".to_string(), runtime.get_state());
733 }
734
735 #[tokio::test]
736 async fn signing_artifact_not_created() {
737 let mut runner = MockAggregatorRunner::new();
738 runner
739 .expect_get_time_point_from_chain()
740 .once()
741 .returning(|| Ok(TimePoint::dummy()));
742 runner
743 .expect_is_open_message_outdated()
744 .once()
745 .returning(|_, _| Ok(false));
746 runner
747 .expect_create_certificate()
748 .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
749 runner
750 .expect_create_artifact()
751 .once()
752 .returning(|_, _| Err(anyhow!("whatever")));
753 runner
754 .expect_increment_runtime_cycle_total_since_startup_counter()
755 .once()
756 .returning(|| ());
757 runner
758 .expect_increment_runtime_cycle_success_since_startup_counter()
759 .never();
760 let state = SigningState {
761 current_time_point: TimePoint::dummy(),
762 open_message: OpenMessage::dummy(),
763 };
764 let mut runtime =
765 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
766 let err = runtime
767 .cycle()
768 .await
769 .expect_err("cycle should have returned an error");
770
771 match err {
772 RuntimeError::ReInit { .. } => (),
773 _ => panic!("ReInit error expected, got {err:?}."),
774 };
775
776 assert_eq!("signing".to_string(), runtime.get_state());
777 }
778
779 #[tokio::test]
780 async fn signing_certificate_is_created() {
781 let mut runner = MockAggregatorRunner::new();
782 runner
783 .expect_get_time_point_from_chain()
784 .once()
785 .returning(|| Ok(TimePoint::dummy()));
786 runner
787 .expect_is_open_message_outdated()
788 .once()
789 .returning(|_, _| Ok(false));
790 runner
791 .expect_create_certificate()
792 .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
793 runner
794 .expect_create_artifact()
795 .once()
796 .returning(|_, _| Ok(()));
797 runner
798 .expect_increment_runtime_cycle_total_since_startup_counter()
799 .once()
800 .returning(|| ());
801 runner
802 .expect_increment_runtime_cycle_success_since_startup_counter()
803 .once()
804 .returning(|| ());
805
806 let state = SigningState {
807 current_time_point: TimePoint::dummy(),
808 open_message: OpenMessage::dummy(),
809 };
810 let mut runtime =
811 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
812 runtime.cycle().await.unwrap();
813
814 assert_eq!("ready".to_string(), runtime.get_state());
815 }
816
817 #[tokio::test]
818 pub async fn critical_error() {
819 let mut runner = MockAggregatorRunner::new();
820 runner
821 .expect_get_time_point_from_chain()
822 .once()
823 .returning(|| Ok(TimePoint::dummy()));
824 runner
825 .expect_update_era_checker()
826 .with(predicate::eq(TimePoint::dummy().epoch))
827 .once()
828 .returning(|_| Err(anyhow!("ERROR")));
829 runner
830 .expect_close_signer_registration_round()
831 .once()
832 .returning(|| Ok(()));
833 runner
834 .expect_increment_runtime_cycle_total_since_startup_counter()
835 .once()
836 .returning(|| ());
837 runner
838 .expect_increment_runtime_cycle_success_since_startup_counter()
839 .never();
840
841 let mut runtime = init_runtime(
842 Some(AggregatorState::Idle(IdleState {
843 current_time_point: None,
844 })),
845 runner,
846 false,
847 )
848 .await;
849 runtime.cycle().await.unwrap_err();
850
851 assert_eq!("idle".to_string(), runtime.get_state());
852 }
853 }
854
855 mod follower {
856 use super::*;
857
858 #[tokio::test]
859 pub async fn idle_new_epoch_detected_and_leader_not_transitioned_to_epoch() {
860 let mut runner = MockAggregatorRunner::new();
861 let time_point = TimePoint::dummy();
862 let new_time_point = TimePoint {
863 epoch: time_point.epoch + 1,
864 ..time_point.clone()
865 };
866 runner
867 .expect_get_time_point_from_chain()
868 .once()
869 .returning(move || Ok(new_time_point.clone()));
870 runner
871 .expect_is_follower_aggregator_at_same_epoch_as_leader()
872 .once()
873 .returning(|_| Ok(false));
874 runner
875 .expect_increment_runtime_cycle_total_since_startup_counter()
876 .once()
877 .returning(|| ());
878 runner
879 .expect_increment_runtime_cycle_success_since_startup_counter()
880 .once()
881 .returning(|| ());
882 let mut runtime = init_runtime(
883 Some(AggregatorState::Idle(IdleState {
884 current_time_point: Some(time_point),
885 })),
886 runner,
887 true,
888 )
889 .await;
890 runtime.cycle().await.unwrap();
891
892 assert_eq!("idle".to_string(), runtime.get_state());
893 }
894
895 #[tokio::test]
896 pub async fn idle_new_epoch_detected_and_leader_has_transitioned_to_epoch() {
897 let mut runner = MockAggregatorRunner::new();
898 let time_point = TimePoint::dummy();
899 let new_time_point = TimePoint {
900 epoch: time_point.epoch + 1,
901 ..time_point.clone()
902 };
903 let new_time_point_clone = new_time_point.clone();
904 runner
905 .expect_get_time_point_from_chain()
906 .once()
907 .returning(move || Ok(new_time_point.clone()));
908 runner
909 .expect_is_follower_aggregator_at_same_epoch_as_leader()
910 .once()
911 .returning(|_| Ok(true));
912 runner
913 .expect_update_stake_distribution()
914 .with(predicate::eq(new_time_point_clone.clone()))
915 .once()
916 .returning(|_| Ok(()));
917 runner
918 .expect_close_signer_registration_round()
919 .once()
920 .returning(|| Ok(()));
921 runner
922 .expect_synchronize_follower_aggregator_signer_registration()
923 .once()
924 .returning(|| Ok(()));
925 runner
926 .expect_open_signer_registration_round()
927 .once()
928 .returning(|_| Ok(()));
929 runner
930 .expect_is_certificate_chain_valid()
931 .once()
932 .returning(|_| Ok(()));
933 runner
934 .expect_update_era_checker()
935 .with(predicate::eq(new_time_point_clone.clone().epoch))
936 .once()
937 .returning(|_| Ok(()));
938 runner
939 .expect_inform_new_epoch()
940 .with(predicate::eq(new_time_point_clone.clone().epoch))
941 .once()
942 .returning(|_| Ok(()));
943 runner
944 .expect_update_epoch_settings()
945 .once()
946 .returning(|| Ok(()));
947 runner
948 .expect_precompute_epoch_data()
949 .once()
950 .returning(|| Ok(()));
951 runner
952 .expect_upkeep()
953 .with(predicate::eq(new_time_point_clone.clone().epoch))
954 .once()
955 .returning(|_| Ok(()));
956 runner
957 .expect_increment_runtime_cycle_total_since_startup_counter()
958 .once()
959 .returning(|| ());
960 runner
961 .expect_increment_runtime_cycle_success_since_startup_counter()
962 .once()
963 .returning(|| ());
964 let mut runtime = init_runtime(
965 Some(AggregatorState::Idle(IdleState {
966 current_time_point: Some(time_point),
967 })),
968 runner,
969 true,
970 )
971 .await;
972 runtime.cycle().await.unwrap();
973
974 assert_eq!("ready".to_string(), runtime.get_state());
975 }
976 }
977}