1use anyhow::Context;
2use chrono::Local;
3use slog::{Logger, info, trace};
4use std::fmt::Display;
5use std::sync::Arc;
6
7use mithril_common::entities::TimePoint;
8use mithril_common::logging::LoggerExtensions;
9
10use crate::AggregatorConfig;
11use crate::entities::OpenMessage;
12use crate::runtime::{AggregatorRunnerTrait, RuntimeError};
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct IdleState {
16 current_time_point: Option<TimePoint>,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct ReadyState {
21 current_time_point: TimePoint,
22}
23
24#[derive(Clone, Debug, PartialEq)]
25pub struct SigningState {
26 current_time_point: TimePoint,
27 open_message: OpenMessage,
28}
29
30#[derive(Clone, Debug, PartialEq)]
31pub enum AggregatorState {
32 Idle(IdleState),
33 Ready(ReadyState),
34 Signing(SigningState),
35}
36
37impl Display for AggregatorState {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 AggregatorState::Idle(state) => write!(
41 f,
42 "Idle - {}",
43 match &state.current_time_point {
44 None => "No TimePoint".to_string(),
45 Some(time_point) => time_point.to_string(),
46 }
47 ),
48 AggregatorState::Ready(state) => write!(f, "Ready - {}", state.current_time_point),
49 AggregatorState::Signing(state) => write!(f, "Signing - {}", state.current_time_point),
50 }
51 }
52}
53
54pub struct AggregatorRuntime {
60 config: AggregatorConfig,
61 state: AggregatorState,
62 runner: Arc<dyn AggregatorRunnerTrait>,
63 logger: Logger,
64}
65
66impl AggregatorRuntime {
67 pub async fn new(
69 aggregator_config: AggregatorConfig,
70 init_state: Option<AggregatorState>,
71 runner: Arc<dyn AggregatorRunnerTrait>,
72 logger: Logger,
73 ) -> Result<Self, RuntimeError> {
74 let logger = logger.new_with_component_name::<Self>();
75 info!(logger, "Initializing runtime");
76
77 let state = if let Some(init_state) = init_state {
78 trace!(logger, "Got initial state from caller");
79 init_state
80 } else {
81 trace!(logger, "Idle state, no current time point");
82 AggregatorState::Idle(IdleState {
83 current_time_point: None,
84 })
85 };
86
87 Ok(Self {
88 config: aggregator_config,
89 state,
90 runner,
91 logger,
92 })
93 }
94
95 pub fn get_state(&self) -> String {
97 match self.state {
98 AggregatorState::Idle(_) => "idle".to_string(),
99 AggregatorState::Ready(_) => "ready".to_string(),
100 AggregatorState::Signing(_) => "signing".to_string(),
101 }
102 }
103
104 pub async fn run(&mut self) -> Result<(), RuntimeError> {
106 info!(self.logger, "Launching State Machine");
107 let mut interval = tokio::time::interval(self.config.interval);
108
109 loop {
110 interval.tick().await;
111 let approximate_next_cycle_time = Local::now() + self.config.interval;
114
115 if let Err(e) = self.cycle().await {
116 e.write_to_log(&self.logger);
117 if e.is_critical() {
118 return Err(e);
119 }
120 }
121
122 info!(
123 self.logger, "… Cycle finished";
124 "approximate_next_cycle_time" => %approximate_next_cycle_time.time().format("%H:%M:%S%.3f"),
125 "run_interval_in_ms" => self.config.interval.as_millis(),
126 );
127 }
128 }
129
130 pub async fn cycle(&mut self) -> Result<(), RuntimeError> {
132 info!(
133 self.logger,
134 "================================================================================"
135 );
136 info!(self.logger, "New cycle: {}", self.state);
137
138 self.runner.increment_runtime_cycle_total_since_startup_counter();
139
140 match self.state.clone() {
141 AggregatorState::Idle(state) => {
142 let last_time_point = self.runner.get_time_point_from_chain().await.with_context(
143 || "AggregatorRuntime in the state IDLE can not get current time point from chain",
144 )?;
145
146 info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);
147
148 let can_try_transition_from_idle_to_ready = if self.config.is_follower {
149 self.runner
150 .is_follower_aggregator_at_same_epoch_as_leader(&last_time_point)
151 .await?
152 } else {
153 true
154 };
155 if can_try_transition_from_idle_to_ready {
156 self.try_transition_from_idle_to_ready(
157 state.current_time_point,
158 last_time_point.clone(),
159 )
160 .await?;
161 self.state = AggregatorState::Ready(ReadyState {
162 current_time_point: last_time_point,
163 });
164 }
165 }
166 AggregatorState::Ready(state) => {
167 let last_time_point: TimePoint = self
168 .runner
169 .get_time_point_from_chain()
170 .await
171 .with_context(|| {
172 "AggregatorRuntime in the state READY can not get current time point from chain"
173 })?;
174
175 if state.current_time_point.epoch < last_time_point.epoch {
176 info!(self.logger, "→ Epoch has changed, transitioning to IDLE"; "last_time_point" => ?last_time_point);
178 self.state = AggregatorState::Idle(IdleState {
179 current_time_point: Some(state.current_time_point),
180 });
181 } else if let Some(open_message) = self
182 .runner
183 .get_current_non_certified_open_message(&last_time_point)
184 .await
185 .with_context(|| "AggregatorRuntime can not get the current open message")?
186 {
187 info!(self.logger, "→ Transitioning to SIGNING");
189 let new_state = self
190 .transition_from_ready_to_signing(last_time_point.clone(), open_message.clone())
191 .await.with_context(|| format!("AggregatorRuntime can not perform a transition from READY state to SIGNING with entity_type: '{:?}'", open_message.signed_entity_type))?;
192 self.state = AggregatorState::Signing(new_state);
193 } else {
194 info!(
196 self.logger, " ⋅ No open message to certify, waiting…";
197 "time_point" => ?state.current_time_point
198 );
199 self.state = AggregatorState::Ready(ReadyState {
200 current_time_point: last_time_point,
201 });
202 }
203 }
204 AggregatorState::Signing(state) => {
205 let last_time_point: TimePoint =
206 self.runner.get_time_point_from_chain().await.with_context(|| {
207 "AggregatorRuntime in the state SIGNING can not get current time point from chain"
208 })?;
209
210 let is_outdated = self
211 .runner
212 .is_open_message_outdated(
213 state.open_message.signed_entity_type.clone(),
214 &last_time_point,
215 )
216 .await?;
217
218 if state.current_time_point.epoch < last_time_point.epoch {
219 info!(self.logger, "→ Epoch changed, transitioning to IDLE");
221 let new_state = self.transition_from_signing_to_idle(state).await?;
222 self.state = AggregatorState::Idle(new_state);
223 } else if is_outdated {
224 info!(
226 self.logger,
227 "→ Open message changed, transitioning to READY"
228 );
229 let new_state =
230 self.transition_from_signing_to_ready_new_open_message(state).await?;
231 self.state = AggregatorState::Ready(new_state);
232 } else {
233 let new_state =
235 self.transition_from_signing_to_ready_multisignature(state).await?;
236 info!(
237 self.logger,
238 "→ A multi-signature has been created, build an artifact & a certificate and transitioning back to READY"
239 );
240 self.state = AggregatorState::Ready(new_state);
241 }
242 }
243 }
244
245 self.runner.increment_runtime_cycle_success_since_startup_counter();
246
247 Ok(())
248 }
249
250 async fn try_transition_from_idle_to_ready(
253 &mut self,
254 maybe_current_time_point: Option<TimePoint>,
255 new_time_point: TimePoint,
256 ) -> Result<(), RuntimeError> {
257 trace!(self.logger, "Trying transition from IDLE to READY state");
258
259 if maybe_current_time_point.is_none()
260 || maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
261 {
262 self.runner.close_signer_registration_round().await?;
263 self.runner
264 .update_era_checker(new_time_point.epoch)
265 .await
266 .map_err(|e| RuntimeError::critical("transiting IDLE → READY", Some(e)))?;
267 self.runner.update_stake_distribution(&new_time_point).await?;
268 self.runner.inform_new_epoch(new_time_point.epoch).await?;
269 self.runner.upkeep(new_time_point.epoch).await?;
270 self.runner.open_signer_registration_round(&new_time_point).await?;
271 self.runner.update_epoch_settings().await?;
272 if self.config.is_follower {
273 self.runner
274 .synchronize_follower_aggregator_signer_registration()
275 .await?;
276 }
277 self.runner.precompute_epoch_data().await?;
278 }
279
280 let chain_validity_result = self
281 .runner
282 .is_certificate_chain_valid(&new_time_point)
283 .await
284 .map_err(|e| RuntimeError::KeepState {
285 message: "certificate chain is invalid".to_string(),
286 nested_error: e.into(),
287 });
288 if self.config.is_follower {
289 let force_sync = chain_validity_result.is_err();
290 self.runner
291 .synchronize_follower_aggregator_certificate_chain(force_sync)
292 .await?;
293 }
294 chain_validity_result?;
295
296 Ok(())
297 }
298
299 async fn transition_from_signing_to_ready_multisignature(
302 &self,
303 state: SigningState,
304 ) -> Result<ReadyState, RuntimeError> {
305 trace!(
306 self.logger,
307 "Launching transition from SIGNING to READY state"
308 );
309 let certificate = self
310 .runner
311 .create_certificate(&state.open_message.signed_entity_type)
312 .await?
313 .ok_or_else(|| RuntimeError::KeepState {
314 message: "not enough signature yet to create a certificate, waiting…".to_string(),
315 nested_error: None,
316 })?;
317 self.runner
318 .create_artifact(&state.open_message.signed_entity_type, &certificate)
319 .await
320 .map_err(|e| RuntimeError::ReInit {
321 message: "transiting SIGNING → READY: failed to create artifact. Retrying…"
322 .to_string(),
323 nested_error: Some(e),
324 })?;
325
326 Ok(ReadyState {
327 current_time_point: state.current_time_point,
328 })
329 }
330
331 async fn transition_from_signing_to_idle(
334 &self,
335 state: SigningState,
336 ) -> Result<IdleState, RuntimeError> {
337 trace!(
338 self.logger,
339 "Launching transition from SIGNING to IDLE state"
340 );
341
342 Ok(IdleState {
343 current_time_point: Some(state.current_time_point),
344 })
345 }
346
347 async fn transition_from_signing_to_ready_new_open_message(
350 &self,
351 state: SigningState,
352 ) -> Result<ReadyState, RuntimeError> {
353 trace!(
354 self.logger,
355 "Launching transition from SIGNING to READY state"
356 );
357
358 Ok(ReadyState {
359 current_time_point: state.current_time_point,
360 })
361 }
362
363 async fn transition_from_ready_to_signing(
366 &mut self,
367 new_time_point: TimePoint,
368 open_message: OpenMessage,
369 ) -> Result<SigningState, RuntimeError> {
370 trace!(
371 self.logger,
372 "Launching transition from READY to SIGNING state"
373 );
374
375 let state = SigningState {
376 current_time_point: new_time_point,
377 open_message,
378 };
379
380 Ok(state)
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use anyhow::anyhow;
387 use mockall::predicate;
388 use std::time::Duration;
389
390 use mithril_common::test::double::{Dummy, fake_data};
391
392 use crate::entities::OpenMessage;
393 use crate::test::TestLogger;
394
395 use super::super::runner::MockAggregatorRunner;
396 use super::*;
397
398 async fn init_runtime(
399 init_state: Option<AggregatorState>,
400 runner: MockAggregatorRunner,
401 is_follower: bool,
402 ) -> AggregatorRuntime {
403 AggregatorRuntime::new(
404 AggregatorConfig::new(Duration::from_millis(20), is_follower),
405 init_state,
406 Arc::new(runner),
407 TestLogger::stdout(),
408 )
409 .await
410 .unwrap()
411 }
412
413 mod leader {
414 use super::*;
415
416 #[tokio::test]
417 pub async fn idle_check_certificate_chain_is_not_valid() {
418 let mut runner = MockAggregatorRunner::new();
419 runner
420 .expect_get_time_point_from_chain()
421 .once()
422 .returning(|| Ok(TimePoint::dummy()));
423 runner
424 .expect_update_stake_distribution()
425 .with(predicate::eq(TimePoint::dummy()))
426 .once()
427 .returning(|_| Ok(()));
428 runner
429 .expect_close_signer_registration_round()
430 .once()
431 .returning(|| Ok(()));
432 runner
433 .expect_open_signer_registration_round()
434 .once()
435 .returning(|_| Ok(()));
436 runner
437 .expect_is_certificate_chain_valid()
438 .once()
439 .returning(|_| Err(anyhow!("error")));
440 runner
441 .expect_update_era_checker()
442 .with(predicate::eq(TimePoint::dummy().epoch))
443 .once()
444 .returning(|_| Ok(()));
445 runner
446 .expect_inform_new_epoch()
447 .with(predicate::eq(TimePoint::dummy().epoch))
448 .once()
449 .returning(|_| Ok(()));
450 runner.expect_update_epoch_settings().once().returning(|| Ok(()));
451 runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
452 runner
453 .expect_upkeep()
454 .with(predicate::eq(TimePoint::dummy().epoch))
455 .once()
456 .returning(|_| Ok(()));
457 runner
458 .expect_increment_runtime_cycle_total_since_startup_counter()
459 .once()
460 .returning(|| ());
461 runner
462 .expect_increment_runtime_cycle_success_since_startup_counter()
463 .never();
464
465 let mut runtime = init_runtime(
466 Some(AggregatorState::Idle(IdleState {
467 current_time_point: None,
468 })),
469 runner,
470 false,
471 )
472 .await;
473 let err = runtime.cycle().await.unwrap_err();
474 assert!(matches!(err, RuntimeError::KeepState { .. }));
475
476 assert_eq!("idle".to_string(), runtime.get_state());
477 }
478
479 #[tokio::test]
480 pub async fn idle_check_certificate_chain_is_valid() {
481 let mut runner = MockAggregatorRunner::new();
482 runner
483 .expect_get_time_point_from_chain()
484 .once()
485 .returning(|| Ok(TimePoint::dummy()));
486 runner
487 .expect_update_stake_distribution()
488 .with(predicate::eq(TimePoint::dummy()))
489 .once()
490 .returning(|_| Ok(()));
491 runner
492 .expect_close_signer_registration_round()
493 .once()
494 .returning(|| Ok(()));
495 runner
496 .expect_open_signer_registration_round()
497 .once()
498 .returning(|_| Ok(()));
499 runner
500 .expect_is_certificate_chain_valid()
501 .once()
502 .returning(|_| Ok(()));
503 runner
504 .expect_update_era_checker()
505 .with(predicate::eq(TimePoint::dummy().epoch))
506 .once()
507 .returning(|_| Ok(()));
508 runner
509 .expect_inform_new_epoch()
510 .with(predicate::eq(TimePoint::dummy().epoch))
511 .once()
512 .returning(|_| Ok(()));
513 runner.expect_update_epoch_settings().once().returning(|| Ok(()));
514 runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
515 runner
516 .expect_upkeep()
517 .with(predicate::eq(TimePoint::dummy().epoch))
518 .once()
519 .returning(|_| Ok(()));
520 runner
521 .expect_increment_runtime_cycle_total_since_startup_counter()
522 .once()
523 .returning(|| ());
524 runner
525 .expect_increment_runtime_cycle_success_since_startup_counter()
526 .once()
527 .returning(|| ());
528
529 let mut runtime = init_runtime(
530 Some(AggregatorState::Idle(IdleState {
531 current_time_point: None,
532 })),
533 runner,
534 false,
535 )
536 .await;
537 runtime.cycle().await.unwrap();
538
539 assert_eq!("ready".to_string(), runtime.get_state());
540 }
541
542 #[tokio::test]
543 pub async fn ready_new_epoch_detected() {
544 let mut runner = MockAggregatorRunner::new();
545 let time_point = TimePoint::dummy();
546 let new_time_point = TimePoint {
547 epoch: time_point.epoch + 1,
548 ..time_point.clone()
549 };
550 runner
551 .expect_get_time_point_from_chain()
552 .once()
553 .returning(move || Ok(new_time_point.clone()));
554 runner
555 .expect_increment_runtime_cycle_total_since_startup_counter()
556 .once()
557 .returning(|| ());
558 runner
559 .expect_increment_runtime_cycle_success_since_startup_counter()
560 .once()
561 .returning(|| ());
562 let mut runtime = init_runtime(
563 Some(AggregatorState::Ready(ReadyState {
564 current_time_point: time_point,
565 })),
566 runner,
567 false,
568 )
569 .await;
570 runtime.cycle().await.unwrap();
571
572 assert_eq!("idle".to_string(), runtime.get_state());
573 }
574
575 #[tokio::test]
576 pub async fn ready_open_message_not_exist() {
577 let mut runner = MockAggregatorRunner::new();
578 let time_point = TimePoint::dummy();
579 let next_time_point = TimePoint {
580 immutable_file_number: time_point.immutable_file_number + 1,
581 ..time_point.clone()
582 };
583 let expected_time_point = next_time_point.clone();
584 runner
585 .expect_get_time_point_from_chain()
586 .once()
587 .returning(move || Ok(next_time_point.clone()));
588 runner
589 .expect_get_current_non_certified_open_message()
590 .once()
591 .returning(|_| Ok(None));
592 runner
593 .expect_increment_runtime_cycle_total_since_startup_counter()
594 .once()
595 .returning(|| ());
596 runner
597 .expect_increment_runtime_cycle_success_since_startup_counter()
598 .once()
599 .returning(|| ());
600 let mut runtime = init_runtime(
601 Some(AggregatorState::Ready(ReadyState {
602 current_time_point: time_point.clone(),
603 })),
604 runner,
605 false,
606 )
607 .await;
608 runtime.cycle().await.unwrap();
609
610 assert_eq!("ready".to_string(), runtime.get_state());
611 assert_eq!(
612 AggregatorState::Ready(ReadyState {
613 current_time_point: expected_time_point,
614 }),
615 runtime.state
616 );
617 }
618
619 #[tokio::test]
620 pub async fn ready_certificate_does_not_exist_for_time_point() {
621 let mut runner = MockAggregatorRunner::new();
622 runner
623 .expect_get_time_point_from_chain()
624 .once()
625 .returning(|| Ok(TimePoint::dummy()));
626 runner
627 .expect_get_current_non_certified_open_message()
628 .once()
629 .returning(|_| {
630 let open_message = OpenMessage {
631 is_certified: false,
632 ..OpenMessage::dummy()
633 };
634 Ok(Some(open_message))
635 });
636 runner
637 .expect_increment_runtime_cycle_total_since_startup_counter()
638 .once()
639 .returning(|| ());
640 runner
641 .expect_increment_runtime_cycle_success_since_startup_counter()
642 .once()
643 .returning(|| ());
644
645 let mut runtime = init_runtime(
646 Some(AggregatorState::Ready(ReadyState {
647 current_time_point: TimePoint::dummy(),
648 })),
649 runner,
650 false,
651 )
652 .await;
653 runtime.cycle().await.unwrap();
654
655 assert_eq!("signing".to_string(), runtime.get_state());
656 }
657
658 #[tokio::test]
659 async fn signing_changing_open_message_to_ready() {
660 let mut runner = MockAggregatorRunner::new();
661 runner
662 .expect_get_time_point_from_chain()
663 .once()
664 .returning(|| Ok(TimePoint::dummy()));
665 runner
666 .expect_is_open_message_outdated()
667 .once()
668 .returning(|_, _| Ok(true));
669 runner
670 .expect_increment_runtime_cycle_total_since_startup_counter()
671 .once()
672 .returning(|| ());
673 runner
674 .expect_increment_runtime_cycle_success_since_startup_counter()
675 .once()
676 .returning(|| ());
677
678 let initial_state = AggregatorState::Signing(SigningState {
679 current_time_point: TimePoint::dummy(),
680 open_message: OpenMessage::dummy(),
681 });
682
683 let mut runtime = init_runtime(Some(initial_state), runner, false).await;
684 runtime.cycle().await.unwrap();
685
686 assert_eq!("ready".to_string(), runtime.get_state());
687 }
688
689 #[tokio::test]
690 async fn signing_certificate_is_not_created() {
691 let mut runner = MockAggregatorRunner::new();
692 runner
693 .expect_get_time_point_from_chain()
694 .once()
695 .returning(|| Ok(TimePoint::dummy()));
696 runner
697 .expect_is_open_message_outdated()
698 .once()
699 .returning(|_, _| Ok(false));
700 runner.expect_create_certificate().once().returning(|_| Ok(None));
701 runner
702 .expect_increment_runtime_cycle_total_since_startup_counter()
703 .once()
704 .returning(|| ());
705 runner
706 .expect_increment_runtime_cycle_success_since_startup_counter()
707 .never();
708 let state = SigningState {
709 current_time_point: TimePoint::dummy(),
710 open_message: OpenMessage::dummy(),
711 };
712 let mut runtime =
713 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
714 let err = runtime
715 .cycle()
716 .await
717 .expect_err("cycle should have returned an error");
718
719 match err {
720 RuntimeError::KeepState { .. } => (),
721 _ => panic!("KeepState error expected, got {err:?}."),
722 };
723
724 assert_eq!("signing".to_string(), runtime.get_state());
725 }
726
727 #[tokio::test]
728 async fn signing_artifact_not_created() {
729 let mut runner = MockAggregatorRunner::new();
730 runner
731 .expect_get_time_point_from_chain()
732 .once()
733 .returning(|| Ok(TimePoint::dummy()));
734 runner
735 .expect_is_open_message_outdated()
736 .once()
737 .returning(|_, _| Ok(false));
738 runner
739 .expect_create_certificate()
740 .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
741 runner
742 .expect_create_artifact()
743 .once()
744 .returning(|_, _| Err(anyhow!("whatever")));
745 runner
746 .expect_increment_runtime_cycle_total_since_startup_counter()
747 .once()
748 .returning(|| ());
749 runner
750 .expect_increment_runtime_cycle_success_since_startup_counter()
751 .never();
752 let state = SigningState {
753 current_time_point: TimePoint::dummy(),
754 open_message: OpenMessage::dummy(),
755 };
756 let mut runtime =
757 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
758 let err = runtime
759 .cycle()
760 .await
761 .expect_err("cycle should have returned an error");
762
763 match err {
764 RuntimeError::ReInit { .. } => (),
765 _ => panic!("ReInit error expected, got {err:?}."),
766 };
767
768 assert_eq!("signing".to_string(), runtime.get_state());
769 }
770
771 #[tokio::test]
772 async fn signing_certificate_is_created() {
773 let mut runner = MockAggregatorRunner::new();
774 runner
775 .expect_get_time_point_from_chain()
776 .once()
777 .returning(|| Ok(TimePoint::dummy()));
778 runner
779 .expect_is_open_message_outdated()
780 .once()
781 .returning(|_, _| Ok(false));
782 runner
783 .expect_create_certificate()
784 .return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
785 runner.expect_create_artifact().once().returning(|_, _| Ok(()));
786 runner
787 .expect_increment_runtime_cycle_total_since_startup_counter()
788 .once()
789 .returning(|| ());
790 runner
791 .expect_increment_runtime_cycle_success_since_startup_counter()
792 .once()
793 .returning(|| ());
794
795 let state = SigningState {
796 current_time_point: TimePoint::dummy(),
797 open_message: OpenMessage::dummy(),
798 };
799 let mut runtime =
800 init_runtime(Some(AggregatorState::Signing(state)), runner, false).await;
801 runtime.cycle().await.unwrap();
802
803 assert_eq!("ready".to_string(), runtime.get_state());
804 }
805
806 #[tokio::test]
807 pub async fn critical_error() {
808 let mut runner = MockAggregatorRunner::new();
809 runner
810 .expect_get_time_point_from_chain()
811 .once()
812 .returning(|| Ok(TimePoint::dummy()));
813 runner
814 .expect_update_era_checker()
815 .with(predicate::eq(TimePoint::dummy().epoch))
816 .once()
817 .returning(|_| Err(anyhow!("ERROR")));
818 runner
819 .expect_close_signer_registration_round()
820 .once()
821 .returning(|| Ok(()));
822 runner
823 .expect_increment_runtime_cycle_total_since_startup_counter()
824 .once()
825 .returning(|| ());
826 runner
827 .expect_increment_runtime_cycle_success_since_startup_counter()
828 .never();
829
830 let mut runtime = init_runtime(
831 Some(AggregatorState::Idle(IdleState {
832 current_time_point: None,
833 })),
834 runner,
835 false,
836 )
837 .await;
838 runtime.cycle().await.unwrap_err();
839
840 assert_eq!("idle".to_string(), runtime.get_state());
841 }
842 }
843
844 mod follower {
845 use mockall::predicate::eq;
846
847 use super::*;
848
849 #[tokio::test]
850 pub async fn idle_new_epoch_detected_and_leader_not_transitioned_to_epoch() {
851 let mut runner = MockAggregatorRunner::new();
852 let time_point = TimePoint::dummy();
853 let new_time_point = TimePoint {
854 epoch: time_point.epoch + 1,
855 ..time_point.clone()
856 };
857 runner
858 .expect_get_time_point_from_chain()
859 .once()
860 .returning(move || Ok(new_time_point.clone()));
861 runner
862 .expect_is_follower_aggregator_at_same_epoch_as_leader()
863 .once()
864 .returning(|_| Ok(false));
865 runner
866 .expect_increment_runtime_cycle_total_since_startup_counter()
867 .once()
868 .returning(|| ());
869 runner
870 .expect_increment_runtime_cycle_success_since_startup_counter()
871 .once()
872 .returning(|| ());
873 let mut runtime = init_runtime(
874 Some(AggregatorState::Idle(IdleState {
875 current_time_point: Some(time_point),
876 })),
877 runner,
878 true,
879 )
880 .await;
881 runtime.cycle().await.unwrap();
882
883 assert_eq!("idle".to_string(), runtime.get_state());
884 }
885
886 #[tokio::test]
887 pub async fn idle_new_epoch_detected_and_leader_has_transitioned_to_epoch() {
888 let mut runner = MockAggregatorRunner::new();
889 let time_point = TimePoint::dummy();
890 let new_time_point = TimePoint {
891 epoch: time_point.epoch + 1,
892 ..time_point.clone()
893 };
894 let new_time_point_clone = new_time_point.clone();
895 runner
896 .expect_get_time_point_from_chain()
897 .once()
898 .returning(move || Ok(new_time_point.clone()));
899 runner
900 .expect_is_follower_aggregator_at_same_epoch_as_leader()
901 .once()
902 .returning(|_| Ok(true));
903 runner
904 .expect_update_stake_distribution()
905 .with(predicate::eq(new_time_point_clone.clone()))
906 .once()
907 .returning(|_| Ok(()));
908 runner
909 .expect_close_signer_registration_round()
910 .once()
911 .returning(|| Ok(()));
912 runner
913 .expect_synchronize_follower_aggregator_signer_registration()
914 .once()
915 .returning(|| Ok(()));
916 runner
917 .expect_open_signer_registration_round()
918 .once()
919 .returning(|_| Ok(()));
920 runner
921 .expect_is_certificate_chain_valid()
922 .once()
923 .returning(|_| Ok(()));
924 runner
925 .expect_synchronize_follower_aggregator_certificate_chain()
926 .once()
927 .with(eq(false)) .returning(|_| Ok(()));
929 runner
930 .expect_update_era_checker()
931 .with(predicate::eq(new_time_point_clone.clone().epoch))
932 .once()
933 .returning(|_| Ok(()));
934 runner
935 .expect_inform_new_epoch()
936 .with(predicate::eq(new_time_point_clone.clone().epoch))
937 .once()
938 .returning(|_| Ok(()));
939 runner.expect_update_epoch_settings().once().returning(|| Ok(()));
940 runner.expect_precompute_epoch_data().once().returning(|| Ok(()));
941 runner
942 .expect_upkeep()
943 .with(predicate::eq(new_time_point_clone.clone().epoch))
944 .once()
945 .returning(|_| Ok(()));
946 runner
947 .expect_increment_runtime_cycle_total_since_startup_counter()
948 .once()
949 .returning(|| ());
950 runner
951 .expect_increment_runtime_cycle_success_since_startup_counter()
952 .once()
953 .returning(|| ());
954 let mut runtime = init_runtime(
955 Some(AggregatorState::Idle(IdleState {
956 current_time_point: Some(time_point),
957 })),
958 runner,
959 true,
960 )
961 .await;
962 runtime.cycle().await.unwrap();
963
964 assert_eq!("ready".to_string(), runtime.get_state());
965 }
966 }
967}