1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{Logger, debug, warn};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::StdResult;
8use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
9use mithril_common::logging::LoggerExtensions;
10use mithril_persistence::store::StakeStorer;
11
12use crate::ServeCommandDependenciesContainer;
13use crate::entities::OpenMessage;
14
15#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18 pub interval: Duration,
20
21 pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26 pub fn new(interval: Duration, is_follower: bool) -> Self {
28 Self {
29 interval,
30 is_follower,
31 }
32 }
33}
34
35#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42 async fn get_current_open_message_for_signed_entity_type(
44 &self,
45 signed_entity_type: &SignedEntityType,
46 ) -> StdResult<Option<OpenMessage>>;
47
48 async fn get_current_non_certified_open_message(
50 &self,
51 current_time_point: &TimePoint,
52 ) -> StdResult<Option<OpenMessage>>;
53
54 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63 async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66 async fn is_follower_aggregator_at_same_epoch_as_leader(
68 &self,
69 time_point: &TimePoint,
70 ) -> StdResult<bool>;
71
72 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75 async fn synchronize_follower_aggregator_certificate_chain(
77 &self,
78 force_sync: bool,
79 ) -> StdResult<()>;
80
81 async fn update_epoch_settings(&self) -> StdResult<()>;
83
84 async fn compute_protocol_message(
86 &self,
87 signed_entity_type: &SignedEntityType,
88 ) -> StdResult<ProtocolMessage>;
89
90 async fn mark_open_message_if_expired(
92 &self,
93 signed_entity_type: &SignedEntityType,
94 ) -> StdResult<Option<OpenMessage>>;
95
96 async fn create_certificate(
98 &self,
99 signed_entity_type: &SignedEntityType,
100 ) -> StdResult<Option<Certificate>>;
101
102 async fn create_artifact(
104 &self,
105 signed_entity_type: &SignedEntityType,
106 certificate: &Certificate,
107 ) -> StdResult<()>;
108
109 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
111
112 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
114
115 async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
117
118 async fn precompute_epoch_data(&self) -> StdResult<()>;
120
121 async fn create_open_message(
123 &self,
124 signed_entity_type: &SignedEntityType,
125 protocol_message: &ProtocolMessage,
126 ) -> StdResult<OpenMessage>;
127
128 async fn is_open_message_outdated(
130 &self,
131 open_message_signed_entity_type: SignedEntityType,
132 last_time_point: &TimePoint,
133 ) -> StdResult<bool>;
134
135 fn increment_runtime_cycle_success_since_startup_counter(&self);
137
138 fn increment_runtime_cycle_total_since_startup_counter(&self);
140}
141
142pub struct AggregatorRunner {
145 dependencies: Arc<ServeCommandDependenciesContainer>,
146 logger: Logger,
147}
148
149impl AggregatorRunner {
150 pub fn new(dependencies: Arc<ServeCommandDependenciesContainer>) -> Self {
152 let logger = dependencies.root_logger.new_with_component_name::<Self>();
153 Self {
154 dependencies,
155 logger,
156 }
157 }
158
159 async fn list_available_signed_entity_types(
160 &self,
161 time_point: &TimePoint,
162 ) -> StdResult<Vec<SignedEntityType>> {
163 let signed_entity_types = self
164 .dependencies
165 .epoch_service
166 .read()
167 .await
168 .signed_entity_config()?
169 .list_allowed_signed_entity_types(time_point)?;
170 let unlocked_signed_entities = self
171 .dependencies
172 .signed_entity_type_lock
173 .filter_unlocked_entries(signed_entity_types)
174 .await;
175
176 Ok(unlocked_signed_entities)
177 }
178}
179
180#[cfg_attr(test, mockall::automock)]
181#[async_trait]
182impl AggregatorRunnerTrait for AggregatorRunner {
183 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
185 debug!(self.logger, ">> get_time_point_from_chain");
186 let time_point = self.dependencies.ticker_service.get_current_time_point().await?;
187
188 Ok(time_point)
189 }
190
191 async fn get_current_open_message_for_signed_entity_type(
192 &self,
193 signed_entity_type: &SignedEntityType,
194 ) -> StdResult<Option<OpenMessage>> {
195 debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
196 self.mark_open_message_if_expired(signed_entity_type).await?;
197
198 Ok(self
199 .dependencies
200 .certifier_service
201 .get_open_message(signed_entity_type)
202 .await
203 .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
204 }
205
206 async fn get_current_non_certified_open_message(
207 &self,
208 current_time_point: &TimePoint,
209 ) -> StdResult<Option<OpenMessage>> {
210 debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
211 let signed_entity_types =
212 self.list_available_signed_entity_types(current_time_point).await?;
213
214 for signed_entity_type in signed_entity_types {
215 let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
216 .await
217 .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
218 match current_open_message {
219 None => {
220 let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
221 let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
222 .await
223 .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
224
225 return Ok(Some(open_message_new));
226 }
227 Some(open_message) => {
228 if !open_message.is_certified && !open_message.is_expired {
229 return Ok(Some(open_message));
230 }
231 }
232 }
233 }
234
235 Ok(None)
236 }
237
238 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
239 debug!(self.logger, ">> is_certificate_chain_valid");
240 self.dependencies
241 .certifier_service
242 .verify_certificate_chain(time_point.epoch)
243 .await?;
244
245 Ok(())
246 }
247
248 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
249 debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
250 self.dependencies
251 .stake_distribution_service
252 .update_stake_distribution()
253 .await
254 .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
255 }
256
257 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
258 debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
259 let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
260
261 let stakes = self
262 .dependencies
263 .stake_store
264 .get_stakes(registration_epoch)
265 .await?
266 .unwrap_or_default();
267
268 self.dependencies
269 .signer_registration_round_opener
270 .open_registration_round(registration_epoch, stakes)
271 .await
272 }
273
274 async fn close_signer_registration_round(&self) -> StdResult<()> {
275 debug!(self.logger, ">> close_signer_registration_round");
276 self.dependencies
277 .signer_registration_round_opener
278 .close_registration_round()
279 .await
280 }
281
282 async fn is_follower_aggregator_at_same_epoch_as_leader(
283 &self,
284 time_point: &TimePoint,
285 ) -> StdResult<bool> {
286 self.dependencies
287 .signer_synchronizer
288 .can_synchronize_signers(time_point.epoch)
289 .await
290 .map_err(|e| e.into())
291 }
292
293 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
294 self.dependencies
295 .signer_synchronizer
296 .synchronize_all_signers()
297 .await
298 .map_err(|e| e.into())
299 }
300
301 async fn update_epoch_settings(&self) -> StdResult<()> {
302 debug!(self.logger, ">> update_epoch_settings");
303 self.dependencies
304 .epoch_service
305 .write()
306 .await
307 .update_epoch_settings()
308 .await
309 }
310
311 async fn compute_protocol_message(
312 &self,
313 signed_entity_type: &SignedEntityType,
314 ) -> StdResult<ProtocolMessage> {
315 debug!(self.logger, ">> compute_protocol_message");
316 let protocol_message = self
317 .dependencies
318 .signable_builder_service
319 .compute_protocol_message(signed_entity_type.to_owned())
320 .await
321 .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
322
323 Ok(protocol_message)
324 }
325
326 async fn mark_open_message_if_expired(
327 &self,
328 signed_entity_type: &SignedEntityType,
329 ) -> StdResult<Option<OpenMessage>> {
330 debug!(self.logger, ">> mark_open_message_if_expired");
331 let expired_open_message = self
332 .dependencies
333 .certifier_service
334 .mark_open_message_if_expired(signed_entity_type)
335 .await
336 .with_context(|| "CertifierService can not mark expired open message")?;
337
338 debug!(
339 self.logger, "Marked expired open messages";
340 "expired_open_message" => ?expired_open_message
341 );
342
343 Ok(expired_open_message)
344 }
345
346 async fn create_certificate(
347 &self,
348 signed_entity_type: &SignedEntityType,
349 ) -> StdResult<Option<Certificate>> {
350 debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
351
352 let certificate = self.dependencies
353 .certifier_service
354 .create_certificate(signed_entity_type)
355 .await
356 .with_context(|| {
357 format!(
358 "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
359 )
360 })?;
361
362 if certificate.is_some() {
363 self.dependencies
364 .metrics_service
365 .get_certificate_total_produced_since_startup()
366 .increment();
367 }
368
369 Ok(certificate)
370 }
371
372 async fn create_artifact(
373 &self,
374 signed_entity_type: &SignedEntityType,
375 certificate: &Certificate,
376 ) -> StdResult<()> {
377 debug!(
378 self.logger, ">> create_artifact";
379 "signed_entity_type" => ?signed_entity_type,
380 "certificate_hash" => &certificate.hash
381 );
382
383 self.dependencies
384 .signed_entity_service
385 .create_artifact(signed_entity_type.to_owned(), certificate)
386 .await
387 .with_context(|| {
388 format!(
389 "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
390 certificate.hash
391 )
392 })?;
393
394 Ok(())
395 }
396
397 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
398 debug!(self.logger, ">> update_era_checker({epoch:?})");
399 let token = self
400 .dependencies
401 .era_reader
402 .read_era_epoch_token(epoch)
403 .await
404 .with_context(|| {
405 format!("EraReader can not get era epoch token for current epoch: '{epoch}'")
406 })?;
407
408 let current_era = token
409 .get_current_supported_era()
410 .with_context(|| "EraEpochToken can not get current supported era")?;
411 self.dependencies
412 .era_checker
413 .change_era(current_era, token.get_current_epoch());
414 debug!(
415 self.logger,
416 "Current Era is {current_era} (Epoch {}).",
417 token.get_current_epoch()
418 );
419
420 if token.get_next_supported_era().is_err() {
421 let era_name = &token.get_next_era_marker().unwrap().name;
422 warn!(
423 self.logger,
424 "Upcoming Era '{era_name}' is not supported by this version of the software. Please update!"
425 );
426 }
427
428 Ok(())
429 }
430
431 async fn precompute_epoch_data(&self) -> StdResult<()> {
432 debug!(self.logger, ">> precompute_epoch_data");
433 self.dependencies
434 .epoch_service
435 .write()
436 .await
437 .precompute_epoch_data()
438 .await?;
439
440 Ok(())
441 }
442
443 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
444 debug!(self.logger, ">> inform_new_epoch({epoch:?})");
445 self.dependencies.certifier_service.inform_epoch(epoch).await?;
446
447 self.dependencies
448 .epoch_service
449 .write()
450 .await
451 .inform_epoch(epoch)
452 .await?;
453
454 Ok(())
455 }
456
457 async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
458 debug!(self.logger, ">> upkeep");
459 self.dependencies.upkeep_service.run(epoch).await
460 }
461
462 async fn create_open_message(
463 &self,
464 signed_entity_type: &SignedEntityType,
465 protocol_message: &ProtocolMessage,
466 ) -> StdResult<OpenMessage> {
467 debug!(self.logger, ">> create_open_message");
468 self.dependencies
469 .certifier_service
470 .create_open_message(signed_entity_type, protocol_message)
471 .await
472 }
473
474 async fn is_open_message_outdated(
475 &self,
476 open_message_signed_entity_type: SignedEntityType,
477 last_time_point: &TimePoint,
478 ) -> StdResult<bool> {
479 let current_open_message = self
480 .get_current_open_message_for_signed_entity_type(
481 &open_message_signed_entity_type,
482 )
483 .await
484 .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
485 let is_expired_open_message =
486 current_open_message.as_ref().map(|om| om.is_expired).unwrap_or(false);
487
488 let exists_newer_open_message = {
489 let new_signed_entity_type = self
490 .dependencies
491 .epoch_service
492 .read()
493 .await
494 .signed_entity_config()?
495 .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
496 new_signed_entity_type != open_message_signed_entity_type
497 };
498
499 Ok(exists_newer_open_message || is_expired_open_message)
500 }
501
502 fn increment_runtime_cycle_success_since_startup_counter(&self) {
503 self.dependencies
504 .metrics_service
505 .get_runtime_cycle_success_since_startup()
506 .increment();
507 }
508
509 fn increment_runtime_cycle_total_since_startup_counter(&self) {
510 self.dependencies
511 .metrics_service
512 .get_runtime_cycle_total_since_startup()
513 .increment();
514 }
515
516 async fn synchronize_follower_aggregator_certificate_chain(
517 &self,
518 force_sync: bool,
519 ) -> StdResult<()> {
520 debug!(
521 self.logger,
522 ">> synchronize_follower_aggregator_certificate_chain(force_sync:{force_sync})"
523 );
524 self.dependencies
525 .certificate_chain_synchronizer
526 .synchronize_certificate_chain(force_sync)
527 .await
528 }
529}
530
531#[cfg(test)]
532pub mod tests {
533 use async_trait::async_trait;
534 use chrono::{DateTime, Utc};
535 use mockall::mock;
536 use mockall::predicate::eq;
537 use std::path::PathBuf;
538 use std::sync::Arc;
539 use tokio::sync::RwLock;
540
541 use mithril_cardano_node_chain::test::double::FakeChainObserver;
542 use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
543 use mithril_common::{
544 StdResult,
545 entities::{
546 CardanoTransactionsSigningConfig, ChainPoint, Epoch, ProtocolMessage,
547 SignedEntityConfig, SignedEntityType, SignedEntityTypeDiscriminants, StakeDistribution,
548 TimePoint,
549 },
550 signable_builder::SignableBuilderService,
551 temp_dir,
552 test::{
553 builder::MithrilFixtureBuilder,
554 double::{Dummy, fake_data},
555 },
556 };
557 use mithril_persistence::store::StakeStorer;
558 use mithril_signed_entity_lock::SignedEntityTypeLock;
559 use mithril_ticker::MithrilTickerService;
560
561 use crate::{
562 MithrilSignerRegistrationLeader, ServeCommandConfiguration,
563 ServeCommandDependenciesContainer, SignerRegistrationRound,
564 dependency_injection::DependenciesBuilder,
565 entities::{AggregatorEpochSettings, OpenMessage},
566 initialize_dependencies,
567 runtime::{AggregatorRunner, AggregatorRunnerTrait},
568 services::{
569 FakeEpochService, FakeEpochServiceBuilder, MithrilStakeDistributionService,
570 MockCertifierService, MockUpkeepService,
571 },
572 };
573
574 mock! {
575 SignableBuilderServiceImpl { }
576
577 #[async_trait]
578 impl SignableBuilderService for SignableBuilderServiceImpl
579 {
580
581 async fn compute_protocol_message(
582 &self,
583 signed_entity_type: SignedEntityType,
584 ) -> StdResult<ProtocolMessage>;
585 }
586 }
587
588 async fn build_runner_with_fixture_data(
589 deps: ServeCommandDependenciesContainer,
590 ) -> AggregatorRunner {
591 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
592 let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
593 deps.init_state_from_fixture(
594 &fixture,
595 &CardanoTransactionsSigningConfig::dummy(),
596 &[
597 current_epoch.offset_to_signer_retrieval_epoch().unwrap(),
598 current_epoch,
599 current_epoch.next(),
600 ],
601 )
602 .await;
603
604 AggregatorRunner::new(Arc::new(deps))
605 }
606
607 async fn build_runner(
608 temp_dir: PathBuf,
609 mock_certifier_service: MockCertifierService,
610 ) -> AggregatorRunner {
611 build_runner_with_discriminants(temp_dir, mock_certifier_service, vec![]).await
612 }
613
614 async fn build_runner_with_discriminants(
615 temp_dir: PathBuf,
616 mock_certifier_service: MockCertifierService,
617 allowed_discriminants: Vec<SignedEntityTypeDiscriminants>,
618 ) -> AggregatorRunner {
619 let mut deps = initialize_dependencies(temp_dir).await;
620 deps.certifier_service = Arc::new(mock_certifier_service);
621
622 let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
623 mock_signable_builder_service
624 .expect_compute_protocol_message()
625 .return_once(|_| Ok(ProtocolMessage::default()));
626 deps.signable_builder_service = Arc::new(mock_signable_builder_service);
627
628 if !allowed_discriminants.is_empty() {
630 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
631 let epoch_service = FakeEpochServiceBuilder {
632 signed_entity_config: SignedEntityConfig {
633 allowed_discriminants: allowed_discriminants.into_iter().collect(),
634 ..SignedEntityConfig::dummy()
635 },
636 ..FakeEpochServiceBuilder::dummy(current_epoch)
637 }
638 .build();
639 deps.epoch_service = Arc::new(RwLock::new(epoch_service));
640 }
641
642 let runner = build_runner_with_fixture_data(deps).await;
643
644 let current_epoch = runner.dependencies.ticker_service.get_current_epoch().await.unwrap();
645 runner.inform_new_epoch(current_epoch).await.unwrap();
646 runner.precompute_epoch_data().await.unwrap();
647 runner
648 }
649
650 fn init_certifier_service_mock(
651 mock_certifier_service: &mut MockCertifierService,
652 messages: Vec<OpenMessage>,
653 ) {
654 for message in messages {
655 mock_certifier_service
656 .expect_get_open_message()
657 .return_once(|_| Ok(Some(message)))
658 .times(1);
659 }
660 mock_certifier_service
662 .expect_get_open_message()
663 .returning(|_| Ok(None));
664
665 mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
666 mock_certifier_service
667 .expect_mark_open_message_if_expired()
668 .returning(|_| Ok(None));
669 }
670
671 fn create_open_message(
672 is_certified: IsCertified,
673 is_expired: IsExpired,
674 signed_entity_type: SignedEntityType,
675 ) -> OpenMessage {
676 OpenMessage {
677 signed_entity_type,
678 is_certified: is_certified == IsCertified::Yes,
679 is_expired: is_expired == IsExpired::Yes,
680 ..OpenMessage::dummy()
681 }
682 }
683
684 #[derive(Eq, PartialEq)]
685 enum IsCertified {
686 Yes,
687 No,
688 }
689
690 #[derive(Eq, PartialEq)]
691 enum IsExpired {
692 Yes,
693 No,
694 }
695
696 #[tokio::test]
697 async fn test_get_time_point_from_chain() {
698 let expected = TimePoint::new(2, 17, ChainPoint::dummy());
699 let mut dependencies = initialize_dependencies!().await;
700 let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
701 immutable_file_observer
702 .shall_return(Some(expected.immutable_file_number))
703 .await;
704 let ticker_service = Arc::new(MithrilTickerService::new(
705 Arc::new(FakeChainObserver::new(Some(expected.clone()))),
706 immutable_file_observer,
707 ));
708 dependencies.ticker_service = ticker_service;
709 let runner = AggregatorRunner::new(Arc::new(dependencies));
710
711 let res = runner.get_time_point_from_chain().await;
713 assert_eq!(expected, res.unwrap());
714 }
715
716 #[tokio::test]
717 async fn test_update_stake_distribution() {
718 let chain_observer = Arc::new(FakeChainObserver::default());
719 let deps = {
720 let mut deps = initialize_dependencies!().await;
721 deps.chain_observer = chain_observer.clone();
722 deps.stake_distribution_service = Arc::new(MithrilStakeDistributionService::new(
723 deps.stake_store.clone(),
724 chain_observer.clone(),
725 ));
726 Arc::new(deps)
727 };
728 let runner = AggregatorRunner::new(deps.clone());
729 let time_point = runner.get_time_point_from_chain().await.unwrap();
730 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
731 let expected = fixture.stake_distribution();
732
733 chain_observer.set_signers(fixture.signers_with_stake()).await;
734 runner
735 .update_stake_distribution(&time_point)
736 .await
737 .expect("updating stake distribution should not return an error");
738
739 let saved_stake_distribution = deps
740 .stake_store
741 .get_stakes(time_point.epoch.offset_to_recording_epoch())
742 .await
743 .unwrap()
744 .unwrap_or_else(|| {
745 panic!(
746 "I should have a stake distribution for the epoch {:?}",
747 time_point.epoch
748 )
749 });
750
751 assert_eq!(expected, saved_stake_distribution);
752 }
753
754 #[tokio::test]
755 async fn test_open_signer_registration_round() {
756 let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
757 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
758
759 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
760 builder.get_verification_key_store().await.unwrap(),
761 builder.get_signer_store().await.unwrap(),
762 builder.get_signer_registration_verifier().await.unwrap(),
763 ));
764 let mut deps = builder.build_serve_dependencies_container().await.unwrap();
765 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
766 let stake_store = deps.stake_store.clone();
767 let deps = Arc::new(deps);
768 let runner = AggregatorRunner::new(deps.clone());
769
770 let time_point = TimePoint::dummy();
771 let recording_epoch = time_point.epoch.offset_to_recording_epoch();
772 let stake_distribution: StakeDistribution =
773 StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
774
775 stake_store
776 .save_stakes(recording_epoch, stake_distribution.clone())
777 .await
778 .expect("Save Stake distribution should not fail");
779
780 runner
781 .open_signer_registration_round(&time_point)
782 .await
783 .expect("opening signer registration should not return an error");
784
785 let saved_current_round = signer_registration_round_opener.get_current_round().await;
786
787 let expected_signer_registration_round =
788 SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
789
790 assert_eq!(
791 Some(expected_signer_registration_round),
792 saved_current_round,
793 );
794 }
795
796 #[tokio::test]
797 async fn test_close_signer_registration_round() {
798 let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
799 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
800
801 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
802 builder.get_verification_key_store().await.unwrap(),
803 builder.get_signer_store().await.unwrap(),
804 builder.get_signer_registration_verifier().await.unwrap(),
805 ));
806 let mut deps = builder.build_serve_dependencies_container().await.unwrap();
807 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
808 let deps = Arc::new(deps);
809 let runner = AggregatorRunner::new(deps.clone());
810
811 let time_point = TimePoint::dummy();
812 runner
813 .open_signer_registration_round(&time_point)
814 .await
815 .expect("opening signer registration should not return an error");
816
817 runner
818 .close_signer_registration_round()
819 .await
820 .expect("closing signer registration should not return an error");
821
822 let saved_current_round = signer_registration_round_opener.get_current_round().await;
823 assert!(saved_current_round.is_none());
824 }
825
826 #[tokio::test]
827 async fn test_expire_open_message() {
828 let open_message_expected = OpenMessage {
829 signed_entity_type: SignedEntityType::dummy(),
830 is_certified: false,
831 is_expired: false,
832 expires_at: Some(
833 DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
834 .unwrap()
835 .with_timezone(&Utc),
836 ),
837 ..OpenMessage::dummy()
838 };
839 let open_message_clone = open_message_expected.clone();
840
841 let mut mock_certifier_service = MockCertifierService::new();
842 mock_certifier_service
843 .expect_mark_open_message_if_expired()
844 .return_once(|_| Ok(Some(open_message_clone)));
845
846 let mut deps = initialize_dependencies!().await;
847 deps.certifier_service = Arc::new(mock_certifier_service);
848
849 let runner = build_runner_with_fixture_data(deps).await;
850 let open_message_expired = runner
851 .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
852 .await
853 .expect("mark_open_message_if_expired should not fail");
854
855 assert_eq!(Some(open_message_expected), open_message_expired);
856 }
857
858 #[tokio::test]
859 async fn test_update_era_checker() {
860 let deps = initialize_dependencies!().await;
861 let ticker_service = deps.ticker_service.clone();
862 let era_checker = deps.era_checker.clone();
863 let mut time_point = ticker_service.get_current_time_point().await.unwrap();
864
865 assert_eq!(time_point.epoch, era_checker.current_epoch());
866 let runner = AggregatorRunner::new(Arc::new(deps));
867 time_point.epoch += 1;
868
869 runner.update_era_checker(time_point.epoch).await.unwrap();
870 assert_eq!(time_point.epoch, era_checker.current_epoch());
871 }
872
873 #[tokio::test]
874 async fn test_inform_new_epoch() {
875 let mut mock_certifier_service = MockCertifierService::new();
876 mock_certifier_service
877 .expect_inform_epoch()
878 .returning(|_| Ok(()))
879 .times(1);
880 let mut deps = initialize_dependencies!().await;
881 let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
882
883 deps.certifier_service = Arc::new(mock_certifier_service);
884 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
885 current_epoch,
886 &MithrilFixtureBuilder::default().build(),
887 )));
888
889 let runner = AggregatorRunner::new(Arc::new(deps));
890
891 runner.inform_new_epoch(current_epoch).await.unwrap();
892 }
893
894 #[tokio::test]
895 async fn test_upkeep_calls_run_on_upkeep_service() {
896 let mut upkeep_service = MockUpkeepService::new();
897 upkeep_service
898 .expect_run()
899 .with(eq(Epoch(5)))
900 .returning(|_| Ok(()))
901 .times(1);
902
903 let mut deps = initialize_dependencies!().await;
904 deps.upkeep_service = Arc::new(upkeep_service);
905
906 let runner = AggregatorRunner::new(Arc::new(deps));
907
908 runner.upkeep(Epoch(5)).await.unwrap();
909 }
910
911 #[tokio::test]
912 async fn test_update_epoch_settings() {
913 let mut mock_certifier_service = MockCertifierService::new();
914 mock_certifier_service
915 .expect_inform_epoch()
916 .returning(|_| Ok(()))
917 .times(1);
918
919 let config = ServeCommandConfiguration::new_sample(temp_dir!());
920 let mut deps = DependenciesBuilder::new_with_stdout_logger(Arc::new(config.clone()))
921 .build_serve_dependencies_container()
922 .await
923 .unwrap();
924 deps.certifier_service = Arc::new(mock_certifier_service);
925 let epoch_settings_storer = deps.epoch_settings_storer.clone();
926 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
927 let insert_epoch = current_epoch.offset_to_epoch_settings_recording_epoch();
928
929 let runner = build_runner_with_fixture_data(deps).await;
930 runner.inform_new_epoch(current_epoch).await.unwrap();
931 runner
932 .update_epoch_settings()
933 .await
934 .expect("update_epoch_settings should not fail");
935
936 let saved_epoch_settings = epoch_settings_storer
937 .get_epoch_settings(insert_epoch)
938 .await
939 .unwrap()
940 .unwrap_or_else(|| panic!("should have epoch settings for epoch {insert_epoch}",));
941
942 assert_eq!(
943 AggregatorEpochSettings {
944 protocol_parameters: config.protocol_parameters.clone(),
945 cardano_transactions_signing_config: config
946 .cardano_transactions_signing_config
947 .clone(),
948 },
949 saved_epoch_settings
950 );
951 }
952
953 #[tokio::test]
954 async fn test_precompute_epoch_data() {
955 let mut deps = initialize_dependencies!().await;
956 let current_epoch = deps.chain_observer.get_current_epoch().await.unwrap().unwrap();
957
958 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
959 current_epoch,
960 &MithrilFixtureBuilder::default().build(),
961 )));
962
963 let runner = AggregatorRunner::new(Arc::new(deps));
964
965 runner.precompute_epoch_data().await.unwrap();
966 }
967
968 #[tokio::test]
969 async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists()
970 {
971 let open_message_created = create_open_message(
972 IsCertified::No,
973 IsExpired::No,
974 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
975 );
976 let open_message_expected = open_message_created.clone();
977
978 let runner = {
979 let mut mock_certifier_service = MockCertifierService::new();
980 init_certifier_service_mock(&mut mock_certifier_service, vec![]);
981
982 mock_certifier_service
983 .expect_create_open_message()
984 .return_once(|_, _| Ok(open_message_created))
985 .times(1);
986 build_runner(temp_dir!(), mock_certifier_service).await
987 };
988
989 let open_message_returned = runner
990 .get_current_non_certified_open_message(&TimePoint::dummy())
991 .await
992 .unwrap();
993 assert_eq!(Some(open_message_expected), open_message_returned);
994 }
995
996 #[tokio::test]
997 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired()
998 {
999 let not_certified_and_not_expired = create_open_message(
1000 IsCertified::No,
1001 IsExpired::No,
1002 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1003 );
1004
1005 let open_message_expected = not_certified_and_not_expired.clone();
1006
1007 let runner = {
1008 let mut mock_certifier_service = MockCertifierService::new();
1009 init_certifier_service_mock(
1010 &mut mock_certifier_service,
1011 vec![not_certified_and_not_expired],
1012 );
1013
1014 mock_certifier_service.expect_create_open_message().never();
1015 build_runner(temp_dir!(), mock_certifier_service).await
1016 };
1017
1018 let open_message_returned = runner
1019 .get_current_non_certified_open_message(&TimePoint::dummy())
1020 .await
1021 .unwrap();
1022
1023 assert_eq!(Some(open_message_expected), open_message_returned);
1024 }
1025
1026 #[tokio::test]
1027 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified()
1028 {
1029 let certified_and_not_expired = create_open_message(
1030 IsCertified::Yes,
1031 IsExpired::No,
1032 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1033 );
1034 let not_certified_and_not_expired = create_open_message(
1035 IsCertified::No,
1036 IsExpired::No,
1037 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1038 );
1039
1040 let open_message_expected = not_certified_and_not_expired.clone();
1041
1042 let runner = {
1043 let mut mock_certifier_service = MockCertifierService::new();
1044 init_certifier_service_mock(
1045 &mut mock_certifier_service,
1046 vec![certified_and_not_expired, not_certified_and_not_expired],
1047 );
1048
1049 mock_certifier_service.expect_create_open_message().never();
1050 build_runner_with_discriminants(
1051 temp_dir!(),
1052 mock_certifier_service,
1053 vec![
1054 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1055 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1056 ],
1057 )
1058 .await
1059 };
1060
1061 let open_message_returned = runner
1062 .get_current_non_certified_open_message(&TimePoint::dummy())
1063 .await
1064 .unwrap();
1065
1066 assert_eq!(Some(open_message_expected), open_message_returned);
1067 }
1068
1069 #[tokio::test]
1070 async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified()
1071 {
1072 let certified_and_not_expired = create_open_message(
1073 IsCertified::Yes,
1074 IsExpired::No,
1075 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1076 );
1077 let open_message_created = create_open_message(
1078 IsCertified::No,
1079 IsExpired::No,
1080 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1081 );
1082 let open_message_expected = open_message_created.clone();
1083
1084 let runner = {
1085 let mut mock_certifier_service = MockCertifierService::new();
1086 init_certifier_service_mock(
1087 &mut mock_certifier_service,
1088 vec![certified_and_not_expired],
1089 );
1090
1091 mock_certifier_service
1092 .expect_create_open_message()
1093 .return_once(|_, _| Ok(open_message_created))
1094 .times(1);
1095 build_runner_with_discriminants(
1096 temp_dir!(),
1097 mock_certifier_service,
1098 vec![
1099 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1100 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1101 ],
1102 )
1103 .await
1104 };
1105
1106 let open_message_returned = runner
1107 .get_current_non_certified_open_message(&TimePoint::dummy())
1108 .await
1109 .unwrap();
1110
1111 assert_eq!(Some(open_message_expected), open_message_returned);
1112 }
1113
1114 #[tokio::test]
1115 async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified()
1116 {
1117 let certified_and_not_expired_1 = create_open_message(
1118 IsCertified::Yes,
1119 IsExpired::No,
1120 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1121 );
1122 let certified_and_not_expired_2 = create_open_message(
1123 IsCertified::Yes,
1124 IsExpired::No,
1125 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1126 );
1127
1128 let runner = {
1129 let mut mock_certifier_service = MockCertifierService::new();
1130 init_certifier_service_mock(
1131 &mut mock_certifier_service,
1132 vec![certified_and_not_expired_1, certified_and_not_expired_2],
1133 );
1134
1135 mock_certifier_service.expect_create_open_message().never();
1136 build_runner_with_discriminants(
1137 temp_dir!(),
1138 mock_certifier_service,
1139 vec![
1140 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1141 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1142 ],
1143 )
1144 .await
1145 };
1146
1147 let open_message_returned = runner
1148 .get_current_non_certified_open_message(&TimePoint::dummy())
1149 .await
1150 .unwrap();
1151
1152 assert!(open_message_returned.is_none());
1153 }
1154
1155 #[tokio::test]
1156 async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message()
1157 {
1158 let not_certified_and_expired = create_open_message(
1159 IsCertified::No,
1160 IsExpired::Yes,
1161 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1162 );
1163 let not_certified_and_not_expired = create_open_message(
1164 IsCertified::No,
1165 IsExpired::No,
1166 SignedEntityType::CardanoStakeDistribution(fake_data::epoch()),
1167 );
1168
1169 let open_message_expected = not_certified_and_not_expired.clone();
1170
1171 let runner = {
1172 let mut mock_certifier_service = MockCertifierService::new();
1173 init_certifier_service_mock(
1174 &mut mock_certifier_service,
1175 vec![not_certified_and_expired, not_certified_and_not_expired],
1176 );
1177
1178 mock_certifier_service.expect_create_open_message().never();
1179 build_runner_with_discriminants(
1180 temp_dir!(),
1181 mock_certifier_service,
1182 vec![
1183 SignedEntityTypeDiscriminants::MithrilStakeDistribution,
1184 SignedEntityTypeDiscriminants::CardanoStakeDistribution,
1185 ],
1186 )
1187 .await
1188 };
1189
1190 let open_message_returned = runner
1191 .get_current_non_certified_open_message(&TimePoint::dummy())
1192 .await
1193 .unwrap();
1194
1195 assert_eq!(Some(open_message_expected), open_message_returned);
1196 }
1197
1198 #[tokio::test]
1199 async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution() {
1200 let mut mock_certifier_service = MockCertifierService::new();
1201
1202 mock_certifier_service
1203 .expect_get_open_message()
1204 .with(eq(SignedEntityType::MithrilStakeDistribution(
1205 TimePoint::dummy().epoch,
1206 )))
1207 .times(1)
1208 .return_once(|_| {
1209 Ok(Some(create_open_message(
1210 IsCertified::Yes,
1211 IsExpired::No,
1212 SignedEntityType::MithrilStakeDistribution(fake_data::epoch()),
1213 )))
1214 });
1215
1216 mock_certifier_service.expect_create_open_message().never();
1217
1218 mock_certifier_service.expect_inform_epoch().return_once(|_| Ok(()));
1219 mock_certifier_service
1220 .expect_mark_open_message_if_expired()
1221 .returning(|_| Ok(None));
1222
1223 let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1224
1225 runner
1226 .get_current_non_certified_open_message(&TimePoint::dummy())
1227 .await
1228 .unwrap();
1229 }
1230
1231 #[tokio::test]
1232 async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1233 let runner = {
1234 let mut dependencies = initialize_dependencies!().await;
1235 let epoch_service = FakeEpochServiceBuilder {
1236 signed_entity_config: SignedEntityConfig {
1237 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1238 ..SignedEntityConfig::dummy()
1239 },
1240 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1241 }
1242 .build();
1243 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1244 dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1245 AggregatorRunner::new(Arc::new(dependencies))
1246 };
1247
1248 let time_point = TimePoint::dummy();
1249 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1250 .list_available_signed_entity_types(&time_point)
1251 .await
1252 .unwrap()
1253 .into_iter()
1254 .map(Into::into)
1255 .collect();
1256
1257 assert_eq!(
1258 signed_entities,
1259 SignedEntityTypeDiscriminants::all().into_iter().collect::<Vec<_>>()
1260 );
1261 }
1262
1263 #[tokio::test]
1264 async fn list_available_signed_entity_types_exclude_locked_entities() {
1265 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1266 let runner = {
1267 let mut dependencies = initialize_dependencies!().await;
1268 dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1269 let epoch_service = FakeEpochServiceBuilder {
1270 signed_entity_config: SignedEntityConfig {
1271 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1272 ..SignedEntityConfig::dummy()
1273 },
1274 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1275 }
1276 .build();
1277 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1278
1279 AggregatorRunner::new(Arc::new(dependencies))
1280 };
1281
1282 signed_entity_type_lock
1283 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1284 .await;
1285
1286 let time_point = TimePoint::dummy();
1287 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1288 .list_available_signed_entity_types(&time_point)
1289 .await
1290 .unwrap()
1291 .into_iter()
1292 .map(Into::into)
1293 .collect();
1294
1295 assert!(!signed_entities.is_empty());
1296 assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1297 }
1298
1299 #[tokio::test]
1300 async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message()
1301 {
1302 assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1303 }
1304
1305 #[tokio::test]
1306 async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message()
1307 {
1308 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1309 }
1310
1311 #[tokio::test]
1312 async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message()
1313 {
1314 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1315 }
1316
1317 #[tokio::test]
1318 async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message()
1319 {
1320 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1321 }
1322
1323 async fn is_outdated_returned_when(
1324 tmp_path: PathBuf,
1325 is_expired: IsExpired,
1326 newer_open_message: bool,
1327 ) -> bool {
1328 let current_time_point = TimePoint {
1329 epoch: Epoch(2),
1330 ..TimePoint::dummy()
1331 };
1332
1333 let message_epoch = if newer_open_message {
1334 current_time_point.epoch + 54
1335 } else {
1336 current_time_point.epoch
1337 };
1338 let open_message_to_verify = OpenMessage {
1339 signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1340 is_expired: is_expired == IsExpired::Yes,
1341 ..OpenMessage::dummy()
1342 };
1343
1344 let runner = {
1345 let mut deps = initialize_dependencies(tmp_path).await;
1346 let mut mock_certifier_service = MockCertifierService::new();
1347
1348 let open_message_current = open_message_to_verify.clone();
1349 mock_certifier_service
1350 .expect_get_open_message()
1351 .times(1)
1352 .return_once(|_| Ok(Some(open_message_current)));
1353 mock_certifier_service
1354 .expect_mark_open_message_if_expired()
1355 .returning(|_| Ok(None));
1356
1357 deps.certifier_service = Arc::new(mock_certifier_service);
1358
1359 let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1360 deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1361
1362 build_runner_with_fixture_data(deps).await
1363 };
1364
1365 runner
1366 .is_open_message_outdated(
1367 open_message_to_verify.signed_entity_type,
1368 ¤t_time_point,
1369 )
1370 .await
1371 .unwrap()
1372 }
1373}