1use anyhow::Context;
2use async_trait::async_trait;
3use slog::{debug, warn, Logger};
4use std::sync::Arc;
5use std::time::Duration;
6
7use mithril_common::entities::{Certificate, Epoch, ProtocolMessage, SignedEntityType, TimePoint};
8use mithril_common::logging::LoggerExtensions;
9use mithril_common::StdResult;
10use mithril_persistence::store::StakeStorer;
11
12use crate::entities::OpenMessage;
13use crate::ServeCommandDependenciesContainer;
14
15#[derive(Debug, Clone)]
17pub struct AggregatorConfig {
18 pub interval: Duration,
20
21 pub is_follower: bool,
23}
24
25impl AggregatorConfig {
26 pub fn new(interval: Duration, is_follower: bool) -> Self {
28 Self {
29 interval,
30 is_follower,
31 }
32 }
33}
34
35#[async_trait]
38pub trait AggregatorRunnerTrait: Sync + Send {
39 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint>;
41
42 async fn get_current_open_message_for_signed_entity_type(
44 &self,
45 signed_entity_type: &SignedEntityType,
46 ) -> StdResult<Option<OpenMessage>>;
47
48 async fn get_current_non_certified_open_message(
50 &self,
51 current_time_point: &TimePoint,
52 ) -> StdResult<Option<OpenMessage>>;
53
54 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()>;
56
57 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()>;
59
60 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()>;
62
63 async fn close_signer_registration_round(&self) -> StdResult<()>;
65
66 async fn is_follower_aggregator_at_same_epoch_as_leader(
68 &self,
69 time_point: &TimePoint,
70 ) -> StdResult<bool>;
71
72 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
74
75 async fn update_epoch_settings(&self) -> StdResult<()>;
77
78 async fn compute_protocol_message(
80 &self,
81 signed_entity_type: &SignedEntityType,
82 ) -> StdResult<ProtocolMessage>;
83
84 async fn mark_open_message_if_expired(
86 &self,
87 signed_entity_type: &SignedEntityType,
88 ) -> StdResult<Option<OpenMessage>>;
89
90 async fn create_certificate(
92 &self,
93 signed_entity_type: &SignedEntityType,
94 ) -> StdResult<Option<Certificate>>;
95
96 async fn create_artifact(
98 &self,
99 signed_entity_type: &SignedEntityType,
100 certificate: &Certificate,
101 ) -> StdResult<()>;
102
103 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>;
105
106 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
108
109 async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
111
112 async fn precompute_epoch_data(&self) -> StdResult<()>;
114
115 async fn create_open_message(
117 &self,
118 signed_entity_type: &SignedEntityType,
119 protocol_message: &ProtocolMessage,
120 ) -> StdResult<OpenMessage>;
121
122 async fn is_open_message_outdated(
124 &self,
125 open_message_signed_entity_type: SignedEntityType,
126 last_time_point: &TimePoint,
127 ) -> StdResult<bool>;
128
129 fn increment_runtime_cycle_success_since_startup_counter(&self);
131
132 fn increment_runtime_cycle_total_since_startup_counter(&self);
134}
135
136pub struct AggregatorRunner {
139 dependencies: Arc<ServeCommandDependenciesContainer>,
140 logger: Logger,
141}
142
143impl AggregatorRunner {
144 pub fn new(dependencies: Arc<ServeCommandDependenciesContainer>) -> Self {
146 let logger = dependencies.root_logger.new_with_component_name::<Self>();
147 Self {
148 dependencies,
149 logger,
150 }
151 }
152
153 async fn list_available_signed_entity_types(
154 &self,
155 time_point: &TimePoint,
156 ) -> StdResult<Vec<SignedEntityType>> {
157 let signed_entity_types = self
158 .dependencies
159 .epoch_service
160 .read()
161 .await
162 .signed_entity_config()?
163 .list_allowed_signed_entity_types(time_point)?;
164 let unlocked_signed_entities = self
165 .dependencies
166 .signed_entity_type_lock
167 .filter_unlocked_entries(signed_entity_types)
168 .await;
169
170 Ok(unlocked_signed_entities)
171 }
172}
173
174#[cfg_attr(test, mockall::automock)]
175#[async_trait]
176impl AggregatorRunnerTrait for AggregatorRunner {
177 async fn get_time_point_from_chain(&self) -> StdResult<TimePoint> {
179 debug!(self.logger, ">> get_time_point_from_chain");
180 let time_point = self
181 .dependencies
182 .ticker_service
183 .get_current_time_point()
184 .await?;
185
186 Ok(time_point)
187 }
188
189 async fn get_current_open_message_for_signed_entity_type(
190 &self,
191 signed_entity_type: &SignedEntityType,
192 ) -> StdResult<Option<OpenMessage>> {
193 debug!(self.logger,">> get_current_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
194 self.mark_open_message_if_expired(signed_entity_type)
195 .await?;
196
197 Ok(self
198 .dependencies
199 .certifier_service
200 .get_open_message(signed_entity_type)
201 .await
202 .with_context(|| format!("CertifierService can not get open message for signed_entity_type: '{signed_entity_type}'"))?)
203 }
204
205 async fn get_current_non_certified_open_message(
206 &self,
207 current_time_point: &TimePoint,
208 ) -> StdResult<Option<OpenMessage>> {
209 debug!(self.logger,">> get_current_non_certified_open_message"; "time_point" => #?current_time_point);
210 let signed_entity_types = self
211 .list_available_signed_entity_types(current_time_point)
212 .await?;
213
214 for signed_entity_type in signed_entity_types {
215 let current_open_message = self.get_current_open_message_for_signed_entity_type(&signed_entity_type)
216 .await
217 .with_context(|| format!("AggregatorRunner can not get current open message for signed entity type: '{}'", &signed_entity_type))?;
218 match current_open_message {
219 None => {
220 let protocol_message = self.compute_protocol_message(&signed_entity_type).await.with_context(|| format!("AggregatorRunner can not compute protocol message for signed_entity_type: '{signed_entity_type}'"))?;
221 let open_message_new = self.create_open_message(&signed_entity_type, &protocol_message)
222 .await
223 .with_context(|| format!("AggregatorRunner can not create open message for signed_entity_type: '{signed_entity_type}'"))?;
224
225 return Ok(Some(open_message_new));
226 }
227 Some(open_message) => {
228 if !open_message.is_certified && !open_message.is_expired {
229 return Ok(Some(open_message));
230 }
231 }
232 }
233 }
234
235 Ok(None)
236 }
237
238 async fn is_certificate_chain_valid(&self, time_point: &TimePoint) -> StdResult<()> {
239 debug!(self.logger, ">> is_certificate_chain_valid");
240 self.dependencies
241 .certifier_service
242 .verify_certificate_chain(time_point.epoch)
243 .await?;
244
245 Ok(())
246 }
247
248 async fn update_stake_distribution(&self, new_time_point: &TimePoint) -> StdResult<()> {
249 debug!(self.logger,">> update_stake_distribution"; "time_point" => #?new_time_point);
250 self.dependencies
251 .stake_distribution_service
252 .update_stake_distribution()
253 .await
254 .with_context(|| format!("AggregatorRunner could not update stake distribution for time_point: '{new_time_point}'"))
255 }
256
257 async fn open_signer_registration_round(&self, new_time_point: &TimePoint) -> StdResult<()> {
258 debug!(self.logger,">> open_signer_registration_round"; "time_point" => #?new_time_point);
259 let registration_epoch = new_time_point.epoch.offset_to_recording_epoch();
260
261 let stakes = self
262 .dependencies
263 .stake_store
264 .get_stakes(registration_epoch)
265 .await?
266 .unwrap_or_default();
267
268 self.dependencies
269 .signer_registration_round_opener
270 .open_registration_round(registration_epoch, stakes)
271 .await
272 }
273
274 async fn close_signer_registration_round(&self) -> StdResult<()> {
275 debug!(self.logger, ">> close_signer_registration_round");
276 self.dependencies
277 .signer_registration_round_opener
278 .close_registration_round()
279 .await
280 }
281
282 async fn is_follower_aggregator_at_same_epoch_as_leader(
283 &self,
284 time_point: &TimePoint,
285 ) -> StdResult<bool> {
286 self.dependencies
287 .signer_synchronizer
288 .can_synchronize_signers(time_point.epoch)
289 .await
290 .map_err(|e| e.into())
291 }
292
293 async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()> {
294 self.dependencies
295 .signer_synchronizer
296 .synchronize_all_signers()
297 .await
298 .map_err(|e| e.into())
299 }
300
301 async fn update_epoch_settings(&self) -> StdResult<()> {
302 debug!(self.logger, ">> update_epoch_settings");
303 self.dependencies
304 .epoch_service
305 .write()
306 .await
307 .update_epoch_settings()
308 .await
309 }
310
311 async fn compute_protocol_message(
312 &self,
313 signed_entity_type: &SignedEntityType,
314 ) -> StdResult<ProtocolMessage> {
315 debug!(self.logger, ">> compute_protocol_message");
316 let protocol_message = self
317 .dependencies
318 .signable_builder_service
319 .compute_protocol_message(signed_entity_type.to_owned())
320 .await
321 .with_context(|| format!("Runner can not compute protocol message for signed entity type: '{signed_entity_type}'"))?;
322
323 Ok(protocol_message)
324 }
325
326 async fn mark_open_message_if_expired(
327 &self,
328 signed_entity_type: &SignedEntityType,
329 ) -> StdResult<Option<OpenMessage>> {
330 debug!(self.logger, ">> mark_open_message_if_expired");
331 let expired_open_message = self
332 .dependencies
333 .certifier_service
334 .mark_open_message_if_expired(signed_entity_type)
335 .await
336 .with_context(|| "CertifierService can not mark expired open message")?;
337
338 debug!(
339 self.logger, "Marked expired open messages";
340 "expired_open_message" => ?expired_open_message
341 );
342
343 Ok(expired_open_message)
344 }
345
346 async fn create_certificate(
347 &self,
348 signed_entity_type: &SignedEntityType,
349 ) -> StdResult<Option<Certificate>> {
350 debug!(self.logger, ">> create_certificate"; "signed_entity_type" => ?signed_entity_type);
351
352 let certificate = self.dependencies
353 .certifier_service
354 .create_certificate(signed_entity_type)
355 .await
356 .with_context(|| {
357 format!(
358 "CertifierService can not create certificate for signed_entity_type: '{signed_entity_type}'"
359 )
360 })?;
361
362 if certificate.is_some() {
363 self.dependencies
364 .metrics_service
365 .get_certificate_total_produced_since_startup()
366 .increment();
367 }
368
369 Ok(certificate)
370 }
371
372 async fn create_artifact(
373 &self,
374 signed_entity_type: &SignedEntityType,
375 certificate: &Certificate,
376 ) -> StdResult<()> {
377 debug!(
378 self.logger, ">> create_artifact";
379 "signed_entity_type" => ?signed_entity_type,
380 "certificate_hash" => &certificate.hash
381 );
382
383 self.dependencies
384 .signed_entity_service
385 .create_artifact(signed_entity_type.to_owned(), certificate)
386 .await
387 .with_context(|| {
388 format!(
389 "SignedEntityService can not create artifact for signed_entity_type: '{signed_entity_type}' with certificate hash: '{}'",
390 certificate.hash
391 )
392 })?;
393
394 Ok(())
395 }
396
397 async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()> {
398 debug!(self.logger, ">> update_era_checker({epoch:?})");
399 let token = self
400 .dependencies
401 .era_reader
402 .read_era_epoch_token(epoch)
403 .await
404 .with_context(|| {
405 format!("EraReader can not get era epoch token for current epoch: '{epoch}'")
406 })?;
407
408 let current_era = token
409 .get_current_supported_era()
410 .with_context(|| "EraEpochToken can not get current supported era")?;
411 self.dependencies
412 .era_checker
413 .change_era(current_era, token.get_current_epoch());
414 debug!(
415 self.logger,
416 "Current Era is {current_era} (Epoch {}).",
417 token.get_current_epoch()
418 );
419
420 if token.get_next_supported_era().is_err() {
421 let era_name = &token.get_next_era_marker().unwrap().name;
422 warn!(self.logger,"Upcoming Era '{era_name}' is not supported by this version of the software. Please update!");
423 }
424
425 Ok(())
426 }
427
428 async fn precompute_epoch_data(&self) -> StdResult<()> {
429 debug!(self.logger, ">> precompute_epoch_data");
430 self.dependencies
431 .epoch_service
432 .write()
433 .await
434 .precompute_epoch_data()
435 .await?;
436
437 Ok(())
438 }
439
440 async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()> {
441 debug!(self.logger, ">> inform_new_epoch({epoch:?})");
442 self.dependencies
443 .certifier_service
444 .inform_epoch(epoch)
445 .await?;
446
447 self.dependencies
448 .epoch_service
449 .write()
450 .await
451 .inform_epoch(epoch)
452 .await?;
453
454 Ok(())
455 }
456
457 async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
458 debug!(self.logger, ">> upkeep");
459 self.dependencies.upkeep_service.run(epoch).await
460 }
461
462 async fn create_open_message(
463 &self,
464 signed_entity_type: &SignedEntityType,
465 protocol_message: &ProtocolMessage,
466 ) -> StdResult<OpenMessage> {
467 debug!(self.logger, ">> create_open_message");
468 self.dependencies
469 .certifier_service
470 .create_open_message(signed_entity_type, protocol_message)
471 .await
472 }
473
474 async fn is_open_message_outdated(
475 &self,
476 open_message_signed_entity_type: SignedEntityType,
477 last_time_point: &TimePoint,
478 ) -> StdResult<bool> {
479 let current_open_message = self
480 .get_current_open_message_for_signed_entity_type(
481 &open_message_signed_entity_type,
482 )
483 .await
484 .with_context(|| format!("AggregatorRuntime can not get the current open message for signed entity type: '{}'", &open_message_signed_entity_type))?;
485 let is_expired_open_message = current_open_message
486 .as_ref()
487 .map(|om| om.is_expired)
488 .unwrap_or(false);
489
490 let exists_newer_open_message = {
491 let new_signed_entity_type = self
492 .dependencies
493 .epoch_service
494 .read()
495 .await
496 .signed_entity_config()?
497 .time_point_to_signed_entity(&open_message_signed_entity_type, last_time_point)?;
498 new_signed_entity_type != open_message_signed_entity_type
499 };
500
501 Ok(exists_newer_open_message || is_expired_open_message)
502 }
503
504 fn increment_runtime_cycle_success_since_startup_counter(&self) {
505 self.dependencies
506 .metrics_service
507 .get_runtime_cycle_success_since_startup()
508 .increment();
509 }
510
511 fn increment_runtime_cycle_total_since_startup_counter(&self) {
512 self.dependencies
513 .metrics_service
514 .get_runtime_cycle_total_since_startup()
515 .increment();
516 }
517}
518
519#[cfg(test)]
520pub mod tests {
521 use async_trait::async_trait;
522 use chrono::{DateTime, Utc};
523 use mockall::predicate::eq;
524 use mockall::{mock, Sequence};
525 use std::path::PathBuf;
526 use std::sync::Arc;
527 use tokio::sync::RwLock;
528
529 use mithril_cardano_node_chain::test::double::FakeChainObserver;
530 use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
531 use mithril_common::{
532 entities::{
533 CardanoTransactionsSigningConfig, ChainPoint, Epoch, ProtocolMessage,
534 SignedEntityConfig, SignedEntityType, SignedEntityTypeDiscriminants, StakeDistribution,
535 TimePoint,
536 },
537 signable_builder::SignableBuilderService,
538 temp_dir,
539 test_utils::{fake_data, MithrilFixtureBuilder},
540 StdResult,
541 };
542 use mithril_persistence::store::StakeStorer;
543 use mithril_signed_entity_lock::SignedEntityTypeLock;
544 use mithril_ticker::MithrilTickerService;
545
546 use crate::{
547 dependency_injection::DependenciesBuilder,
548 entities::{AggregatorEpochSettings, OpenMessage},
549 initialize_dependencies,
550 runtime::{AggregatorRunner, AggregatorRunnerTrait},
551 services::{
552 FakeEpochService, FakeEpochServiceBuilder, MithrilStakeDistributionService,
553 MockCertifierService, MockUpkeepService,
554 },
555 MithrilSignerRegistrationLeader, ServeCommandConfiguration,
556 ServeCommandDependenciesContainer, SignerRegistrationRound,
557 };
558
559 mock! {
560 SignableBuilderServiceImpl { }
561
562 #[async_trait]
563 impl SignableBuilderService for SignableBuilderServiceImpl
564 {
565
566 async fn compute_protocol_message(
567 &self,
568 signed_entity_type: SignedEntityType,
569 ) -> StdResult<ProtocolMessage>;
570 }
571 }
572
573 async fn build_runner_with_fixture_data(
574 deps: ServeCommandDependenciesContainer,
575 ) -> AggregatorRunner {
576 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
577 let current_epoch = deps
578 .chain_observer
579 .get_current_epoch()
580 .await
581 .unwrap()
582 .unwrap();
583 deps.init_state_from_fixture(
584 &fixture,
585 &CardanoTransactionsSigningConfig::dummy(),
586 &[
587 current_epoch.offset_to_signer_retrieval_epoch().unwrap(),
588 current_epoch,
589 current_epoch.next(),
590 ],
591 )
592 .await;
593
594 AggregatorRunner::new(Arc::new(deps))
595 }
596
597 async fn build_runner(
598 temp_dir: PathBuf,
599 mock_certifier_service: MockCertifierService,
600 ) -> AggregatorRunner {
601 let mut deps = initialize_dependencies(temp_dir).await;
602 deps.certifier_service = Arc::new(mock_certifier_service);
603
604 let mut mock_signable_builder_service = MockSignableBuilderServiceImpl::new();
605 mock_signable_builder_service
606 .expect_compute_protocol_message()
607 .return_once(|_| Ok(ProtocolMessage::default()));
608 deps.signable_builder_service = Arc::new(mock_signable_builder_service);
609
610 let runner = build_runner_with_fixture_data(deps).await;
611
612 let current_epoch = runner
613 .dependencies
614 .ticker_service
615 .get_current_epoch()
616 .await
617 .unwrap();
618 runner.inform_new_epoch(current_epoch).await.unwrap();
619 runner.precompute_epoch_data().await.unwrap();
620 runner
621 }
622
623 fn init_certifier_service_mock(
624 mock_certifier_service: &mut MockCertifierService,
625 messages: Vec<OpenMessage>,
626 ) {
627 for message in messages {
628 mock_certifier_service
629 .expect_get_open_message()
630 .return_once(|_| Ok(Some(message)))
631 .times(1);
632 }
633 mock_certifier_service
635 .expect_get_open_message()
636 .returning(|_| Ok(None));
637
638 mock_certifier_service
639 .expect_inform_epoch()
640 .return_once(|_| Ok(()));
641 mock_certifier_service
642 .expect_mark_open_message_if_expired()
643 .returning(|_| Ok(None));
644 }
645
646 fn create_open_message(is_certified: IsCertified, is_expired: IsExpired) -> OpenMessage {
647 OpenMessage {
648 signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(fake_data::beacon()),
649 is_certified: is_certified == IsCertified::Yes,
650 is_expired: is_expired == IsExpired::Yes,
651 ..OpenMessage::dummy()
652 }
653 }
654
655 #[derive(Eq, PartialEq)]
656 enum IsCertified {
657 Yes,
658 No,
659 }
660
661 #[derive(Eq, PartialEq)]
662 enum IsExpired {
663 Yes,
664 No,
665 }
666
667 #[tokio::test]
668 async fn test_get_time_point_from_chain() {
669 let expected = TimePoint::new(2, 17, ChainPoint::dummy());
670 let mut dependencies = initialize_dependencies!().await;
671 let immutable_file_observer = Arc::new(DumbImmutableFileObserver::default());
672 immutable_file_observer
673 .shall_return(Some(expected.immutable_file_number))
674 .await;
675 let ticker_service = Arc::new(MithrilTickerService::new(
676 Arc::new(FakeChainObserver::new(Some(expected.clone()))),
677 immutable_file_observer,
678 ));
679 dependencies.ticker_service = ticker_service;
680 let runner = AggregatorRunner::new(Arc::new(dependencies));
681
682 let res = runner.get_time_point_from_chain().await;
684 assert_eq!(expected, res.unwrap());
685 }
686
687 #[tokio::test]
688 async fn test_update_stake_distribution() {
689 let chain_observer = Arc::new(FakeChainObserver::default());
690 let deps = {
691 let mut deps = initialize_dependencies!().await;
692 deps.chain_observer = chain_observer.clone();
693 deps.stake_distribution_service = Arc::new(MithrilStakeDistributionService::new(
694 deps.stake_store.clone(),
695 chain_observer.clone(),
696 ));
697 Arc::new(deps)
698 };
699 let runner = AggregatorRunner::new(deps.clone());
700 let time_point = runner.get_time_point_from_chain().await.unwrap();
701 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
702 let expected = fixture.stake_distribution();
703
704 chain_observer
705 .set_signers(fixture.signers_with_stake())
706 .await;
707 runner
708 .update_stake_distribution(&time_point)
709 .await
710 .expect("updating stake distribution should not return an error");
711
712 let saved_stake_distribution = deps
713 .stake_store
714 .get_stakes(time_point.epoch.offset_to_recording_epoch())
715 .await
716 .unwrap()
717 .unwrap_or_else(|| {
718 panic!(
719 "I should have a stake distribution for the epoch {:?}",
720 time_point.epoch
721 )
722 });
723
724 assert_eq!(expected, saved_stake_distribution);
725 }
726
727 #[tokio::test]
728 async fn test_open_signer_registration_round() {
729 let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
730 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
731
732 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
733 builder.get_verification_key_store().await.unwrap(),
734 builder.get_signer_store().await.unwrap(),
735 builder.get_signer_registration_verifier().await.unwrap(),
736 ));
737 let mut deps = builder.build_serve_dependencies_container().await.unwrap();
738 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
739 let stake_store = deps.stake_store.clone();
740 let deps = Arc::new(deps);
741 let runner = AggregatorRunner::new(deps.clone());
742
743 let time_point = TimePoint::dummy();
744 let recording_epoch = time_point.epoch.offset_to_recording_epoch();
745 let stake_distribution: StakeDistribution =
746 StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]);
747
748 stake_store
749 .save_stakes(recording_epoch, stake_distribution.clone())
750 .await
751 .expect("Save Stake distribution should not fail");
752
753 runner
754 .open_signer_registration_round(&time_point)
755 .await
756 .expect("opening signer registration should not return an error");
757
758 let saved_current_round = signer_registration_round_opener.get_current_round().await;
759
760 let expected_signer_registration_round =
761 SignerRegistrationRound::dummy(recording_epoch, stake_distribution);
762
763 assert_eq!(
764 Some(expected_signer_registration_round),
765 saved_current_round,
766 );
767 }
768
769 #[tokio::test]
770 async fn test_close_signer_registration_round() {
771 let config = ServeCommandConfiguration::new_sample(mithril_common::temp_dir!());
772 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
773
774 let signer_registration_round_opener = Arc::new(MithrilSignerRegistrationLeader::new(
775 builder.get_verification_key_store().await.unwrap(),
776 builder.get_signer_store().await.unwrap(),
777 builder.get_signer_registration_verifier().await.unwrap(),
778 ));
779 let mut deps = builder.build_serve_dependencies_container().await.unwrap();
780 deps.signer_registration_round_opener = signer_registration_round_opener.clone();
781 let deps = Arc::new(deps);
782 let runner = AggregatorRunner::new(deps.clone());
783
784 let time_point = TimePoint::dummy();
785 runner
786 .open_signer_registration_round(&time_point)
787 .await
788 .expect("opening signer registration should not return an error");
789
790 runner
791 .close_signer_registration_round()
792 .await
793 .expect("closing signer registration should not return an error");
794
795 let saved_current_round = signer_registration_round_opener.get_current_round().await;
796 assert!(saved_current_round.is_none());
797 }
798
799 #[tokio::test]
800 async fn test_expire_open_message() {
801 let open_message_expected = OpenMessage {
802 signed_entity_type: SignedEntityType::dummy(),
803 is_certified: false,
804 is_expired: false,
805 expires_at: Some(
806 DateTime::parse_from_rfc3339("2000-01-19T13:43:05.618857482Z")
807 .unwrap()
808 .with_timezone(&Utc),
809 ),
810 ..OpenMessage::dummy()
811 };
812 let open_message_clone = open_message_expected.clone();
813
814 let mut mock_certifier_service = MockCertifierService::new();
815 mock_certifier_service
816 .expect_mark_open_message_if_expired()
817 .return_once(|_| Ok(Some(open_message_clone)));
818
819 let mut deps = initialize_dependencies!().await;
820 deps.certifier_service = Arc::new(mock_certifier_service);
821
822 let runner = build_runner_with_fixture_data(deps).await;
823 let open_message_expired = runner
824 .mark_open_message_if_expired(&open_message_expected.signed_entity_type)
825 .await
826 .expect("mark_open_message_if_expired should not fail");
827
828 assert_eq!(Some(open_message_expected), open_message_expired);
829 }
830
831 #[tokio::test]
832 async fn test_update_era_checker() {
833 let deps = initialize_dependencies!().await;
834 let ticker_service = deps.ticker_service.clone();
835 let era_checker = deps.era_checker.clone();
836 let mut time_point = ticker_service.get_current_time_point().await.unwrap();
837
838 assert_eq!(time_point.epoch, era_checker.current_epoch());
839 let runner = AggregatorRunner::new(Arc::new(deps));
840 time_point.epoch += 1;
841
842 runner.update_era_checker(time_point.epoch).await.unwrap();
843 assert_eq!(time_point.epoch, era_checker.current_epoch());
844 }
845
846 #[tokio::test]
847 async fn test_inform_new_epoch() {
848 let mut mock_certifier_service = MockCertifierService::new();
849 mock_certifier_service
850 .expect_inform_epoch()
851 .returning(|_| Ok(()))
852 .times(1);
853 let mut deps = initialize_dependencies!().await;
854 let current_epoch = deps
855 .chain_observer
856 .get_current_epoch()
857 .await
858 .unwrap()
859 .unwrap();
860
861 deps.certifier_service = Arc::new(mock_certifier_service);
862 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
863 current_epoch,
864 &MithrilFixtureBuilder::default().build(),
865 )));
866
867 let runner = AggregatorRunner::new(Arc::new(deps));
868
869 runner.inform_new_epoch(current_epoch).await.unwrap();
870 }
871
872 #[tokio::test]
873 async fn test_upkeep_calls_run_on_upkeep_service() {
874 let mut upkeep_service = MockUpkeepService::new();
875 upkeep_service
876 .expect_run()
877 .with(eq(Epoch(5)))
878 .returning(|_| Ok(()))
879 .times(1);
880
881 let mut deps = initialize_dependencies!().await;
882 deps.upkeep_service = Arc::new(upkeep_service);
883
884 let runner = AggregatorRunner::new(Arc::new(deps));
885
886 runner.upkeep(Epoch(5)).await.unwrap();
887 }
888
889 #[tokio::test]
890 async fn test_update_epoch_settings() {
891 let mut mock_certifier_service = MockCertifierService::new();
892 mock_certifier_service
893 .expect_inform_epoch()
894 .returning(|_| Ok(()))
895 .times(1);
896
897 let config = ServeCommandConfiguration::new_sample(temp_dir!());
898 let mut deps = DependenciesBuilder::new_with_stdout_logger(Arc::new(config.clone()))
899 .build_serve_dependencies_container()
900 .await
901 .unwrap();
902 deps.certifier_service = Arc::new(mock_certifier_service);
903 let epoch_settings_storer = deps.epoch_settings_storer.clone();
904 let current_epoch = deps.ticker_service.get_current_epoch().await.unwrap();
905 let insert_epoch = current_epoch.offset_to_epoch_settings_recording_epoch();
906
907 let runner = build_runner_with_fixture_data(deps).await;
908 runner.inform_new_epoch(current_epoch).await.unwrap();
909 runner
910 .update_epoch_settings()
911 .await
912 .expect("update_epoch_settings should not fail");
913
914 let saved_epoch_settings = epoch_settings_storer
915 .get_epoch_settings(insert_epoch)
916 .await
917 .unwrap()
918 .unwrap_or_else(|| panic!("should have epoch settings for epoch {insert_epoch}",));
919
920 assert_eq!(
921 AggregatorEpochSettings {
922 protocol_parameters: config.protocol_parameters.clone(),
923 cardano_transactions_signing_config: config
924 .cardano_transactions_signing_config
925 .clone(),
926 },
927 saved_epoch_settings
928 );
929 }
930
931 #[tokio::test]
932 async fn test_precompute_epoch_data() {
933 let mut deps = initialize_dependencies!().await;
934 let current_epoch = deps
935 .chain_observer
936 .get_current_epoch()
937 .await
938 .unwrap()
939 .unwrap();
940
941 deps.epoch_service = Arc::new(RwLock::new(FakeEpochService::from_fixture(
942 current_epoch,
943 &MithrilFixtureBuilder::default().build(),
944 )));
945
946 let runner = AggregatorRunner::new(Arc::new(deps));
947
948 runner.precompute_epoch_data().await.unwrap();
949 }
950
951 #[tokio::test]
952 async fn test_get_current_non_certified_open_message_should_create_new_open_message_if_none_exists(
953 ) {
954 let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
955 let open_message_expected = open_message_created.clone();
956
957 let runner = {
958 let mut mock_certifier_service = MockCertifierService::new();
959 init_certifier_service_mock(&mut mock_certifier_service, vec![]);
960
961 mock_certifier_service
962 .expect_create_open_message()
963 .return_once(|_, _| Ok(open_message_created))
964 .times(1);
965 build_runner(temp_dir!(), mock_certifier_service).await
966 };
967
968 let open_message_returned = runner
969 .get_current_non_certified_open_message(&TimePoint::dummy())
970 .await
971 .unwrap();
972 assert_eq!(Some(open_message_expected), open_message_returned);
973 }
974
975 #[tokio::test]
976 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_not_expired(
977 ) {
978 let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
979
980 let open_message_expected = not_certified_and_not_expired.clone();
981
982 let runner = {
983 let mut mock_certifier_service = MockCertifierService::new();
984 init_certifier_service_mock(
985 &mut mock_certifier_service,
986 vec![not_certified_and_not_expired],
987 );
988
989 mock_certifier_service.expect_create_open_message().never();
990 build_runner(temp_dir!(), mock_certifier_service).await
991 };
992
993 let open_message_returned = runner
994 .get_current_non_certified_open_message(&TimePoint::dummy())
995 .await
996 .unwrap();
997
998 assert_eq!(Some(open_message_expected), open_message_returned);
999 }
1000
1001 #[tokio::test]
1002 async fn test_get_current_non_certified_open_message_should_return_existing_open_message_if_already_exists_and_open_message_already_certified(
1003 ) {
1004 let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
1005 let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
1006
1007 let open_message_expected = not_certified_and_not_expired.clone();
1008
1009 let runner = {
1010 let mut mock_certifier_service = MockCertifierService::new();
1011 init_certifier_service_mock(
1012 &mut mock_certifier_service,
1013 vec![certified_and_not_expired, not_certified_and_not_expired],
1014 );
1015
1016 mock_certifier_service.expect_create_open_message().never();
1017 build_runner(temp_dir!(), mock_certifier_service).await
1018 };
1019
1020 let open_message_returned = runner
1021 .get_current_non_certified_open_message(&TimePoint::dummy())
1022 .await
1023 .unwrap();
1024
1025 assert_eq!(Some(open_message_expected), open_message_returned);
1026 }
1027
1028 #[tokio::test]
1029 async fn test_get_current_non_certified_open_message_should_create_open_message_if_none_exists_and_open_message_already_certified(
1030 ) {
1031 let certified_and_not_expired = create_open_message(IsCertified::Yes, IsExpired::No);
1032
1033 let open_message_created = create_open_message(IsCertified::No, IsExpired::No);
1034 let open_message_expected = open_message_created.clone();
1035
1036 let runner = {
1037 let mut mock_certifier_service = MockCertifierService::new();
1038 init_certifier_service_mock(
1039 &mut mock_certifier_service,
1040 vec![certified_and_not_expired],
1041 );
1042
1043 mock_certifier_service
1044 .expect_create_open_message()
1045 .return_once(|_, _| Ok(open_message_created))
1046 .times(1);
1047 build_runner(temp_dir!(), mock_certifier_service).await
1048 };
1049
1050 let open_message_returned = runner
1051 .get_current_non_certified_open_message(&TimePoint::dummy())
1052 .await
1053 .unwrap();
1054
1055 assert_eq!(Some(open_message_expected), open_message_returned);
1056 }
1057
1058 #[tokio::test]
1059 async fn test_get_current_non_certified_open_message_should_return_none_if_all_open_message_already_certified(
1060 ) {
1061 let certified_and_not_expired_1 = create_open_message(IsCertified::Yes, IsExpired::No);
1062 let certified_and_not_expired_2 = create_open_message(IsCertified::Yes, IsExpired::No);
1063
1064 let runner = {
1065 let mut mock_certifier_service = MockCertifierService::new();
1066 init_certifier_service_mock(
1067 &mut mock_certifier_service,
1068 vec![certified_and_not_expired_1, certified_and_not_expired_2],
1069 );
1070
1071 mock_certifier_service.expect_create_open_message().never();
1072 build_runner(temp_dir!(), mock_certifier_service).await
1073 };
1074
1075 let open_message_returned = runner
1076 .get_current_non_certified_open_message(&TimePoint::dummy())
1077 .await
1078 .unwrap();
1079
1080 assert!(open_message_returned.is_none());
1081 }
1082
1083 #[tokio::test]
1084 async fn test_get_current_non_certified_open_message_should_return_first_not_certified_and_not_expired_open_message(
1085 ) {
1086 let not_certified_and_expired = create_open_message(IsCertified::No, IsExpired::Yes);
1087 let not_certified_and_not_expired = create_open_message(IsCertified::No, IsExpired::No);
1088
1089 let open_message_expected = not_certified_and_not_expired.clone();
1090
1091 let runner = {
1092 let mut mock_certifier_service = MockCertifierService::new();
1093 init_certifier_service_mock(
1094 &mut mock_certifier_service,
1095 vec![not_certified_and_expired, not_certified_and_not_expired],
1096 );
1097
1098 mock_certifier_service.expect_create_open_message().never();
1099 build_runner(temp_dir!(), mock_certifier_service).await
1100 };
1101
1102 let open_message_returned = runner
1103 .get_current_non_certified_open_message(&TimePoint::dummy())
1104 .await
1105 .unwrap();
1106
1107 assert_eq!(Some(open_message_expected), open_message_returned);
1108 }
1109
1110 #[tokio::test]
1111 async fn test_get_current_non_certified_open_message_called_for_mithril_stake_distribution_and_then_for_immutable_file(
1112 ) {
1113 let mut mock_certifier_service = MockCertifierService::new();
1114
1115 let mut seq = Sequence::new();
1116 mock_certifier_service
1117 .expect_get_open_message()
1118 .with(eq(SignedEntityType::MithrilStakeDistribution(
1119 TimePoint::dummy().epoch,
1120 )))
1121 .times(1)
1122 .in_sequence(&mut seq)
1123 .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1124
1125 mock_certifier_service
1126 .expect_get_open_message()
1127 .with(eq(SignedEntityType::CardanoImmutableFilesFull(
1128 fake_data::beacon(),
1129 )))
1130 .times(1)
1131 .in_sequence(&mut seq)
1132 .return_once(|_| Ok(Some(create_open_message(IsCertified::Yes, IsExpired::No))));
1133
1134 mock_certifier_service.expect_create_open_message().never();
1135
1136 mock_certifier_service
1137 .expect_inform_epoch()
1138 .return_once(|_| Ok(()));
1139 mock_certifier_service
1140 .expect_mark_open_message_if_expired()
1141 .returning(|_| Ok(None));
1142
1143 let runner = build_runner(temp_dir!(), mock_certifier_service).await;
1144
1145 runner
1146 .get_current_non_certified_open_message(&TimePoint::dummy())
1147 .await
1148 .unwrap();
1149 }
1150
1151 #[tokio::test]
1152 async fn list_available_signed_entity_types_list_all_configured_entities_if_none_are_locked() {
1153 let runner = {
1154 let mut dependencies = initialize_dependencies!().await;
1155 let epoch_service = FakeEpochServiceBuilder {
1156 signed_entity_config: SignedEntityConfig {
1157 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1158 ..SignedEntityConfig::dummy()
1159 },
1160 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1161 }
1162 .build();
1163 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1164 dependencies.signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1165 AggregatorRunner::new(Arc::new(dependencies))
1166 };
1167
1168 let time_point = TimePoint::dummy();
1169 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1170 .list_available_signed_entity_types(&time_point)
1171 .await
1172 .unwrap()
1173 .into_iter()
1174 .map(Into::into)
1175 .collect();
1176
1177 assert_eq!(
1178 signed_entities,
1179 SignedEntityTypeDiscriminants::all()
1180 .into_iter()
1181 .collect::<Vec<_>>()
1182 );
1183 }
1184
1185 #[tokio::test]
1186 async fn list_available_signed_entity_types_exclude_locked_entities() {
1187 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
1188 let runner = {
1189 let mut dependencies = initialize_dependencies!().await;
1190 dependencies.signed_entity_type_lock = signed_entity_type_lock.clone();
1191 let epoch_service = FakeEpochServiceBuilder {
1192 signed_entity_config: SignedEntityConfig {
1193 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
1194 ..SignedEntityConfig::dummy()
1195 },
1196 ..FakeEpochServiceBuilder::dummy(Epoch(32))
1197 }
1198 .build();
1199 dependencies.epoch_service = Arc::new(RwLock::new(epoch_service));
1200
1201 AggregatorRunner::new(Arc::new(dependencies))
1202 };
1203
1204 signed_entity_type_lock
1205 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
1206 .await;
1207
1208 let time_point = TimePoint::dummy();
1209 let signed_entities: Vec<SignedEntityTypeDiscriminants> = runner
1210 .list_available_signed_entity_types(&time_point)
1211 .await
1212 .unwrap()
1213 .into_iter()
1214 .map(Into::into)
1215 .collect();
1216
1217 assert!(!signed_entities.is_empty());
1218 assert!(!signed_entities.contains(&SignedEntityTypeDiscriminants::CardanoTransactions));
1219 }
1220
1221 #[tokio::test]
1222 async fn is_open_message_outdated_return_false_when_message_is_not_expired_and_no_newer_open_message(
1223 ) {
1224 assert!(!is_outdated_returned_when(temp_dir!(), IsExpired::No, false).await);
1225 }
1226
1227 #[tokio::test]
1228 async fn is_open_message_outdated_return_true_when_message_is_expired_and_no_newer_open_message(
1229 ) {
1230 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, false).await);
1231 }
1232
1233 #[tokio::test]
1234 async fn is_open_message_outdated_return_true_when_message_is_not_expired_and_exists_newer_open_message(
1235 ) {
1236 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::No, true).await);
1237 }
1238
1239 #[tokio::test]
1240 async fn is_open_message_outdated_return_true_when_message_is_expired_and_exists_newer_open_message(
1241 ) {
1242 assert!(is_outdated_returned_when(temp_dir!(), IsExpired::Yes, true).await);
1243 }
1244
1245 async fn is_outdated_returned_when(
1246 tmp_path: PathBuf,
1247 is_expired: IsExpired,
1248 newer_open_message: bool,
1249 ) -> bool {
1250 let current_time_point = TimePoint {
1251 epoch: Epoch(2),
1252 ..TimePoint::dummy()
1253 };
1254
1255 let message_epoch = if newer_open_message {
1256 current_time_point.epoch + 54
1257 } else {
1258 current_time_point.epoch
1259 };
1260 let open_message_to_verify = OpenMessage {
1261 signed_entity_type: SignedEntityType::MithrilStakeDistribution(message_epoch),
1262 is_expired: is_expired == IsExpired::Yes,
1263 ..OpenMessage::dummy()
1264 };
1265
1266 let runner = {
1267 let mut deps = initialize_dependencies(tmp_path).await;
1268 let mut mock_certifier_service = MockCertifierService::new();
1269
1270 let open_message_current = open_message_to_verify.clone();
1271 mock_certifier_service
1272 .expect_get_open_message()
1273 .times(1)
1274 .return_once(|_| Ok(Some(open_message_current)));
1275 mock_certifier_service
1276 .expect_mark_open_message_if_expired()
1277 .returning(|_| Ok(None));
1278
1279 deps.certifier_service = Arc::new(mock_certifier_service);
1280
1281 let epoch_service = FakeEpochServiceBuilder::dummy(current_time_point.epoch).build();
1282 deps.epoch_service = Arc::new(RwLock::new(epoch_service));
1283
1284 build_runner_with_fixture_data(deps).await
1285 };
1286
1287 runner
1288 .is_open_message_outdated(
1289 open_message_to_verify.signed_entity_type,
1290 ¤t_time_point,
1291 )
1292 .await
1293 .unwrap()
1294 }
1295}