mithril_aggregator/services/signer_registration/
follower.rs

1use std::sync::Arc;
2
3use anyhow::{Context, anyhow};
4use async_trait::async_trait;
5
6use mithril_common::{
7    StdResult,
8    entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
9};
10use mithril_persistence::store::StakeStorer;
11
12use crate::{
13    SignerRegistrationVerifier, VerificationKeyStorer, dependency_injection::EpochServiceWrapper,
14};
15
16use super::{
17    LeaderAggregatorClient, SignerRecorder, SignerRegisterer, SignerRegistrationError,
18    SignerRegistrationRound, SignerRegistrationRoundOpener, SignerSynchronizer,
19};
20
21/// A [MithrilSignerRegistrationFollower] supports signer registrations in a follower aggregator
22pub struct MithrilSignerRegistrationFollower {
23    /// Epoch service
24    pub epoch_service: EpochServiceWrapper,
25
26    /// Verification key store
27    verification_key_store: Arc<dyn VerificationKeyStorer>,
28
29    /// Signer recorder
30    signer_recorder: Arc<dyn SignerRecorder>,
31
32    /// Signer registration verifier
33    signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
34
35    /// Leader aggregator client
36    leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
37
38    /// Stake store
39    stake_store: Arc<dyn StakeStorer>,
40}
41
42impl MithrilSignerRegistrationFollower {
43    /// MithrilSignerRegistererFollower factory
44    pub fn new(
45        epoch_service: EpochServiceWrapper,
46        verification_key_store: Arc<dyn VerificationKeyStorer>,
47        signer_recorder: Arc<dyn SignerRecorder>,
48        signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
49        leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
50        stake_store: Arc<dyn StakeStorer>,
51    ) -> Self {
52        Self {
53            epoch_service,
54            verification_key_store,
55            signer_recorder,
56            signer_registration_verifier,
57            leader_aggregator_client,
58            stake_store,
59        }
60    }
61
62    async fn synchronize_signers(
63        &self,
64        epoch: Epoch,
65        signers: &[Signer],
66        stake_distribution: &StakeDistribution,
67    ) -> Result<(), SignerRegistrationError> {
68        for signer in signers {
69            let signer_with_stake = self
70                .signer_registration_verifier
71                .verify(signer, stake_distribution)
72                .await
73                .map_err(|e| SignerRegistrationError::FailedSignerRegistration(anyhow!(e)))?;
74
75            self.signer_recorder
76                .record_signer_registration(signer_with_stake.party_id.clone())
77                .await
78                .map_err(|e| SignerRegistrationError::FailedSignerRecorder(e.to_string()))?;
79
80            self
81                .verification_key_store
82                .save_verification_key(epoch, signer_with_stake.clone())
83                .await
84                .with_context(|| {
85                    format!(
86                        "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
87                        signer_with_stake.party_id,
88                        epoch
89                    )
90                })
91                .map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?;
92        }
93
94        self.epoch_service
95            .write()
96            .await
97            .update_next_signers_with_stake()
98            .await
99            .map_err(|e| SignerRegistrationError::EpochService(anyhow!(e)))?;
100
101        Ok(())
102    }
103}
104
105#[async_trait]
106impl SignerSynchronizer for MithrilSignerRegistrationFollower {
107    async fn can_synchronize_signers(&self, epoch: Epoch) -> Result<bool, SignerRegistrationError> {
108        Ok(self
109            .leader_aggregator_client
110            .retrieve_epoch_settings()
111            .await
112            .with_context(|| "can_synchronize_signers failed")
113            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
114            .is_some_and(|leader_epoch_settings| epoch == leader_epoch_settings.epoch))
115    }
116
117    async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
118        let leader_epoch_settings = self
119            .leader_aggregator_client
120            .retrieve_epoch_settings()
121            .await
122            .with_context(|| "synchronize_all_signers failed")
123            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
124            .ok_or(
125                SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings(
126                    anyhow::anyhow!("Leader aggregator did not return any epoch settings"),
127                ),
128            )?;
129        let registration_epoch =
130            leader_epoch_settings.epoch.offset_to_leader_synchronization_epoch();
131        let next_signers = leader_epoch_settings.next_signers;
132        let stake_distribution = self
133            .stake_store
134            .get_stakes(registration_epoch)
135            .await
136            .with_context(|| "synchronize_all_signers failed")
137            .map_err(SignerRegistrationError::Store)?
138            .ok_or(SignerRegistrationError::Store(anyhow::anyhow!(
139                "Follower aggregator did not return any stake distribution"
140            )))?;
141        self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
142            .await?;
143
144        Ok(())
145    }
146}
147
148#[async_trait]
149impl SignerRegisterer for MithrilSignerRegistrationFollower {
150    async fn register_signer(
151        &self,
152        _epoch: Epoch,
153        _signer: &Signer,
154    ) -> Result<SignerWithStake, SignerRegistrationError> {
155        Err(SignerRegistrationError::RegistrationRoundAlwaysClosedOnFollowerAggregator)
156    }
157
158    async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
159        None
160    }
161}
162
163#[async_trait]
164impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower {
165    async fn open_registration_round(
166        &self,
167        _registration_epoch: Epoch,
168        _stake_distribution: StakeDistribution,
169    ) -> StdResult<()> {
170        Ok(())
171    }
172
173    async fn close_registration_round(&self) -> StdResult<()> {
174        Ok(())
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use mithril_common::messages::{
181        EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter,
182    };
183    use mithril_common::test::{builder::MithrilFixtureBuilder, double::Dummy};
184
185    use crate::{
186        database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
187        message_adapters::FromEpochSettingsAdapter,
188        services::{
189            FakeEpochService, MockLeaderAggregatorClient, MockSignerRecorder,
190            MockSignerRegistrationVerifier,
191        },
192        test::double::mocks::MockStakeStore,
193    };
194
195    use super::*;
196
197    use test_utils::*;
198
199    mod test_utils {
200        use tokio::sync::RwLock;
201
202        use super::*;
203
204        /// MithrilSignerRegistrationFollowerBuilder is a test builder for [MithrilSignerRegistrationFollower]
205        pub struct MithrilSignerRegistrationFollowerBuilder {
206            epoch_service: EpochServiceWrapper,
207            signer_recorder: Arc<dyn SignerRecorder>,
208            signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
209            leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
210            stake_store: Arc<dyn StakeStorer>,
211            verification_key_store: Arc<dyn VerificationKeyStorer>,
212        }
213
214        impl Default for MithrilSignerRegistrationFollowerBuilder {
215            fn default() -> Self {
216                Self {
217                    epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
218                    signer_recorder: Arc::new(MockSignerRecorder::new()),
219                    signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
220                    leader_aggregator_client: Arc::new(MockLeaderAggregatorClient::new()),
221                    stake_store: Arc::new(MockStakeStore::new()),
222                    verification_key_store: Arc::new(SignerRegistrationStore::new(
223                        Arc::new(main_db_connection().unwrap()),
224                        None,
225                    )),
226                }
227            }
228        }
229
230        impl MithrilSignerRegistrationFollowerBuilder {
231            pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
232                Self {
233                    epoch_service: Arc::new(RwLock::new(epoch_service)),
234                    ..self
235                }
236            }
237
238            pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
239                Self {
240                    signer_recorder,
241                    ..self
242                }
243            }
244
245            pub fn with_signer_registration_verifier(
246                self,
247                signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
248            ) -> Self {
249                Self {
250                    signer_registration_verifier,
251                    ..self
252                }
253            }
254
255            pub fn with_leader_aggregator_client(
256                self,
257                leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
258            ) -> Self {
259                Self {
260                    leader_aggregator_client,
261                    ..self
262                }
263            }
264
265            pub fn with_stake_store(self, stake_store: Arc<dyn StakeStorer>) -> Self {
266                Self {
267                    stake_store,
268                    ..self
269                }
270            }
271
272            pub fn build(self) -> MithrilSignerRegistrationFollower {
273                MithrilSignerRegistrationFollower {
274                    epoch_service: self.epoch_service,
275                    verification_key_store: self.verification_key_store,
276                    signer_recorder: self.signer_recorder,
277                    signer_registration_verifier: self.signer_registration_verifier,
278                    leader_aggregator_client: self.leader_aggregator_client,
279                    stake_store: self.stake_store,
280                }
281            }
282        }
283    }
284
285    #[tokio::test]
286    async fn open_close_registration_always_succeeds() {
287        let signer_registration_follower =
288            MithrilSignerRegistrationFollowerBuilder::default().build();
289        let registration_epoch = Epoch(1);
290        let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
291        let stake_distribution = fixture.stake_distribution();
292
293        signer_registration_follower
294            .open_registration_round(registration_epoch, stake_distribution)
295            .await
296            .expect("signer registration round opening should not fail");
297
298        signer_registration_follower
299            .close_registration_round()
300            .await
301            .expect("signer registration round opening should not fail");
302    }
303
304    #[tokio::test]
305    async fn register_signer_always_fails() {
306        let signer_registration_follower =
307            MithrilSignerRegistrationFollowerBuilder::default().build();
308        let registration_epoch = Epoch(1);
309        let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
310        let signer_to_register: Signer = fixture.signers()[0].to_owned();
311
312        signer_registration_follower
313            .register_signer(registration_epoch, &signer_to_register)
314            .await
315            .expect_err("signer registration should always fail");
316    }
317
318    #[tokio::test]
319    async fn synchronize_all_signers_succeeds() {
320        let registration_epoch = Epoch(1);
321        let fixture = MithrilFixtureBuilder::default()
322            .with_signers(5)
323            .disable_signers_certification()
324            .build();
325        let signers = fixture.signers();
326        let stake_distribution = fixture.stake_distribution();
327        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
328            epoch: registration_epoch,
329            next_signers: SignerMessagePart::from_signers(signers),
330            ..EpochSettingsMessage::dummy()
331        })
332        .unwrap();
333        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
334            .with_signer_recorder({
335                let mut signer_recorder = MockSignerRecorder::new();
336                signer_recorder
337                    .expect_record_signer_registration()
338                    .returning(|_| Ok(()))
339                    .times(5);
340
341                Arc::new(signer_recorder)
342            })
343            .with_signer_registration_verifier({
344                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
345                signer_registration_verifier
346                    .expect_verify()
347                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
348                    .times(5);
349
350                Arc::new(signer_registration_verifier)
351            })
352            .with_leader_aggregator_client({
353                let mut aggregator_client = MockLeaderAggregatorClient::new();
354                aggregator_client
355                    .expect_retrieve_epoch_settings()
356                    .returning(move || Ok(Some(epoch_settings_message.clone())))
357                    .times(1);
358
359                Arc::new(aggregator_client)
360            })
361            .with_stake_store({
362                let mut stake_store = MockStakeStore::new();
363                stake_store
364                    .expect_get_stakes()
365                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
366                    .times(1);
367
368                Arc::new(stake_store)
369            })
370            .build();
371
372        signer_registration_follower.synchronize_all_signers().await.unwrap();
373    }
374
375    #[tokio::test]
376    async fn synchronize_all_signers_fails_if_one_signer_registration_fails() {
377        let registration_epoch = Epoch(1);
378        let fixture = MithrilFixtureBuilder::default()
379            .with_signers(5)
380            .disable_signers_certification()
381            .build();
382        let signers = fixture.signers();
383        let stake_distribution = fixture.stake_distribution();
384        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
385            epoch: registration_epoch,
386            next_signers: SignerMessagePart::from_signers(signers),
387            ..EpochSettingsMessage::dummy()
388        })
389        .unwrap();
390
391        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
392            .with_signer_recorder({
393                let mut signer_recorder = MockSignerRecorder::new();
394                signer_recorder
395                    .expect_record_signer_registration()
396                    .returning(|_| Ok(()))
397                    .times(4);
398                signer_recorder
399                    .expect_record_signer_registration()
400                    .returning(|_| Err(anyhow!("an error")))
401                    .times(1);
402
403                Arc::new(signer_recorder)
404            })
405            .with_signer_registration_verifier({
406                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
407                signer_registration_verifier
408                    .expect_verify()
409                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
410                    .times(5);
411
412                Arc::new(signer_registration_verifier)
413            })
414            .with_leader_aggregator_client({
415                let mut aggregator_client = MockLeaderAggregatorClient::new();
416                aggregator_client
417                    .expect_retrieve_epoch_settings()
418                    .returning(move || Ok(Some(epoch_settings_message.clone())))
419                    .times(1);
420
421                Arc::new(aggregator_client)
422            })
423            .with_stake_store({
424                let mut stake_store = MockStakeStore::new();
425                stake_store
426                    .expect_get_stakes()
427                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
428                    .times(1);
429
430                Arc::new(stake_store)
431            })
432            .build();
433
434        signer_registration_follower
435            .synchronize_all_signers()
436            .await
437            .expect_err("synchronize_all_signers should fail");
438    }
439
440    #[tokio::test]
441    async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
442        let registration_epoch = Epoch(1);
443        let fixture = MithrilFixtureBuilder::default()
444            .with_signers(5)
445            .disable_signers_certification()
446            .build();
447        let signers = fixture.signers();
448        let stake_distribution = fixture.stake_distribution();
449        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
450            epoch: registration_epoch,
451            next_signers: SignerMessagePart::from_signers(signers),
452            ..EpochSettingsMessage::dummy()
453        })
454        .unwrap();
455
456        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
457            .with_epoch_service({
458                let mut epoch_service = FakeEpochService::without_data();
459                epoch_service.toggle_errors(false, false, false, true);
460
461                epoch_service
462            })
463            .with_signer_recorder({
464                let mut signer_recorder = MockSignerRecorder::new();
465                signer_recorder
466                    .expect_record_signer_registration()
467                    .returning(|_| Ok(()))
468                    .times(5);
469
470                Arc::new(signer_recorder)
471            })
472            .with_signer_registration_verifier({
473                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
474                signer_registration_verifier
475                    .expect_verify()
476                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
477                    .times(5);
478
479                Arc::new(signer_registration_verifier)
480            })
481            .with_leader_aggregator_client({
482                let mut aggregator_client = MockLeaderAggregatorClient::new();
483                aggregator_client
484                    .expect_retrieve_epoch_settings()
485                    .returning(move || Ok(Some(epoch_settings_message.clone())))
486                    .times(1);
487
488                Arc::new(aggregator_client)
489            })
490            .with_stake_store({
491                let mut stake_store = MockStakeStore::new();
492                stake_store
493                    .expect_get_stakes()
494                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
495                    .times(1);
496
497                Arc::new(stake_store)
498            })
499            .build();
500
501        signer_registration_follower
502            .synchronize_all_signers()
503            .await
504            .expect_err("synchronize_all_signers should fail");
505    }
506
507    #[tokio::test]
508    async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
509        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
510            .with_leader_aggregator_client({
511                let mut aggregator_client = MockLeaderAggregatorClient::new();
512                aggregator_client
513                    .expect_retrieve_epoch_settings()
514                    .returning(move || Err(anyhow!("an error")))
515                    .times(1);
516
517                Arc::new(aggregator_client)
518            })
519            .build();
520
521        signer_registration_follower
522            .synchronize_all_signers()
523            .await
524            .expect_err("synchronize_all_signers should fail");
525    }
526
527    #[tokio::test]
528    async fn synchronize_all_signers_fails_if_fetching_stakes_fails() {
529        let registration_epoch = Epoch(1);
530        let fixture = MithrilFixtureBuilder::default()
531            .with_signers(5)
532            .disable_signers_certification()
533            .build();
534        let signers = fixture.signers();
535        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
536            epoch: registration_epoch,
537            next_signers: SignerMessagePart::from_signers(signers),
538            ..EpochSettingsMessage::dummy()
539        })
540        .unwrap();
541        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
542            .with_leader_aggregator_client({
543                let mut aggregator_client = MockLeaderAggregatorClient::new();
544                aggregator_client
545                    .expect_retrieve_epoch_settings()
546                    .returning(move || Ok(Some(epoch_settings_message.clone())))
547                    .times(1);
548
549                Arc::new(aggregator_client)
550            })
551            .with_stake_store({
552                let mut stake_store = MockStakeStore::new();
553                stake_store
554                    .expect_get_stakes()
555                    .returning(move |_epoch| Err(anyhow!("an error")))
556                    .times(1);
557
558                Arc::new(stake_store)
559            })
560            .build();
561
562        signer_registration_follower
563            .synchronize_all_signers()
564            .await
565            .expect_err("synchronize_all_signers should fail");
566    }
567}