mithril_aggregator/services/signer_registration/
follower.rs

1use std::sync::Arc;
2
3use anyhow::{Context, anyhow};
4use async_trait::async_trait;
5
6use mithril_common::{
7    StdResult,
8    entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
9};
10use mithril_persistence::store::StakeStorer;
11
12use crate::{
13    SignerRegistrationVerifier, VerificationKeyStorer, dependency_injection::EpochServiceWrapper,
14    services::AggregatorClient,
15};
16
17use super::{
18    SignerRecorder, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound,
19    SignerRegistrationRoundOpener, SignerSynchronizer,
20};
21
22/// A [MithrilSignerRegistrationFollower] supports signer registrations in a follower aggregator
23pub struct MithrilSignerRegistrationFollower {
24    /// Epoch service
25    pub epoch_service: EpochServiceWrapper,
26
27    /// Verification key store
28    verification_key_store: Arc<dyn VerificationKeyStorer>,
29
30    /// Signer recorder
31    signer_recorder: Arc<dyn SignerRecorder>,
32
33    /// Signer registration verifier
34    signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
35
36    /// Leader aggregator client
37    leader_aggregator_client: Arc<dyn AggregatorClient>,
38
39    /// Stake store
40    stake_store: Arc<dyn StakeStorer>,
41}
42
43impl MithrilSignerRegistrationFollower {
44    /// MithrilSignerRegistererFollower factory
45    pub fn new(
46        epoch_service: EpochServiceWrapper,
47        verification_key_store: Arc<dyn VerificationKeyStorer>,
48        signer_recorder: Arc<dyn SignerRecorder>,
49        signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
50        leader_aggregator_client: Arc<dyn AggregatorClient>,
51        stake_store: Arc<dyn StakeStorer>,
52    ) -> Self {
53        Self {
54            epoch_service,
55            verification_key_store,
56            signer_recorder,
57            signer_registration_verifier,
58            leader_aggregator_client,
59            stake_store,
60        }
61    }
62
63    async fn synchronize_signers(
64        &self,
65        epoch: Epoch,
66        signers: &[Signer],
67        stake_distribution: &StakeDistribution,
68    ) -> Result<(), SignerRegistrationError> {
69        for signer in signers {
70            let signer_with_stake = self
71                .signer_registration_verifier
72                .verify(signer, stake_distribution)
73                .await
74                .map_err(|e| SignerRegistrationError::FailedSignerRegistration(anyhow!(e)))?;
75
76            self.signer_recorder
77                .record_signer_registration(signer_with_stake.party_id.clone())
78                .await
79                .map_err(|e| SignerRegistrationError::FailedSignerRecorder(e.to_string()))?;
80
81            self
82                .verification_key_store
83                .save_verification_key(epoch, signer_with_stake.clone())
84                .await
85                .with_context(|| {
86                    format!(
87                        "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
88                        signer_with_stake.party_id,
89                        epoch
90                    )
91                })
92                .map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?;
93        }
94
95        self.epoch_service
96            .write()
97            .await
98            .update_next_signers_with_stake()
99            .await
100            .map_err(|e| SignerRegistrationError::EpochService(anyhow!(e)))?;
101
102        Ok(())
103    }
104}
105
106#[async_trait]
107impl SignerSynchronizer for MithrilSignerRegistrationFollower {
108    async fn can_synchronize_signers(&self, epoch: Epoch) -> Result<bool, SignerRegistrationError> {
109        Ok(self
110            .leader_aggregator_client
111            .retrieve_epoch_settings()
112            .await
113            .with_context(|| "can_synchronize_signers failed")
114            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
115            .is_some_and(|leader_epoch_settings| epoch == leader_epoch_settings.epoch))
116    }
117
118    async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
119        let leader_epoch_settings = self
120            .leader_aggregator_client
121            .retrieve_epoch_settings()
122            .await
123            .with_context(|| "synchronize_all_signers failed")
124            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
125            .ok_or(
126                SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings(
127                    anyhow::anyhow!("Leader aggregator did not return any epoch settings"),
128                ),
129            )?;
130        let registration_epoch =
131            leader_epoch_settings.epoch.offset_to_leader_synchronization_epoch();
132        let next_signers = leader_epoch_settings.next_signers;
133        let stake_distribution = self
134            .stake_store
135            .get_stakes(registration_epoch)
136            .await
137            .with_context(|| "synchronize_all_signers failed")
138            .map_err(SignerRegistrationError::Store)?
139            .ok_or(SignerRegistrationError::Store(anyhow::anyhow!(
140                "Follower aggregator did not return any stake distribution"
141            )))?;
142        self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
143            .await?;
144
145        Ok(())
146    }
147}
148
149#[async_trait]
150impl SignerRegisterer for MithrilSignerRegistrationFollower {
151    async fn register_signer(
152        &self,
153        _epoch: Epoch,
154        _signer: &Signer,
155    ) -> Result<SignerWithStake, SignerRegistrationError> {
156        Err(SignerRegistrationError::RegistrationRoundAlwaysClosedOnFollowerAggregator)
157    }
158
159    async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
160        None
161    }
162}
163
164#[async_trait]
165impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower {
166    async fn open_registration_round(
167        &self,
168        _registration_epoch: Epoch,
169        _stake_distribution: StakeDistribution,
170    ) -> StdResult<()> {
171        Ok(())
172    }
173
174    async fn close_registration_round(&self) -> StdResult<()> {
175        Ok(())
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use std::sync::Arc;
182
183    use anyhow::anyhow;
184    use mithril_persistence::store::StakeStorer;
185
186    use mithril_common::{
187        entities::{Epoch, Signer, SignerWithStake},
188        messages::{EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter},
189        test_utils::MithrilFixtureBuilder,
190    };
191
192    use crate::{
193        MithrilSignerRegistrationFollower, SignerRecorder, SignerRegisterer,
194        SignerRegistrationRoundOpener, SignerRegistrationVerifier, VerificationKeyStorer,
195        database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
196        message_adapters::FromEpochSettingsAdapter,
197        services::{
198            AggregatorClient, AggregatorClientError, FakeEpochService, MockAggregatorClient,
199            MockSignerRecorder, MockSignerRegistrationVerifier, SignerSynchronizer,
200        },
201        tools::mocks::MockStakeStore,
202    };
203
204    use test_utils::*;
205
206    mod test_utils {
207        use tokio::sync::RwLock;
208
209        use crate::{dependency_injection::EpochServiceWrapper, services::FakeEpochService};
210
211        use super::*;
212
213        /// MithrilSignerRegistrationFollowerBuilder is a test builder for [MithrilSignerRegistrationFollower]
214        pub struct MithrilSignerRegistrationFollowerBuilder {
215            epoch_service: EpochServiceWrapper,
216            signer_recorder: Arc<dyn SignerRecorder>,
217            signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
218            leader_aggregator_client: Arc<dyn AggregatorClient>,
219            stake_store: Arc<dyn StakeStorer>,
220            verification_key_store: Arc<dyn VerificationKeyStorer>,
221        }
222
223        impl Default for MithrilSignerRegistrationFollowerBuilder {
224            fn default() -> Self {
225                Self {
226                    epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
227                    signer_recorder: Arc::new(MockSignerRecorder::new()),
228                    signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
229                    leader_aggregator_client: Arc::new(MockAggregatorClient::new()),
230                    stake_store: Arc::new(MockStakeStore::new()),
231                    verification_key_store: Arc::new(SignerRegistrationStore::new(
232                        Arc::new(main_db_connection().unwrap()),
233                        None,
234                    )),
235                }
236            }
237        }
238
239        impl MithrilSignerRegistrationFollowerBuilder {
240            pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
241                Self {
242                    epoch_service: Arc::new(RwLock::new(epoch_service)),
243                    ..self
244                }
245            }
246
247            pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
248                Self {
249                    signer_recorder,
250                    ..self
251                }
252            }
253
254            pub fn with_signer_registration_verifier(
255                self,
256                signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
257            ) -> Self {
258                Self {
259                    signer_registration_verifier,
260                    ..self
261                }
262            }
263
264            pub fn with_leader_aggregator_client(
265                self,
266                leader_aggregator_client: Arc<dyn AggregatorClient>,
267            ) -> Self {
268                Self {
269                    leader_aggregator_client,
270                    ..self
271                }
272            }
273
274            pub fn with_stake_store(self, stake_store: Arc<dyn StakeStorer>) -> Self {
275                Self {
276                    stake_store,
277                    ..self
278                }
279            }
280
281            pub fn build(self) -> MithrilSignerRegistrationFollower {
282                MithrilSignerRegistrationFollower {
283                    epoch_service: self.epoch_service,
284                    verification_key_store: self.verification_key_store,
285                    signer_recorder: self.signer_recorder,
286                    signer_registration_verifier: self.signer_registration_verifier,
287                    leader_aggregator_client: self.leader_aggregator_client,
288                    stake_store: self.stake_store,
289                }
290            }
291        }
292    }
293
294    #[tokio::test]
295    async fn open_close_registration_always_succeeds() {
296        let signer_registration_follower =
297            MithrilSignerRegistrationFollowerBuilder::default().build();
298        let registration_epoch = Epoch(1);
299        let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
300        let stake_distribution = fixture.stake_distribution();
301
302        signer_registration_follower
303            .open_registration_round(registration_epoch, stake_distribution)
304            .await
305            .expect("signer registration round opening should not fail");
306
307        signer_registration_follower
308            .close_registration_round()
309            .await
310            .expect("signer registration round opening should not fail");
311    }
312
313    #[tokio::test]
314    async fn register_signer_always_fails() {
315        let signer_registration_follower =
316            MithrilSignerRegistrationFollowerBuilder::default().build();
317        let registration_epoch = Epoch(1);
318        let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
319        let signer_to_register: Signer = fixture.signers()[0].to_owned();
320
321        signer_registration_follower
322            .register_signer(registration_epoch, &signer_to_register)
323            .await
324            .expect_err("signer registration should always fail");
325    }
326
327    #[tokio::test]
328    async fn synchronize_all_signers_succeeds() {
329        let registration_epoch = Epoch(1);
330        let fixture = MithrilFixtureBuilder::default()
331            .with_signers(5)
332            .disable_signers_certification()
333            .build();
334        let signers = fixture.signers();
335        let stake_distribution = fixture.stake_distribution();
336        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
337            epoch: registration_epoch,
338            next_signers: SignerMessagePart::from_signers(signers),
339            ..EpochSettingsMessage::dummy()
340        })
341        .unwrap();
342        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
343            .with_signer_recorder({
344                let mut signer_recorder = MockSignerRecorder::new();
345                signer_recorder
346                    .expect_record_signer_registration()
347                    .returning(|_| Ok(()))
348                    .times(5);
349
350                Arc::new(signer_recorder)
351            })
352            .with_signer_registration_verifier({
353                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
354                signer_registration_verifier
355                    .expect_verify()
356                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
357                    .times(5);
358
359                Arc::new(signer_registration_verifier)
360            })
361            .with_leader_aggregator_client({
362                let mut aggregator_client = MockAggregatorClient::new();
363                aggregator_client
364                    .expect_retrieve_epoch_settings()
365                    .returning(move || Ok(Some(epoch_settings_message.clone())))
366                    .times(1);
367
368                Arc::new(aggregator_client)
369            })
370            .with_stake_store({
371                let mut stake_store = MockStakeStore::new();
372                stake_store
373                    .expect_get_stakes()
374                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
375                    .times(1);
376
377                Arc::new(stake_store)
378            })
379            .build();
380
381        signer_registration_follower.synchronize_all_signers().await.unwrap();
382    }
383
384    #[tokio::test]
385    async fn synchronize_all_signers_fails_if_one_signer_registration_fails() {
386        let registration_epoch = Epoch(1);
387        let fixture = MithrilFixtureBuilder::default()
388            .with_signers(5)
389            .disable_signers_certification()
390            .build();
391        let signers = fixture.signers();
392        let stake_distribution = fixture.stake_distribution();
393        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
394            epoch: registration_epoch,
395            next_signers: SignerMessagePart::from_signers(signers),
396            ..EpochSettingsMessage::dummy()
397        })
398        .unwrap();
399
400        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
401            .with_signer_recorder({
402                let mut signer_recorder = MockSignerRecorder::new();
403                signer_recorder
404                    .expect_record_signer_registration()
405                    .returning(|_| Ok(()))
406                    .times(4);
407                signer_recorder
408                    .expect_record_signer_registration()
409                    .returning(|_| Err(anyhow!("an error")))
410                    .times(1);
411
412                Arc::new(signer_recorder)
413            })
414            .with_signer_registration_verifier({
415                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
416                signer_registration_verifier
417                    .expect_verify()
418                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
419                    .times(5);
420
421                Arc::new(signer_registration_verifier)
422            })
423            .with_leader_aggregator_client({
424                let mut aggregator_client = MockAggregatorClient::new();
425                aggregator_client
426                    .expect_retrieve_epoch_settings()
427                    .returning(move || Ok(Some(epoch_settings_message.clone())))
428                    .times(1);
429
430                Arc::new(aggregator_client)
431            })
432            .with_stake_store({
433                let mut stake_store = MockStakeStore::new();
434                stake_store
435                    .expect_get_stakes()
436                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
437                    .times(1);
438
439                Arc::new(stake_store)
440            })
441            .build();
442
443        signer_registration_follower
444            .synchronize_all_signers()
445            .await
446            .expect_err("synchronize_all_signers should fail");
447    }
448
449    #[tokio::test]
450    async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
451        let registration_epoch = Epoch(1);
452        let fixture = MithrilFixtureBuilder::default()
453            .with_signers(5)
454            .disable_signers_certification()
455            .build();
456        let signers = fixture.signers();
457        let stake_distribution = fixture.stake_distribution();
458        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
459            epoch: registration_epoch,
460            next_signers: SignerMessagePart::from_signers(signers),
461            ..EpochSettingsMessage::dummy()
462        })
463        .unwrap();
464
465        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
466            .with_epoch_service({
467                let mut epoch_service = FakeEpochService::without_data();
468                epoch_service.toggle_errors(false, false, false, true);
469
470                epoch_service
471            })
472            .with_signer_recorder({
473                let mut signer_recorder = MockSignerRecorder::new();
474                signer_recorder
475                    .expect_record_signer_registration()
476                    .returning(|_| Ok(()))
477                    .times(5);
478
479                Arc::new(signer_recorder)
480            })
481            .with_signer_registration_verifier({
482                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
483                signer_registration_verifier
484                    .expect_verify()
485                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
486                    .times(5);
487
488                Arc::new(signer_registration_verifier)
489            })
490            .with_leader_aggregator_client({
491                let mut aggregator_client = MockAggregatorClient::new();
492                aggregator_client
493                    .expect_retrieve_epoch_settings()
494                    .returning(move || Ok(Some(epoch_settings_message.clone())))
495                    .times(1);
496
497                Arc::new(aggregator_client)
498            })
499            .with_stake_store({
500                let mut stake_store = MockStakeStore::new();
501                stake_store
502                    .expect_get_stakes()
503                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
504                    .times(1);
505
506                Arc::new(stake_store)
507            })
508            .build();
509
510        signer_registration_follower
511            .synchronize_all_signers()
512            .await
513            .expect_err("synchronize_all_signers should fail");
514    }
515
516    #[tokio::test]
517    async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
518        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
519            .with_leader_aggregator_client({
520                let mut aggregator_client = MockAggregatorClient::new();
521                aggregator_client
522                    .expect_retrieve_epoch_settings()
523                    .returning(move || {
524                        Err(AggregatorClientError::RemoteServerTechnical(anyhow!(
525                            "an error"
526                        )))
527                    })
528                    .times(1);
529
530                Arc::new(aggregator_client)
531            })
532            .build();
533
534        signer_registration_follower
535            .synchronize_all_signers()
536            .await
537            .expect_err("synchronize_all_signers should fail");
538    }
539
540    #[tokio::test]
541    async fn synchronize_all_signers_fails_if_fetching_stakes_fails() {
542        let registration_epoch = Epoch(1);
543        let fixture = MithrilFixtureBuilder::default()
544            .with_signers(5)
545            .disable_signers_certification()
546            .build();
547        let signers = fixture.signers();
548        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
549            epoch: registration_epoch,
550            next_signers: SignerMessagePart::from_signers(signers),
551            ..EpochSettingsMessage::dummy()
552        })
553        .unwrap();
554        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
555            .with_leader_aggregator_client({
556                let mut aggregator_client = MockAggregatorClient::new();
557                aggregator_client
558                    .expect_retrieve_epoch_settings()
559                    .returning(move || Ok(Some(epoch_settings_message.clone())))
560                    .times(1);
561
562                Arc::new(aggregator_client)
563            })
564            .with_stake_store({
565                let mut stake_store = MockStakeStore::new();
566                stake_store
567                    .expect_get_stakes()
568                    .returning(move |_epoch| Err(anyhow!("an error")))
569                    .times(1);
570
571                Arc::new(stake_store)
572            })
573            .build();
574
575        signer_registration_follower
576            .synchronize_all_signers()
577            .await
578            .expect_err("synchronize_all_signers should fail");
579    }
580}