1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{Logger, debug, warn};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::StdResult;
8use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
9use mithril_common::logging::LoggerExtensions;
10use mithril_persistence::store::StakeStorer;
11
12use crate::ServeCommandDependenciesContainer;
13use crate::entities::OpenMessage;
14
15#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18 pub interval: Duration,
20
21 pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26 pub fn new(interval: Duration, is_follower: bool) -> Self {
28 Self {
29 interval,
30 is_follower,
31 }
32 }
33}
34
35#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42 async fn get_current_open_message_for_signed_entity_type(
44 &self,
45 signed_entity_type: &SignedEntityType,
46 ) -> StdResult<Option<OpenMessage>>;
47
48 async fn get_current_non_certified_open_message(
50 &self,
51 current_time_point: &TimePoint,
52 ) -> StdResult<Option<OpenMessage>>;
53
54 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63 async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66 async fn is_follower_aggregator_at_same_epoch_as_leader(
68 &self,
69 time_point: &TimePoint,
70 ) -> StdResult<bool>;
71
72 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75 async fn update_epoch_settings(&self) -> StdResult<()>;
77
78 async fn compute_protocol_message(
80 &self,
81 signed_entity_type: &SignedEntityType,
82 ) -> StdResult<ProtocolMessage>;
83
84 async fn mark_open_message_if_expired(
86 &self,
87 signed_entity_type: &SignedEntityType,
88 ) -> StdResult<Option<OpenMessage>>;
89
90 async fn create_certificate(
92 &self,
93 signed_entity_type: &SignedEntityType,
94 ) -> StdResult<Option<Certificate>>;
95
96 async fn create_artifact(
98 &self,
99 signed_entity_type: &SignedEntityType,
100 certificate: &Certificate,
101 ) -> StdResult<()>;
102
103 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
105
106 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
108
109 async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
111
112 async fn precompute_epoch_data(&self) -> StdResult<()>;
114
115 async fn create_open_message(
117 &self,
118 signed_entity_type: &SignedEntityType,
119 protocol_message: &ProtocolMessage,
120 ) -> StdResult<OpenMessage>;
121
122 async fn is_open_message_outdated(
124 &self,
125 open_message_signed_entity_type: SignedEntityType,
126 last_time_point: &TimePoint,
127 ) -> StdResult<bool>;
128
129 fn increment_runtime_cycle_success_since_startup_counter(&self);
131
132 fn increment_runtime_cycle_total_since_startup_counter(&self);
134}
135
136pub struct AggregatorRunner {
139 dependencies: Arc<ServeCommandDependenciesContainer>,
140 logger: Logger,
141}
142
143impl AggregatorRunner {
144 pub fn new(dependencies: Arc<ServeCommandDependenciesContainer>) -> Self {
146 let logger = dependencies.root_logger.new_with_component_name::<Self>();
147 Self {
148 dependencies,
149 logger,
150 }
151 }
152
153 async fn list_available_signed_entity_types(
154 &self,
155 time_point: &TimePoint,
156 ) -> StdResult<Vec<SignedEntityType>> {
157 let signed_entity_types = self
158 .dependencies
159 .epoch_service
160 .read()
161 .await
162 .signed_entity_config()?
163 .list_allowed_signed_entity_types(time_point)?;
164 let unlocked_signed_entities = self
165 .dependencies
166 .signed_entity_type_lock
167 .filter_unlocked_entries(signed_entity_types)
168 .await;
169
170 Ok(unlocked_signed_entities)
171 }
172}
173
174#[cfg_attr(test, mockall::automock)]
175#[async_trait]
176impl AggregatorRunnerTrait for AggregatorRunner {
177 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
179 debug!(self.logger, ">> get_time_point_from_chain");
180 let time_point = self.dependencies.ticker_service.get_current_time_point().await?;
181
182 Ok(time_point)
183 }
184
185 async fn get_current_open_message_for_signed_entity_type(
186 &self,
187 signed_entity_type: &SignedEntityType,
188 ) -> StdResult<Option<OpenMessage>> {
189 debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
190 self.mark_open_message_if_expired(signed_entity_type).await?;
191
192 Ok(self
193 .dependencies
194 .certifier_service
195 .get_open_message(signed_entity_type)
196 .await
197 .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
198 }
199
200 async fn get_current_non_certified_open_message(
201 &self,
202 current_time_point: &TimePoint,
203 ) -> StdResult<Option<OpenMessage>> {
204 debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
205 let signed_entity_types =
206 self.list_available_signed_entity_types(current_time_point).await?;
207
208 for signed_entity_type in signed_entity_types {
209 let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
210 .await
211 .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
212 match current_open_message {
213 None => {
214 let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
215 let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
216 .await
217 .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
218
219 return Ok(Some(open_message_new));
220 }
221 Some(open_message) => {
222 if !open_message.is_certified && !open_message.is_expired {
223 return Ok(Some(open_message));
224 }
225 }
226 }
227 }
228
229 Ok(None)
230 }
231
232 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
233 debug!(self.logger, ">> is_certificate_chain_valid");
234 self.dependencies
235 .certifier_service
236 .verify_certificate_chain(time_point.epoch)
237 .await?;
238
239 Ok(())
240 }
241
242 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
243 debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
244 self.dependencies
245 .stake_distribution_service
246 .update_stake_distribution()
247 .await
248 .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
249 }
250
251 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
252 debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
253 let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
254
255 let stakes = self
256 .dependencies
257 .stake_store
258 .get_stakes(registration_epoch)
259 .await?
260 .unwrap_or_default();
261
262 self.dependencies
263 .signer_registration_round_opener
264 .open_registration_round(registration_epoch, stakes)
265 .await
266 }
267
268 async fn close_signer_registration_round(&self) -> StdResult<()> {
269 debug!(self.logger, ">> close_signer_registration_round");
270 self.dependencies
271 .signer_registration_round_opener
272 .close_registration_round()
273 .await
274 }
275
276 async fn is_follower_aggregator_at_same_epoch_as_leader(
277 &self,
278 time_point: &TimePoint,
279 ) -> StdResult<bool> {
280 self.dependencies
281 .signer_synchronizer
282 .can_synchronize_signers(time_point.epoch)
283 .await
284 .map_err(|e| e.into())
285 }
286
287 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
288 self.dependencies
289 .signer_synchronizer
290 .synchronize_all_signers()
291 .await
292 .map_err(|e| e.into())
293 }
294
295 async fn update_epoch_settings(&self) -> StdResult<()> {
296 debug!(self.logger, ">> update_epoch_settings");
297 self.dependencies
298 .epoch_service
299 .write()
300 .await
301 .update_epoch_settings()
302 .await
303 }
304
305 async fn compute_protocol_message(
306 &self,
307 signed_entity_type: &SignedEntityType,
308 ) -> StdResult<ProtocolMessage> {
309 debug!(self.logger, ">> compute_protocol_message");
310 let protocol_message = self
311 .dependencies
312 .signable_builder_service
313 .compute_protocol_message(signed_entity_type.to_owned())
314 .await
315 .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
316
317 Ok(protocol_message)
318 }
319
320 async fn mark_open_message_if_expired(
321 &self,
322 signed_entity_type: &SignedEntityType,
323 ) -> StdResult<Option<OpenMessage>> {
324 debug!(self.logger, ">> mark_open_message_if_expired");
325 let expired_open_message = self
326 .dependencies
327 .certifier_service
328 .mark_open_message_if_expired(signed_entity_type)
329 .await
330 .with_context(|| "CertifierService can not mark expired open message")?;
331
332 debug!(
333 self.logger, "Marked expired open messages";
334 "expired_open_message" => ?expired_open_message
335 );
336
337 Ok(expired_open_message)
338 }
339
340 async fn create_certificate(
341 &self,
342 signed_entity_type: &SignedEntityType,
343 ) -> StdResult<Option<Certificate>> {
344 debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
345
346 let certificate = self.dependencies
347 .certifier_service
348 .create_certificate(signed_entity_type)
349 .await
350 .with_context(|| {
351 format!(
352 "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
353 )
354 })?;
355
356 if certificate.is_some() {
357 self.dependencies
358 .metrics_service
359 .get_certificate_total_produced_since_startup()
360 .increment();
361 }
362
363 Ok(certificate)
364 }
365
366 async fn create_artifact(
367 &self,
368 signed_entity_type: &SignedEntityType,
369 certificate: &Certificate,
370 ) -> StdResult<()> {
371 debug!(
372 self.logger, ">> create_artifact";
373 "signed_entity_type" => ?signed_entity_type,
374 "certificate_hash" => &certificate.hash
375 );
376
377 self.dependencies
378 .signed_entity_service
379 .create_artifact(signed_entity_type.to_owned(), certificate)
380 .await
381 .with_context(|| {
382 format!(
383 "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
384 certificate.hash
385 )
386 })?;
387
388 Ok(())
389 }
390
391 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
392 debug!(self.logger, ">> update_era_checker({epoch:?})");
393 let token = self
394 .dependencies
395 .era_reader
396 .read_era_epoch_token(epoch)
397 .await
398 .with_context(|| {
399 format!("EraReader can not get era epoch token for current epoch: '{epoch}'")
400 })?;
401
402 let current_era = token
403 .get_current_supported_era()
404 .with_context(|| "EraEpochToken can not get current supported era")?;
405 self.dependencies
406 .era_checker
407 .change_era(current_era, token.get_current_epoch());
408 debug!(
409 self.logger,
410 "Current Era is {current_era} (Epoch {}).",
411 token.get_current_epoch()
412 );
413
414 if token.get_next_supported_era().is_err() {
415 let era_name = &token.get_next_era_marker().unwrap().name;
416 warn!(
417 self.logger,
418 "Upcoming Era '{era_name}' is not supported by this version of the software. Please update!"
419 );
420 }
421
422 Ok(())
423 }
424
425 async fn precompute_epoch_data(&self) -> StdResult<()> {
426 debug!(self.logger, ">> precompute_epoch_data");
427 self.dependencies
428 .epoch_service
429 .write()
430 .await
431 .precompute_epoch_data()
432 .await?;
433
434 Ok(())
435 }
436
437 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
438 debug!(self.logger, ">> inform_new_epoch({epoch:?})");
439 self.dependencies.certifier_service.inform_epoch(epoch).await?;
440
441 self.dependencies
442 .epoch_service
443 .write()
444 .await
445 .inform_epoch(epoch)
446 .await?;
447
448 Ok(())
449 }
450
451 async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
452 debug!(self.logger, ">> upkeep");
453 self.dependencies.upkeep_service.run(epoch).await
454 }
455
456 async fn create_open_message(
457 &self,
458 signed_entity_type: &SignedEntityType,
459 protocol_message: &ProtocolMessage,
460 ) -> StdResult<OpenMessage> {
461 debug!(self.logger, ">> create_open_message");
462 self.dependencies
463 .certifier_service
464 .create_open_message(signed_entity_type, protocol_message)
465 .await
466 }
467
468 async fn is_open_message_outdated(
469 &self,
470 open_message_signed_entity_type: SignedEntityType,
471 last_time_point: &TimePoint,
472 ) -> StdResult<bool> {
473 let current_open_message = self
474 .get_current_open_message_for_signed_entity_type(
475 &open_message_signed_entity_type,
476 )
477 .await
478 .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
479 let is_expired_open_message =
480 current_open_message.as_ref().map(|om| om.is_expired).unwrap_or(false);
481
482 let exists_newer_open_message = {
483 let new_signed_entity_type = self
484 .dependencies
485 .epoch_service
486 .read()
487 .await
488 .signed_entity_config()?
489 .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
490 new_signed_entity_type != open_message_signed_entity_type
491 };
492
493 Ok(exists_newer_open_message || is_expired_open_message)
494 }
495
496 fn increment_runtime_cycle_success_since_startup_counter(&self) {
497 self.dependencies
498 .metrics_service
499 .get_runtime_cycle_success_since_startup()
500 .increment();
501 }
502
503 fn increment_runtime_cycle_total_since_startup_counter(&self) {
504 self.dependencies
505 .metrics_service
506 .get_runtime_cycle_total_since_startup()
507 .increment();
508 }
509}
510
511#[cfg(test)]
512pub mod tests {
513 use async_trait::async_trait;
514 use chrono::{DateTime, Utc};
515 use mockall::mock;
516 use mockall::predicate::eq;
517 use std::path::PathBuf;
518 use std::sync::Arc;
519 use tokio::sync::RwLock;
520
521 use mithril_cardano_node_chain::test::double::FakeChainObserver;
522 use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
523 use mithril_common::{
524 StdResult,
525 entities::{
526 CardanoTransactionsSigningConfig, ChainPoint, Epoch, ProtocolMessage,
527 SignedEntityConfig, SignedEntityType, SignedEntityTypeDiscriminants, StakeDistribution,
528 TimePoint,
529 },
530 signable_builder::SignableBuilderService,
531 temp_dir,
532 test_utils::{MithrilFixtureBuilder, fake_data},
533 };
534 use mithril_persistence::store::StakeStorer;
535 use mithril_signed_entity_lock::SignedEntityTypeLock;
536 use mithril_ticker::MithrilTickerService;
537
538 use crate::{
539 MithrilSignerRegistrationLeader, ServeCommandConfiguration,
540 ServeCommandDependenciesContainer, SignerRegistrationRound,
541 dependency_injection::DependenciesBuilder,
542 entities::{AggregatorEpochSettings, OpenMessage},
543 initialize_dependencies,
544 runtime::{AggregatorRunner, AggregatorRunnerTrait},
545 services::{
546 FakeEpochService, FakeEpochServiceBuilder, MithrilStakeDistributionService,
547 MockCertifierService, MockUpkeepService,
548 },
549 };
550
551 mock! {
552 SignableBuilderServiceImpl { }
553
554 #[async_trait]
555 impl SignableBuilderService for SignableBuilderServiceImpl
556 {
557
558 async fn compute_protocol_message(
559 &self,
560 signed_entity_type: SignedEntityType,
561 ) -> StdResult<ProtocolMessage>;
562 }
563 }
564
565 async fn build_runner_with_fixture_data(
566 deps: ServeCommandDependenciesContainer,
567 ) -> AggregatorRunner {
568 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
569 let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
570 deps.init_state_from_fixture(
571 &fixture,
572 &CardanoTransactionsSigningConfig::dummy(),
573 &[
574 current_epoch.offset_to_signer_retrieval_epoch().unwrap(),
575 current_epoch,
576 current_epoch.next(),
577 ],
578 )
579 .await;
580
581 AggregatorRunner::new(Arc::new(deps))
582 }
583
584 async fn build_runner(
585 temp_dir: PathBuf,
586 mock_certifier_service: MockCertifierService,
587 ) -> AggregatorRunner {
588 build_runner_with_discriminants(temp_dir, mock_certifier_service, vec![]).await
589 }
590
591 async fn build_runner_with_discriminants(
592 temp_dir: PathBuf,
593 mock_certifier_service: MockCertifierService,
594 allowed_discriminants: Vec<SignedEntityTypeDiscriminants>,
595 ) -> AggregatorRunner {
596 let mut deps = initialize_dependencies(temp_dir).await;
597 deps.certifier_service = Arc::new(mock_certifier_service);
598
599 let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
600 mock_signable_builder_service
601 .expect_compute_protocol_message()
602 .return_once(|_| Ok(ProtocolMessage::default()));
603 deps.signable_builder_service = Arc::new(mock_signable_builder_service);
604
605 if !allowed_discriminants.is_empty() {
607 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
608 let epoch_service = FakeEpochServiceBuilder {
609 signed_entity_config: SignedEntityConfig {
610 allowed_discriminants: allowed_discriminants.into_iter().collect(),
611 ..SignedEntityConfig::dummy()
612 },
613 ..FakeEpochServiceBuilder::dummy(current_epoch)
614 }
615 .build();
616 deps.epoch_service = Arc::new(RwLock::new(epoch_service));
617 }
618
619 let runner = build_runner_with_fixture_data(deps).await;
620
621 let current_epoch = runner.dependencies.ticker_service.get_current_epoch().await.unwrap();
622 runner.inform_new_epoch(current_epoch).await.unwrap();
623 runner.precompute_epoch_data().await.unwrap();
624 runner
625 }
626
627 fn init_certifier_service_mock(
628 mock_certifier_service: &mut MockCertifierService,
629 messages: Vec<OpenMessage>,
630 ) {
631 for message in messages {
632 mock_certifier_service
633 .expect_get_open_message()
634 .return_once(|_| Ok(Some(message)))
635 .times(1);
636 }
637 mock_certifier_service
639 .expect_get_open_message()
640 .returning(|_| Ok(None));
641
642 mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
643 mock_certifier_service
644 .expect_mark_open_message_if_expired()
645 .returning(|_| Ok(None));
646 }
647
648 fn create_open_message(
649 is_certified: IsCertified,
650 is_expired: IsExpired,
651 signed_entity_type: SignedEntityType,
652 ) -> OpenMessage {
653 OpenMessage {
654 signed_entity_type,
655 is_certified: is_certified == IsCertified::Yes,
656 is_expired: is_expired == IsExpired::Yes,
657 ..OpenMessage::dummy()
658 }
659 }
660
661 #[derive(Eq, PartialEq)]
662 enum IsCertified {
663 Yes,
664 No,
665 }
666
667 #[derive(Eq, PartialEq)]
668 enum IsExpired {
669 Yes,
670 No,
671 }
672
673 #[tokio::test]
674 async fn test_get_time_point_from_chain() {
675 let expected = TimePoint::new(2, 17, ChainPoint::dummy());
676 let mut dependencies = initialize_dependencies!().await;
677 let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
678 immutable_file_observer
679 .shall_return(Some(expected.immutable_file_number))
680 .await;
681 let ticker_service = Arc::new(MithrilTickerService::new(
682 Arc::new(FakeChainObserver::new(Some(expected.clone()))),
683 immutable_file_observer,
684 ));
685 dependencies.ticker_service = ticker_service;
686 let runner = AggregatorRunner::new(Arc::new(dependencies));
687
688 let res = runner.get_time_point_from_chain().await;
690 assert_eq!(expected, res.unwrap());
691 }
692
693 #[tokio::test]
694 async fn test_update_stake_distribution() {
695 let chain_observer = Arc::new(FakeChainObserver::default());
696 let deps = {
697 let mut deps = initialize_dependencies!().await;
698 deps.chain_observer = chain_observer.clone();
699 deps.stake_distribution_service = Arc::new(MithrilStakeDistributionService::new(
700 deps.stake_store.clone(),
701 chain_observer.clone(),
702 ));
703 Arc::new(deps)
704 };
705 let runner = AggregatorRunner::new(deps.clone());
706 let time_point = runner.get_time_point_from_chain().await.unwrap();
707 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
708 let expected = fixture.stake_distribution();
709
710 chain_observer.set_signers(fixture.signers_with_stake()).await;
711 runner
712 .update_stake_distribution(&time_point)
713 .await
714 .expect("updating stake distribution should not return an error");
715
716 let saved_stake_distribution = deps
717 .stake_store
718 .get_stakes(time_point.epoch.offset_to_recording_epoch())
719 .await
720 .unwrap()
721 .unwrap_or_else(|| {
722 panic!(
723 "I should have a stake distribution for the epoch {:?}",
724 time_point.epoch
725 )
726 });
727
728 assert_eq!(expected, saved_stake_distribution);
729 }
730
731 #[tokio::test]
732 async fn test_open_signer_registration_round() {
733 let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
734 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
735
736 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
737 builder.get_verification_key_store().await.unwrap(),
738 builder.get_signer_store().await.unwrap(),
739 builder.get_signer_registration_verifier().await.unwrap(),
740 ));
741 let mut deps = builder.build_serve_dependencies_container().await.unwrap();
742 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
743 let stake_store = deps.stake_store.clone();
744 let deps = Arc::new(deps);
745 let runner = AggregatorRunner::new(deps.clone());
746
747 let time_point = TimePoint::dummy();
748 let recording_epoch = time_point.epoch.offset_to_recording_epoch();
749 let stake_distribution: StakeDistribution =
750 StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
751
752 stake_store
753 .save_stakes(recording_epoch, stake_distribution.clone())
754 .await
755 .expect("Save Stake distribution should not fail");
756
757 runner
758 .open_signer_registration_round(&time_point)
759 .await
760 .expect("opening signer registration should not return an error");
761
762 let saved_current_round = signer_registration_round_opener.get_current_round().await;
763
764 let expected_signer_registration_round =
765 SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
766
767 assert_eq!(
768 Some(expected_signer_registration_round),
769 saved_current_round,
770 );
771 }
772
773 #[tokio::test]
774 async fn test_close_signer_registration_round() {
775 let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
776 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
777
778 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
779 builder.get_verification_key_store().await.unwrap(),
780 builder.get_signer_store().await.unwrap(),
781 builder.get_signer_registration_verifier().await.unwrap(),
782 ));
783 let mut deps = builder.build_serve_dependencies_container().await.unwrap();
784 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
785 let deps = Arc::new(deps);
786 let runner = AggregatorRunner::new(deps.clone());
787
788 let time_point = TimePoint::dummy();
789 runner
790 .open_signer_registration_round(&time_point)
791 .await
792 .expect("opening signer registration should not return an error");
793
794 runner
795 .close_signer_registration_round()
796 .await
797 .expect("closing signer registration should not return an error");
798
799 let saved_current_round = signer_registration_round_opener.get_current_round().await;
800 assert!(saved_current_round.is_none());
801 }
802
803 #[tokio::test]
804 async fn test_expire_open_message() {
805 let open_message_expected = OpenMessage {
806 signed_entity_type: SignedEntityType::dummy(),
807 is_certified: false,
808 is_expired: false,
809 expires_at: Some(
810 DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
811 .unwrap()
812 .with_timezone(&Utc),
813 ),
814 ..OpenMessage::dummy()
815 };
816 let open_message_clone = open_message_expected.clone();
817
818 let mut mock_certifier_service = MockCertifierService::new();
819 mock_certifier_service
820 .expect_mark_open_message_if_expired()
821 .return_once(|_| Ok(Some(open_message_clone)));
822
823 let mut deps = initialize_dependencies!().await;
824 deps.certifier_service = Arc::new(mock_certifier_service);
825
826 let runner = build_runner_with_fixture_data(deps).await;
827 let open_message_expired = runner
828 .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
829 .await
830 .expect("mark_open_message_if_expired should not fail");
831
832 assert_eq!(Some(open_message_expected), open_message_expired);
833 }
834
835 #[tokio::test]
836 async fn test_update_era_checker() {
837 let deps = initialize_dependencies!().await;
838 let ticker_service = deps.ticker_service.clone();
839 let era_checker = deps.era_checker.clone();
840 let mut time_point = ticker_service.get_current_time_point().await.unwrap();
841
842 assert_eq!(time_point.epoch, era_checker.current_epoch());
843 let runner = AggregatorRunner::new(Arc::new(deps));
844 time_point.epoch += 1;
845
846 runner.update_era_checker(time_point.epoch).await.unwrap();
847 assert_eq!(time_point.epoch, era_checker.current_epoch());
848 }
849
850 #[tokio::test]
851 async fn test_inform_new_epoch() {
852 let mut mock_certifier_service = MockCertifierService::new();
853 mock_certifier_service
854 .expect_inform_epoch()
855 .returning(|_| Ok(()))
856 .times(1);
857 let mut deps = initialize_dependencies!().await;
858 let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
859
860 deps.certifier_service = Arc::new(mock_certifier_service);
861 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
862 current_epoch,
863 &MithrilFixtureBuilder::default().build(),
864 )));
865
866 let runner = AggregatorRunner::new(Arc::new(deps));
867
868 runner.inform_new_epoch(current_epoch).await.unwrap();
869 }
870
871 #[tokio::test]
872 async fn test_upkeep_calls_run_on_upkeep_service() {
873 let mut upkeep_service = MockUpkeepService::new();
874 upkeep_service
875 .expect_run()
876 .with(eq(Epoch(5)))
877 .returning(|_| Ok(()))
878 .times(1);
879
880 let mut deps = initialize_dependencies!().await;
881 deps.upkeep_service = Arc::new(upkeep_service);
882
883 let runner = AggregatorRunner::new(Arc::new(deps));
884
885 runner.upkeep(Epoch(5)).await.unwrap();
886 }
887
888 #[tokio::test]
889 async fn test_update_epoch_settings() {
890 let mut mock_certifier_service = MockCertifierService::new();
891 mock_certifier_service
892 .expect_inform_epoch()
893 .returning(|_| Ok(()))
894 .times(1);
895
896 let config = ServeCommandConfiguration::new_sample(temp_dir!());
897 let mut deps = DependenciesBuilder::new_with_stdout_logger(Arc::new(config.clone()))
898 .build_serve_dependencies_container()
899 .await
900 .unwrap();
901 deps.certifier_service = Arc::new(mock_certifier_service);
902 let epoch_settings_storer = deps.epoch_settings_storer.clone();
903 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
904 let insert_epoch = current_epoch.offset_to_epoch_settings_recording_epoch();
905
906 let runner = build_runner_with_fixture_data(deps).await;
907 runner.inform_new_epoch(current_epoch).await.unwrap();
908 runner
909 .update_epoch_settings()
910 .await
911 .expect("update_epoch_settings should not fail");
912
913 let saved_epoch_settings = epoch_settings_storer
914 .get_epoch_settings(insert_epoch)
915 .await
916 .unwrap()
917 .unwrap_or_else(|| panic!("should have epoch settings for epoch {insert_epoch}",));
918
919 assert_eq!(
920 AggregatorEpochSettings {
921 protocol_parameters: config.protocol_parameters.clone(),
922 cardano_transactions_signing_config: config
923 .cardano_transactions_signing_config
924 .clone(),
925 },
926 saved_epoch_settings
927 );
928 }
929
930 #[tokio::test]
931 async fn test_precompute_epoch_data() {
932 let mut deps = initialize_dependencies!().await;
933 let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
934
935 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
936 current_epoch,
937 &MithrilFixtureBuilder::default().build(),
938 )));
939
940 let runner = AggregatorRunner::new(Arc::new(deps));
941
942 runner.precompute_epoch_data().await.unwrap();
943 }
944
945 #[tokio::test]
946 async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists()
947 {
948 let open_message_created = create_open_message(
949 IsCertified::No,
950 IsExpired::No,
951 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
952 );
953 let open_message_expected = open_message_created.clone();
954
955 let runner = {
956 let mut mock_certifier_service = MockCertifierService::new();
957 init_certifier_service_mock(&mut mock_certifier_service, vec![]);
958
959 mock_certifier_service
960 .expect_create_open_message()
961 .return_once(|_, _| Ok(open_message_created))
962 .times(1);
963 build_runner(temp_dir!(), mock_certifier_service).await
964 };
965
966 let open_message_returned = runner
967 .get_current_non_certified_open_message(&TimePoint::dummy())
968 .await
969 .unwrap();
970 assert_eq!(Some(open_message_expected), open_message_returned);
971 }
972
973 #[tokio::test]
974 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired()
975 {
976 let not_certified_and_not_expired = create_open_message(
977 IsCertified::No,
978 IsExpired::No,
979 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
980 );
981
982 let open_message_expected = not_certified_and_not_expired.clone();
983
984 let runner = {
985 let mut mock_certifier_service = MockCertifierService::new();
986 init_certifier_service_mock(
987 &mut mock_certifier_service,
988 vec![not_certified_and_not_expired],
989 );
990
991 mock_certifier_service.expect_create_open_message().never();
992 build_runner(temp_dir!(), mock_certifier_service).await
993 };
994
995 let open_message_returned = runner
996 .get_current_non_certified_open_message(&TimePoint::dummy())
997 .await
998 .unwrap();
999
1000 assert_eq!(Some(open_message_expected), open_message_returned);
1001 }
1002
1003 #[tokio::test]
1004 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified()
1005 {
1006 let certified_and_not_expired = create_open_message(
1007 IsCertified::Yes,
1008 IsExpired::No,
1009 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1010 );
1011 let not_certified_and_not_expired = create_open_message(
1012 IsCertified::No,
1013 IsExpired::No,
1014 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1015 );
1016
1017 let open_message_expected = not_certified_and_not_expired.clone();
1018
1019 let runner = {
1020 let mut mock_certifier_service = MockCertifierService::new();
1021 init_certifier_service_mock(
1022 &mut mock_certifier_service,
1023 vec![certified_and_not_expired, not_certified_and_not_expired],
1024 );
1025
1026 mock_certifier_service.expect_create_open_message().never();
1027 build_runner_with_discriminants(
1028 temp_dir!(),
1029 mock_certifier_service,
1030 vec![
1031 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1032 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1033 ],
1034 )
1035 .await
1036 };
1037
1038 let open_message_returned = runner
1039 .get_current_non_certified_open_message(&TimePoint::dummy())
1040 .await
1041 .unwrap();
1042
1043 assert_eq!(Some(open_message_expected), open_message_returned);
1044 }
1045
1046 #[tokio::test]
1047 async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified()
1048 {
1049 let certified_and_not_expired = create_open_message(
1050 IsCertified::Yes,
1051 IsExpired::No,
1052 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1053 );
1054 let open_message_created = create_open_message(
1055 IsCertified::No,
1056 IsExpired::No,
1057 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1058 );
1059 let open_message_expected = open_message_created.clone();
1060
1061 let runner = {
1062 let mut mock_certifier_service = MockCertifierService::new();
1063 init_certifier_service_mock(
1064 &mut mock_certifier_service,
1065 vec![certified_and_not_expired],
1066 );
1067
1068 mock_certifier_service
1069 .expect_create_open_message()
1070 .return_once(|_, _| Ok(open_message_created))
1071 .times(1);
1072 build_runner_with_discriminants(
1073 temp_dir!(),
1074 mock_certifier_service,
1075 vec![
1076 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1077 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1078 ],
1079 )
1080 .await
1081 };
1082
1083 let open_message_returned = runner
1084 .get_current_non_certified_open_message(&TimePoint::dummy())
1085 .await
1086 .unwrap();
1087
1088 assert_eq!(Some(open_message_expected), open_message_returned);
1089 }
1090
1091 #[tokio::test]
1092 async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified()
1093 {
1094 let certified_and_not_expired_1 = create_open_message(
1095 IsCertified::Yes,
1096 IsExpired::No,
1097 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1098 );
1099 let certified_and_not_expired_2 = create_open_message(
1100 IsCertified::Yes,
1101 IsExpired::No,
1102 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1103 );
1104
1105 let runner = {
1106 let mut mock_certifier_service = MockCertifierService::new();
1107 init_certifier_service_mock(
1108 &mut mock_certifier_service,
1109 vec![certified_and_not_expired_1, certified_and_not_expired_2],
1110 );
1111
1112 mock_certifier_service.expect_create_open_message().never();
1113 build_runner_with_discriminants(
1114 temp_dir!(),
1115 mock_certifier_service,
1116 vec![
1117 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1118 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1119 ],
1120 )
1121 .await
1122 };
1123
1124 let open_message_returned = runner
1125 .get_current_non_certified_open_message(&TimePoint::dummy())
1126 .await
1127 .unwrap();
1128
1129 assert!(open_message_returned.is_none());
1130 }
1131
1132 #[tokio::test]
1133 async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message()
1134 {
1135 let not_certified_and_expired = create_open_message(
1136 IsCertified::No,
1137 IsExpired::Yes,
1138 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1139 );
1140 let not_certified_and_not_expired = create_open_message(
1141 IsCertified::No,
1142 IsExpired::No,
1143 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1144 );
1145
1146 let open_message_expected = not_certified_and_not_expired.clone();
1147
1148 let runner = {
1149 let mut mock_certifier_service = MockCertifierService::new();
1150 init_certifier_service_mock(
1151 &mut mock_certifier_service,
1152 vec![not_certified_and_expired, not_certified_and_not_expired],
1153 );
1154
1155 mock_certifier_service.expect_create_open_message().never();
1156 build_runner_with_discriminants(
1157 temp_dir!(),
1158 mock_certifier_service,
1159 vec![
1160 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1161 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1162 ],
1163 )
1164 .await
1165 };
1166
1167 let open_message_returned = runner
1168 .get_current_non_certified_open_message(&TimePoint::dummy())
1169 .await
1170 .unwrap();
1171
1172 assert_eq!(Some(open_message_expected), open_message_returned);
1173 }
1174
1175 #[tokio::test]
1176 async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution() {
1177 let mut mock_certifier_service = MockCertifierService::new();
1178
1179 mock_certifier_service
1180 .expect_get_open_message()
1181 .with(eq(SignedEntityType::MithrilStakeDistribution(
1182 TimePoint::dummy().epoch,
1183 )))
1184 .times(1)
1185 .return_once(|_| {
1186 Ok(Some(create_open_message(
1187 IsCertified::Yes,
1188 IsExpired::No,
1189 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1190 )))
1191 });
1192
1193 mock_certifier_service.expect_create_open_message().never();
1194
1195 mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
1196 mock_certifier_service
1197 .expect_mark_open_message_if_expired()
1198 .returning(|_| Ok(None));
1199
1200 let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1201
1202 runner
1203 .get_current_non_certified_open_message(&TimePoint::dummy())
1204 .await
1205 .unwrap();
1206 }
1207
1208 #[tokio::test]
1209 async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1210 let runner = {
1211 let mut dependencies = initialize_dependencies!().await;
1212 let epoch_service = FakeEpochServiceBuilder {
1213 signed_entity_config: SignedEntityConfig {
1214 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1215 ..SignedEntityConfig::dummy()
1216 },
1217 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1218 }
1219 .build();
1220 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1221 dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1222 AggregatorRunner::new(Arc::new(dependencies))
1223 };
1224
1225 let time_point = TimePoint::dummy();
1226 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1227 .list_available_signed_entity_types(&time_point)
1228 .await
1229 .unwrap()
1230 .into_iter()
1231 .map(Into::into)
1232 .collect();
1233
1234 assert_eq!(
1235 signed_entities,
1236 SignedEntityTypeDiscriminants::all().into_iter().collect::<Vec<_>>()
1237 );
1238 }
1239
1240 #[tokio::test]
1241 async fn list_available_signed_entity_types_exclude_locked_entities() {
1242 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1243 let runner = {
1244 let mut dependencies = initialize_dependencies!().await;
1245 dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1246 let epoch_service = FakeEpochServiceBuilder {
1247 signed_entity_config: SignedEntityConfig {
1248 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1249 ..SignedEntityConfig::dummy()
1250 },
1251 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1252 }
1253 .build();
1254 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1255
1256 AggregatorRunner::new(Arc::new(dependencies))
1257 };
1258
1259 signed_entity_type_lock
1260 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1261 .await;
1262
1263 let time_point = TimePoint::dummy();
1264 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1265 .list_available_signed_entity_types(&time_point)
1266 .await
1267 .unwrap()
1268 .into_iter()
1269 .map(Into::into)
1270 .collect();
1271
1272 assert!(!signed_entities.is_empty());
1273 assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1274 }
1275
1276 #[tokio::test]
1277 async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message()
1278 {
1279 assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1280 }
1281
1282 #[tokio::test]
1283 async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message()
1284 {
1285 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1286 }
1287
1288 #[tokio::test]
1289 async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message()
1290 {
1291 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1292 }
1293
1294 #[tokio::test]
1295 async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message()
1296 {
1297 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1298 }
1299
1300 async fn is_outdated_returned_when(
1301 tmp_path: PathBuf,
1302 is_expired: IsExpired,
1303 newer_open_message: bool,
1304 ) -> bool {
1305 let current_time_point = TimePoint {
1306 epoch: Epoch(2),
1307 ..TimePoint::dummy()
1308 };
1309
1310 let message_epoch = if newer_open_message {
1311 current_time_point.epoch + 54
1312 } else {
1313 current_time_point.epoch
1314 };
1315 let open_message_to_verify = OpenMessage {
1316 signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1317 is_expired: is_expired == IsExpired::Yes,
1318 ..OpenMessage::dummy()
1319 };
1320
1321 let runner = {
1322 let mut deps = initialize_dependencies(tmp_path).await;
1323 let mut mock_certifier_service = MockCertifierService::new();
1324
1325 let open_message_current = open_message_to_verify.clone();
1326 mock_certifier_service
1327 .expect_get_open_message()
1328 .times(1)
1329 .return_once(|_| Ok(Some(open_message_current)));
1330 mock_certifier_service
1331 .expect_mark_open_message_if_expired()
1332 .returning(|_| Ok(None));
1333
1334 deps.certifier_service = Arc::new(mock_certifier_service);
1335
1336 let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1337 deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1338
1339 build_runner_with_fixture_data(deps).await
1340 };
1341
1342 runner
1343 .is_open_message_outdated(
1344 open_message_to_verify.signed_entity_type,
1345 ¤t_time_point,
1346 )
1347 .await
1348 .unwrap()
1349 }
1350}