mithril_aggregator/services/signer_registration/
follower.rs

1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::{
7    StdResult,
8    entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
9};
10use mithril_persistence::store::StakeStorer;
11
12use crate::{
13    SignerRegistrationVerifier, VerificationKeyStorer, dependency_injection::EpochServiceWrapper,
14};
15
16use super::{
17    LeaderAggregatorClient, SignerRecorder, SignerRegisterer, SignerRegistrationError,
18    SignerRegistrationRound, SignerRegistrationRoundOpener, SignerSynchronizer,
19};
20
21/// A [MithrilSignerRegistrationFollower] supports signer registrations in a follower aggregator
22pub struct MithrilSignerRegistrationFollower {
23    /// Epoch service
24    pub epoch_service: EpochServiceWrapper,
25
26    /// Verification key store
27    verification_key_store: Arc<dyn VerificationKeyStorer>,
28
29    /// Signer recorder
30    signer_recorder: Arc<dyn SignerRecorder>,
31
32    /// Signer registration verifier
33    signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
34
35    /// Leader aggregator client
36    leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
37
38    /// Stake store
39    stake_store: Arc<dyn StakeStorer>,
40}
41
42impl MithrilSignerRegistrationFollower {
43    /// MithrilSignerRegistererFollower factory
44    pub fn new(
45        epoch_service: EpochServiceWrapper,
46        verification_key_store: Arc<dyn VerificationKeyStorer>,
47        signer_recorder: Arc<dyn SignerRecorder>,
48        signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
49        leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
50        stake_store: Arc<dyn StakeStorer>,
51    ) -> Self {
52        Self {
53            epoch_service,
54            verification_key_store,
55            signer_recorder,
56            signer_registration_verifier,
57            leader_aggregator_client,
58            stake_store,
59        }
60    }
61
62    async fn synchronize_signers(
63        &self,
64        epoch: Epoch,
65        signers: &[Signer],
66        stake_distribution: &StakeDistribution,
67    ) -> Result<(), SignerRegistrationError> {
68        for signer in signers {
69            let signer_with_stake = self
70                .signer_registration_verifier
71                .verify(signer, stake_distribution)
72                .await
73                .map_err(|err| {
74                    SignerRegistrationError::InvalidSignerRegistration(
75                        signer.party_id.clone(),
76                        epoch,
77                        err,
78                    )
79                })?;
80
81            self.signer_recorder
82                .record_signer_registration(signer_with_stake.party_id.clone())
83                .await
84                .map_err(|err| {
85                    SignerRegistrationError::FailedSignerRecorder(
86                        signer_with_stake.party_id.clone(),
87                        epoch,
88                        err,
89                    )
90                })?;
91
92            self
93                .verification_key_store
94                .save_verification_key(epoch, signer_with_stake.clone())
95                .await
96                .with_context(|| {
97                    format!(
98                        "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
99                        signer_with_stake.party_id,
100                        epoch
101                    )
102                })
103                .map_err(SignerRegistrationError::Store)?;
104        }
105
106        self.epoch_service
107            .write()
108            .await
109            .update_next_signers_with_stake()
110            .await
111            .map_err(SignerRegistrationError::EpochService)?;
112
113        Ok(())
114    }
115}
116
117#[async_trait]
118impl SignerSynchronizer for MithrilSignerRegistrationFollower {
119    async fn can_synchronize_signers(&self, epoch: Epoch) -> Result<bool, SignerRegistrationError> {
120        Ok(self
121            .leader_aggregator_client
122            .retrieve_epoch_settings()
123            .await
124            .with_context(|| "can_synchronize_signers failed")
125            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
126            .is_some_and(|leader_epoch_settings| epoch == leader_epoch_settings.epoch))
127    }
128
129    async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
130        let leader_epoch_settings = self
131            .leader_aggregator_client
132            .retrieve_epoch_settings()
133            .await
134            .with_context(|| "synchronize_all_signers failed")
135            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
136            .with_context(|| "Leader aggregator did not return any epoch settings")
137            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?;
138        let registration_epoch =
139            leader_epoch_settings.epoch.offset_to_leader_synchronization_epoch();
140        let next_signers = leader_epoch_settings.next_signers;
141        let stake_distribution = self
142            .stake_store
143            .get_stakes(registration_epoch)
144            .await
145            .with_context(|| "synchronize_all_signers failed")
146            .map_err(SignerRegistrationError::Store)?
147            .with_context(|| "Follower aggregator did not return any stake distribution")
148            .map_err(SignerRegistrationError::Store)?;
149        self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
150            .await?;
151
152        Ok(())
153    }
154}
155
156#[async_trait]
157impl SignerRegisterer for MithrilSignerRegistrationFollower {
158    async fn register_signer(
159        &self,
160        _epoch: Epoch,
161        _signer: &Signer,
162    ) -> Result<SignerWithStake, SignerRegistrationError> {
163        Err(SignerRegistrationError::RegistrationRoundAlwaysClosedOnFollowerAggregator)
164    }
165
166    async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
167        None
168    }
169}
170
171#[async_trait]
172impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower {
173    async fn open_registration_round(
174        &self,
175        _registration_epoch: Epoch,
176        _stake_distribution: StakeDistribution,
177    ) -> StdResult<()> {
178        Ok(())
179    }
180
181    async fn close_registration_round(&self) -> StdResult<()> {
182        Ok(())
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use anyhow::anyhow;
189
190    use mithril_common::messages::{
191        EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter,
192    };
193    use mithril_common::test::{builder::MithrilFixtureBuilder, double::Dummy};
194
195    use crate::{
196        database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
197        message_adapters::FromEpochSettingsAdapter,
198        services::{
199            FakeEpochService, MockLeaderAggregatorClient, MockSignerRecorder,
200            MockSignerRegistrationVerifier,
201        },
202        test::double::mocks::MockStakeStore,
203    };
204
205    use super::*;
206
207    use test_utils::*;
208
209    mod test_utils {
210        use tokio::sync::RwLock;
211
212        use super::*;
213
214        /// MithrilSignerRegistrationFollowerBuilder is a test builder for [MithrilSignerRegistrationFollower]
215        pub struct MithrilSignerRegistrationFollowerBuilder {
216            epoch_service: EpochServiceWrapper,
217            signer_recorder: Arc<dyn SignerRecorder>,
218            signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
219            leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
220            stake_store: Arc<dyn StakeStorer>,
221            verification_key_store: Arc<dyn VerificationKeyStorer>,
222        }
223
224        impl Default for MithrilSignerRegistrationFollowerBuilder {
225            fn default() -> Self {
226                Self {
227                    epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
228                    signer_recorder: Arc::new(MockSignerRecorder::new()),
229                    signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
230                    leader_aggregator_client: Arc::new(MockLeaderAggregatorClient::new()),
231                    stake_store: Arc::new(MockStakeStore::new()),
232                    verification_key_store: Arc::new(SignerRegistrationStore::new(
233                        Arc::new(main_db_connection().unwrap()),
234                        None,
235                    )),
236                }
237            }
238        }
239
240        impl MithrilSignerRegistrationFollowerBuilder {
241            pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
242                Self {
243                    epoch_service: Arc::new(RwLock::new(epoch_service)),
244                    ..self
245                }
246            }
247
248            pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
249                Self {
250                    signer_recorder,
251                    ..self
252                }
253            }
254
255            pub fn with_signer_registration_verifier(
256                self,
257                signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
258            ) -> Self {
259                Self {
260                    signer_registration_verifier,
261                    ..self
262                }
263            }
264
265            pub fn with_leader_aggregator_client(
266                self,
267                leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
268            ) -> Self {
269                Self {
270                    leader_aggregator_client,
271                    ..self
272                }
273            }
274
275            pub fn with_stake_store(self, stake_store: Arc<dyn StakeStorer>) -> Self {
276                Self {
277                    stake_store,
278                    ..self
279                }
280            }
281
282            pub fn build(self) -> MithrilSignerRegistrationFollower {
283                MithrilSignerRegistrationFollower {
284                    epoch_service: self.epoch_service,
285                    verification_key_store: self.verification_key_store,
286                    signer_recorder: self.signer_recorder,
287                    signer_registration_verifier: self.signer_registration_verifier,
288                    leader_aggregator_client: self.leader_aggregator_client,
289                    stake_store: self.stake_store,
290                }
291            }
292        }
293    }
294
295    #[tokio::test]
296    async fn open_close_registration_always_succeeds() {
297        let signer_registration_follower =
298            MithrilSignerRegistrationFollowerBuilder::default().build();
299        let registration_epoch = Epoch(1);
300        let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
301        let stake_distribution = fixture.stake_distribution();
302
303        signer_registration_follower
304            .open_registration_round(registration_epoch, stake_distribution)
305            .await
306            .expect("signer registration round opening should not fail");
307
308        signer_registration_follower
309            .close_registration_round()
310            .await
311            .expect("signer registration round opening should not fail");
312    }
313
314    #[tokio::test]
315    async fn register_signer_always_fails() {
316        let signer_registration_follower =
317            MithrilSignerRegistrationFollowerBuilder::default().build();
318        let registration_epoch = Epoch(1);
319        let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
320        let signer_to_register: Signer = fixture.signers()[0].to_owned();
321
322        signer_registration_follower
323            .register_signer(registration_epoch, &signer_to_register)
324            .await
325            .expect_err("signer registration should always fail");
326    }
327
328    #[tokio::test]
329    async fn synchronize_all_signers_succeeds() {
330        let registration_epoch = Epoch(1);
331        let fixture = MithrilFixtureBuilder::default()
332            .with_signers(5)
333            .disable_signers_certification()
334            .build();
335        let signers = fixture.signers();
336        let stake_distribution = fixture.stake_distribution();
337        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
338            epoch: registration_epoch,
339            next_signers: SignerMessagePart::from_signers(signers),
340            ..EpochSettingsMessage::dummy()
341        })
342        .unwrap();
343        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
344            .with_signer_recorder({
345                let mut signer_recorder = MockSignerRecorder::new();
346                signer_recorder
347                    .expect_record_signer_registration()
348                    .returning(|_| Ok(()))
349                    .times(5);
350
351                Arc::new(signer_recorder)
352            })
353            .with_signer_registration_verifier({
354                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
355                signer_registration_verifier
356                    .expect_verify()
357                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
358                    .times(5);
359
360                Arc::new(signer_registration_verifier)
361            })
362            .with_leader_aggregator_client({
363                let mut aggregator_client = MockLeaderAggregatorClient::new();
364                aggregator_client
365                    .expect_retrieve_epoch_settings()
366                    .returning(move || Ok(Some(epoch_settings_message.clone())))
367                    .times(1);
368
369                Arc::new(aggregator_client)
370            })
371            .with_stake_store({
372                let mut stake_store = MockStakeStore::new();
373                stake_store
374                    .expect_get_stakes()
375                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
376                    .times(1);
377
378                Arc::new(stake_store)
379            })
380            .build();
381
382        signer_registration_follower.synchronize_all_signers().await.unwrap();
383    }
384
385    #[tokio::test]
386    async fn synchronize_all_signers_fails_if_one_signer_registration_fails() {
387        let registration_epoch = Epoch(1);
388        let fixture = MithrilFixtureBuilder::default()
389            .with_signers(5)
390            .disable_signers_certification()
391            .build();
392        let signers = fixture.signers();
393        let stake_distribution = fixture.stake_distribution();
394        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
395            epoch: registration_epoch,
396            next_signers: SignerMessagePart::from_signers(signers),
397            ..EpochSettingsMessage::dummy()
398        })
399        .unwrap();
400
401        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
402            .with_signer_recorder({
403                let mut signer_recorder = MockSignerRecorder::new();
404                signer_recorder
405                    .expect_record_signer_registration()
406                    .returning(|_| Ok(()))
407                    .times(4);
408                signer_recorder
409                    .expect_record_signer_registration()
410                    .returning(|_| Err(anyhow!("an error")))
411                    .times(1);
412
413                Arc::new(signer_recorder)
414            })
415            .with_signer_registration_verifier({
416                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
417                signer_registration_verifier
418                    .expect_verify()
419                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
420                    .times(5);
421
422                Arc::new(signer_registration_verifier)
423            })
424            .with_leader_aggregator_client({
425                let mut aggregator_client = MockLeaderAggregatorClient::new();
426                aggregator_client
427                    .expect_retrieve_epoch_settings()
428                    .returning(move || Ok(Some(epoch_settings_message.clone())))
429                    .times(1);
430
431                Arc::new(aggregator_client)
432            })
433            .with_stake_store({
434                let mut stake_store = MockStakeStore::new();
435                stake_store
436                    .expect_get_stakes()
437                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
438                    .times(1);
439
440                Arc::new(stake_store)
441            })
442            .build();
443
444        signer_registration_follower
445            .synchronize_all_signers()
446            .await
447            .expect_err("synchronize_all_signers should fail");
448    }
449
450    #[tokio::test]
451    async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
452        let registration_epoch = Epoch(1);
453        let fixture = MithrilFixtureBuilder::default()
454            .with_signers(5)
455            .disable_signers_certification()
456            .build();
457        let signers = fixture.signers();
458        let stake_distribution = fixture.stake_distribution();
459        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
460            epoch: registration_epoch,
461            next_signers: SignerMessagePart::from_signers(signers),
462            ..EpochSettingsMessage::dummy()
463        })
464        .unwrap();
465
466        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
467            .with_epoch_service({
468                let mut epoch_service = FakeEpochService::without_data();
469                epoch_service.toggle_errors(false, false, true);
470
471                epoch_service
472            })
473            .with_signer_recorder({
474                let mut signer_recorder = MockSignerRecorder::new();
475                signer_recorder
476                    .expect_record_signer_registration()
477                    .returning(|_| Ok(()))
478                    .times(5);
479
480                Arc::new(signer_recorder)
481            })
482            .with_signer_registration_verifier({
483                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
484                signer_registration_verifier
485                    .expect_verify()
486                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
487                    .times(5);
488
489                Arc::new(signer_registration_verifier)
490            })
491            .with_leader_aggregator_client({
492                let mut aggregator_client = MockLeaderAggregatorClient::new();
493                aggregator_client
494                    .expect_retrieve_epoch_settings()
495                    .returning(move || Ok(Some(epoch_settings_message.clone())))
496                    .times(1);
497
498                Arc::new(aggregator_client)
499            })
500            .with_stake_store({
501                let mut stake_store = MockStakeStore::new();
502                stake_store
503                    .expect_get_stakes()
504                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
505                    .times(1);
506
507                Arc::new(stake_store)
508            })
509            .build();
510
511        signer_registration_follower
512            .synchronize_all_signers()
513            .await
514            .expect_err("synchronize_all_signers should fail");
515    }
516
517    #[tokio::test]
518    async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
519        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
520            .with_leader_aggregator_client({
521                let mut aggregator_client = MockLeaderAggregatorClient::new();
522                aggregator_client
523                    .expect_retrieve_epoch_settings()
524                    .returning(move || Err(anyhow!("an error")))
525                    .times(1);
526
527                Arc::new(aggregator_client)
528            })
529            .build();
530
531        signer_registration_follower
532            .synchronize_all_signers()
533            .await
534            .expect_err("synchronize_all_signers should fail");
535    }
536
537    #[tokio::test]
538    async fn synchronize_all_signers_fails_if_fetching_stakes_fails() {
539        let registration_epoch = Epoch(1);
540        let fixture = MithrilFixtureBuilder::default()
541            .with_signers(5)
542            .disable_signers_certification()
543            .build();
544        let signers = fixture.signers();
545        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
546            epoch: registration_epoch,
547            next_signers: SignerMessagePart::from_signers(signers),
548            ..EpochSettingsMessage::dummy()
549        })
550        .unwrap();
551        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
552            .with_leader_aggregator_client({
553                let mut aggregator_client = MockLeaderAggregatorClient::new();
554                aggregator_client
555                    .expect_retrieve_epoch_settings()
556                    .returning(move || Ok(Some(epoch_settings_message.clone())))
557                    .times(1);
558
559                Arc::new(aggregator_client)
560            })
561            .with_stake_store({
562                let mut stake_store = MockStakeStore::new();
563                stake_store
564                    .expect_get_stakes()
565                    .returning(move |_epoch| Err(anyhow!("an error")))
566                    .times(1);
567
568                Arc::new(stake_store)
569            })
570            .build();
571
572        signer_registration_follower
573            .synchronize_all_signers()
574            .await
575            .expect_err("synchronize_all_signers should fail");
576    }
577}