1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{debug, warn, Logger};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
8use mithril_common::logging::LoggerExtensions;
9use mithril_common::StdResult;
10use mithril_persistence::store::StakeStorer;
11
12use crate::entities::OpenMessage;
13use crate::ServeCommandDependenciesContainer;
14
15#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18 pub interval: Duration,
20
21 pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26 pub fn new(interval: Duration, is_follower: bool) -> Self {
28 Self {
29 interval,
30 is_follower,
31 }
32 }
33}
34
35#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42 async fn get_current_open_message_for_signed_entity_type(
44 &self,
45 signed_entity_type: &SignedEntityType,
46 ) -> StdResult<Option<OpenMessage>>;
47
48 async fn get_current_non_certified_open_message(
50 &self,
51 current_time_point: &TimePoint,
52 ) -> StdResult<Option<OpenMessage>>;
53
54 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63 async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66 async fn is_follower_aggregator_at_same_epoch_as_leader(
68 &self,
69 time_point: &TimePoint,
70 ) -> StdResult<bool>;
71
72 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75 async fn update_epoch_settings(&self) -> StdResult<()>;
77
78 async fn compute_protocol_message(
80 &self,
81 signed_entity_type: &SignedEntityType,
82 ) -> StdResult<ProtocolMessage>;
83
84 async fn mark_open_message_if_expired(
86 &self,
87 signed_entity_type: &SignedEntityType,
88 ) -> StdResult<Option<OpenMessage>>;
89
90 async fn create_certificate(
92 &self,
93 signed_entity_type: &SignedEntityType,
94 ) -> StdResult<Option<Certificate>>;
95
96 async fn create_artifact(
98 &self,
99 signed_entity_type: &SignedEntityType,
100 certificate: &Certificate,
101 ) -> StdResult<()>;
102
103 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
105
106 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
108
109 async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
111
112 async fn precompute_epoch_data(&self) -> StdResult<()>;
114
115 async fn create_open_message(
117 &self,
118 signed_entity_type: &SignedEntityType,
119 protocol_message: &ProtocolMessage,
120 ) -> StdResult<OpenMessage>;
121
122 async fn is_open_message_outdated(
124 &self,
125 open_message_signed_entity_type: SignedEntityType,
126 last_time_point: &TimePoint,
127 ) -> StdResult<bool>;
128
129 fn increment_runtime_cycle_success_since_startup_counter(&self);
131
132 fn increment_runtime_cycle_total_since_startup_counter(&self);
134}
135
136pub struct AggregatorRunner {
139 dependencies: Arc<ServeCommandDependenciesContainer>,
140 logger: Logger,
141}
142
143impl AggregatorRunner {
144 pub fn new(dependencies: Arc<ServeCommandDependenciesContainer>) -> Self {
146 let logger = dependencies.root_logger.new_with_component_name::<Self>();
147 Self {
148 dependencies,
149 logger,
150 }
151 }
152
153 async fn list_available_signed_entity_types(
154 &self,
155 time_point: &TimePoint,
156 ) -> StdResult<Vec<SignedEntityType>> {
157 let signed_entity_types = self
158 .dependencies
159 .epoch_service
160 .read()
161 .await
162 .signed_entity_config()?
163 .list_allowed_signed_entity_types(time_point)?;
164 let unlocked_signed_entities = self
165 .dependencies
166 .signed_entity_type_lock
167 .filter_unlocked_entries(signed_entity_types)
168 .await;
169
170 Ok(unlocked_signed_entities)
171 }
172}
173
174#[cfg_attr(test, mockall::automock)]
175#[async_trait]
176impl AggregatorRunnerTrait for AggregatorRunner {
177 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
179 debug!(self.logger, ">> get_time_point_from_chain");
180 let time_point = self
181 .dependencies
182 .ticker_service
183 .get_current_time_point()
184 .await?;
185
186 Ok(time_point)
187 }
188
189 async fn get_current_open_message_for_signed_entity_type(
190 &self,
191 signed_entity_type: &SignedEntityType,
192 ) -> StdResult<Option<OpenMessage>> {
193 debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
194 self.mark_open_message_if_expired(signed_entity_type)
195 .await?;
196
197 Ok(self
198 .dependencies
199 .certifier_service
200 .get_open_message(signed_entity_type)
201 .await
202 .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
203 }
204
205 async fn get_current_non_certified_open_message(
206 &self,
207 current_time_point: &TimePoint,
208 ) -> StdResult<Option<OpenMessage>> {
209 debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
210 let signed_entity_types = self
211 .list_available_signed_entity_types(current_time_point)
212 .await?;
213
214 for signed_entity_type in signed_entity_types {
215 let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
216 .await
217 .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
218 match current_open_message {
219 None => {
220 let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
221 let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
222 .await
223 .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
224
225 return Ok(Some(open_message_new));
226 }
227 Some(open_message) => {
228 if !open_message.is_certified && !open_message.is_expired {
229 return Ok(Some(open_message));
230 }
231 }
232 }
233 }
234
235 Ok(None)
236 }
237
238 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
239 debug!(self.logger, ">> is_certificate_chain_valid");
240 self.dependencies
241 .certifier_service
242 .verify_certificate_chain(time_point.epoch)
243 .await?;
244
245 Ok(())
246 }
247
248 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
249 debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
250 self.dependencies
251 .stake_distribution_service
252 .update_stake_distribution()
253 .await
254 .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
255 }
256
257 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
258 debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
259 let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
260
261 let stakes = self
262 .dependencies
263 .stake_store
264 .get_stakes(registration_epoch)
265 .await?
266 .unwrap_or_default();
267
268 self.dependencies
269 .signer_registration_round_opener
270 .open_registration_round(registration_epoch, stakes)
271 .await
272 }
273
274 async fn close_signer_registration_round(&self) -> StdResult<()> {
275 debug!(self.logger, ">> close_signer_registration_round");
276 self.dependencies
277 .signer_registration_round_opener
278 .close_registration_round()
279 .await
280 }
281
282 async fn is_follower_aggregator_at_same_epoch_as_leader(
283 &self,
284 time_point: &TimePoint,
285 ) -> StdResult<bool> {
286 self.dependencies
287 .signer_synchronizer
288 .can_synchronize_signers(time_point.epoch)
289 .await
290 .map_err(|e| e.into())
291 }
292
293 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
294 self.dependencies
295 .signer_synchronizer
296 .synchronize_all_signers()
297 .await
298 .map_err(|e| e.into())
299 }
300
301 async fn update_epoch_settings(&self) -> StdResult<()> {
302 debug!(self.logger, ">> update_epoch_settings");
303 self.dependencies
304 .epoch_service
305 .write()
306 .await
307 .update_epoch_settings()
308 .await
309 }
310
311 async fn compute_protocol_message(
312 &self,
313 signed_entity_type: &SignedEntityType,
314 ) -> StdResult<ProtocolMessage> {
315 debug!(self.logger, ">> compute_protocol_message");
316 let protocol_message = self
317 .dependencies
318 .signable_builder_service
319 .compute_protocol_message(signed_entity_type.to_owned())
320 .await
321 .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
322
323 Ok(protocol_message)
324 }
325
326 async fn mark_open_message_if_expired(
327 &self,
328 signed_entity_type: &SignedEntityType,
329 ) -> StdResult<Option<OpenMessage>> {
330 debug!(self.logger, ">> mark_open_message_if_expired");
331 let expired_open_message = self
332 .dependencies
333 .certifier_service
334 .mark_open_message_if_expired(signed_entity_type)
335 .await
336 .with_context(|| "CertifierService can not mark expired open message")?;
337
338 debug!(
339 self.logger, "Marked expired open messages";
340 "expired_open_message" => ?expired_open_message
341 );
342
343 Ok(expired_open_message)
344 }
345
346 async fn create_certificate(
347 &self,
348 signed_entity_type: &SignedEntityType,
349 ) -> StdResult<Option<Certificate>> {
350 debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
351
352 let certificate = self.dependencies
353 .certifier_service
354 .create_certificate(signed_entity_type)
355 .await
356 .with_context(|| {
357 format!(
358 "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
359 )
360 })?;
361
362 if certificate.is_some() {
363 self.dependencies
364 .metrics_service
365 .get_certificate_total_produced_since_startup()
366 .increment();
367 }
368
369 Ok(certificate)
370 }
371
372 async fn create_artifact(
373 &self,
374 signed_entity_type: &SignedEntityType,
375 certificate: &Certificate,
376 ) -> StdResult<()> {
377 debug!(
378 self.logger, ">> create_artifact";
379 "signed_entity_type" => ?signed_entity_type,
380 "certificate_hash" => &certificate.hash
381 );
382
383 self.dependencies
384 .signed_entity_service
385 .create_artifact(signed_entity_type.to_owned(), certificate)
386 .await
387 .with_context(|| {
388 format!(
389 "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
390 certificate.hash
391 )
392 })?;
393
394 Ok(())
395 }
396
397 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
398 debug!(self.logger, ">> update_era_checker({epoch:?})");
399 let token = self
400 .dependencies
401 .era_reader
402 .read_era_epoch_token(epoch)
403 .await
404 .with_context(|| {
405 format!(
406 "EraReader can not get era epoch token for current epoch: '{}'",
407 epoch
408 )
409 })?;
410
411 let current_era = token
412 .get_current_supported_era()
413 .with_context(|| "EraEpochToken can not get current supported era")?;
414 self.dependencies
415 .era_checker
416 .change_era(current_era, token.get_current_epoch());
417 debug!(
418 self.logger,
419 "Current Era is {current_era} (Epoch {}).",
420 token.get_current_epoch()
421 );
422
423 if token.get_next_supported_era().is_err() {
424 let era_name = &token.get_next_era_marker().unwrap().name;
425 warn!(self.logger,"Upcoming Era '{era_name}' is not supported by this version of the software. Please update!");
426 }
427
428 Ok(())
429 }
430
431 async fn precompute_epoch_data(&self) -> StdResult<()> {
432 debug!(self.logger, ">> precompute_epoch_data");
433 self.dependencies
434 .epoch_service
435 .write()
436 .await
437 .precompute_epoch_data()
438 .await?;
439
440 Ok(())
441 }
442
443 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
444 debug!(self.logger, ">> inform_new_epoch({epoch:?})");
445 self.dependencies
446 .certifier_service
447 .inform_epoch(epoch)
448 .await?;
449
450 self.dependencies
451 .epoch_service
452 .write()
453 .await
454 .inform_epoch(epoch)
455 .await?;
456
457 Ok(())
458 }
459
460 async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
461 debug!(self.logger, ">> upkeep");
462 self.dependencies.upkeep_service.run(epoch).await
463 }
464
465 async fn create_open_message(
466 &self,
467 signed_entity_type: &SignedEntityType,
468 protocol_message: &ProtocolMessage,
469 ) -> StdResult<OpenMessage> {
470 debug!(self.logger, ">> create_open_message");
471 self.dependencies
472 .certifier_service
473 .create_open_message(signed_entity_type, protocol_message)
474 .await
475 }
476
477 async fn is_open_message_outdated(
478 &self,
479 open_message_signed_entity_type: SignedEntityType,
480 last_time_point: &TimePoint,
481 ) -> StdResult<bool> {
482 let current_open_message = self
483 .get_current_open_message_for_signed_entity_type(
484 &open_message_signed_entity_type,
485 )
486 .await
487 .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
488 let is_expired_open_message = current_open_message
489 .as_ref()
490 .map(|om| om.is_expired)
491 .unwrap_or(false);
492
493 let exists_newer_open_message = {
494 let new_signed_entity_type = self
495 .dependencies
496 .epoch_service
497 .read()
498 .await
499 .signed_entity_config()?
500 .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
501 new_signed_entity_type != open_message_signed_entity_type
502 };
503
504 Ok(exists_newer_open_message || is_expired_open_message)
505 }
506
507 fn increment_runtime_cycle_success_since_startup_counter(&self) {
508 self.dependencies
509 .metrics_service
510 .get_runtime_cycle_success_since_startup()
511 .increment();
512 }
513
514 fn increment_runtime_cycle_total_since_startup_counter(&self) {
515 self.dependencies
516 .metrics_service
517 .get_runtime_cycle_total_since_startup()
518 .increment();
519 }
520}
521
522#[cfg(test)]
523pub mod tests {
524 use crate::dependency_injection::DependenciesBuilder;
525 use crate::entities::AggregatorEpochSettings;
526 use crate::services::{FakeEpochService, FakeEpochServiceBuilder, MockUpkeepService};
527 use crate::{
528 entities::OpenMessage,
529 initialize_dependencies,
530 runtime::{AggregatorRunner, AggregatorRunnerTrait},
531 services::{MithrilStakeDistributionService, MockCertifierService},
532 MithrilSignerRegistrationLeader, ServeCommandConfiguration,
533 ServeCommandDependenciesContainer, SignerRegistrationRound,
534 };
535 use async_trait::async_trait;
536 use chrono::{DateTime, Utc};
537 use mithril_common::entities::{
538 CardanoTransactionsSigningConfig, ChainPoint, Epoch, SignedEntityConfig,
539 SignedEntityTypeDiscriminants,
540 };
541 use mithril_common::temp_dir;
542 use mithril_common::{
543 chain_observer::FakeObserver,
544 digesters::DumbImmutableFileObserver,
545 entities::{ProtocolMessage, SignedEntityType, StakeDistribution, TimePoint},
546 signable_builder::SignableBuilderService,
547 test_utils::{fake_data, MithrilFixtureBuilder},
548 MithrilTickerService, StdResult,
549 };
550 use mithril_persistence::store::StakeStorer;
551 use mithril_signed_entity_lock::SignedEntityTypeLock;
552 use mockall::predicate::eq;
553 use mockall::{mock, Sequence};
554 use std::path::PathBuf;
555 use std::sync::Arc;
556 use tokio::sync::RwLock;
557
558 mock! {
559 SignableBuilderServiceImpl { }
560
561 #[async_trait]
562 impl SignableBuilderService for SignableBuilderServiceImpl
563 {
564
565 async fn compute_protocol_message(
566 &self,
567 signed_entity_type: SignedEntityType,
568 ) -> StdResult<ProtocolMessage>;
569 }
570 }
571
572 async fn build_runner_with_fixture_data(
573 deps: ServeCommandDependenciesContainer,
574 ) -> AggregatorRunner {
575 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
576 let current_epoch = deps
577 .chain_observer
578 .get_current_epoch()
579 .await
580 .unwrap()
581 .unwrap();
582 deps.init_state_from_fixture(
583 &fixture,
584 &CardanoTransactionsSigningConfig::dummy(),
585 &[
586 current_epoch.offset_to_signer_retrieval_epoch().unwrap(),
587 current_epoch,
588 current_epoch.next(),
589 ],
590 )
591 .await;
592
593 AggregatorRunner::new(Arc::new(deps))
594 }
595
596 async fn build_runner(
597 temp_dir: PathBuf,
598 mock_certifier_service: MockCertifierService,
599 ) -> AggregatorRunner {
600 let mut deps = initialize_dependencies(temp_dir).await;
601 deps.certifier_service = Arc::new(mock_certifier_service);
602
603 let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
604 mock_signable_builder_service
605 .expect_compute_protocol_message()
606 .return_once(|_| Ok(ProtocolMessage::default()));
607 deps.signable_builder_service = Arc::new(mock_signable_builder_service);
608
609 let runner = build_runner_with_fixture_data(deps).await;
610
611 let current_epoch = runner
612 .dependencies
613 .ticker_service
614 .get_current_epoch()
615 .await
616 .unwrap();
617 runner.inform_new_epoch(current_epoch).await.unwrap();
618 runner.precompute_epoch_data().await.unwrap();
619 runner
620 }
621
622 fn init_certifier_service_mock(
623 mock_certifier_service: &mut MockCertifierService,
624 messages: Vec<OpenMessage>,
625 ) {
626 for message in messages {
627 mock_certifier_service
628 .expect_get_open_message()
629 .return_once(|_| Ok(Some(message)))
630 .times(1);
631 }
632 mock_certifier_service
634 .expect_get_open_message()
635 .returning(|_| Ok(None));
636
637 mock_certifier_service
638 .expect_inform_epoch()
639 .return_once(|_| Ok(()));
640 mock_certifier_service
641 .expect_mark_open_message_if_expired()
642 .returning(|_| Ok(None));
643 }
644
645 fn create_open_message(is_certified: IsCertified, is_expired: IsExpired) -> OpenMessage {
646 OpenMessage {
647 signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(fake_data::beacon()),
648 is_certified: is_certified == IsCertified::Yes,
649 is_expired: is_expired == IsExpired::Yes,
650 ..OpenMessage::dummy()
651 }
652 }
653
654 #[derive(Eq, PartialEq)]
655 enum IsCertified {
656 Yes,
657 No,
658 }
659
660 #[derive(Eq, PartialEq)]
661 enum IsExpired {
662 Yes,
663 No,
664 }
665
666 #[tokio::test]
667 async fn test_get_time_point_from_chain() {
668 let expected = TimePoint::new(2, 17, ChainPoint::dummy());
669 let mut dependencies = initialize_dependencies!().await;
670 let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
671 immutable_file_observer
672 .shall_return(Some(expected.immutable_file_number))
673 .await;
674 let ticker_service = Arc::new(MithrilTickerService::new(
675 Arc::new(FakeObserver::new(Some(expected.clone()))),
676 immutable_file_observer,
677 ));
678 dependencies.ticker_service = ticker_service;
679 let runner = AggregatorRunner::new(Arc::new(dependencies));
680
681 let res = runner.get_time_point_from_chain().await;
683 assert_eq!(expected, res.unwrap());
684 }
685
686 #[tokio::test]
687 async fn test_update_stake_distribution() {
688 let chain_observer = Arc::new(FakeObserver::default());
689 let deps = {
690 let mut deps = initialize_dependencies!().await;
691 deps.chain_observer = chain_observer.clone();
692 deps.stake_distribution_service = Arc::new(MithrilStakeDistributionService::new(
693 deps.stake_store.clone(),
694 chain_observer.clone(),
695 ));
696 Arc::new(deps)
697 };
698 let runner = AggregatorRunner::new(deps.clone());
699 let time_point = runner.get_time_point_from_chain().await.unwrap();
700 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
701 let expected = fixture.stake_distribution();
702
703 chain_observer
704 .set_signers(fixture.signers_with_stake())
705 .await;
706 runner
707 .update_stake_distribution(&time_point)
708 .await
709 .expect("updating stake distribution should not return an error");
710
711 let saved_stake_distribution = deps
712 .stake_store
713 .get_stakes(time_point.epoch.offset_to_recording_epoch())
714 .await
715 .unwrap()
716 .unwrap_or_else(|| {
717 panic!(
718 "I should have a stake distribution for the epoch {:?}",
719 time_point.epoch
720 )
721 });
722
723 assert_eq!(expected, saved_stake_distribution);
724 }
725
726 #[tokio::test]
727 async fn test_open_signer_registration_round() {
728 let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
729 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
730
731 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
732 builder.get_verification_key_store().await.unwrap(),
733 builder.get_signer_store().await.unwrap(),
734 builder.get_signer_registration_verifier().await.unwrap(),
735 ));
736 let mut deps = builder.build_serve_dependencies_container().await.unwrap();
737 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
738 let stake_store = deps.stake_store.clone();
739 let deps = Arc::new(deps);
740 let runner = AggregatorRunner::new(deps.clone());
741
742 let time_point = TimePoint::dummy();
743 let recording_epoch = time_point.epoch.offset_to_recording_epoch();
744 let stake_distribution: StakeDistribution =
745 StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
746
747 stake_store
748 .save_stakes(recording_epoch, stake_distribution.clone())
749 .await
750 .expect("Save Stake distribution should not fail");
751
752 runner
753 .open_signer_registration_round(&time_point)
754 .await
755 .expect("opening signer registration should not return an error");
756
757 let saved_current_round = signer_registration_round_opener.get_current_round().await;
758
759 let expected_signer_registration_round =
760 SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
761
762 assert_eq!(
763 Some(expected_signer_registration_round),
764 saved_current_round,
765 );
766 }
767
768 #[tokio::test]
769 async fn test_close_signer_registration_round() {
770 let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
771 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
772
773 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
774 builder.get_verification_key_store().await.unwrap(),
775 builder.get_signer_store().await.unwrap(),
776 builder.get_signer_registration_verifier().await.unwrap(),
777 ));
778 let mut deps = builder.build_serve_dependencies_container().await.unwrap();
779 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
780 let deps = Arc::new(deps);
781 let runner = AggregatorRunner::new(deps.clone());
782
783 let time_point = TimePoint::dummy();
784 runner
785 .open_signer_registration_round(&time_point)
786 .await
787 .expect("opening signer registration should not return an error");
788
789 runner
790 .close_signer_registration_round()
791 .await
792 .expect("closing signer registration should not return an error");
793
794 let saved_current_round = signer_registration_round_opener.get_current_round().await;
795 assert!(saved_current_round.is_none());
796 }
797
798 #[tokio::test]
799 async fn test_expire_open_message() {
800 let open_message_expected = OpenMessage {
801 signed_entity_type: SignedEntityType::dummy(),
802 is_certified: false,
803 is_expired: false,
804 expires_at: Some(
805 DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
806 .unwrap()
807 .with_timezone(&Utc),
808 ),
809 ..OpenMessage::dummy()
810 };
811 let open_message_clone = open_message_expected.clone();
812
813 let mut mock_certifier_service = MockCertifierService::new();
814 mock_certifier_service
815 .expect_mark_open_message_if_expired()
816 .return_once(|_| Ok(Some(open_message_clone)));
817
818 let mut deps = initialize_dependencies!().await;
819 deps.certifier_service = Arc::new(mock_certifier_service);
820
821 let runner = build_runner_with_fixture_data(deps).await;
822 let open_message_expired = runner
823 .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
824 .await
825 .expect("mark_open_message_if_expired should not fail");
826
827 assert_eq!(Some(open_message_expected), open_message_expired);
828 }
829
830 #[tokio::test]
831 async fn test_update_era_checker() {
832 let deps = initialize_dependencies!().await;
833 let ticker_service = deps.ticker_service.clone();
834 let era_checker = deps.era_checker.clone();
835 let mut time_point = ticker_service.get_current_time_point().await.unwrap();
836
837 assert_eq!(time_point.epoch, era_checker.current_epoch());
838 let runner = AggregatorRunner::new(Arc::new(deps));
839 time_point.epoch += 1;
840
841 runner.update_era_checker(time_point.epoch).await.unwrap();
842 assert_eq!(time_point.epoch, era_checker.current_epoch());
843 }
844
845 #[tokio::test]
846 async fn test_inform_new_epoch() {
847 let mut mock_certifier_service = MockCertifierService::new();
848 mock_certifier_service
849 .expect_inform_epoch()
850 .returning(|_| Ok(()))
851 .times(1);
852 let mut deps = initialize_dependencies!().await;
853 let current_epoch = deps
854 .chain_observer
855 .get_current_epoch()
856 .await
857 .unwrap()
858 .unwrap();
859
860 deps.certifier_service = Arc::new(mock_certifier_service);
861 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
862 current_epoch,
863 &MithrilFixtureBuilder::default().build(),
864 )));
865
866 let runner = AggregatorRunner::new(Arc::new(deps));
867
868 runner.inform_new_epoch(current_epoch).await.unwrap();
869 }
870
871 #[tokio::test]
872 async fn test_upkeep_calls_run_on_upkeep_service() {
873 let mut upkeep_service = MockUpkeepService::new();
874 upkeep_service
875 .expect_run()
876 .with(eq(Epoch(5)))
877 .returning(|_| Ok(()))
878 .times(1);
879
880 let mut deps = initialize_dependencies!().await;
881 deps.upkeep_service = Arc::new(upkeep_service);
882
883 let runner = AggregatorRunner::new(Arc::new(deps));
884
885 runner.upkeep(Epoch(5)).await.unwrap();
886 }
887
888 #[tokio::test]
889 async fn test_update_epoch_settings() {
890 let mut mock_certifier_service = MockCertifierService::new();
891 mock_certifier_service
892 .expect_inform_epoch()
893 .returning(|_| Ok(()))
894 .times(1);
895
896 let config = ServeCommandConfiguration::new_sample(temp_dir!());
897 let mut deps = DependenciesBuilder::new_with_stdout_logger(Arc::new(config.clone()))
898 .build_serve_dependencies_container()
899 .await
900 .unwrap();
901 deps.certifier_service = Arc::new(mock_certifier_service);
902 let epoch_settings_storer = deps.epoch_settings_storer.clone();
903 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
904 let insert_epoch = current_epoch.offset_to_epoch_settings_recording_epoch();
905
906 let runner = build_runner_with_fixture_data(deps).await;
907 runner.inform_new_epoch(current_epoch).await.unwrap();
908 runner
909 .update_epoch_settings()
910 .await
911 .expect("update_epoch_settings should not fail");
912
913 let saved_epoch_settings = epoch_settings_storer
914 .get_epoch_settings(insert_epoch)
915 .await
916 .unwrap()
917 .unwrap_or_else(|| panic!("should have epoch settings for epoch {insert_epoch}",));
918
919 assert_eq!(
920 AggregatorEpochSettings {
921 protocol_parameters: config.protocol_parameters.clone(),
922 cardano_transactions_signing_config: config
923 .cardano_transactions_signing_config
924 .clone(),
925 },
926 saved_epoch_settings
927 );
928 }
929
930 #[tokio::test]
931 async fn test_precompute_epoch_data() {
932 let mut deps = initialize_dependencies!().await;
933 let current_epoch = deps
934 .chain_observer
935 .get_current_epoch()
936 .await
937 .unwrap()
938 .unwrap();
939
940 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
941 current_epoch,
942 &MithrilFixtureBuilder::default().build(),
943 )));
944
945 let runner = AggregatorRunner::new(Arc::new(deps));
946
947 runner.precompute_epoch_data().await.unwrap();
948 }
949
950 #[tokio::test]
951 async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists(
952 ) {
953 let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
954 let open_message_expected = open_message_created.clone();
955
956 let runner = {
957 let mut mock_certifier_service = MockCertifierService::new();
958 init_certifier_service_mock(&mut mock_certifier_service, vec![]);
959
960 mock_certifier_service
961 .expect_create_open_message()
962 .return_once(|_, _| Ok(open_message_created))
963 .times(1);
964 build_runner(temp_dir!(), mock_certifier_service).await
965 };
966
967 let open_message_returned = runner
968 .get_current_non_certified_open_message(&TimePoint::dummy())
969 .await
970 .unwrap();
971 assert_eq!(Some(open_message_expected), open_message_returned);
972 }
973
974 #[tokio::test]
975 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired(
976 ) {
977 let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
978
979 let open_message_expected = not_certified_and_not_expired.clone();
980
981 let runner = {
982 let mut mock_certifier_service = MockCertifierService::new();
983 init_certifier_service_mock(
984 &mut mock_certifier_service,
985 vec![not_certified_and_not_expired],
986 );
987
988 mock_certifier_service.expect_create_open_message().never();
989 build_runner(temp_dir!(), mock_certifier_service).await
990 };
991
992 let open_message_returned = runner
993 .get_current_non_certified_open_message(&TimePoint::dummy())
994 .await
995 .unwrap();
996
997 assert_eq!(Some(open_message_expected), open_message_returned);
998 }
999
1000 #[tokio::test]
1001 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified(
1002 ) {
1003 let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
1004 let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
1005
1006 let open_message_expected = not_certified_and_not_expired.clone();
1007
1008 let runner = {
1009 let mut mock_certifier_service = MockCertifierService::new();
1010 init_certifier_service_mock(
1011 &mut mock_certifier_service,
1012 vec![certified_and_not_expired, not_certified_and_not_expired],
1013 );
1014
1015 mock_certifier_service.expect_create_open_message().never();
1016 build_runner(temp_dir!(), mock_certifier_service).await
1017 };
1018
1019 let open_message_returned = runner
1020 .get_current_non_certified_open_message(&TimePoint::dummy())
1021 .await
1022 .unwrap();
1023
1024 assert_eq!(Some(open_message_expected), open_message_returned);
1025 }
1026
1027 #[tokio::test]
1028 async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified(
1029 ) {
1030 let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
1031
1032 let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
1033 let open_message_expected = open_message_created.clone();
1034
1035 let runner = {
1036 let mut mock_certifier_service = MockCertifierService::new();
1037 init_certifier_service_mock(
1038 &mut mock_certifier_service,
1039 vec![certified_and_not_expired],
1040 );
1041
1042 mock_certifier_service
1043 .expect_create_open_message()
1044 .return_once(|_, _| Ok(open_message_created))
1045 .times(1);
1046 build_runner(temp_dir!(), mock_certifier_service).await
1047 };
1048
1049 let open_message_returned = runner
1050 .get_current_non_certified_open_message(&TimePoint::dummy())
1051 .await
1052 .unwrap();
1053
1054 assert_eq!(Some(open_message_expected), open_message_returned);
1055 }
1056
1057 #[tokio::test]
1058 async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified(
1059 ) {
1060 let certified_and_not_expired_1 = create_open_message(IsCertified::Yes, IsExpired::No);
1061 let certified_and_not_expired_2 = create_open_message(IsCertified::Yes, IsExpired::No);
1062
1063 let runner = {
1064 let mut mock_certifier_service = MockCertifierService::new();
1065 init_certifier_service_mock(
1066 &mut mock_certifier_service,
1067 vec![certified_and_not_expired_1, certified_and_not_expired_2],
1068 );
1069
1070 mock_certifier_service.expect_create_open_message().never();
1071 build_runner(temp_dir!(), mock_certifier_service).await
1072 };
1073
1074 let open_message_returned = runner
1075 .get_current_non_certified_open_message(&TimePoint::dummy())
1076 .await
1077 .unwrap();
1078
1079 assert!(open_message_returned.is_none());
1080 }
1081
1082 #[tokio::test]
1083 async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message(
1084 ) {
1085 let not_certified_and_expired = create_open_message(IsCertified::No, IsExpired::Yes);
1086 let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
1087
1088 let open_message_expected = not_certified_and_not_expired.clone();
1089
1090 let runner = {
1091 let mut mock_certifier_service = MockCertifierService::new();
1092 init_certifier_service_mock(
1093 &mut mock_certifier_service,
1094 vec![not_certified_and_expired, not_certified_and_not_expired],
1095 );
1096
1097 mock_certifier_service.expect_create_open_message().never();
1098 build_runner(temp_dir!(), mock_certifier_service).await
1099 };
1100
1101 let open_message_returned = runner
1102 .get_current_non_certified_open_message(&TimePoint::dummy())
1103 .await
1104 .unwrap();
1105
1106 assert_eq!(Some(open_message_expected), open_message_returned);
1107 }
1108
1109 #[tokio::test]
1110 async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution_and_then_for_immutable_file(
1111 ) {
1112 let mut mock_certifier_service = MockCertifierService::new();
1113
1114 let mut seq = Sequence::new();
1115 mock_certifier_service
1116 .expect_get_open_message()
1117 .with(eq(SignedEntityType::MithrilStakeDistribution(
1118 TimePoint::dummy().epoch,
1119 )))
1120 .times(1)
1121 .in_sequence(&mut seq)
1122 .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1123
1124 mock_certifier_service
1125 .expect_get_open_message()
1126 .with(eq(SignedEntityType::CardanoImmutableFilesFull(
1127 fake_data::beacon(),
1128 )))
1129 .times(1)
1130 .in_sequence(&mut seq)
1131 .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1132
1133 mock_certifier_service.expect_create_open_message().never();
1134
1135 mock_certifier_service
1136 .expect_inform_epoch()
1137 .return_once(|_| Ok(()));
1138 mock_certifier_service
1139 .expect_mark_open_message_if_expired()
1140 .returning(|_| Ok(None));
1141
1142 let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1143
1144 runner
1145 .get_current_non_certified_open_message(&TimePoint::dummy())
1146 .await
1147 .unwrap();
1148 }
1149
1150 #[tokio::test]
1151 async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1152 let runner = {
1153 let mut dependencies = initialize_dependencies!().await;
1154 let epoch_service = FakeEpochServiceBuilder {
1155 signed_entity_config: SignedEntityConfig {
1156 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1157 ..SignedEntityConfig::dummy()
1158 },
1159 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1160 }
1161 .build();
1162 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1163 dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1164 AggregatorRunner::new(Arc::new(dependencies))
1165 };
1166
1167 let time_point = TimePoint::dummy();
1168 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1169 .list_available_signed_entity_types(&time_point)
1170 .await
1171 .unwrap()
1172 .into_iter()
1173 .map(Into::into)
1174 .collect();
1175
1176 assert_eq!(
1177 signed_entities,
1178 SignedEntityTypeDiscriminants::all()
1179 .into_iter()
1180 .collect::<Vec<_>>()
1181 );
1182 }
1183
1184 #[tokio::test]
1185 async fn list_available_signed_entity_types_exclude_locked_entities() {
1186 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1187 let runner = {
1188 let mut dependencies = initialize_dependencies!().await;
1189 dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1190 let epoch_service = FakeEpochServiceBuilder {
1191 signed_entity_config: SignedEntityConfig {
1192 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1193 ..SignedEntityConfig::dummy()
1194 },
1195 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1196 }
1197 .build();
1198 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1199
1200 AggregatorRunner::new(Arc::new(dependencies))
1201 };
1202
1203 signed_entity_type_lock
1204 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1205 .await;
1206
1207 let time_point = TimePoint::dummy();
1208 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1209 .list_available_signed_entity_types(&time_point)
1210 .await
1211 .unwrap()
1212 .into_iter()
1213 .map(Into::into)
1214 .collect();
1215
1216 assert!(!signed_entities.is_empty());
1217 assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1218 }
1219
1220 #[tokio::test]
1221 async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message(
1222 ) {
1223 assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1224 }
1225
1226 #[tokio::test]
1227 async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message(
1228 ) {
1229 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1230 }
1231
1232 #[tokio::test]
1233 async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message(
1234 ) {
1235 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1236 }
1237
1238 #[tokio::test]
1239 async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message(
1240 ) {
1241 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1242 }
1243
1244 async fn is_outdated_returned_when(
1245 tmp_path: PathBuf,
1246 is_expired: IsExpired,
1247 newer_open_message: bool,
1248 ) -> bool {
1249 let current_time_point = TimePoint {
1250 epoch: Epoch(2),
1251 ..TimePoint::dummy()
1252 };
1253
1254 let message_epoch = if newer_open_message {
1255 current_time_point.epoch + 54
1256 } else {
1257 current_time_point.epoch
1258 };
1259 let open_message_to_verify = OpenMessage {
1260 signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1261 is_expired: is_expired == IsExpired::Yes,
1262 ..OpenMessage::dummy()
1263 };
1264
1265 let runner = {
1266 let mut deps = initialize_dependencies(tmp_path).await;
1267 let mut mock_certifier_service = MockCertifierService::new();
1268
1269 let open_message_current = open_message_to_verify.clone();
1270 mock_certifier_service
1271 .expect_get_open_message()
1272 .times(1)
1273 .return_once(|_| Ok(Some(open_message_current)));
1274 mock_certifier_service
1275 .expect_mark_open_message_if_expired()
1276 .returning(|_| Ok(None));
1277
1278 deps.certifier_service = Arc::new(mock_certifier_service);
1279
1280 let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1281 deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1282
1283 build_runner_with_fixture_data(deps).await
1284 };
1285
1286 runner
1287 .is_open_message_outdated(
1288 open_message_to_verify.signed_entity_type,
1289 ¤t_time_point,
1290 )
1291 .await
1292 .unwrap()
1293 }
1294}