1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{debug, warn, Logger};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
8use mithril_common::logging::LoggerExtensions;
9use mithril_common::StdResult;
10use mithril_persistence::store::StakeStorer;
11
12use crate::entities::OpenMessage;
13use crate::DependencyContainer;
14
15#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18 pub interval: Duration,
20
21 pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26 pub fn new(interval: Duration, is_follower: bool) -> Self {
28 Self {
29 interval,
30 is_follower,
31 }
32 }
33}
34
35#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42 async fn get_current_open_message_for_signed_entity_type(
44 &self,
45 signed_entity_type: &SignedEntityType,
46 ) -> StdResult<Option<OpenMessage>>;
47
48 async fn get_current_non_certified_open_message(
50 &self,
51 current_time_point: &TimePoint,
52 ) -> StdResult<Option<OpenMessage>>;
53
54 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63 async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66 async fn is_follower_aggregator_at_same_epoch_as_leader(
68 &self,
69 time_point: &TimePoint,
70 ) -> StdResult<bool>;
71
72 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75 async fn update_epoch_settings(&self) -> StdResult<()>;
77
78 async fn compute_protocol_message(
80 &self,
81 signed_entity_type: &SignedEntityType,
82 ) -> StdResult<ProtocolMessage>;
83
84 async fn mark_open_message_if_expired(
86 &self,
87 signed_entity_type: &SignedEntityType,
88 ) -> StdResult<Option<OpenMessage>>;
89
90 async fn create_certificate(
92 &self,
93 signed_entity_type: &SignedEntityType,
94 ) -> StdResult<Option<Certificate>>;
95
96 async fn create_artifact(
98 &self,
99 signed_entity_type: &SignedEntityType,
100 certificate: &Certificate,
101 ) -> StdResult<()>;
102
103 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
105
106 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
108
109 async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
111
112 async fn precompute_epoch_data(&self) -> StdResult<()>;
114
115 async fn create_open_message(
117 &self,
118 signed_entity_type: &SignedEntityType,
119 protocol_message: &ProtocolMessage,
120 ) -> StdResult<OpenMessage>;
121
122 async fn is_open_message_outdated(
124 &self,
125 open_message_signed_entity_type: SignedEntityType,
126 last_time_point: &TimePoint,
127 ) -> StdResult<bool>;
128
129 fn increment_runtime_cycle_success_since_startup_counter(&self);
131
132 fn increment_runtime_cycle_total_since_startup_counter(&self);
134}
135
136pub struct AggregatorRunner {
139 dependencies: Arc<DependencyContainer>,
140 logger: Logger,
141}
142
143impl AggregatorRunner {
144 pub fn new(dependencies: Arc<DependencyContainer>) -> Self {
146 let logger = dependencies.root_logger.new_with_component_name::<Self>();
147 Self {
148 dependencies,
149 logger,
150 }
151 }
152
153 async fn list_available_signed_entity_types(
154 &self,
155 time_point: &TimePoint,
156 ) -> StdResult<Vec<SignedEntityType>> {
157 let signed_entity_types = self
158 .dependencies
159 .epoch_service
160 .read()
161 .await
162 .signed_entity_config()?
163 .list_allowed_signed_entity_types(time_point)?;
164 let unlocked_signed_entities = self
165 .dependencies
166 .signed_entity_type_lock
167 .filter_unlocked_entries(signed_entity_types)
168 .await;
169
170 Ok(unlocked_signed_entities)
171 }
172}
173
174#[cfg_attr(test, mockall::automock)]
175#[async_trait]
176impl AggregatorRunnerTrait for AggregatorRunner {
177 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
179 debug!(self.logger, ">> get_time_point_from_chain");
180 let time_point = self
181 .dependencies
182 .ticker_service
183 .get_current_time_point()
184 .await?;
185
186 Ok(time_point)
187 }
188
189 async fn get_current_open_message_for_signed_entity_type(
190 &self,
191 signed_entity_type: &SignedEntityType,
192 ) -> StdResult<Option<OpenMessage>> {
193 debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
194 self.mark_open_message_if_expired(signed_entity_type)
195 .await?;
196
197 Ok(self
198 .dependencies
199 .certifier_service
200 .get_open_message(signed_entity_type)
201 .await
202 .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
203 }
204
205 async fn get_current_non_certified_open_message(
206 &self,
207 current_time_point: &TimePoint,
208 ) -> StdResult<Option<OpenMessage>> {
209 debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
210 let signed_entity_types = self
211 .list_available_signed_entity_types(current_time_point)
212 .await?;
213
214 for signed_entity_type in signed_entity_types {
215 let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
216 .await
217 .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
218 match current_open_message {
219 None => {
220 let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
221 let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
222 .await
223 .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
224
225 return Ok(Some(open_message_new));
226 }
227 Some(open_message) => {
228 if !open_message.is_certified && !open_message.is_expired {
229 return Ok(Some(open_message));
230 }
231 }
232 }
233 }
234
235 Ok(None)
236 }
237
238 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
239 debug!(self.logger, ">> is_certificate_chain_valid");
240 self.dependencies
241 .certifier_service
242 .verify_certificate_chain(time_point.epoch)
243 .await?;
244
245 Ok(())
246 }
247
248 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
249 debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
250 self.dependencies
251 .stake_distribution_service
252 .update_stake_distribution()
253 .await
254 .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
255 }
256
257 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
258 debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
259 let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
260
261 let stakes = self
262 .dependencies
263 .stake_store
264 .get_stakes(registration_epoch)
265 .await?
266 .unwrap_or_default();
267
268 self.dependencies
269 .signer_registration_round_opener
270 .open_registration_round(registration_epoch, stakes)
271 .await
272 }
273
274 async fn close_signer_registration_round(&self) -> StdResult<()> {
275 debug!(self.logger, ">> close_signer_registration_round");
276 self.dependencies
277 .signer_registration_round_opener
278 .close_registration_round()
279 .await
280 }
281
282 async fn is_follower_aggregator_at_same_epoch_as_leader(
283 &self,
284 time_point: &TimePoint,
285 ) -> StdResult<bool> {
286 self.dependencies
287 .signer_synchronizer
288 .can_synchronize_signers(time_point.epoch)
289 .await
290 .map_err(|e| e.into())
291 }
292
293 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
294 self.dependencies
295 .signer_synchronizer
296 .synchronize_all_signers()
297 .await
298 .map_err(|e| e.into())
299 }
300
301 async fn update_epoch_settings(&self) -> StdResult<()> {
302 debug!(self.logger, ">> update_epoch_settings");
303 self.dependencies
304 .epoch_service
305 .write()
306 .await
307 .update_epoch_settings()
308 .await
309 }
310
311 async fn compute_protocol_message(
312 &self,
313 signed_entity_type: &SignedEntityType,
314 ) -> StdResult<ProtocolMessage> {
315 debug!(self.logger, ">> compute_protocol_message");
316 let protocol_message = self
317 .dependencies
318 .signable_builder_service
319 .compute_protocol_message(signed_entity_type.to_owned())
320 .await
321 .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
322
323 Ok(protocol_message)
324 }
325
326 async fn mark_open_message_if_expired(
327 &self,
328 signed_entity_type: &SignedEntityType,
329 ) -> StdResult<Option<OpenMessage>> {
330 debug!(self.logger, ">> mark_open_message_if_expired");
331 let expired_open_message = self
332 .dependencies
333 .certifier_service
334 .mark_open_message_if_expired(signed_entity_type)
335 .await
336 .with_context(|| "CertifierService can not mark expired open message")?;
337
338 debug!(
339 self.logger, "Marked expired open messages";
340 "expired_open_message" => ?expired_open_message
341 );
342
343 Ok(expired_open_message)
344 }
345
346 async fn create_certificate(
347 &self,
348 signed_entity_type: &SignedEntityType,
349 ) -> StdResult<Option<Certificate>> {
350 debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
351
352 let certificate = self.dependencies
353 .certifier_service
354 .create_certificate(signed_entity_type)
355 .await
356 .with_context(|| {
357 format!(
358 "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
359 )
360 })?;
361
362 if certificate.is_some() {
363 self.dependencies
364 .metrics_service
365 .get_certificate_total_produced_since_startup()
366 .increment();
367 }
368
369 Ok(certificate)
370 }
371
372 async fn create_artifact(
373 &self,
374 signed_entity_type: &SignedEntityType,
375 certificate: &Certificate,
376 ) -> StdResult<()> {
377 debug!(
378 self.logger, ">> create_artifact";
379 "signed_entity_type" => ?signed_entity_type,
380 "certificate_hash" => &certificate.hash
381 );
382
383 self.dependencies
384 .signed_entity_service
385 .create_artifact(signed_entity_type.to_owned(), certificate)
386 .await
387 .with_context(|| {
388 format!(
389 "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
390 certificate.hash
391 )
392 })?;
393
394 Ok(())
395 }
396
397 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
398 debug!(self.logger, ">> update_era_checker({epoch:?})");
399 let token = self
400 .dependencies
401 .era_reader
402 .read_era_epoch_token(epoch)
403 .await
404 .with_context(|| {
405 format!(
406 "EraReader can not get era epoch token for current epoch: '{}'",
407 epoch
408 )
409 })?;
410
411 let current_era = token
412 .get_current_supported_era()
413 .with_context(|| "EraEpochToken can not get current supported era")?;
414 self.dependencies
415 .era_checker
416 .change_era(current_era, token.get_current_epoch());
417 debug!(
418 self.logger,
419 "Current Era is {current_era} (Epoch {}).",
420 token.get_current_epoch()
421 );
422
423 if token.get_next_supported_era().is_err() {
424 let era_name = &token.get_next_era_marker().unwrap().name;
425 warn!(self.logger,"Upcoming Era '{era_name}' is not supported by this version of the software. Please update!");
426 }
427
428 Ok(())
429 }
430
431 async fn precompute_epoch_data(&self) -> StdResult<()> {
432 debug!(self.logger, ">> precompute_epoch_data");
433 self.dependencies
434 .epoch_service
435 .write()
436 .await
437 .precompute_epoch_data()
438 .await?;
439
440 Ok(())
441 }
442
443 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
444 debug!(self.logger, ">> inform_new_epoch({epoch:?})");
445 self.dependencies
446 .certifier_service
447 .inform_epoch(epoch)
448 .await?;
449
450 self.dependencies
451 .epoch_service
452 .write()
453 .await
454 .inform_epoch(epoch)
455 .await?;
456
457 Ok(())
458 }
459
460 async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
461 debug!(self.logger, ">> upkeep");
462 self.dependencies.upkeep_service.run(epoch).await
463 }
464
465 async fn create_open_message(
466 &self,
467 signed_entity_type: &SignedEntityType,
468 protocol_message: &ProtocolMessage,
469 ) -> StdResult<OpenMessage> {
470 debug!(self.logger, ">> create_open_message");
471 self.dependencies
472 .certifier_service
473 .create_open_message(signed_entity_type, protocol_message)
474 .await
475 }
476
477 async fn is_open_message_outdated(
478 &self,
479 open_message_signed_entity_type: SignedEntityType,
480 last_time_point: &TimePoint,
481 ) -> StdResult<bool> {
482 let current_open_message = self
483 .get_current_open_message_for_signed_entity_type(
484 &open_message_signed_entity_type,
485 )
486 .await
487 .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
488 let is_expired_open_message = current_open_message
489 .as_ref()
490 .map(|om| om.is_expired)
491 .unwrap_or(false);
492
493 let exists_newer_open_message = {
494 let new_signed_entity_type = self
495 .dependencies
496 .epoch_service
497 .read()
498 .await
499 .signed_entity_config()?
500 .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
501 new_signed_entity_type != open_message_signed_entity_type
502 };
503
504 Ok(exists_newer_open_message || is_expired_open_message)
505 }
506
507 fn increment_runtime_cycle_success_since_startup_counter(&self) {
508 self.dependencies
509 .metrics_service
510 .get_runtime_cycle_success_since_startup()
511 .increment();
512 }
513
514 fn increment_runtime_cycle_total_since_startup_counter(&self) {
515 self.dependencies
516 .metrics_service
517 .get_runtime_cycle_total_since_startup()
518 .increment();
519 }
520}
521
522#[cfg(test)]
523pub mod tests {
524 use crate::dependency_injection::DependenciesBuilder;
525 use crate::entities::AggregatorEpochSettings;
526 use crate::services::{FakeEpochService, FakeEpochServiceBuilder, MockUpkeepService};
527 use crate::{
528 entities::OpenMessage,
529 initialize_dependencies,
530 runtime::{AggregatorRunner, AggregatorRunnerTrait},
531 services::{MithrilStakeDistributionService, MockCertifierService},
532 Configuration, DependencyContainer, MithrilSignerRegistrationLeader,
533 SignerRegistrationRound,
534 };
535 use async_trait::async_trait;
536 use chrono::{DateTime, Utc};
537 use mithril_common::entities::{
538 CardanoTransactionsSigningConfig, ChainPoint, Epoch, SignedEntityConfig,
539 SignedEntityTypeDiscriminants,
540 };
541 use mithril_common::temp_dir;
542 use mithril_common::{
543 chain_observer::FakeObserver,
544 digesters::DumbImmutableFileObserver,
545 entities::{ProtocolMessage, SignedEntityType, StakeDistribution, TimePoint},
546 signable_builder::SignableBuilderService,
547 test_utils::{fake_data, MithrilFixtureBuilder},
548 MithrilTickerService, StdResult,
549 };
550 use mithril_persistence::store::StakeStorer;
551 use mithril_signed_entity_lock::SignedEntityTypeLock;
552 use mockall::predicate::eq;
553 use mockall::{mock, Sequence};
554 use std::path::PathBuf;
555 use std::sync::Arc;
556 use tokio::sync::RwLock;
557
558 mock! {
559 SignableBuilderServiceImpl { }
560
561 #[async_trait]
562 impl SignableBuilderService for SignableBuilderServiceImpl
563 {
564
565 async fn compute_protocol_message(
566 &self,
567 signed_entity_type: SignedEntityType,
568 ) -> StdResult<ProtocolMessage>;
569 }
570 }
571
572 async fn build_runner_with_fixture_data(deps: DependencyContainer) -> AggregatorRunner {
573 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
574 let current_epoch = deps
575 .chain_observer
576 .get_current_epoch()
577 .await
578 .unwrap()
579 .unwrap();
580 deps.init_state_from_fixture(
581 &fixture,
582 &CardanoTransactionsSigningConfig::dummy(),
583 &[
584 current_epoch.offset_to_signer_retrieval_epoch().unwrap(),
585 current_epoch,
586 current_epoch.next(),
587 ],
588 )
589 .await;
590
591 AggregatorRunner::new(Arc::new(deps))
592 }
593
594 async fn build_runner(
595 temp_dir: PathBuf,
596 mock_certifier_service: MockCertifierService,
597 ) -> AggregatorRunner {
598 let mut deps = initialize_dependencies(temp_dir).await;
599 deps.certifier_service = Arc::new(mock_certifier_service);
600
601 let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
602 mock_signable_builder_service
603 .expect_compute_protocol_message()
604 .return_once(|_| Ok(ProtocolMessage::default()));
605 deps.signable_builder_service = Arc::new(mock_signable_builder_service);
606
607 let runner = build_runner_with_fixture_data(deps).await;
608
609 let current_epoch = runner
610 .dependencies
611 .ticker_service
612 .get_current_epoch()
613 .await
614 .unwrap();
615 runner.inform_new_epoch(current_epoch).await.unwrap();
616 runner.precompute_epoch_data().await.unwrap();
617 runner
618 }
619
620 fn init_certifier_service_mock(
621 mock_certifier_service: &mut MockCertifierService,
622 messages: Vec<OpenMessage>,
623 ) {
624 for message in messages {
625 mock_certifier_service
626 .expect_get_open_message()
627 .return_once(|_| Ok(Some(message)))
628 .times(1);
629 }
630 mock_certifier_service
632 .expect_get_open_message()
633 .returning(|_| Ok(None));
634
635 mock_certifier_service
636 .expect_inform_epoch()
637 .return_once(|_| Ok(()));
638 mock_certifier_service
639 .expect_mark_open_message_if_expired()
640 .returning(|_| Ok(None));
641 }
642
643 fn create_open_message(is_certified: IsCertified, is_expired: IsExpired) -> OpenMessage {
644 OpenMessage {
645 signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(fake_data::beacon()),
646 is_certified: is_certified == IsCertified::Yes,
647 is_expired: is_expired == IsExpired::Yes,
648 ..OpenMessage::dummy()
649 }
650 }
651
652 #[derive(Eq, PartialEq)]
653 enum IsCertified {
654 Yes,
655 No,
656 }
657
658 #[derive(Eq, PartialEq)]
659 enum IsExpired {
660 Yes,
661 No,
662 }
663
664 #[tokio::test]
665 async fn test_get_time_point_from_chain() {
666 let expected = TimePoint::new(2, 17, ChainPoint::dummy());
667 let mut dependencies = initialize_dependencies!().await;
668 let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
669 immutable_file_observer
670 .shall_return(Some(expected.immutable_file_number))
671 .await;
672 let ticker_service = Arc::new(MithrilTickerService::new(
673 Arc::new(FakeObserver::new(Some(expected.clone()))),
674 immutable_file_observer,
675 ));
676 dependencies.ticker_service = ticker_service;
677 let runner = AggregatorRunner::new(Arc::new(dependencies));
678
679 let res = runner.get_time_point_from_chain().await;
681 assert_eq!(expected, res.unwrap());
682 }
683
684 #[tokio::test]
685 async fn test_update_stake_distribution() {
686 let mut deps = initialize_dependencies!().await;
687 let chain_observer = Arc::new(FakeObserver::default());
688 deps.chain_observer = chain_observer.clone();
689 deps.stake_distribution_service = Arc::new(MithrilStakeDistributionService::new(
690 deps.stake_store.clone(),
691 chain_observer.clone(),
692 ));
693 let deps = Arc::new(deps);
694 let runner = AggregatorRunner::new(deps.clone());
695 let time_point = runner.get_time_point_from_chain().await.unwrap();
696 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
697 let expected = fixture.stake_distribution();
698
699 chain_observer
700 .set_signers(fixture.signers_with_stake())
701 .await;
702 runner
703 .update_stake_distribution(&time_point)
704 .await
705 .expect("updating stake distribution should not return an error");
706
707 let saved_stake_distribution = deps
708 .stake_store
709 .get_stakes(time_point.epoch.offset_to_recording_epoch())
710 .await
711 .unwrap()
712 .unwrap_or_else(|| {
713 panic!(
714 "I should have a stake distribution for the epoch {:?}",
715 time_point.epoch
716 )
717 });
718
719 assert_eq!(expected, saved_stake_distribution);
720 }
721
722 #[tokio::test]
723 async fn test_open_signer_registration_round() {
724 let mut deps = initialize_dependencies!().await;
725 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
726 deps.verification_key_store.clone(),
727 deps.signer_recorder.clone(),
728 deps.signer_registration_verifier.clone(),
729 None,
730 ));
731 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
732 let stake_store = deps.stake_store.clone();
733 let deps = Arc::new(deps);
734 let runner = AggregatorRunner::new(deps.clone());
735
736 let time_point = TimePoint::dummy();
737 let recording_epoch = time_point.epoch.offset_to_recording_epoch();
738 let stake_distribution: StakeDistribution =
739 StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
740
741 stake_store
742 .save_stakes(recording_epoch, stake_distribution.clone())
743 .await
744 .expect("Save Stake distribution should not fail");
745
746 runner
747 .open_signer_registration_round(&time_point)
748 .await
749 .expect("opening signer registration should not return an error");
750
751 let saved_current_round = signer_registration_round_opener.get_current_round().await;
752
753 let expected_signer_registration_round =
754 SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
755
756 assert_eq!(
757 Some(expected_signer_registration_round),
758 saved_current_round,
759 );
760 }
761
762 #[tokio::test]
763 async fn test_close_signer_registration_round() {
764 let mut deps = initialize_dependencies!().await;
765 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
766 deps.verification_key_store.clone(),
767 deps.signer_recorder.clone(),
768 deps.signer_registration_verifier.clone(),
769 None,
770 ));
771 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
772 let deps = Arc::new(deps);
773 let runner = AggregatorRunner::new(deps.clone());
774
775 let time_point = TimePoint::dummy();
776 runner
777 .open_signer_registration_round(&time_point)
778 .await
779 .expect("opening signer registration should not return an error");
780
781 runner
782 .close_signer_registration_round()
783 .await
784 .expect("closing signer registration should not return an error");
785
786 let saved_current_round = signer_registration_round_opener.get_current_round().await;
787 assert!(saved_current_round.is_none());
788 }
789
790 #[tokio::test]
791 async fn test_expire_open_message() {
792 let open_message_expected = OpenMessage {
793 signed_entity_type: SignedEntityType::dummy(),
794 is_certified: false,
795 is_expired: false,
796 expires_at: Some(
797 DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
798 .unwrap()
799 .with_timezone(&Utc),
800 ),
801 ..OpenMessage::dummy()
802 };
803 let open_message_clone = open_message_expected.clone();
804
805 let mut mock_certifier_service = MockCertifierService::new();
806 mock_certifier_service
807 .expect_mark_open_message_if_expired()
808 .return_once(|_| Ok(Some(open_message_clone)));
809
810 let mut deps = initialize_dependencies!().await;
811 deps.certifier_service = Arc::new(mock_certifier_service);
812
813 let runner = build_runner_with_fixture_data(deps).await;
814 let open_message_expired = runner
815 .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
816 .await
817 .expect("mark_open_message_if_expired should not fail");
818
819 assert_eq!(Some(open_message_expected), open_message_expired);
820 }
821
822 #[tokio::test]
823 async fn test_update_era_checker() {
824 let deps = initialize_dependencies!().await;
825 let ticker_service = deps.ticker_service.clone();
826 let era_checker = deps.era_checker.clone();
827 let mut time_point = ticker_service.get_current_time_point().await.unwrap();
828
829 assert_eq!(time_point.epoch, era_checker.current_epoch());
830 let runner = AggregatorRunner::new(Arc::new(deps));
831 time_point.epoch += 1;
832
833 runner.update_era_checker(time_point.epoch).await.unwrap();
834 assert_eq!(time_point.epoch, era_checker.current_epoch());
835 }
836
837 #[tokio::test]
838 async fn test_inform_new_epoch() {
839 let mut mock_certifier_service = MockCertifierService::new();
840 mock_certifier_service
841 .expect_inform_epoch()
842 .returning(|_| Ok(()))
843 .times(1);
844 let mut deps = initialize_dependencies!().await;
845 let current_epoch = deps
846 .chain_observer
847 .get_current_epoch()
848 .await
849 .unwrap()
850 .unwrap();
851
852 deps.certifier_service = Arc::new(mock_certifier_service);
853 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
854 current_epoch,
855 &MithrilFixtureBuilder::default().build(),
856 )));
857
858 let runner = AggregatorRunner::new(Arc::new(deps));
859
860 runner.inform_new_epoch(current_epoch).await.unwrap();
861 }
862
863 #[tokio::test]
864 async fn test_upkeep_calls_run_on_upkeep_service() {
865 let mut upkeep_service = MockUpkeepService::new();
866 upkeep_service
867 .expect_run()
868 .with(eq(Epoch(5)))
869 .returning(|_| Ok(()))
870 .times(1);
871
872 let mut deps = initialize_dependencies!().await;
873 deps.upkeep_service = Arc::new(upkeep_service);
874
875 let runner = AggregatorRunner::new(Arc::new(deps));
876
877 runner.upkeep(Epoch(5)).await.unwrap();
878 }
879
880 #[tokio::test]
881 async fn test_update_epoch_settings() {
882 let mut mock_certifier_service = MockCertifierService::new();
883 mock_certifier_service
884 .expect_inform_epoch()
885 .returning(|_| Ok(()))
886 .times(1);
887
888 let config = Configuration::new_sample(temp_dir!());
889 let mut deps = DependenciesBuilder::new_with_stdout_logger(config.clone())
890 .build_dependency_container()
891 .await
892 .unwrap();
893 deps.certifier_service = Arc::new(mock_certifier_service);
894 let epoch_settings_storer = deps.epoch_settings_storer.clone();
895 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
896 let insert_epoch = current_epoch.offset_to_epoch_settings_recording_epoch();
897
898 let runner = build_runner_with_fixture_data(deps).await;
899 runner.inform_new_epoch(current_epoch).await.unwrap();
900 runner
901 .update_epoch_settings()
902 .await
903 .expect("update_epoch_settings should not fail");
904
905 let saved_epoch_settings = epoch_settings_storer
906 .get_epoch_settings(insert_epoch)
907 .await
908 .unwrap()
909 .unwrap_or_else(|| panic!("should have epoch settings for epoch {insert_epoch}",));
910
911 assert_eq!(
912 AggregatorEpochSettings {
913 protocol_parameters: config.protocol_parameters.clone(),
914 cardano_transactions_signing_config: config
915 .cardano_transactions_signing_config
916 .clone(),
917 },
918 saved_epoch_settings
919 );
920 }
921
922 #[tokio::test]
923 async fn test_precompute_epoch_data() {
924 let mut deps = initialize_dependencies!().await;
925 let current_epoch = deps
926 .chain_observer
927 .get_current_epoch()
928 .await
929 .unwrap()
930 .unwrap();
931
932 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
933 current_epoch,
934 &MithrilFixtureBuilder::default().build(),
935 )));
936
937 let runner = AggregatorRunner::new(Arc::new(deps));
938
939 runner.precompute_epoch_data().await.unwrap();
940 }
941
942 #[tokio::test]
943 async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists(
944 ) {
945 let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
946 let open_message_expected = open_message_created.clone();
947
948 let runner = {
949 let mut mock_certifier_service = MockCertifierService::new();
950 init_certifier_service_mock(&mut mock_certifier_service, vec![]);
951
952 mock_certifier_service
953 .expect_create_open_message()
954 .return_once(|_, _| Ok(open_message_created))
955 .times(1);
956 build_runner(temp_dir!(), mock_certifier_service).await
957 };
958
959 let open_message_returned = runner
960 .get_current_non_certified_open_message(&TimePoint::dummy())
961 .await
962 .unwrap();
963 assert_eq!(Some(open_message_expected), open_message_returned);
964 }
965
966 #[tokio::test]
967 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired(
968 ) {
969 let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
970
971 let open_message_expected = not_certified_and_not_expired.clone();
972
973 let runner = {
974 let mut mock_certifier_service = MockCertifierService::new();
975 init_certifier_service_mock(
976 &mut mock_certifier_service,
977 vec![not_certified_and_not_expired],
978 );
979
980 mock_certifier_service.expect_create_open_message().never();
981 build_runner(temp_dir!(), mock_certifier_service).await
982 };
983
984 let open_message_returned = runner
985 .get_current_non_certified_open_message(&TimePoint::dummy())
986 .await
987 .unwrap();
988
989 assert_eq!(Some(open_message_expected), open_message_returned);
990 }
991
992 #[tokio::test]
993 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified(
994 ) {
995 let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
996 let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
997
998 let open_message_expected = not_certified_and_not_expired.clone();
999
1000 let runner = {
1001 let mut mock_certifier_service = MockCertifierService::new();
1002 init_certifier_service_mock(
1003 &mut mock_certifier_service,
1004 vec![certified_and_not_expired, not_certified_and_not_expired],
1005 );
1006
1007 mock_certifier_service.expect_create_open_message().never();
1008 build_runner(temp_dir!(), mock_certifier_service).await
1009 };
1010
1011 let open_message_returned = runner
1012 .get_current_non_certified_open_message(&TimePoint::dummy())
1013 .await
1014 .unwrap();
1015
1016 assert_eq!(Some(open_message_expected), open_message_returned);
1017 }
1018
1019 #[tokio::test]
1020 async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified(
1021 ) {
1022 let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
1023
1024 let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
1025 let open_message_expected = open_message_created.clone();
1026
1027 let runner = {
1028 let mut mock_certifier_service = MockCertifierService::new();
1029 init_certifier_service_mock(
1030 &mut mock_certifier_service,
1031 vec![certified_and_not_expired],
1032 );
1033
1034 mock_certifier_service
1035 .expect_create_open_message()
1036 .return_once(|_, _| Ok(open_message_created))
1037 .times(1);
1038 build_runner(temp_dir!(), mock_certifier_service).await
1039 };
1040
1041 let open_message_returned = runner
1042 .get_current_non_certified_open_message(&TimePoint::dummy())
1043 .await
1044 .unwrap();
1045
1046 assert_eq!(Some(open_message_expected), open_message_returned);
1047 }
1048
1049 #[tokio::test]
1050 async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified(
1051 ) {
1052 let certified_and_not_expired_1 = create_open_message(IsCertified::Yes, IsExpired::No);
1053 let certified_and_not_expired_2 = create_open_message(IsCertified::Yes, IsExpired::No);
1054
1055 let runner = {
1056 let mut mock_certifier_service = MockCertifierService::new();
1057 init_certifier_service_mock(
1058 &mut mock_certifier_service,
1059 vec![certified_and_not_expired_1, certified_and_not_expired_2],
1060 );
1061
1062 mock_certifier_service.expect_create_open_message().never();
1063 build_runner(temp_dir!(), mock_certifier_service).await
1064 };
1065
1066 let open_message_returned = runner
1067 .get_current_non_certified_open_message(&TimePoint::dummy())
1068 .await
1069 .unwrap();
1070
1071 assert!(open_message_returned.is_none());
1072 }
1073
1074 #[tokio::test]
1075 async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message(
1076 ) {
1077 let not_certified_and_expired = create_open_message(IsCertified::No, IsExpired::Yes);
1078 let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
1079
1080 let open_message_expected = not_certified_and_not_expired.clone();
1081
1082 let runner = {
1083 let mut mock_certifier_service = MockCertifierService::new();
1084 init_certifier_service_mock(
1085 &mut mock_certifier_service,
1086 vec![not_certified_and_expired, not_certified_and_not_expired],
1087 );
1088
1089 mock_certifier_service.expect_create_open_message().never();
1090 build_runner(temp_dir!(), mock_certifier_service).await
1091 };
1092
1093 let open_message_returned = runner
1094 .get_current_non_certified_open_message(&TimePoint::dummy())
1095 .await
1096 .unwrap();
1097
1098 assert_eq!(Some(open_message_expected), open_message_returned);
1099 }
1100
1101 #[tokio::test]
1102 async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution_and_then_for_immutable_file(
1103 ) {
1104 let mut mock_certifier_service = MockCertifierService::new();
1105
1106 let mut seq = Sequence::new();
1107 mock_certifier_service
1108 .expect_get_open_message()
1109 .with(eq(SignedEntityType::MithrilStakeDistribution(
1110 TimePoint::dummy().epoch,
1111 )))
1112 .times(1)
1113 .in_sequence(&mut seq)
1114 .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1115
1116 mock_certifier_service
1117 .expect_get_open_message()
1118 .with(eq(SignedEntityType::CardanoImmutableFilesFull(
1119 fake_data::beacon(),
1120 )))
1121 .times(1)
1122 .in_sequence(&mut seq)
1123 .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1124
1125 mock_certifier_service.expect_create_open_message().never();
1126
1127 mock_certifier_service
1128 .expect_inform_epoch()
1129 .return_once(|_| Ok(()));
1130 mock_certifier_service
1131 .expect_mark_open_message_if_expired()
1132 .returning(|_| Ok(None));
1133
1134 let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1135
1136 runner
1137 .get_current_non_certified_open_message(&TimePoint::dummy())
1138 .await
1139 .unwrap();
1140 }
1141
1142 #[tokio::test]
1143 async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1144 let runner = {
1145 let mut dependencies = initialize_dependencies!().await;
1146 let epoch_service = FakeEpochServiceBuilder {
1147 signed_entity_config: SignedEntityConfig {
1148 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1149 ..SignedEntityConfig::dummy()
1150 },
1151 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1152 }
1153 .build();
1154 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1155 dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1156 AggregatorRunner::new(Arc::new(dependencies))
1157 };
1158
1159 let time_point = TimePoint::dummy();
1160 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1161 .list_available_signed_entity_types(&time_point)
1162 .await
1163 .unwrap()
1164 .into_iter()
1165 .map(Into::into)
1166 .collect();
1167
1168 assert_eq!(
1169 signed_entities,
1170 SignedEntityTypeDiscriminants::all()
1171 .into_iter()
1172 .collect::<Vec<_>>()
1173 );
1174 }
1175
1176 #[tokio::test]
1177 async fn list_available_signed_entity_types_exclude_locked_entities() {
1178 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1179 let runner = {
1180 let mut dependencies = initialize_dependencies!().await;
1181 dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1182 let epoch_service = FakeEpochServiceBuilder {
1183 signed_entity_config: SignedEntityConfig {
1184 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1185 ..SignedEntityConfig::dummy()
1186 },
1187 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1188 }
1189 .build();
1190 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1191
1192 AggregatorRunner::new(Arc::new(dependencies))
1193 };
1194
1195 signed_entity_type_lock
1196 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1197 .await;
1198
1199 let time_point = TimePoint::dummy();
1200 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1201 .list_available_signed_entity_types(&time_point)
1202 .await
1203 .unwrap()
1204 .into_iter()
1205 .map(Into::into)
1206 .collect();
1207
1208 assert!(!signed_entities.is_empty());
1209 assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1210 }
1211
1212 #[tokio::test]
1213 async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message(
1214 ) {
1215 assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1216 }
1217
1218 #[tokio::test]
1219 async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message(
1220 ) {
1221 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1222 }
1223
1224 #[tokio::test]
1225 async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message(
1226 ) {
1227 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1228 }
1229
1230 #[tokio::test]
1231 async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message(
1232 ) {
1233 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1234 }
1235
1236 async fn is_outdated_returned_when(
1237 tmp_path: PathBuf,
1238 is_expired: IsExpired,
1239 newer_open_message: bool,
1240 ) -> bool {
1241 let current_time_point = TimePoint {
1242 epoch: Epoch(2),
1243 ..TimePoint::dummy()
1244 };
1245
1246 let message_epoch = if newer_open_message {
1247 current_time_point.epoch + 54
1248 } else {
1249 current_time_point.epoch
1250 };
1251 let open_message_to_verify = OpenMessage {
1252 signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1253 is_expired: is_expired == IsExpired::Yes,
1254 ..OpenMessage::dummy()
1255 };
1256
1257 let runner = {
1258 let mut deps = initialize_dependencies(tmp_path).await;
1259 let mut mock_certifier_service = MockCertifierService::new();
1260
1261 let open_message_current = open_message_to_verify.clone();
1262 mock_certifier_service
1263 .expect_get_open_message()
1264 .times(1)
1265 .return_once(|_| Ok(Some(open_message_current)));
1266 mock_certifier_service
1267 .expect_mark_open_message_if_expired()
1268 .returning(|_| Ok(None));
1269
1270 deps.certifier_service = Arc::new(mock_certifier_service);
1271
1272 let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1273 deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1274
1275 build_runner_with_fixture_data(deps).await
1276 };
1277
1278 runner
1279 .is_open_message_outdated(
1280 open_message_to_verify.signed_entity_type,
1281 ¤t_time_point,
1282 )
1283 .await
1284 .unwrap()
1285 }
1286}