mithril_aggregator/services/signer_registration/
follower.rs

1use std::sync::Arc;
2
3use anyhow::{anyhow, Context};
4use async_trait::async_trait;
5
6use mithril_common::{
7    entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
8    StdResult,
9};
10use mithril_persistence::store::StakeStorer;
11
12use crate::{
13    dependency_injection::EpochServiceWrapper,
14    services::{AggregatorClient, EpochPruningTask},
15    SignerRegistrationVerifier, VerificationKeyStorer,
16};
17
18use super::{
19    SignerRecorder, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound,
20    SignerRegistrationRoundOpener, SignerSynchronizer,
21};
22
23/// A [MithrilSignerRegistrationFollower] supports signer registrations in a follower aggregator
24pub struct MithrilSignerRegistrationFollower {
25    /// Epoch service
26    pub epoch_service: EpochServiceWrapper,
27
28    /// Verification key store
29    verification_key_store: Arc<dyn VerificationKeyStorer>,
30
31    /// Signer recorder
32    signer_recorder: Arc<dyn SignerRecorder>,
33
34    /// Signer registration verifier
35    signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
36
37    /// Leader aggregator client
38    leader_aggregator_client: Arc<dyn AggregatorClient>,
39
40    /// Stake store
41    stake_store: Arc<dyn StakeStorer>,
42
43    /// Number of epochs before previous records will be deleted at the next registration round
44    /// opening
45    verification_key_epoch_retention_limit: Option<u64>,
46}
47
48impl MithrilSignerRegistrationFollower {
49    /// MithrilSignerRegistererFollower factory
50    pub fn new(
51        epoch_service: EpochServiceWrapper,
52        verification_key_store: Arc<dyn VerificationKeyStorer>,
53        signer_recorder: Arc<dyn SignerRecorder>,
54        signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
55        leader_aggregator_client: Arc<dyn AggregatorClient>,
56        stake_store: Arc<dyn StakeStorer>,
57        verification_key_epoch_retention_limit: Option<u64>,
58    ) -> Self {
59        Self {
60            epoch_service,
61            verification_key_store,
62            signer_recorder,
63            signer_registration_verifier,
64            leader_aggregator_client,
65            stake_store,
66            verification_key_epoch_retention_limit,
67        }
68    }
69
70    async fn synchronize_signers(
71        &self,
72        epoch: Epoch,
73        signers: &[Signer],
74        stake_distribution: &StakeDistribution,
75    ) -> Result<(), SignerRegistrationError> {
76        for signer in signers {
77            let signer_with_stake = self
78                .signer_registration_verifier
79                .verify(signer, stake_distribution)
80                .await
81                .map_err(|e| SignerRegistrationError::FailedSignerRegistration(anyhow!(e)))?;
82
83            self.signer_recorder
84                .record_signer_registration(signer_with_stake.party_id.clone())
85                .await
86                .map_err(|e| SignerRegistrationError::FailedSignerRecorder(e.to_string()))?;
87
88            self
89                .verification_key_store
90                .save_verification_key(epoch, signer_with_stake.clone())
91                .await
92                .with_context(|| {
93                    format!(
94                        "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
95                        signer_with_stake.party_id,
96                        epoch
97                    )
98                })
99                .map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?;
100        }
101
102        self.epoch_service
103            .write()
104            .await
105            .update_next_signers_with_stake()
106            .await
107            .map_err(|e| SignerRegistrationError::EpochService(anyhow!(e)))?;
108
109        Ok(())
110    }
111}
112
113#[async_trait]
114impl SignerSynchronizer for MithrilSignerRegistrationFollower {
115    async fn can_synchronize_signers(&self, epoch: Epoch) -> Result<bool, SignerRegistrationError> {
116        Ok(self
117            .leader_aggregator_client
118            .retrieve_epoch_settings()
119            .await
120            .with_context(|| "can_synchronize_signers failed")
121            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
122            .is_some_and(|leader_epoch_settings| epoch == leader_epoch_settings.epoch))
123    }
124
125    async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
126        let leader_epoch_settings = self
127            .leader_aggregator_client
128            .retrieve_epoch_settings()
129            .await
130            .with_context(|| "synchronize_all_signers failed")
131            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
132            .ok_or(
133                SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings(
134                    anyhow::anyhow!("Leader aggregator did not return any epoch settings"),
135                ),
136            )?;
137        let registration_epoch = leader_epoch_settings
138            .epoch
139            .offset_to_leader_synchronization_epoch();
140        let next_signers = leader_epoch_settings.next_signers;
141        let stake_distribution = self
142            .stake_store
143            .get_stakes(registration_epoch)
144            .await
145            .with_context(|| "synchronize_all_signers failed")
146            .map_err(SignerRegistrationError::Store)?
147            .ok_or(SignerRegistrationError::Store(anyhow::anyhow!(
148                "Follower aggregator did not return any stake distribution"
149            )))?;
150        self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
151            .await?;
152
153        Ok(())
154    }
155}
156
157#[async_trait]
158impl SignerRegisterer for MithrilSignerRegistrationFollower {
159    async fn register_signer(
160        &self,
161        _epoch: Epoch,
162        _signer: &Signer,
163    ) -> Result<SignerWithStake, SignerRegistrationError> {
164        Err(SignerRegistrationError::RegistrationRoundAlwaysClosedOnFollowerAggregator)
165    }
166
167    async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
168        None
169    }
170}
171
172#[async_trait]
173impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower {
174    async fn open_registration_round(
175        &self,
176        _registration_epoch: Epoch,
177        _stake_distribution: StakeDistribution,
178    ) -> StdResult<()> {
179        Ok(())
180    }
181
182    async fn close_registration_round(&self) -> StdResult<()> {
183        Ok(())
184    }
185}
186
187#[async_trait]
188impl EpochPruningTask for MithrilSignerRegistrationFollower {
189    fn pruned_data(&self) -> &'static str {
190        "Signer registration"
191    }
192
193    async fn prune(&self, epoch: Epoch) -> StdResult<()> {
194        let registration_epoch = epoch.offset_to_recording_epoch();
195
196        if let Some(retention_limit) = self.verification_key_epoch_retention_limit {
197            self.verification_key_store
198                .prune_verification_keys(registration_epoch - retention_limit)
199                .await
200                .with_context(|| {
201                    format!(
202                        "VerificationKeyStorer can not prune verification keys below epoch: '{}'",
203                        registration_epoch - retention_limit
204                    )
205                })?;
206        }
207
208        Ok(())
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use std::sync::Arc;
215
216    use anyhow::anyhow;
217    use mithril_persistence::store::StakeStorer;
218    use mockall::predicate::eq;
219
220    use mithril_common::{
221        entities::{Epoch, Signer, SignerWithStake},
222        messages::{EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter},
223        test_utils::MithrilFixtureBuilder,
224    };
225
226    use crate::{
227        database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
228        message_adapters::FromEpochSettingsAdapter,
229        services::{
230            AggregatorClient, AggregatorClientError, EpochPruningTask, FakeEpochService,
231            MockAggregatorClient, MockSignerRecorder, MockSignerRegistrationVerifier,
232            SignerSynchronizer,
233        },
234        store::MockVerificationKeyStorer,
235        tools::mocks::MockStakeStore,
236        MithrilSignerRegistrationFollower, SignerRecorder, SignerRegisterer,
237        SignerRegistrationRoundOpener, SignerRegistrationVerifier, VerificationKeyStorer,
238    };
239
240    use test_utils::*;
241
242    mod test_utils {
243        use tokio::sync::RwLock;
244
245        use crate::{dependency_injection::EpochServiceWrapper, services::FakeEpochService};
246
247        use super::*;
248
249        /// MithrilSignerRegistrationFollowerBuilder is a test builder for [MithrilSignerRegistrationFollower]
250        pub struct MithrilSignerRegistrationFollowerBuilder {
251            epoch_service: EpochServiceWrapper,
252            signer_recorder: Arc<dyn SignerRecorder>,
253            signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
254            leader_aggregator_client: Arc<dyn AggregatorClient>,
255            stake_store: Arc<dyn StakeStorer>,
256            verification_key_store: Arc<dyn VerificationKeyStorer>,
257            verification_key_epoch_retention_limit: Option<u64>,
258        }
259
260        impl Default for MithrilSignerRegistrationFollowerBuilder {
261            fn default() -> Self {
262                Self {
263                    epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
264                    signer_recorder: Arc::new(MockSignerRecorder::new()),
265                    signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
266                    leader_aggregator_client: Arc::new(MockAggregatorClient::new()),
267                    stake_store: Arc::new(MockStakeStore::new()),
268                    verification_key_store: Arc::new(SignerRegistrationStore::new(Arc::new(
269                        main_db_connection().unwrap(),
270                    ))),
271                    verification_key_epoch_retention_limit: None,
272                }
273            }
274        }
275
276        impl MithrilSignerRegistrationFollowerBuilder {
277            pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
278                Self {
279                    epoch_service: Arc::new(RwLock::new(epoch_service)),
280                    ..self
281                }
282            }
283
284            pub fn with_verification_key_store(
285                self,
286                verification_key_store: Arc<dyn VerificationKeyStorer>,
287            ) -> Self {
288                Self {
289                    verification_key_store,
290                    ..self
291                }
292            }
293
294            pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
295                Self {
296                    signer_recorder,
297                    ..self
298                }
299            }
300
301            pub fn with_signer_registration_verifier(
302                self,
303                signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
304            ) -> Self {
305                Self {
306                    signer_registration_verifier,
307                    ..self
308                }
309            }
310
311            pub fn with_leader_aggregator_client(
312                self,
313                leader_aggregator_client: Arc<dyn AggregatorClient>,
314            ) -> Self {
315                Self {
316                    leader_aggregator_client,
317                    ..self
318                }
319            }
320
321            pub fn with_stake_store(self, stake_store: Arc<dyn StakeStorer>) -> Self {
322                Self {
323                    stake_store,
324                    ..self
325                }
326            }
327
328            pub fn with_verification_key_epoch_retention_limit(
329                self,
330                verification_key_epoch_retention_limit: Option<u64>,
331            ) -> Self {
332                Self {
333                    verification_key_epoch_retention_limit,
334                    ..self
335                }
336            }
337
338            pub fn build(self) -> MithrilSignerRegistrationFollower {
339                MithrilSignerRegistrationFollower {
340                    epoch_service: self.epoch_service,
341                    verification_key_store: self.verification_key_store,
342                    signer_recorder: self.signer_recorder,
343                    signer_registration_verifier: self.signer_registration_verifier,
344                    leader_aggregator_client: self.leader_aggregator_client,
345                    stake_store: self.stake_store,
346                    verification_key_epoch_retention_limit: self
347                        .verification_key_epoch_retention_limit,
348                }
349            }
350        }
351    }
352
353    #[tokio::test]
354    async fn open_close_registration_always_succeeds() {
355        let signer_registration_follower =
356            MithrilSignerRegistrationFollowerBuilder::default().build();
357        let registration_epoch = Epoch(1);
358        let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
359        let stake_distribution = fixture.stake_distribution();
360
361        signer_registration_follower
362            .open_registration_round(registration_epoch, stake_distribution)
363            .await
364            .expect("signer registration round opening should not fail");
365
366        signer_registration_follower
367            .close_registration_round()
368            .await
369            .expect("signer registration round opening should not fail");
370    }
371
372    #[tokio::test]
373    async fn register_signer_always_fails() {
374        let signer_registration_follower =
375            MithrilSignerRegistrationFollowerBuilder::default().build();
376        let registration_epoch = Epoch(1);
377        let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
378        let signer_to_register: Signer = fixture.signers()[0].to_owned();
379
380        signer_registration_follower
381            .register_signer(registration_epoch, &signer_to_register)
382            .await
383            .expect_err("signer registration should always fail");
384    }
385
386    #[tokio::test]
387    async fn synchronize_all_signers_succeeds() {
388        let registration_epoch = Epoch(1);
389        let fixture = MithrilFixtureBuilder::default()
390            .with_signers(5)
391            .disable_signers_certification()
392            .build();
393        let signers = fixture.signers();
394        let stake_distribution = fixture.stake_distribution();
395        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
396            epoch: registration_epoch,
397            next_signers: SignerMessagePart::from_signers(signers),
398            ..EpochSettingsMessage::dummy()
399        })
400        .unwrap();
401        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
402            .with_signer_recorder({
403                let mut signer_recorder = MockSignerRecorder::new();
404                signer_recorder
405                    .expect_record_signer_registration()
406                    .returning(|_| Ok(()))
407                    .times(5);
408
409                Arc::new(signer_recorder)
410            })
411            .with_signer_registration_verifier({
412                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
413                signer_registration_verifier
414                    .expect_verify()
415                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
416                    .times(5);
417
418                Arc::new(signer_registration_verifier)
419            })
420            .with_leader_aggregator_client({
421                let mut aggregator_client = MockAggregatorClient::new();
422                aggregator_client
423                    .expect_retrieve_epoch_settings()
424                    .returning(move || Ok(Some(epoch_settings_message.clone())))
425                    .times(1);
426
427                Arc::new(aggregator_client)
428            })
429            .with_stake_store({
430                let mut stake_store = MockStakeStore::new();
431                stake_store
432                    .expect_get_stakes()
433                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
434                    .times(1);
435
436                Arc::new(stake_store)
437            })
438            .build();
439
440        signer_registration_follower
441            .synchronize_all_signers()
442            .await
443            .unwrap();
444    }
445
446    #[tokio::test]
447    async fn synchronize_all_signers_fails_if_one_signer_registration_fails() {
448        let registration_epoch = Epoch(1);
449        let fixture = MithrilFixtureBuilder::default()
450            .with_signers(5)
451            .disable_signers_certification()
452            .build();
453        let signers = fixture.signers();
454        let stake_distribution = fixture.stake_distribution();
455        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
456            epoch: registration_epoch,
457            next_signers: SignerMessagePart::from_signers(signers),
458            ..EpochSettingsMessage::dummy()
459        })
460        .unwrap();
461
462        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
463            .with_signer_recorder({
464                let mut signer_recorder = MockSignerRecorder::new();
465                signer_recorder
466                    .expect_record_signer_registration()
467                    .returning(|_| Ok(()))
468                    .times(4);
469                signer_recorder
470                    .expect_record_signer_registration()
471                    .returning(|_| Err(anyhow!("an error")))
472                    .times(1);
473
474                Arc::new(signer_recorder)
475            })
476            .with_signer_registration_verifier({
477                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
478                signer_registration_verifier
479                    .expect_verify()
480                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
481                    .times(5);
482
483                Arc::new(signer_registration_verifier)
484            })
485            .with_leader_aggregator_client({
486                let mut aggregator_client = MockAggregatorClient::new();
487                aggregator_client
488                    .expect_retrieve_epoch_settings()
489                    .returning(move || Ok(Some(epoch_settings_message.clone())))
490                    .times(1);
491
492                Arc::new(aggregator_client)
493            })
494            .with_stake_store({
495                let mut stake_store = MockStakeStore::new();
496                stake_store
497                    .expect_get_stakes()
498                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
499                    .times(1);
500
501                Arc::new(stake_store)
502            })
503            .build();
504
505        signer_registration_follower
506            .synchronize_all_signers()
507            .await
508            .expect_err("synchronize_all_signers should fail");
509    }
510
511    #[tokio::test]
512    async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
513        let registration_epoch = Epoch(1);
514        let fixture = MithrilFixtureBuilder::default()
515            .with_signers(5)
516            .disable_signers_certification()
517            .build();
518        let signers = fixture.signers();
519        let stake_distribution = fixture.stake_distribution();
520        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
521            epoch: registration_epoch,
522            next_signers: SignerMessagePart::from_signers(signers),
523            ..EpochSettingsMessage::dummy()
524        })
525        .unwrap();
526
527        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
528            .with_epoch_service({
529                let mut epoch_service = FakeEpochService::without_data();
530                epoch_service.toggle_errors(false, false, false, true);
531
532                epoch_service
533            })
534            .with_signer_recorder({
535                let mut signer_recorder = MockSignerRecorder::new();
536                signer_recorder
537                    .expect_record_signer_registration()
538                    .returning(|_| Ok(()))
539                    .times(5);
540
541                Arc::new(signer_recorder)
542            })
543            .with_signer_registration_verifier({
544                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
545                signer_registration_verifier
546                    .expect_verify()
547                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
548                    .times(5);
549
550                Arc::new(signer_registration_verifier)
551            })
552            .with_leader_aggregator_client({
553                let mut aggregator_client = MockAggregatorClient::new();
554                aggregator_client
555                    .expect_retrieve_epoch_settings()
556                    .returning(move || Ok(Some(epoch_settings_message.clone())))
557                    .times(1);
558
559                Arc::new(aggregator_client)
560            })
561            .with_stake_store({
562                let mut stake_store = MockStakeStore::new();
563                stake_store
564                    .expect_get_stakes()
565                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
566                    .times(1);
567
568                Arc::new(stake_store)
569            })
570            .build();
571
572        signer_registration_follower
573            .synchronize_all_signers()
574            .await
575            .expect_err("synchronize_all_signers should fail");
576    }
577
578    #[tokio::test]
579    async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
580        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
581            .with_leader_aggregator_client({
582                let mut aggregator_client = MockAggregatorClient::new();
583                aggregator_client
584                    .expect_retrieve_epoch_settings()
585                    .returning(move || {
586                        Err(AggregatorClientError::RemoteServerTechnical(anyhow!(
587                            "an error"
588                        )))
589                    })
590                    .times(1);
591
592                Arc::new(aggregator_client)
593            })
594            .build();
595
596        signer_registration_follower
597            .synchronize_all_signers()
598            .await
599            .expect_err("synchronize_all_signers should fail");
600    }
601
602    #[tokio::test]
603    async fn synchronize_all_signers_fails_if_fetching_stakes_fails() {
604        let registration_epoch = Epoch(1);
605        let fixture = MithrilFixtureBuilder::default()
606            .with_signers(5)
607            .disable_signers_certification()
608            .build();
609        let signers = fixture.signers();
610        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
611            epoch: registration_epoch,
612            next_signers: SignerMessagePart::from_signers(signers),
613            ..EpochSettingsMessage::dummy()
614        })
615        .unwrap();
616        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
617            .with_leader_aggregator_client({
618                let mut aggregator_client = MockAggregatorClient::new();
619                aggregator_client
620                    .expect_retrieve_epoch_settings()
621                    .returning(move || Ok(Some(epoch_settings_message.clone())))
622                    .times(1);
623
624                Arc::new(aggregator_client)
625            })
626            .with_stake_store({
627                let mut stake_store = MockStakeStore::new();
628                stake_store
629                    .expect_get_stakes()
630                    .returning(move |_epoch| Err(anyhow!("an error")))
631                    .times(1);
632
633                Arc::new(stake_store)
634            })
635            .build();
636
637        signer_registration_follower
638            .synchronize_all_signers()
639            .await
640            .expect_err("synchronize_all_signers should fail");
641    }
642
643    #[tokio::test]
644    async fn prune_epoch_older_than_threshold() {
645        const PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD: u64 = 10;
646        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
647            .with_verification_key_store({
648                let mut verification_key_store = MockVerificationKeyStorer::new();
649                verification_key_store
650                    .expect_prune_verification_keys()
651                    .with(eq(Epoch(4).offset_to_recording_epoch()))
652                    .times(1)
653                    .returning(|_| Ok(()));
654
655                Arc::new(verification_key_store)
656            })
657            .with_verification_key_epoch_retention_limit(Some(
658                PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD,
659            ))
660            .build();
661
662        let current_epoch = Epoch(4) + PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD;
663        signer_registration_follower
664            .prune(current_epoch)
665            .await
666            .unwrap();
667    }
668
669    #[tokio::test]
670    async fn without_threshold_nothing_is_pruned() {
671        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
672            .with_verification_key_store({
673                let mut verification_key_store = MockVerificationKeyStorer::new();
674                verification_key_store
675                    .expect_prune_verification_keys()
676                    .never();
677
678                Arc::new(verification_key_store)
679            })
680            .with_verification_key_epoch_retention_limit(None)
681            .build();
682
683        signer_registration_follower
684            .prune(Epoch(100))
685            .await
686            .unwrap();
687    }
688}