1use anyhow::Context;
2use chrono::Local;
3use slog::{Logger, info, trace};
4use std::fmt::Display;
5use std::sync::Arc;
6
7use mithril_common::entities::TimePoint;
8use mithril_common::logging::LoggerExtensions;
9
10use crate::AggregatorConfig;
11use crate::entities::OpenMessage;
12use crate::runtime::{AggregatorRunnerTrait, RuntimeError};
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct IdleState {
16 current_time_point: Option<TimePoint>,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct ReadyState {
21 current_time_point: TimePoint,
22}
23
24#[derive(Clone, Debug, PartialEq)]
25pub struct SigningState {
26 current_time_point: TimePoint,
27 open_message: OpenMessage,
28}
29
30#[derive(Clone, Debug, PartialEq)]
31pub enum AggregatorState {
32 Idle(IdleState),
33 Ready(ReadyState),
34 Signing(SigningState),
35}
36
37impl Display for AggregatorState {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 AggregatorState::Idle(state) => write!(
41 f,
42 "Idle - {}",
43 match &state.current_time_point {
44 None => "No TimePoint".to_string(),
45 Some(time_point) => time_point.to_string(),
46 }
47 ),
48 AggregatorState::Ready(state) => write!(f, "Ready - {}", state.current_time_point),
49 AggregatorState::Signing(state) => write!(f, "Signing - {}", state.current_time_point),
50 }
51 }
52}
53
54pub struct AggregatorRuntime {
60 config: AggregatorConfig,
61 state: AggregatorState,
62 runner: Arc<dyn AggregatorRunnerTrait>,
63 logger: Logger,
64}
65
66impl AggregatorRuntime {
67 pub async fn new(
69 aggregator_config: AggregatorConfig,
70 init_state: Option<AggregatorState>,
71 runner: Arc<dyn AggregatorRunnerTrait>,
72 logger: Logger,
73 ) -> Result<Self, RuntimeError> {
74 let logger = logger.new_with_component_name::<Self>();
75 info!(logger, "Initializing runtime");
76
77 let state = if let Some(init_state) = init_state {
78 trace!(logger, "Got initial state from caller");
79 init_state
80 } else {
81 trace!(logger, "Idle state, no current time point");
82 AggregatorState::Idle(IdleState {
83 current_time_point: None,
84 })
85 };
86
87 Ok(Self {
88 config: aggregator_config,
89 state,
90 runner,
91 logger,
92 })
93 }
94
95 pub fn get_state(&self) -> String {
97 match self.state {
98 AggregatorState::Idle(_) => "idle".to_string(),
99 AggregatorState::Ready(_) => "ready".to_string(),
100 AggregatorState::Signing(_) => "signing".to_string(),
101 }
102 }
103
104 pub async fn run(&mut self) -> Result<(), RuntimeError> {
106 info!(self.logger, "Launching State Machine");
107 let mut interval = tokio::time::interval(self.config.interval);
108
109 loop {
110 interval.tick().await;
111 let approximate_next_cycle_time = Local::now() + self.config.interval;
114
115 if let Err(e) = self.cycle().await {
116 e.write_to_log(&self.logger);
117 if e.is_critical() {
118 return Err(e);
119 }
120 }
121
122 info!(
123 self.logger, "… Cycle finished";
124 "approximate_next_cycle_time" => %approximate_next_cycle_time.time().format("%H:%M:%S%.3f"),
125 "run_interval_in_ms" => self.config.interval.as_millis(),
126 );
127 }
128 }
129
130 pub async fn cycle(&mut self) -> Result<(), RuntimeError> {
132 info!(
133 self.logger,
134 "================================================================================"
135 );
136 info!(self.logger, "New cycle: {}", self.state);
137
138 self.runner.increment_runtime_cycle_total_since_startup_counter();
139
140 match self.state.clone() {
141 AggregatorState::Idle(state) => {
142 let last_time_point = self.runner.get_time_point_from_chain().await.with_context(
143 || "AggregatorRuntime in the state IDLE can not get current time point from chain",
144 )?;
145
146 info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);
147
148 let can_try_transition_from_idle_to_ready = if self.config.is_follower {
149 self.runner
150 .is_follower_aggregator_at_same_epoch_as_leader(&last_time_point)
151 .await?
152 } else {
153 true
154 };
155 if can_try_transition_from_idle_to_ready {
156 self.try_transition_from_idle_to_ready(
157 state.current_time_point,
158 last_time_point.clone(),
159 )
160 .await?;
161 self.state = AggregatorState::Ready(ReadyState {
162 current_time_point: last_time_point,
163 });
164 }
165 }
166 AggregatorState::Ready(state) => {
167 let last_time_point: TimePoint = self
168 .runner
169 .get_time_point_from_chain()
170 .await
171 .with_context(|| {
172 "AggregatorRuntime in the state READY can not get current time point from chain"
173 })?;
174
175 if state.current_time_point.epoch < last_time_point.epoch {
176 info!(self.logger, "→ Epoch has changed, transitioning to IDLE"; "last_time_point" => ?last_time_point);
178 self.state = AggregatorState::Idle(IdleState {
179 current_time_point: Some(state.current_time_point),
180 });
181 } else if let Some(open_message) = self
182 .runner
183 .get_current_non_certified_open_message(&last_time_point)
184 .await
185 .with_context(|| "AggregatorRuntime can not get the current open message")?
186 {
187 info!(self.logger, "→ Transitioning to SIGNING");
189 let new_state = self
190 .transition_from_ready_to_signing(last_time_point.clone(), open_message.clone())
191 .await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
192 self.state = AggregatorState::Signing(new_state);
193 } else {
194 info!(
196 self.logger, " ⋅ No open message to certify, waiting…";
197 "time_point" => ?state.current_time_point
198 );
199 self.state = AggregatorState::Ready(ReadyState {
200 current_time_point: last_time_point,
201 });
202 }
203 }
204 AggregatorState::Signing(state) => {
205 let last_time_point: TimePoint =
206 self.runner.get_time_point_from_chain().await.with_context(|| {
207 "AggregatorRuntime in the state SIGNING can not get current time point from chain"
208 })?;
209
210 let is_outdated = self
211 .runner
212 .is_open_message_outdated(
213 state.open_message.signed_entity_type.clone(),
214 &last_time_point,
215 )
216 .await?;
217
218 if state.current_time_point.epoch < last_time_point.epoch {
219 info!(self.logger, "→ Epoch changed, transitioning to IDLE");
221 let new_state = self.transition_from_signing_to_idle(state).await?;
222 self.state = AggregatorState::Idle(new_state);
223 } else if is_outdated {
224 info!(
226 self.logger,
227 "→ Open message changed, transitioning to READY"
228 );
229 let new_state =
230 self.transition_from_signing_to_ready_new_open_message(state).await?;
231 self.state = AggregatorState::Ready(new_state);
232 } else {
233 let new_state =
235 self.transition_from_signing_to_ready_multisignature(state).await?;
236 info!(
237 self.logger,
238 "→ A multi-signature has been created, build an artifact & a certificate and transitioning back to READY"
239 );
240 self.state = AggregatorState::Ready(new_state);
241 }
242 }
243 }
244
245 self.runner.increment_runtime_cycle_success_since_startup_counter();
246
247 Ok(())
248 }
249
250 async fn try_transition_from_idle_to_ready(
253 &mut self,
254 maybe_current_time_point: Option<TimePoint>,
255 new_time_point: TimePoint,
256 ) -> Result<(), RuntimeError> {
257 trace!(self.logger, "Trying transition from IDLE to READY state");
258
259 if maybe_current_time_point.is_none()
260 || maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
261 {
262 self.runner.close_signer_registration_round().await?;
263 self.runner
264 .update_era_checker(new_time_point.epoch)
265 .await
266 .map_err(|e| RuntimeError::critical("transiting IDLE → READY", Some(e)))?;
267 self.runner.update_stake_distribution(&new_time_point).await?;
268 self.runner.inform_new_epoch(new_time_point.epoch).await?;
269 self.runner.upkeep(new_time_point.epoch).await?;
270 self.runner.open_signer_registration_round(&new_time_point).await?;
271 self.runner.update_epoch_settings().await?;
272 if self.config.is_follower {
273 self.runner
274 .synchronize_follower_aggregator_signer_registration()
275 .await?;
276 }
277 self.runner.precompute_epoch_data().await?;
278 }
279
280 self.runner
281 .is_certificate_chain_valid(&new_time_point)
282 .await
283 .map_err(|e| RuntimeError::KeepState {
284 message: "certificate chain is invalid".to_string(),
285 nested_error: e.into(),
286 })?;
287
288 Ok(())
289 }
290
291 async fn transition_from_signing_to_ready_multisignature(
294 &self,
295 state: SigningState,
296 ) -> Result<ReadyState, RuntimeError> {
297 trace!(
298 self.logger,
299 "Launching transition from SIGNING to READY state"
300 );
301 let certificate = self
302 .runner
303 .create_certificate(&state.open_message.signed_entity_type)
304 .await?
305 .ok_or_else(|| RuntimeError::KeepState {
306 message: "not enough signature yet to create a certificate, waiting…".to_string(),
307 nested_error: None,
308 })?;
309 self.runner
310 .create_artifact(&state.open_message.signed_entity_type, &certificate)
311 .await
312 .map_err(|e| RuntimeError::ReInit {
313 message: "transiting SIGNING → READY: failed to create artifact. Retrying…"
314 .to_string(),
315 nested_error: Some(e),
316 })?;
317
318 Ok(ReadyState {
319 current_time_point: state.current_time_point,
320 })
321 }
322
323 async fn transition_from_signing_to_idle(
326 &self,
327 state: SigningState,
328 ) -> Result<IdleState, RuntimeError> {
329 trace!(
330 self.logger,
331 "Launching transition from SIGNING to IDLE state"
332 );
333
334 Ok(IdleState {
335 current_time_point: Some(state.current_time_point),
336 })
337 }
338
339 async fn transition_from_signing_to_ready_new_open_message(
342 &self,
343 state: SigningState,
344 ) -> Result<ReadyState, RuntimeError> {
345 trace!(
346 self.logger,
347 "Launching transition from SIGNING to READY state"
348 );
349
350 Ok(ReadyState {
351 current_time_point: state.current_time_point,
352 })
353 }
354
355 async fn transition_from_ready_to_signing(
358 &mut self,
359 new_time_point: TimePoint,
360 open_message: OpenMessage,
361 ) -> Result<SigningState, RuntimeError> {
362 trace!(
363 self.logger,
364 "Launching transition from READY to SIGNING state"
365 );
366
367 let state = SigningState {
368 current_time_point: new_time_point,
369 open_message,
370 };
371
372 Ok(state)
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use crate::entities::OpenMessage;
379 use anyhow::anyhow;
380 use mockall::predicate;
381 use std::time::Duration;
382
383 use mithril_common::test_utils::fake_data;
384
385 use crate::test_tools::TestLogger;
386
387 use super::super::runner::MockAggregatorRunner;
388 use super::*;
389
390 async fn init_runtime(
391 init_state: Option<AggregatorState>,
392 runner: MockAggregatorRunner,
393 is_follower: bool,
394 ) -> AggregatorRuntime {
395 AggregatorRuntime::new(
396 AggregatorConfig::new(Duration::from_millis(20), is_follower),
397 init_state,
398 Arc::new(runner),
399 TestLogger::stdout(),
400 )
401 .await
402 .unwrap()
403 }
404
405 mod leader {
406 use super::*;
407
408 #[tokio::test]
409 pub async fn idle_check_certificate_chain_is_not_valid() {
410 let mut runner = MockAggregatorRunner::new();
411 runner
412 .expect_get_time_point_from_chain()
413 .once()
414 .returning(|| Ok(TimePoint::dummy()));
415 runner
416 .expect_update_stake_distribution()
417 .with(predicate::eq(TimePoint::dummy()))
418 .once()
419 .returning(|_| Ok(()));
420 runner
421 .expect_close_signer_registration_round()
422 .once()
423 .returning(|| Ok(()));
424 runner
425 .expect_open_signer_registration_round()
426 .once()
427 .returning(|_| Ok(()));
428 runner
429 .expect_is_certificate_chain_valid()
430 .once()
431 .returning(|_| Err(anyhow!("error")));
432 runner
433 .expect_update_era_checker()
434 .with(predicate::eq(TimePoint::dummy().epoch))
435 .once()
436 .returning(|_| Ok(()));
437 runner
438 .expect_inform_new_epoch()
439 .with(predicate::eq(TimePoint::dummy().epoch))
440 .once()
441 .returning(|_| Ok(()));
442 runner.expect_update_epoch_settings().once().returning(|| Ok(()));
443 runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
444 runner
445 .expect_upkeep()
446 .with(predicate::eq(TimePoint::dummy().epoch))
447 .once()
448 .returning(|_| Ok(()));
449 runner
450 .expect_increment_runtime_cycle_total_since_startup_counter()
451 .once()
452 .returning(|| ());
453 runner
454 .expect_increment_runtime_cycle_success_since_startup_counter()
455 .never();
456
457 let mut runtime = init_runtime(
458 Some(AggregatorState::Idle(IdleState {
459 current_time_point: None,
460 })),
461 runner,
462 false,
463 )
464 .await;
465 let err = runtime.cycle().await.unwrap_err();
466 assert!(matches!(err, RuntimeError::KeepState { .. }));
467
468 assert_eq!("idle".to_string(), runtime.get_state());
469 }
470
471 #[tokio::test]
472 pub async fn idle_check_certificate_chain_is_valid() {
473 let mut runner = MockAggregatorRunner::new();
474 runner
475 .expect_get_time_point_from_chain()
476 .once()
477 .returning(|| Ok(TimePoint::dummy()));
478 runner
479 .expect_update_stake_distribution()
480 .with(predicate::eq(TimePoint::dummy()))
481 .once()
482 .returning(|_| Ok(()));
483 runner
484 .expect_close_signer_registration_round()
485 .once()
486 .returning(|| Ok(()));
487 runner
488 .expect_open_signer_registration_round()
489 .once()
490 .returning(|_| Ok(()));
491 runner
492 .expect_is_certificate_chain_valid()
493 .once()
494 .returning(|_| Ok(()));
495 runner
496 .expect_update_era_checker()
497 .with(predicate::eq(TimePoint::dummy().epoch))
498 .once()
499 .returning(|_| Ok(()));
500 runner
501 .expect_inform_new_epoch()
502 .with(predicate::eq(TimePoint::dummy().epoch))
503 .once()
504 .returning(|_| Ok(()));
505 runner.expect_update_epoch_settings().once().returning(|| Ok(()));
506 runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
507 runner
508 .expect_upkeep()
509 .with(predicate::eq(TimePoint::dummy().epoch))
510 .once()
511 .returning(|_| Ok(()));
512 runner
513 .expect_increment_runtime_cycle_total_since_startup_counter()
514 .once()
515 .returning(|| ());
516 runner
517 .expect_increment_runtime_cycle_success_since_startup_counter()
518 .once()
519 .returning(|| ());
520
521 let mut runtime = init_runtime(
522 Some(AggregatorState::Idle(IdleState {
523 current_time_point: None,
524 })),
525 runner,
526 false,
527 )
528 .await;
529 runtime.cycle().await.unwrap();
530
531 assert_eq!("ready".to_string(), runtime.get_state());
532 }
533
534 #[tokio::test]
535 pub async fn ready_new_epoch_detected() {
536 let mut runner = MockAggregatorRunner::new();
537 let time_point = TimePoint::dummy();
538 let new_time_point = TimePoint {
539 epoch: time_point.epoch + 1,
540 ..time_point.clone()
541 };
542 runner
543 .expect_get_time_point_from_chain()
544 .once()
545 .returning(move || Ok(new_time_point.clone()));
546 runner
547 .expect_increment_runtime_cycle_total_since_startup_counter()
548 .once()
549 .returning(|| ());
550 runner
551 .expect_increment_runtime_cycle_success_since_startup_counter()
552 .once()
553 .returning(|| ());
554 let mut runtime = init_runtime(
555 Some(AggregatorState::Ready(ReadyState {
556 current_time_point: time_point,
557 })),
558 runner,
559 false,
560 )
561 .await;
562 runtime.cycle().await.unwrap();
563
564 assert_eq!("idle".to_string(), runtime.get_state());
565 }
566
567 #[tokio::test]
568 pub async fn ready_open_message_not_exist() {
569 let mut runner = MockAggregatorRunner::new();
570 let time_point = TimePoint::dummy();
571 let next_time_point = TimePoint {
572 immutable_file_number: time_point.immutable_file_number + 1,
573 ..time_point.clone()
574 };
575 let expected_time_point = next_time_point.clone();
576 runner
577 .expect_get_time_point_from_chain()
578 .once()
579 .returning(move || Ok(next_time_point.clone()));
580 runner
581 .expect_get_current_non_certified_open_message()
582 .once()
583 .returning(|_| Ok(None));
584 runner
585 .expect_increment_runtime_cycle_total_since_startup_counter()
586 .once()
587 .returning(|| ());
588 runner
589 .expect_increment_runtime_cycle_success_since_startup_counter()
590 .once()
591 .returning(|| ());
592 let mut runtime = init_runtime(
593 Some(AggregatorState::Ready(ReadyState {
594 current_time_point: time_point.clone(),
595 })),
596 runner,
597 false,
598 )
599 .await;
600 runtime.cycle().await.unwrap();
601
602 assert_eq!("ready".to_string(), runtime.get_state());
603 assert_eq!(
604 AggregatorState::Ready(ReadyState {
605 current_time_point: expected_time_point,
606 }),
607 runtime.state
608 );
609 }
610
611 #[tokio::test]
612 pub async fn ready_certificate_does_not_exist_for_time_point() {
613 let mut runner = MockAggregatorRunner::new();
614 runner
615 .expect_get_time_point_from_chain()
616 .once()
617 .returning(|| Ok(TimePoint::dummy()));
618 runner
619 .expect_get_current_non_certified_open_message()
620 .once()
621 .returning(|_| {
622 let open_message = OpenMessage {
623 is_certified: false,
624 ..OpenMessage::dummy()
625 };
626 Ok(Some(open_message))
627 });
628 runner
629 .expect_increment_runtime_cycle_total_since_startup_counter()
630 .once()
631 .returning(|| ());
632 runner
633 .expect_increment_runtime_cycle_success_since_startup_counter()
634 .once()
635 .returning(|| ());
636
637 let mut runtime = init_runtime(
638 Some(AggregatorState::Ready(ReadyState {
639 current_time_point: TimePoint::dummy(),
640 })),
641 runner,
642 false,
643 )
644 .await;
645 runtime.cycle().await.unwrap();
646
647 assert_eq!("signing".to_string(), runtime.get_state());
648 }
649
650 #[tokio::test]
651 async fn signing_changing_open_message_to_ready() {
652 let mut runner = MockAggregatorRunner::new();
653 runner
654 .expect_get_time_point_from_chain()
655 .once()
656 .returning(|| Ok(TimePoint::dummy()));
657 runner
658 .expect_is_open_message_outdated()
659 .once()
660 .returning(|_, _| Ok(true));
661 runner
662 .expect_increment_runtime_cycle_total_since_startup_counter()
663 .once()
664 .returning(|| ());
665 runner
666 .expect_increment_runtime_cycle_success_since_startup_counter()
667 .once()
668 .returning(|| ());
669
670 let initial_state = AggregatorState::Signing(SigningState {
671 current_time_point: TimePoint::dummy(),
672 open_message: OpenMessage::dummy(),
673 });
674
675 let mut runtime = init_runtime(Some(initial_state), runner, false).await;
676 runtime.cycle().await.unwrap();
677
678 assert_eq!("ready".to_string(), runtime.get_state());
679 }
680
681 #[tokio::test]
682 async fn signing_certificate_is_not_created() {
683 let mut runner = MockAggregatorRunner::new();
684 runner
685 .expect_get_time_point_from_chain()
686 .once()
687 .returning(|| Ok(TimePoint::dummy()));
688 runner
689 .expect_is_open_message_outdated()
690 .once()
691 .returning(|_, _| Ok(false));
692 runner.expect_create_certificate().once().returning(|_| Ok(None));
693 runner
694 .expect_increment_runtime_cycle_total_since_startup_counter()
695 .once()
696 .returning(|| ());
697 runner
698 .expect_increment_runtime_cycle_success_since_startup_counter()
699 .never();
700 let state = SigningState {
701 current_time_point: TimePoint::dummy(),
702 open_message: OpenMessage::dummy(),
703 };
704 let mut runtime =
705 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
706 let err = runtime
707 .cycle()
708 .await
709 .expect_err("cycle should have returned an error");
710
711 match err {
712 RuntimeError::KeepState { .. } => (),
713 _ => panic!("KeepState error expected, got {err:?}."),
714 };
715
716 assert_eq!("signing".to_string(), runtime.get_state());
717 }
718
719 #[tokio::test]
720 async fn signing_artifact_not_created() {
721 let mut runner = MockAggregatorRunner::new();
722 runner
723 .expect_get_time_point_from_chain()
724 .once()
725 .returning(|| Ok(TimePoint::dummy()));
726 runner
727 .expect_is_open_message_outdated()
728 .once()
729 .returning(|_, _| Ok(false));
730 runner
731 .expect_create_certificate()
732 .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
733 runner
734 .expect_create_artifact()
735 .once()
736 .returning(|_, _| Err(anyhow!("whatever")));
737 runner
738 .expect_increment_runtime_cycle_total_since_startup_counter()
739 .once()
740 .returning(|| ());
741 runner
742 .expect_increment_runtime_cycle_success_since_startup_counter()
743 .never();
744 let state = SigningState {
745 current_time_point: TimePoint::dummy(),
746 open_message: OpenMessage::dummy(),
747 };
748 let mut runtime =
749 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
750 let err = runtime
751 .cycle()
752 .await
753 .expect_err("cycle should have returned an error");
754
755 match err {
756 RuntimeError::ReInit { .. } => (),
757 _ => panic!("ReInit error expected, got {err:?}."),
758 };
759
760 assert_eq!("signing".to_string(), runtime.get_state());
761 }
762
763 #[tokio::test]
764 async fn signing_certificate_is_created() {
765 let mut runner = MockAggregatorRunner::new();
766 runner
767 .expect_get_time_point_from_chain()
768 .once()
769 .returning(|| Ok(TimePoint::dummy()));
770 runner
771 .expect_is_open_message_outdated()
772 .once()
773 .returning(|_, _| Ok(false));
774 runner
775 .expect_create_certificate()
776 .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
777 runner.expect_create_artifact().once().returning(|_, _| Ok(()));
778 runner
779 .expect_increment_runtime_cycle_total_since_startup_counter()
780 .once()
781 .returning(|| ());
782 runner
783 .expect_increment_runtime_cycle_success_since_startup_counter()
784 .once()
785 .returning(|| ());
786
787 let state = SigningState {
788 current_time_point: TimePoint::dummy(),
789 open_message: OpenMessage::dummy(),
790 };
791 let mut runtime =
792 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
793 runtime.cycle().await.unwrap();
794
795 assert_eq!("ready".to_string(), runtime.get_state());
796 }
797
798 #[tokio::test]
799 pub async fn critical_error() {
800 let mut runner = MockAggregatorRunner::new();
801 runner
802 .expect_get_time_point_from_chain()
803 .once()
804 .returning(|| Ok(TimePoint::dummy()));
805 runner
806 .expect_update_era_checker()
807 .with(predicate::eq(TimePoint::dummy().epoch))
808 .once()
809 .returning(|_| Err(anyhow!("ERROR")));
810 runner
811 .expect_close_signer_registration_round()
812 .once()
813 .returning(|| Ok(()));
814 runner
815 .expect_increment_runtime_cycle_total_since_startup_counter()
816 .once()
817 .returning(|| ());
818 runner
819 .expect_increment_runtime_cycle_success_since_startup_counter()
820 .never();
821
822 let mut runtime = init_runtime(
823 Some(AggregatorState::Idle(IdleState {
824 current_time_point: None,
825 })),
826 runner,
827 false,
828 )
829 .await;
830 runtime.cycle().await.unwrap_err();
831
832 assert_eq!("idle".to_string(), runtime.get_state());
833 }
834 }
835
836 mod follower {
837 use super::*;
838
839 #[tokio::test]
840 pub async fn idle_new_epoch_detected_and_leader_not_transitioned_to_epoch() {
841 let mut runner = MockAggregatorRunner::new();
842 let time_point = TimePoint::dummy();
843 let new_time_point = TimePoint {
844 epoch: time_point.epoch + 1,
845 ..time_point.clone()
846 };
847 runner
848 .expect_get_time_point_from_chain()
849 .once()
850 .returning(move || Ok(new_time_point.clone()));
851 runner
852 .expect_is_follower_aggregator_at_same_epoch_as_leader()
853 .once()
854 .returning(|_| Ok(false));
855 runner
856 .expect_increment_runtime_cycle_total_since_startup_counter()
857 .once()
858 .returning(|| ());
859 runner
860 .expect_increment_runtime_cycle_success_since_startup_counter()
861 .once()
862 .returning(|| ());
863 let mut runtime = init_runtime(
864 Some(AggregatorState::Idle(IdleState {
865 current_time_point: Some(time_point),
866 })),
867 runner,
868 true,
869 )
870 .await;
871 runtime.cycle().await.unwrap();
872
873 assert_eq!("idle".to_string(), runtime.get_state());
874 }
875
876 #[tokio::test]
877 pub async fn idle_new_epoch_detected_and_leader_has_transitioned_to_epoch() {
878 let mut runner = MockAggregatorRunner::new();
879 let time_point = TimePoint::dummy();
880 let new_time_point = TimePoint {
881 epoch: time_point.epoch + 1,
882 ..time_point.clone()
883 };
884 let new_time_point_clone = new_time_point.clone();
885 runner
886 .expect_get_time_point_from_chain()
887 .once()
888 .returning(move || Ok(new_time_point.clone()));
889 runner
890 .expect_is_follower_aggregator_at_same_epoch_as_leader()
891 .once()
892 .returning(|_| Ok(true));
893 runner
894 .expect_update_stake_distribution()
895 .with(predicate::eq(new_time_point_clone.clone()))
896 .once()
897 .returning(|_| Ok(()));
898 runner
899 .expect_close_signer_registration_round()
900 .once()
901 .returning(|| Ok(()));
902 runner
903 .expect_synchronize_follower_aggregator_signer_registration()
904 .once()
905 .returning(|| Ok(()));
906 runner
907 .expect_open_signer_registration_round()
908 .once()
909 .returning(|_| Ok(()));
910 runner
911 .expect_is_certificate_chain_valid()
912 .once()
913 .returning(|_| Ok(()));
914 runner
915 .expect_update_era_checker()
916 .with(predicate::eq(new_time_point_clone.clone().epoch))
917 .once()
918 .returning(|_| Ok(()));
919 runner
920 .expect_inform_new_epoch()
921 .with(predicate::eq(new_time_point_clone.clone().epoch))
922 .once()
923 .returning(|_| Ok(()));
924 runner.expect_update_epoch_settings().once().returning(|| Ok(()));
925 runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
926 runner
927 .expect_upkeep()
928 .with(predicate::eq(new_time_point_clone.clone().epoch))
929 .once()
930 .returning(|_| Ok(()));
931 runner
932 .expect_increment_runtime_cycle_total_since_startup_counter()
933 .once()
934 .returning(|| ());
935 runner
936 .expect_increment_runtime_cycle_success_since_startup_counter()
937 .once()
938 .returning(|| ());
939 let mut runtime = init_runtime(
940 Some(AggregatorState::Idle(IdleState {
941 current_time_point: Some(time_point),
942 })),
943 runner,
944 true,
945 )
946 .await;
947 runtime.cycle().await.unwrap();
948
949 assert_eq!("ready".to_string(), runtime.get_state());
950 }
951 }
952}