1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{Logger, debug, warn};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::StdResult;
8use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
9use mithril_common::logging::LoggerExtensions;
10use mithril_persistence::store::StakeStorer;
11
12use crate::ServeCommandDependenciesContainer;
13use crate::entities::OpenMessage;
14
15#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18 pub interval: Duration,
20
21 pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26 pub fn new(interval: Duration, is_follower: bool) -> Self {
28 Self {
29 interval,
30 is_follower,
31 }
32 }
33}
34
35#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42 async fn get_current_open_message_for_signed_entity_type(
44 &self,
45 signed_entity_type: &SignedEntityType,
46 ) -> StdResult<Option<OpenMessage>>;
47
48 async fn get_current_non_certified_open_message(
50 &self,
51 current_time_point: &TimePoint,
52 ) -> StdResult<Option<OpenMessage>>;
53
54 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63 async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66 async fn is_follower_aggregator_at_same_epoch_as_leader(
68 &self,
69 time_point: &TimePoint,
70 ) -> StdResult<bool>;
71
72 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75 async fn synchronize_follower_aggregator_certificate_chain(
77 &self,
78 force_sync: bool,
79 ) -> StdResult<()>;
80
81 async fn compute_protocol_message(
83 &self,
84 signed_entity_type: &SignedEntityType,
85 ) -> StdResult<ProtocolMessage>;
86
87 async fn mark_open_message_if_expired(
89 &self,
90 signed_entity_type: &SignedEntityType,
91 ) -> StdResult<Option<OpenMessage>>;
92
93 async fn create_certificate(
95 &self,
96 signed_entity_type: &SignedEntityType,
97 ) -> StdResult<Option<Certificate>>;
98
99 async fn create_artifact(
101 &self,
102 signed_entity_type: &SignedEntityType,
103 certificate: &Certificate,
104 ) -> StdResult<()>;
105
106 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
108
109 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
111
112 async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
114
115 async fn precompute_epoch_data(&self) -> StdResult<()>;
117
118 async fn create_open_message(
120 &self,
121 signed_entity_type: &SignedEntityType,
122 protocol_message: &ProtocolMessage,
123 ) -> StdResult<OpenMessage>;
124
125 async fn is_open_message_outdated(
127 &self,
128 open_message_signed_entity_type: SignedEntityType,
129 last_time_point: &TimePoint,
130 ) -> StdResult<bool>;
131
132 fn increment_runtime_cycle_success_since_startup_counter(&self);
134
135 fn increment_runtime_cycle_total_since_startup_counter(&self);
137}
138
139pub struct AggregatorRunner {
142 dependencies: Arc<ServeCommandDependenciesContainer>,
143 logger: Logger,
144}
145
146impl AggregatorRunner {
147 pub fn new(dependencies: Arc<ServeCommandDependenciesContainer>) -> Self {
149 let logger = dependencies.root_logger.new_with_component_name::<Self>();
150 Self {
151 dependencies,
152 logger,
153 }
154 }
155
156 async fn list_available_signed_entity_types(
157 &self,
158 time_point: &TimePoint,
159 ) -> StdResult<Vec<SignedEntityType>> {
160 let signed_entity_types = self
161 .dependencies
162 .epoch_service
163 .read()
164 .await
165 .signed_entity_config()?
166 .list_allowed_signed_entity_types(time_point)?;
167 let unlocked_signed_entities = self
168 .dependencies
169 .signed_entity_type_lock
170 .filter_unlocked_entries(signed_entity_types)
171 .await;
172
173 Ok(unlocked_signed_entities)
174 }
175}
176
177#[cfg_attr(test, mockall::automock)]
178#[async_trait]
179impl AggregatorRunnerTrait for AggregatorRunner {
180 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
182 debug!(self.logger, ">> get_time_point_from_chain");
183 let time_point = self.dependencies.ticker_service.get_current_time_point().await?;
184
185 Ok(time_point)
186 }
187
188 async fn get_current_open_message_for_signed_entity_type(
189 &self,
190 signed_entity_type: &SignedEntityType,
191 ) -> StdResult<Option<OpenMessage>> {
192 debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
193 self.mark_open_message_if_expired(signed_entity_type).await?;
194
195 Ok(self
196 .dependencies
197 .certifier_service
198 .get_open_message(signed_entity_type)
199 .await
200 .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
201 }
202
203 async fn get_current_non_certified_open_message(
204 &self,
205 current_time_point: &TimePoint,
206 ) -> StdResult<Option<OpenMessage>> {
207 debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
208 let signed_entity_types =
209 self.list_available_signed_entity_types(current_time_point).await?;
210
211 for signed_entity_type in signed_entity_types {
212 let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
213 .await
214 .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
215 match current_open_message {
216 None => {
217 let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
218 let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
219 .await
220 .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
221
222 return Ok(Some(open_message_new));
223 }
224 Some(open_message) => {
225 if !open_message.is_certified && !open_message.is_expired {
226 return Ok(Some(open_message));
227 }
228 }
229 }
230 }
231
232 Ok(None)
233 }
234
235 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
236 debug!(self.logger, ">> is_certificate_chain_valid");
237 self.dependencies
238 .certifier_service
239 .verify_certificate_chain(time_point.epoch)
240 .await?;
241
242 Ok(())
243 }
244
245 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
246 debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
247 self.dependencies
248 .stake_distribution_service
249 .update_stake_distribution()
250 .await
251 .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
252 }
253
254 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
255 debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
256 let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
257
258 let stakes = self
259 .dependencies
260 .stake_store
261 .get_stakes(registration_epoch)
262 .await?
263 .unwrap_or_default();
264
265 self.dependencies
266 .signer_registration_round_opener
267 .open_registration_round(registration_epoch, stakes)
268 .await
269 }
270
271 async fn close_signer_registration_round(&self) -> StdResult<()> {
272 debug!(self.logger, ">> close_signer_registration_round");
273 self.dependencies
274 .signer_registration_round_opener
275 .close_registration_round()
276 .await
277 }
278
279 async fn is_follower_aggregator_at_same_epoch_as_leader(
280 &self,
281 time_point: &TimePoint,
282 ) -> StdResult<bool> {
283 self.dependencies
284 .signer_synchronizer
285 .can_synchronize_signers(time_point.epoch)
286 .await
287 .map_err(|e| e.into())
288 }
289
290 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
291 self.dependencies
292 .signer_synchronizer
293 .synchronize_all_signers()
294 .await
295 .map_err(|e| e.into())
296 }
297
298 async fn compute_protocol_message(
299 &self,
300 signed_entity_type: &SignedEntityType,
301 ) -> StdResult<ProtocolMessage> {
302 debug!(self.logger, ">> compute_protocol_message");
303 let protocol_message = self
304 .dependencies
305 .signable_builder_service
306 .compute_protocol_message(signed_entity_type.to_owned())
307 .await
308 .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
309
310 Ok(protocol_message)
311 }
312
313 async fn mark_open_message_if_expired(
314 &self,
315 signed_entity_type: &SignedEntityType,
316 ) -> StdResult<Option<OpenMessage>> {
317 debug!(self.logger, ">> mark_open_message_if_expired");
318 let expired_open_message = self
319 .dependencies
320 .certifier_service
321 .mark_open_message_if_expired(signed_entity_type)
322 .await
323 .with_context(|| "CertifierService can not mark expired open message")?;
324
325 debug!(
326 self.logger, "Marked expired open messages";
327 "expired_open_message" => ?expired_open_message
328 );
329
330 Ok(expired_open_message)
331 }
332
333 async fn create_certificate(
334 &self,
335 signed_entity_type: &SignedEntityType,
336 ) -> StdResult<Option<Certificate>> {
337 debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
338
339 let certificate = self.dependencies
340 .certifier_service
341 .create_certificate(signed_entity_type)
342 .await
343 .with_context(|| {
344 format!(
345 "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
346 )
347 })?;
348
349 if certificate.is_some() {
350 self.dependencies
351 .metrics_service
352 .get_certificate_total_produced_since_startup()
353 .increment();
354 }
355
356 Ok(certificate)
357 }
358
359 async fn create_artifact(
360 &self,
361 signed_entity_type: &SignedEntityType,
362 certificate: &Certificate,
363 ) -> StdResult<()> {
364 debug!(
365 self.logger, ">> create_artifact";
366 "signed_entity_type" => ?signed_entity_type,
367 "certificate_hash" => &certificate.hash
368 );
369
370 self.dependencies
371 .signed_entity_service
372 .create_artifact(signed_entity_type.to_owned(), certificate)
373 .await
374 .with_context(|| {
375 format!(
376 "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
377 certificate.hash
378 )
379 })?;
380
381 Ok(())
382 }
383
384 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
385 debug!(self.logger, ">> update_era_checker({epoch:?})");
386 let token = self
387 .dependencies
388 .era_reader
389 .read_era_epoch_token(epoch)
390 .await
391 .with_context(|| {
392 format!("EraReader can not get era epoch token for current epoch: '{epoch}'")
393 })?;
394
395 let current_era = token
396 .get_current_supported_era()
397 .with_context(|| "EraEpochToken can not get current supported era")?;
398 self.dependencies
399 .era_checker
400 .change_era(current_era, token.get_current_epoch());
401 debug!(
402 self.logger,
403 "Current Era is {current_era} (Epoch {}).",
404 token.get_current_epoch()
405 );
406
407 if token.get_next_supported_era().is_err() {
408 let era_name = &token.get_next_era_marker().unwrap().name;
409 warn!(
410 self.logger,
411 "Upcoming Era '{era_name}' is not supported by this version of the software. Please update!"
412 );
413 }
414
415 Ok(())
416 }
417
418 async fn precompute_epoch_data(&self) -> StdResult<()> {
419 debug!(self.logger, ">> precompute_epoch_data");
420 self.dependencies
421 .epoch_service
422 .write()
423 .await
424 .precompute_epoch_data()
425 .await?;
426
427 Ok(())
428 }
429
430 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
431 debug!(self.logger, ">> inform_new_epoch({epoch:?})");
432 self.dependencies.certifier_service.inform_epoch(epoch).await?;
433
434 self.dependencies
435 .epoch_service
436 .write()
437 .await
438 .inform_epoch(epoch)
439 .await?;
440
441 Ok(())
442 }
443
444 async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
445 debug!(self.logger, ">> upkeep");
446 self.dependencies.upkeep_service.run(epoch).await
447 }
448
449 async fn create_open_message(
450 &self,
451 signed_entity_type: &SignedEntityType,
452 protocol_message: &ProtocolMessage,
453 ) -> StdResult<OpenMessage> {
454 debug!(self.logger, ">> create_open_message");
455 self.dependencies
456 .certifier_service
457 .create_open_message(signed_entity_type, protocol_message)
458 .await
459 }
460
461 async fn is_open_message_outdated(
462 &self,
463 open_message_signed_entity_type: SignedEntityType,
464 last_time_point: &TimePoint,
465 ) -> StdResult<bool> {
466 let current_open_message = self
467 .get_current_open_message_for_signed_entity_type(
468 &open_message_signed_entity_type,
469 )
470 .await
471 .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
472 let is_expired_open_message =
473 current_open_message.as_ref().map(|om| om.is_expired).unwrap_or(false);
474
475 let exists_newer_open_message = {
476 let new_signed_entity_type = self
477 .dependencies
478 .epoch_service
479 .read()
480 .await
481 .signed_entity_config()?
482 .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
483 new_signed_entity_type != open_message_signed_entity_type
484 };
485
486 Ok(exists_newer_open_message || is_expired_open_message)
487 }
488
489 fn increment_runtime_cycle_success_since_startup_counter(&self) {
490 self.dependencies
491 .metrics_service
492 .get_runtime_cycle_success_since_startup()
493 .increment();
494 }
495
496 fn increment_runtime_cycle_total_since_startup_counter(&self) {
497 self.dependencies
498 .metrics_service
499 .get_runtime_cycle_total_since_startup()
500 .increment();
501 }
502
503 async fn synchronize_follower_aggregator_certificate_chain(
504 &self,
505 force_sync: bool,
506 ) -> StdResult<()> {
507 debug!(
508 self.logger,
509 ">> synchronize_follower_aggregator_certificate_chain(force_sync:{force_sync})"
510 );
511 self.dependencies
512 .certificate_chain_synchronizer
513 .synchronize_certificate_chain(force_sync)
514 .await
515 }
516}
517
518#[cfg(test)]
519pub mod tests {
520 use async_trait::async_trait;
521 use chrono::{DateTime, Utc};
522 use mockall::mock;
523 use mockall::predicate::eq;
524 use std::path::PathBuf;
525 use std::sync::Arc;
526 use tokio::sync::RwLock;
527
528 use mithril_cardano_node_chain::test::double::FakeChainObserver;
529 use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
530 use mithril_common::{
531 StdResult,
532 entities::{
533 ChainPoint, Epoch, ProtocolMessage, SignedEntityConfig, SignedEntityType,
534 SignedEntityTypeDiscriminants, StakeDistribution, TimePoint,
535 },
536 signable_builder::SignableBuilderService,
537 temp_dir,
538 test::{
539 builder::MithrilFixtureBuilder,
540 double::{Dummy, fake_data},
541 },
542 };
543 use mithril_persistence::store::StakeStorer;
544 use mithril_signed_entity_lock::SignedEntityTypeLock;
545 use mithril_ticker::MithrilTickerService;
546
547 use crate::{
548 MithrilSignerRegistrationLeader, ServeCommandConfiguration,
549 ServeCommandDependenciesContainer, SignerRegistrationRound,
550 dependency_injection::DependenciesBuilder,
551 entities::OpenMessage,
552 initialize_dependencies,
553 runtime::{AggregatorRunner, AggregatorRunnerTrait},
554 services::{
555 FakeEpochService, FakeEpochServiceBuilder, MockCertifierService, MockUpkeepService,
556 },
557 };
558
559 mock! {
560 SignableBuilderServiceImpl { }
561
562 #[async_trait]
563 impl SignableBuilderService for SignableBuilderServiceImpl
564 {
565
566 async fn compute_protocol_message(
567 &self,
568 signed_entity_type: SignedEntityType,
569 ) -> StdResult<ProtocolMessage>;
570 }
571 }
572
573 async fn build_runner_with_fixture_data(
574 deps: ServeCommandDependenciesContainer,
575 ) -> AggregatorRunner {
576 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
577 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
578 deps.init_state_from_fixture(&fixture, current_epoch).await;
579
580 AggregatorRunner::new(Arc::new(deps))
581 }
582
583 async fn build_runner(
584 temp_dir: PathBuf,
585 mock_certifier_service: MockCertifierService,
586 ) -> AggregatorRunner {
587 build_runner_with_discriminants(temp_dir, mock_certifier_service, vec![]).await
588 }
589
590 async fn build_runner_with_discriminants(
591 temp_dir: PathBuf,
592 mock_certifier_service: MockCertifierService,
593 allowed_discriminants: Vec<SignedEntityTypeDiscriminants>,
594 ) -> AggregatorRunner {
595 let mut deps = initialize_dependencies(temp_dir).await;
596 deps.certifier_service = Arc::new(mock_certifier_service);
597
598 let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
599 mock_signable_builder_service
600 .expect_compute_protocol_message()
601 .return_once(|_| Ok(ProtocolMessage::default()));
602 deps.signable_builder_service = Arc::new(mock_signable_builder_service);
603
604 if !allowed_discriminants.is_empty() {
606 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
607 let epoch_service = FakeEpochServiceBuilder {
608 signed_entity_config: SignedEntityConfig {
609 allowed_discriminants: allowed_discriminants.into_iter().collect(),
610 ..SignedEntityConfig::dummy()
611 },
612 ..FakeEpochServiceBuilder::dummy(current_epoch)
613 }
614 .build();
615 deps.epoch_service = Arc::new(RwLock::new(epoch_service));
616 }
617
618 let runner = build_runner_with_fixture_data(deps).await;
619
620 let current_epoch = runner.dependencies.ticker_service.get_current_epoch().await.unwrap();
621 runner.inform_new_epoch(current_epoch).await.unwrap();
622 runner.precompute_epoch_data().await.unwrap();
623 runner
624 }
625
626 fn init_certifier_service_mock(
627 mock_certifier_service: &mut MockCertifierService,
628 messages: Vec<OpenMessage>,
629 ) {
630 for message in messages {
631 mock_certifier_service
632 .expect_get_open_message()
633 .return_once(|_| Ok(Some(message)))
634 .times(1);
635 }
636 mock_certifier_service
638 .expect_get_open_message()
639 .returning(|_| Ok(None));
640
641 mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
642 mock_certifier_service
643 .expect_mark_open_message_if_expired()
644 .returning(|_| Ok(None));
645 }
646
647 fn create_open_message(
648 is_certified: IsCertified,
649 is_expired: IsExpired,
650 signed_entity_type: SignedEntityType,
651 ) -> OpenMessage {
652 OpenMessage {
653 signed_entity_type,
654 is_certified: is_certified == IsCertified::Yes,
655 is_expired: is_expired == IsExpired::Yes,
656 ..OpenMessage::dummy()
657 }
658 }
659
660 #[derive(Eq, PartialEq)]
661 enum IsCertified {
662 Yes,
663 No,
664 }
665
666 #[derive(Eq, PartialEq)]
667 enum IsExpired {
668 Yes,
669 No,
670 }
671
672 #[tokio::test]
673 async fn test_get_time_point_from_chain() {
674 let expected = TimePoint::new(2, 17, ChainPoint::dummy());
675 let mut dependencies = initialize_dependencies!().await;
676 let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
677 immutable_file_observer
678 .shall_return(Some(expected.immutable_file_number))
679 .await;
680 let ticker_service = Arc::new(MithrilTickerService::new(
681 Arc::new(FakeChainObserver::new(Some(expected.clone()))),
682 immutable_file_observer,
683 ));
684 dependencies.ticker_service = ticker_service;
685 let runner = AggregatorRunner::new(Arc::new(dependencies));
686
687 let res = runner.get_time_point_from_chain().await;
689 assert_eq!(expected, res.unwrap());
690 }
691
692 #[tokio::test]
693 async fn test_update_stake_distribution() {
694 let chain_observer = Arc::new(FakeChainObserver::default());
695 let deps = {
696 let config = ServeCommandConfiguration::new_sample(temp_dir!());
697 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
698 builder.chain_observer = Some(chain_observer.clone());
699
700 Arc::new(builder.build_serve_dependencies_container().await.unwrap())
701 };
702 let runner = AggregatorRunner::new(deps.clone());
703 let time_point = runner.get_time_point_from_chain().await.unwrap();
704 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
705 let expected = fixture.stake_distribution();
706
707 chain_observer.set_signers(fixture.signers_with_stake()).await;
708 runner
709 .update_stake_distribution(&time_point)
710 .await
711 .expect("updating stake distribution should not return an error");
712
713 let saved_stake_distribution = deps
714 .stake_store
715 .get_stakes(time_point.epoch.offset_to_recording_epoch())
716 .await
717 .unwrap()
718 .unwrap_or_else(|| {
719 panic!(
720 "I should have a stake distribution for the epoch {:?}",
721 time_point.epoch
722 )
723 });
724
725 assert_eq!(expected, saved_stake_distribution);
726 }
727
728 #[tokio::test]
729 async fn test_open_signer_registration_round() {
730 let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
731 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
732
733 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
734 builder.get_verification_key_store().await.unwrap(),
735 builder.get_signer_store().await.unwrap(),
736 builder.get_signer_registration_verifier().await.unwrap(),
737 ));
738 let mut deps = builder.build_serve_dependencies_container().await.unwrap();
739 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
740 let stake_store = deps.stake_store.clone();
741 let deps = Arc::new(deps);
742 let runner = AggregatorRunner::new(deps.clone());
743
744 let time_point = TimePoint::dummy();
745 let recording_epoch = time_point.epoch.offset_to_recording_epoch();
746 let stake_distribution: StakeDistribution =
747 StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
748
749 stake_store
750 .save_stakes(recording_epoch, stake_distribution.clone())
751 .await
752 .expect("Save Stake distribution should not fail");
753
754 runner
755 .open_signer_registration_round(&time_point)
756 .await
757 .expect("opening signer registration should not return an error");
758
759 let saved_current_round = signer_registration_round_opener.get_current_round().await;
760
761 let expected_signer_registration_round =
762 SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
763
764 assert_eq!(
765 Some(expected_signer_registration_round),
766 saved_current_round,
767 );
768 }
769
770 #[tokio::test]
771 async fn test_close_signer_registration_round() {
772 let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
773 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
774
775 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
776 builder.get_verification_key_store().await.unwrap(),
777 builder.get_signer_store().await.unwrap(),
778 builder.get_signer_registration_verifier().await.unwrap(),
779 ));
780 let mut deps = builder.build_serve_dependencies_container().await.unwrap();
781 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
782 let deps = Arc::new(deps);
783 let runner = AggregatorRunner::new(deps.clone());
784
785 let time_point = TimePoint::dummy();
786 runner
787 .open_signer_registration_round(&time_point)
788 .await
789 .expect("opening signer registration should not return an error");
790
791 runner
792 .close_signer_registration_round()
793 .await
794 .expect("closing signer registration should not return an error");
795
796 let saved_current_round = signer_registration_round_opener.get_current_round().await;
797 assert!(saved_current_round.is_none());
798 }
799
800 #[tokio::test]
801 async fn test_expire_open_message() {
802 let open_message_expected = OpenMessage {
803 signed_entity_type: SignedEntityType::dummy(),
804 is_certified: false,
805 is_expired: false,
806 expires_at: Some(
807 DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
808 .unwrap()
809 .with_timezone(&Utc),
810 ),
811 ..OpenMessage::dummy()
812 };
813 let open_message_clone = open_message_expected.clone();
814
815 let mut mock_certifier_service = MockCertifierService::new();
816 mock_certifier_service
817 .expect_mark_open_message_if_expired()
818 .return_once(|_| Ok(Some(open_message_clone)));
819
820 let mut deps = initialize_dependencies!().await;
821 deps.certifier_service = Arc::new(mock_certifier_service);
822
823 let runner = build_runner_with_fixture_data(deps).await;
824 let open_message_expired = runner
825 .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
826 .await
827 .expect("mark_open_message_if_expired should not fail");
828
829 assert_eq!(Some(open_message_expected), open_message_expired);
830 }
831
832 #[tokio::test]
833 async fn test_update_era_checker() {
834 let deps = initialize_dependencies!().await;
835 let ticker_service = deps.ticker_service.clone();
836 let era_checker = deps.era_checker.clone();
837 let mut time_point = ticker_service.get_current_time_point().await.unwrap();
838
839 assert_eq!(time_point.epoch, era_checker.current_epoch());
840 let runner = AggregatorRunner::new(Arc::new(deps));
841 time_point.epoch += 1;
842
843 runner.update_era_checker(time_point.epoch).await.unwrap();
844 assert_eq!(time_point.epoch, era_checker.current_epoch());
845 }
846
847 #[tokio::test]
848 async fn test_inform_new_epoch() {
849 let mut mock_certifier_service = MockCertifierService::new();
850 mock_certifier_service
851 .expect_inform_epoch()
852 .returning(|_| Ok(()))
853 .times(1);
854 let mut deps = initialize_dependencies!().await;
855 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
856
857 deps.certifier_service = Arc::new(mock_certifier_service);
858 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
859 current_epoch,
860 &MithrilFixtureBuilder::default().build(),
861 )));
862
863 let runner = AggregatorRunner::new(Arc::new(deps));
864
865 runner.inform_new_epoch(current_epoch).await.unwrap();
866 }
867
868 #[tokio::test]
869 async fn test_upkeep_calls_run_on_upkeep_service() {
870 let mut upkeep_service = MockUpkeepService::new();
871 upkeep_service
872 .expect_run()
873 .with(eq(Epoch(5)))
874 .returning(|_| Ok(()))
875 .times(1);
876
877 let mut deps = initialize_dependencies!().await;
878 deps.upkeep_service = Arc::new(upkeep_service);
879
880 let runner = AggregatorRunner::new(Arc::new(deps));
881
882 runner.upkeep(Epoch(5)).await.unwrap();
883 }
884
885 #[tokio::test]
886 async fn test_precompute_epoch_data() {
887 let mut deps = initialize_dependencies!().await;
888 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
889
890 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
891 current_epoch,
892 &MithrilFixtureBuilder::default().build(),
893 )));
894
895 let runner = AggregatorRunner::new(Arc::new(deps));
896
897 runner.precompute_epoch_data().await.unwrap();
898 }
899
900 #[tokio::test]
901 async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists()
902 {
903 let open_message_created = create_open_message(
904 IsCertified::No,
905 IsExpired::No,
906 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
907 );
908 let open_message_expected = open_message_created.clone();
909
910 let runner = {
911 let mut mock_certifier_service = MockCertifierService::new();
912 init_certifier_service_mock(&mut mock_certifier_service, vec![]);
913
914 mock_certifier_service
915 .expect_create_open_message()
916 .return_once(|_, _| Ok(open_message_created))
917 .times(1);
918 build_runner(temp_dir!(), mock_certifier_service).await
919 };
920
921 let open_message_returned = runner
922 .get_current_non_certified_open_message(&TimePoint::dummy())
923 .await
924 .unwrap();
925 assert_eq!(Some(open_message_expected), open_message_returned);
926 }
927
928 #[tokio::test]
929 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired()
930 {
931 let not_certified_and_not_expired = create_open_message(
932 IsCertified::No,
933 IsExpired::No,
934 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
935 );
936
937 let open_message_expected = not_certified_and_not_expired.clone();
938
939 let runner = {
940 let mut mock_certifier_service = MockCertifierService::new();
941 init_certifier_service_mock(
942 &mut mock_certifier_service,
943 vec![not_certified_and_not_expired],
944 );
945
946 mock_certifier_service.expect_create_open_message().never();
947 build_runner(temp_dir!(), mock_certifier_service).await
948 };
949
950 let open_message_returned = runner
951 .get_current_non_certified_open_message(&TimePoint::dummy())
952 .await
953 .unwrap();
954
955 assert_eq!(Some(open_message_expected), open_message_returned);
956 }
957
958 #[tokio::test]
959 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified()
960 {
961 let certified_and_not_expired = create_open_message(
962 IsCertified::Yes,
963 IsExpired::No,
964 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
965 );
966 let not_certified_and_not_expired = create_open_message(
967 IsCertified::No,
968 IsExpired::No,
969 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
970 );
971
972 let open_message_expected = not_certified_and_not_expired.clone();
973
974 let runner = {
975 let mut mock_certifier_service = MockCertifierService::new();
976 init_certifier_service_mock(
977 &mut mock_certifier_service,
978 vec![certified_and_not_expired, not_certified_and_not_expired],
979 );
980
981 mock_certifier_service.expect_create_open_message().never();
982 build_runner_with_discriminants(
983 temp_dir!(),
984 mock_certifier_service,
985 vec![
986 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
987 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
988 ],
989 )
990 .await
991 };
992
993 let open_message_returned = runner
994 .get_current_non_certified_open_message(&TimePoint::dummy())
995 .await
996 .unwrap();
997
998 assert_eq!(Some(open_message_expected), open_message_returned);
999 }
1000
1001 #[tokio::test]
1002 async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified()
1003 {
1004 let certified_and_not_expired = create_open_message(
1005 IsCertified::Yes,
1006 IsExpired::No,
1007 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1008 );
1009 let open_message_created = create_open_message(
1010 IsCertified::No,
1011 IsExpired::No,
1012 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1013 );
1014 let open_message_expected = open_message_created.clone();
1015
1016 let runner = {
1017 let mut mock_certifier_service = MockCertifierService::new();
1018 init_certifier_service_mock(
1019 &mut mock_certifier_service,
1020 vec![certified_and_not_expired],
1021 );
1022
1023 mock_certifier_service
1024 .expect_create_open_message()
1025 .return_once(|_, _| Ok(open_message_created))
1026 .times(1);
1027 build_runner_with_discriminants(
1028 temp_dir!(),
1029 mock_certifier_service,
1030 vec![
1031 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1032 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1033 ],
1034 )
1035 .await
1036 };
1037
1038 let open_message_returned = runner
1039 .get_current_non_certified_open_message(&TimePoint::dummy())
1040 .await
1041 .unwrap();
1042
1043 assert_eq!(Some(open_message_expected), open_message_returned);
1044 }
1045
1046 #[tokio::test]
1047 async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified()
1048 {
1049 let certified_and_not_expired_1 = create_open_message(
1050 IsCertified::Yes,
1051 IsExpired::No,
1052 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1053 );
1054 let certified_and_not_expired_2 = create_open_message(
1055 IsCertified::Yes,
1056 IsExpired::No,
1057 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1058 );
1059
1060 let runner = {
1061 let mut mock_certifier_service = MockCertifierService::new();
1062 init_certifier_service_mock(
1063 &mut mock_certifier_service,
1064 vec![certified_and_not_expired_1, certified_and_not_expired_2],
1065 );
1066
1067 mock_certifier_service.expect_create_open_message().never();
1068 build_runner_with_discriminants(
1069 temp_dir!(),
1070 mock_certifier_service,
1071 vec![
1072 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1073 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1074 ],
1075 )
1076 .await
1077 };
1078
1079 let open_message_returned = runner
1080 .get_current_non_certified_open_message(&TimePoint::dummy())
1081 .await
1082 .unwrap();
1083
1084 assert!(open_message_returned.is_none());
1085 }
1086
1087 #[tokio::test]
1088 async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message()
1089 {
1090 let not_certified_and_expired = create_open_message(
1091 IsCertified::No,
1092 IsExpired::Yes,
1093 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1094 );
1095 let not_certified_and_not_expired = create_open_message(
1096 IsCertified::No,
1097 IsExpired::No,
1098 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1099 );
1100
1101 let open_message_expected = not_certified_and_not_expired.clone();
1102
1103 let runner = {
1104 let mut mock_certifier_service = MockCertifierService::new();
1105 init_certifier_service_mock(
1106 &mut mock_certifier_service,
1107 vec![not_certified_and_expired, not_certified_and_not_expired],
1108 );
1109
1110 mock_certifier_service.expect_create_open_message().never();
1111 build_runner_with_discriminants(
1112 temp_dir!(),
1113 mock_certifier_service,
1114 vec![
1115 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1116 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1117 ],
1118 )
1119 .await
1120 };
1121
1122 let open_message_returned = runner
1123 .get_current_non_certified_open_message(&TimePoint::dummy())
1124 .await
1125 .unwrap();
1126
1127 assert_eq!(Some(open_message_expected), open_message_returned);
1128 }
1129
1130 #[tokio::test]
1131 async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution() {
1132 let mut mock_certifier_service = MockCertifierService::new();
1133
1134 mock_certifier_service
1135 .expect_get_open_message()
1136 .with(eq(SignedEntityType::MithrilStakeDistribution(
1137 TimePoint::dummy().epoch,
1138 )))
1139 .times(1)
1140 .return_once(|_| {
1141 Ok(Some(create_open_message(
1142 IsCertified::Yes,
1143 IsExpired::No,
1144 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1145 )))
1146 });
1147
1148 mock_certifier_service.expect_create_open_message().never();
1149
1150 mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
1151 mock_certifier_service
1152 .expect_mark_open_message_if_expired()
1153 .returning(|_| Ok(None));
1154
1155 let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1156
1157 runner
1158 .get_current_non_certified_open_message(&TimePoint::dummy())
1159 .await
1160 .unwrap();
1161 }
1162
1163 #[tokio::test]
1164 async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1165 let runner = {
1166 let mut dependencies = initialize_dependencies!().await;
1167 let epoch_service = FakeEpochServiceBuilder {
1168 signed_entity_config: SignedEntityConfig {
1169 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1170 ..SignedEntityConfig::dummy()
1171 },
1172 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1173 }
1174 .build();
1175 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1176 dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1177 AggregatorRunner::new(Arc::new(dependencies))
1178 };
1179
1180 let time_point = TimePoint::dummy();
1181 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1182 .list_available_signed_entity_types(&time_point)
1183 .await
1184 .unwrap()
1185 .into_iter()
1186 .map(Into::into)
1187 .collect();
1188
1189 assert_eq!(
1190 signed_entities,
1191 SignedEntityTypeDiscriminants::all().into_iter().collect::<Vec<_>>()
1192 );
1193 }
1194
1195 #[tokio::test]
1196 async fn list_available_signed_entity_types_exclude_locked_entities() {
1197 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1198 let runner = {
1199 let mut dependencies = initialize_dependencies!().await;
1200 dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1201 let epoch_service = FakeEpochServiceBuilder {
1202 signed_entity_config: SignedEntityConfig {
1203 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1204 ..SignedEntityConfig::dummy()
1205 },
1206 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1207 }
1208 .build();
1209 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1210
1211 AggregatorRunner::new(Arc::new(dependencies))
1212 };
1213
1214 signed_entity_type_lock
1215 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1216 .await;
1217
1218 let time_point = TimePoint::dummy();
1219 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1220 .list_available_signed_entity_types(&time_point)
1221 .await
1222 .unwrap()
1223 .into_iter()
1224 .map(Into::into)
1225 .collect();
1226
1227 assert!(!signed_entities.is_empty());
1228 assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1229 }
1230
1231 #[tokio::test]
1232 async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message()
1233 {
1234 assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1235 }
1236
1237 #[tokio::test]
1238 async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message()
1239 {
1240 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1241 }
1242
1243 #[tokio::test]
1244 async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message()
1245 {
1246 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1247 }
1248
1249 #[tokio::test]
1250 async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message()
1251 {
1252 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1253 }
1254
1255 async fn is_outdated_returned_when(
1256 tmp_path: PathBuf,
1257 is_expired: IsExpired,
1258 newer_open_message: bool,
1259 ) -> bool {
1260 let current_time_point = TimePoint {
1261 epoch: Epoch(2),
1262 ..TimePoint::dummy()
1263 };
1264
1265 let message_epoch = if newer_open_message {
1266 current_time_point.epoch + 54
1267 } else {
1268 current_time_point.epoch
1269 };
1270 let open_message_to_verify = OpenMessage {
1271 signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1272 is_expired: is_expired == IsExpired::Yes,
1273 ..OpenMessage::dummy()
1274 };
1275
1276 let runner = {
1277 let mut deps = initialize_dependencies(tmp_path).await;
1278 let mut mock_certifier_service = MockCertifierService::new();
1279
1280 let open_message_current = open_message_to_verify.clone();
1281 mock_certifier_service
1282 .expect_get_open_message()
1283 .times(1)
1284 .return_once(|_| Ok(Some(open_message_current)));
1285 mock_certifier_service
1286 .expect_mark_open_message_if_expired()
1287 .returning(|_| Ok(None));
1288
1289 deps.certifier_service = Arc::new(mock_certifier_service);
1290
1291 let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1292 deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1293
1294 build_runner_with_fixture_data(deps).await
1295 };
1296
1297 runner
1298 .is_open_message_outdated(
1299 open_message_to_verify.signed_entity_type,
1300 ¤t_time_point,
1301 )
1302 .await
1303 .unwrap()
1304 }
1305}