mithril_aggregator/services/signer_registration/
follower.rs1use std::sync::Arc;
2
3use anyhow::{anyhow, Context};
4use async_trait::async_trait;
5
6use mithril_common::{
7 entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
8 StdResult,
9};
10use mithril_persistence::store::StakeStorer;
11
12use crate::{
13 dependency_injection::EpochServiceWrapper,
14 services::{AggregatorClient, EpochPruningTask},
15 SignerRegistrationVerifier, VerificationKeyStorer,
16};
17
18use super::{
19 SignerRecorder, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound,
20 SignerRegistrationRoundOpener, SignerSynchronizer,
21};
22
23pub struct MithrilSignerRegistrationFollower {
25 pub epoch_service: EpochServiceWrapper,
27
28 verification_key_store: Arc<dyn VerificationKeyStorer>,
30
31 signer_recorder: Arc<dyn SignerRecorder>,
33
34 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
36
37 leader_aggregator_client: Arc<dyn AggregatorClient>,
39
40 stake_store: Arc<dyn StakeStorer>,
42
43 verification_key_epoch_retention_limit: Option<u64>,
46}
47
48impl MithrilSignerRegistrationFollower {
49 pub fn new(
51 epoch_service: EpochServiceWrapper,
52 verification_key_store: Arc<dyn VerificationKeyStorer>,
53 signer_recorder: Arc<dyn SignerRecorder>,
54 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
55 leader_aggregator_client: Arc<dyn AggregatorClient>,
56 stake_store: Arc<dyn StakeStorer>,
57 verification_key_epoch_retention_limit: Option<u64>,
58 ) -> Self {
59 Self {
60 epoch_service,
61 verification_key_store,
62 signer_recorder,
63 signer_registration_verifier,
64 leader_aggregator_client,
65 stake_store,
66 verification_key_epoch_retention_limit,
67 }
68 }
69
70 async fn synchronize_signers(
71 &self,
72 epoch: Epoch,
73 signers: &[Signer],
74 stake_distribution: &StakeDistribution,
75 ) -> Result<(), SignerRegistrationError> {
76 for signer in signers {
77 let signer_with_stake = self
78 .signer_registration_verifier
79 .verify(signer, stake_distribution)
80 .await
81 .map_err(|e| SignerRegistrationError::FailedSignerRegistration(anyhow!(e)))?;
82
83 self.signer_recorder
84 .record_signer_registration(signer_with_stake.party_id.clone())
85 .await
86 .map_err(|e| SignerRegistrationError::FailedSignerRecorder(e.to_string()))?;
87
88 self
89 .verification_key_store
90 .save_verification_key(epoch, signer_with_stake.clone())
91 .await
92 .with_context(|| {
93 format!(
94 "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
95 signer_with_stake.party_id,
96 epoch
97 )
98 })
99 .map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?;
100 }
101
102 self.epoch_service
103 .write()
104 .await
105 .update_next_signers_with_stake()
106 .await
107 .map_err(|e| SignerRegistrationError::EpochService(anyhow!(e)))?;
108
109 Ok(())
110 }
111}
112
113#[async_trait]
114impl SignerSynchronizer for MithrilSignerRegistrationFollower {
115 async fn can_synchronize_signers(&self, epoch: Epoch) -> Result<bool, SignerRegistrationError> {
116 Ok(self
117 .leader_aggregator_client
118 .retrieve_epoch_settings()
119 .await
120 .with_context(|| "can_synchronize_signers failed")
121 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
122 .is_some_and(|leader_epoch_settings| epoch == leader_epoch_settings.epoch))
123 }
124
125 async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
126 let leader_epoch_settings = self
127 .leader_aggregator_client
128 .retrieve_epoch_settings()
129 .await
130 .with_context(|| "synchronize_all_signers failed")
131 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
132 .ok_or(
133 SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings(
134 anyhow::anyhow!("Leader aggregator did not return any epoch settings"),
135 ),
136 )?;
137 let registration_epoch = leader_epoch_settings
138 .epoch
139 .offset_to_leader_synchronization_epoch();
140 let next_signers = leader_epoch_settings.next_signers;
141 let stake_distribution = self
142 .stake_store
143 .get_stakes(registration_epoch)
144 .await
145 .with_context(|| "synchronize_all_signers failed")
146 .map_err(SignerRegistrationError::Store)?
147 .ok_or(SignerRegistrationError::Store(anyhow::anyhow!(
148 "Follower aggregator did not return any stake distribution"
149 )))?;
150 self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
151 .await?;
152
153 Ok(())
154 }
155}
156
157#[async_trait]
158impl SignerRegisterer for MithrilSignerRegistrationFollower {
159 async fn register_signer(
160 &self,
161 _epoch: Epoch,
162 _signer: &Signer,
163 ) -> Result<SignerWithStake, SignerRegistrationError> {
164 Err(SignerRegistrationError::RegistrationRoundAlwaysClosedOnFollowerAggregator)
165 }
166
167 async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
168 None
169 }
170}
171
172#[async_trait]
173impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower {
174 async fn open_registration_round(
175 &self,
176 _registration_epoch: Epoch,
177 _stake_distribution: StakeDistribution,
178 ) -> StdResult<()> {
179 Ok(())
180 }
181
182 async fn close_registration_round(&self) -> StdResult<()> {
183 Ok(())
184 }
185}
186
187#[async_trait]
188impl EpochPruningTask for MithrilSignerRegistrationFollower {
189 fn pruned_data(&self) -> &'static str {
190 "Signer registration"
191 }
192
193 async fn prune(&self, epoch: Epoch) -> StdResult<()> {
194 let registration_epoch = epoch.offset_to_recording_epoch();
195
196 if let Some(retention_limit) = self.verification_key_epoch_retention_limit {
197 self.verification_key_store
198 .prune_verification_keys(registration_epoch - retention_limit)
199 .await
200 .with_context(|| {
201 format!(
202 "VerificationKeyStorer can not prune verification keys below epoch: '{}'",
203 registration_epoch - retention_limit
204 )
205 })?;
206 }
207
208 Ok(())
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use std::sync::Arc;
215
216 use anyhow::anyhow;
217 use mithril_persistence::store::StakeStorer;
218 use mockall::predicate::eq;
219
220 use mithril_common::{
221 entities::{Epoch, Signer, SignerWithStake},
222 messages::{EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter},
223 test_utils::MithrilFixtureBuilder,
224 };
225
226 use crate::{
227 database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
228 message_adapters::FromEpochSettingsAdapter,
229 services::{
230 AggregatorClient, AggregatorClientError, EpochPruningTask, FakeEpochService,
231 MockAggregatorClient, MockSignerRecorder, MockSignerRegistrationVerifier,
232 SignerSynchronizer,
233 },
234 store::MockVerificationKeyStorer,
235 tools::mocks::MockStakeStore,
236 MithrilSignerRegistrationFollower, SignerRecorder, SignerRegisterer,
237 SignerRegistrationRoundOpener, SignerRegistrationVerifier, VerificationKeyStorer,
238 };
239
240 use test_utils::*;
241
242 mod test_utils {
243 use tokio::sync::RwLock;
244
245 use crate::{dependency_injection::EpochServiceWrapper, services::FakeEpochService};
246
247 use super::*;
248
249 pub struct MithrilSignerRegistrationFollowerBuilder {
251 epoch_service: EpochServiceWrapper,
252 signer_recorder: Arc<dyn SignerRecorder>,
253 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
254 leader_aggregator_client: Arc<dyn AggregatorClient>,
255 stake_store: Arc<dyn StakeStorer>,
256 verification_key_store: Arc<dyn VerificationKeyStorer>,
257 verification_key_epoch_retention_limit: Option<u64>,
258 }
259
260 impl Default for MithrilSignerRegistrationFollowerBuilder {
261 fn default() -> Self {
262 Self {
263 epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
264 signer_recorder: Arc::new(MockSignerRecorder::new()),
265 signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
266 leader_aggregator_client: Arc::new(MockAggregatorClient::new()),
267 stake_store: Arc::new(MockStakeStore::new()),
268 verification_key_store: Arc::new(SignerRegistrationStore::new(Arc::new(
269 main_db_connection().unwrap(),
270 ))),
271 verification_key_epoch_retention_limit: None,
272 }
273 }
274 }
275
276 impl MithrilSignerRegistrationFollowerBuilder {
277 pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
278 Self {
279 epoch_service: Arc::new(RwLock::new(epoch_service)),
280 ..self
281 }
282 }
283
284 pub fn with_verification_key_store(
285 self,
286 verification_key_store: Arc<dyn VerificationKeyStorer>,
287 ) -> Self {
288 Self {
289 verification_key_store,
290 ..self
291 }
292 }
293
294 pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
295 Self {
296 signer_recorder,
297 ..self
298 }
299 }
300
301 pub fn with_signer_registration_verifier(
302 self,
303 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
304 ) -> Self {
305 Self {
306 signer_registration_verifier,
307 ..self
308 }
309 }
310
311 pub fn with_leader_aggregator_client(
312 self,
313 leader_aggregator_client: Arc<dyn AggregatorClient>,
314 ) -> Self {
315 Self {
316 leader_aggregator_client,
317 ..self
318 }
319 }
320
321 pub fn with_stake_store(self, stake_store: Arc<dyn StakeStorer>) -> Self {
322 Self {
323 stake_store,
324 ..self
325 }
326 }
327
328 pub fn with_verification_key_epoch_retention_limit(
329 self,
330 verification_key_epoch_retention_limit: Option<u64>,
331 ) -> Self {
332 Self {
333 verification_key_epoch_retention_limit,
334 ..self
335 }
336 }
337
338 pub fn build(self) -> MithrilSignerRegistrationFollower {
339 MithrilSignerRegistrationFollower {
340 epoch_service: self.epoch_service,
341 verification_key_store: self.verification_key_store,
342 signer_recorder: self.signer_recorder,
343 signer_registration_verifier: self.signer_registration_verifier,
344 leader_aggregator_client: self.leader_aggregator_client,
345 stake_store: self.stake_store,
346 verification_key_epoch_retention_limit: self
347 .verification_key_epoch_retention_limit,
348 }
349 }
350 }
351 }
352
353 #[tokio::test]
354 async fn open_close_registration_always_succeeds() {
355 let signer_registration_follower =
356 MithrilSignerRegistrationFollowerBuilder::default().build();
357 let registration_epoch = Epoch(1);
358 let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
359 let stake_distribution = fixture.stake_distribution();
360
361 signer_registration_follower
362 .open_registration_round(registration_epoch, stake_distribution)
363 .await
364 .expect("signer registration round opening should not fail");
365
366 signer_registration_follower
367 .close_registration_round()
368 .await
369 .expect("signer registration round opening should not fail");
370 }
371
372 #[tokio::test]
373 async fn register_signer_always_fails() {
374 let signer_registration_follower =
375 MithrilSignerRegistrationFollowerBuilder::default().build();
376 let registration_epoch = Epoch(1);
377 let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
378 let signer_to_register: Signer = fixture.signers()[0].to_owned();
379
380 signer_registration_follower
381 .register_signer(registration_epoch, &signer_to_register)
382 .await
383 .expect_err("signer registration should always fail");
384 }
385
386 #[tokio::test]
387 async fn synchronize_all_signers_succeeds() {
388 let registration_epoch = Epoch(1);
389 let fixture = MithrilFixtureBuilder::default()
390 .with_signers(5)
391 .disable_signers_certification()
392 .build();
393 let signers = fixture.signers();
394 let stake_distribution = fixture.stake_distribution();
395 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
396 epoch: registration_epoch,
397 next_signers: SignerMessagePart::from_signers(signers),
398 ..EpochSettingsMessage::dummy()
399 })
400 .unwrap();
401 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
402 .with_signer_recorder({
403 let mut signer_recorder = MockSignerRecorder::new();
404 signer_recorder
405 .expect_record_signer_registration()
406 .returning(|_| Ok(()))
407 .times(5);
408
409 Arc::new(signer_recorder)
410 })
411 .with_signer_registration_verifier({
412 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
413 signer_registration_verifier
414 .expect_verify()
415 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
416 .times(5);
417
418 Arc::new(signer_registration_verifier)
419 })
420 .with_leader_aggregator_client({
421 let mut aggregator_client = MockAggregatorClient::new();
422 aggregator_client
423 .expect_retrieve_epoch_settings()
424 .returning(move || Ok(Some(epoch_settings_message.clone())))
425 .times(1);
426
427 Arc::new(aggregator_client)
428 })
429 .with_stake_store({
430 let mut stake_store = MockStakeStore::new();
431 stake_store
432 .expect_get_stakes()
433 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
434 .times(1);
435
436 Arc::new(stake_store)
437 })
438 .build();
439
440 signer_registration_follower
441 .synchronize_all_signers()
442 .await
443 .unwrap();
444 }
445
446 #[tokio::test]
447 async fn synchronize_all_signers_fails_if_one_signer_registration_fails() {
448 let registration_epoch = Epoch(1);
449 let fixture = MithrilFixtureBuilder::default()
450 .with_signers(5)
451 .disable_signers_certification()
452 .build();
453 let signers = fixture.signers();
454 let stake_distribution = fixture.stake_distribution();
455 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
456 epoch: registration_epoch,
457 next_signers: SignerMessagePart::from_signers(signers),
458 ..EpochSettingsMessage::dummy()
459 })
460 .unwrap();
461
462 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
463 .with_signer_recorder({
464 let mut signer_recorder = MockSignerRecorder::new();
465 signer_recorder
466 .expect_record_signer_registration()
467 .returning(|_| Ok(()))
468 .times(4);
469 signer_recorder
470 .expect_record_signer_registration()
471 .returning(|_| Err(anyhow!("an error")))
472 .times(1);
473
474 Arc::new(signer_recorder)
475 })
476 .with_signer_registration_verifier({
477 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
478 signer_registration_verifier
479 .expect_verify()
480 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
481 .times(5);
482
483 Arc::new(signer_registration_verifier)
484 })
485 .with_leader_aggregator_client({
486 let mut aggregator_client = MockAggregatorClient::new();
487 aggregator_client
488 .expect_retrieve_epoch_settings()
489 .returning(move || Ok(Some(epoch_settings_message.clone())))
490 .times(1);
491
492 Arc::new(aggregator_client)
493 })
494 .with_stake_store({
495 let mut stake_store = MockStakeStore::new();
496 stake_store
497 .expect_get_stakes()
498 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
499 .times(1);
500
501 Arc::new(stake_store)
502 })
503 .build();
504
505 signer_registration_follower
506 .synchronize_all_signers()
507 .await
508 .expect_err("synchronize_all_signers should fail");
509 }
510
511 #[tokio::test]
512 async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
513 let registration_epoch = Epoch(1);
514 let fixture = MithrilFixtureBuilder::default()
515 .with_signers(5)
516 .disable_signers_certification()
517 .build();
518 let signers = fixture.signers();
519 let stake_distribution = fixture.stake_distribution();
520 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
521 epoch: registration_epoch,
522 next_signers: SignerMessagePart::from_signers(signers),
523 ..EpochSettingsMessage::dummy()
524 })
525 .unwrap();
526
527 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
528 .with_epoch_service({
529 let mut epoch_service = FakeEpochService::without_data();
530 epoch_service.toggle_errors(false, false, false, true);
531
532 epoch_service
533 })
534 .with_signer_recorder({
535 let mut signer_recorder = MockSignerRecorder::new();
536 signer_recorder
537 .expect_record_signer_registration()
538 .returning(|_| Ok(()))
539 .times(5);
540
541 Arc::new(signer_recorder)
542 })
543 .with_signer_registration_verifier({
544 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
545 signer_registration_verifier
546 .expect_verify()
547 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
548 .times(5);
549
550 Arc::new(signer_registration_verifier)
551 })
552 .with_leader_aggregator_client({
553 let mut aggregator_client = MockAggregatorClient::new();
554 aggregator_client
555 .expect_retrieve_epoch_settings()
556 .returning(move || Ok(Some(epoch_settings_message.clone())))
557 .times(1);
558
559 Arc::new(aggregator_client)
560 })
561 .with_stake_store({
562 let mut stake_store = MockStakeStore::new();
563 stake_store
564 .expect_get_stakes()
565 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
566 .times(1);
567
568 Arc::new(stake_store)
569 })
570 .build();
571
572 signer_registration_follower
573 .synchronize_all_signers()
574 .await
575 .expect_err("synchronize_all_signers should fail");
576 }
577
578 #[tokio::test]
579 async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
580 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
581 .with_leader_aggregator_client({
582 let mut aggregator_client = MockAggregatorClient::new();
583 aggregator_client
584 .expect_retrieve_epoch_settings()
585 .returning(move || {
586 Err(AggregatorClientError::RemoteServerTechnical(anyhow!(
587 "an error"
588 )))
589 })
590 .times(1);
591
592 Arc::new(aggregator_client)
593 })
594 .build();
595
596 signer_registration_follower
597 .synchronize_all_signers()
598 .await
599 .expect_err("synchronize_all_signers should fail");
600 }
601
602 #[tokio::test]
603 async fn synchronize_all_signers_fails_if_fetching_stakes_fails() {
604 let registration_epoch = Epoch(1);
605 let fixture = MithrilFixtureBuilder::default()
606 .with_signers(5)
607 .disable_signers_certification()
608 .build();
609 let signers = fixture.signers();
610 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
611 epoch: registration_epoch,
612 next_signers: SignerMessagePart::from_signers(signers),
613 ..EpochSettingsMessage::dummy()
614 })
615 .unwrap();
616 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
617 .with_leader_aggregator_client({
618 let mut aggregator_client = MockAggregatorClient::new();
619 aggregator_client
620 .expect_retrieve_epoch_settings()
621 .returning(move || Ok(Some(epoch_settings_message.clone())))
622 .times(1);
623
624 Arc::new(aggregator_client)
625 })
626 .with_stake_store({
627 let mut stake_store = MockStakeStore::new();
628 stake_store
629 .expect_get_stakes()
630 .returning(move |_epoch| Err(anyhow!("an error")))
631 .times(1);
632
633 Arc::new(stake_store)
634 })
635 .build();
636
637 signer_registration_follower
638 .synchronize_all_signers()
639 .await
640 .expect_err("synchronize_all_signers should fail");
641 }
642
643 #[tokio::test]
644 async fn prune_epoch_older_than_threshold() {
645 const PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD: u64 = 10;
646 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
647 .with_verification_key_store({
648 let mut verification_key_store = MockVerificationKeyStorer::new();
649 verification_key_store
650 .expect_prune_verification_keys()
651 .with(eq(Epoch(4).offset_to_recording_epoch()))
652 .times(1)
653 .returning(|_| Ok(()));
654
655 Arc::new(verification_key_store)
656 })
657 .with_verification_key_epoch_retention_limit(Some(
658 PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD,
659 ))
660 .build();
661
662 let current_epoch = Epoch(4) + PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD;
663 signer_registration_follower
664 .prune(current_epoch)
665 .await
666 .unwrap();
667 }
668
669 #[tokio::test]
670 async fn without_threshold_nothing_is_pruned() {
671 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
672 .with_verification_key_store({
673 let mut verification_key_store = MockVerificationKeyStorer::new();
674 verification_key_store
675 .expect_prune_verification_keys()
676 .never();
677
678 Arc::new(verification_key_store)
679 })
680 .with_verification_key_epoch_retention_limit(None)
681 .build();
682
683 signer_registration_follower
684 .prune(Epoch(100))
685 .await
686 .unwrap();
687 }
688}