mithril_aggregator/services/signer_registration/
follower.rs

1use std::sync::Arc;
2
3use anyhow::{anyhow, Context};
4use async_trait::async_trait;
5
6use mithril_common::{
7    entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
8    StdResult,
9};
10use mithril_persistence::store::StakeStorer;
11
12use crate::{
13    dependency_injection::EpochServiceWrapper, services::AggregatorClient,
14    SignerRegistrationVerifier, VerificationKeyStorer,
15};
16
17use super::{
18    SignerRecorder, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound,
19    SignerRegistrationRoundOpener, SignerSynchronizer,
20};
21
22/// A [MithrilSignerRegistrationFollower] supports signer registrations in a follower aggregator
23pub struct MithrilSignerRegistrationFollower {
24    /// Epoch service
25    pub epoch_service: EpochServiceWrapper,
26
27    /// Verification key store
28    verification_key_store: Arc<dyn VerificationKeyStorer>,
29
30    /// Signer recorder
31    signer_recorder: Arc<dyn SignerRecorder>,
32
33    /// Signer registration verifier
34    signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
35
36    /// Leader aggregator client
37    leader_aggregator_client: Arc<dyn AggregatorClient>,
38
39    /// Stake store
40    stake_store: Arc<dyn StakeStorer>,
41}
42
43impl MithrilSignerRegistrationFollower {
44    /// MithrilSignerRegistererFollower factory
45    pub fn new(
46        epoch_service: EpochServiceWrapper,
47        verification_key_store: Arc<dyn VerificationKeyStorer>,
48        signer_recorder: Arc<dyn SignerRecorder>,
49        signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
50        leader_aggregator_client: Arc<dyn AggregatorClient>,
51        stake_store: Arc<dyn StakeStorer>,
52    ) -> Self {
53        Self {
54            epoch_service,
55            verification_key_store,
56            signer_recorder,
57            signer_registration_verifier,
58            leader_aggregator_client,
59            stake_store,
60        }
61    }
62
63    async fn synchronize_signers(
64        &self,
65        epoch: Epoch,
66        signers: &[Signer],
67        stake_distribution: &StakeDistribution,
68    ) -> Result<(), SignerRegistrationError> {
69        for signer in signers {
70            let signer_with_stake = self
71                .signer_registration_verifier
72                .verify(signer, stake_distribution)
73                .await
74                .map_err(|e| SignerRegistrationError::FailedSignerRegistration(anyhow!(e)))?;
75
76            self.signer_recorder
77                .record_signer_registration(signer_with_stake.party_id.clone())
78                .await
79                .map_err(|e| SignerRegistrationError::FailedSignerRecorder(e.to_string()))?;
80
81            self
82                .verification_key_store
83                .save_verification_key(epoch, signer_with_stake.clone())
84                .await
85                .with_context(|| {
86                    format!(
87                        "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
88                        signer_with_stake.party_id,
89                        epoch
90                    )
91                })
92                .map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?;
93        }
94
95        self.epoch_service
96            .write()
97            .await
98            .update_next_signers_with_stake()
99            .await
100            .map_err(|e| SignerRegistrationError::EpochService(anyhow!(e)))?;
101
102        Ok(())
103    }
104}
105
106#[async_trait]
107impl SignerSynchronizer for MithrilSignerRegistrationFollower {
108    async fn can_synchronize_signers(&self, epoch: Epoch) -> Result<bool, SignerRegistrationError> {
109        Ok(self
110            .leader_aggregator_client
111            .retrieve_epoch_settings()
112            .await
113            .with_context(|| "can_synchronize_signers failed")
114            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
115            .is_some_and(|leader_epoch_settings| epoch == leader_epoch_settings.epoch))
116    }
117
118    async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
119        let leader_epoch_settings = self
120            .leader_aggregator_client
121            .retrieve_epoch_settings()
122            .await
123            .with_context(|| "synchronize_all_signers failed")
124            .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
125            .ok_or(
126                SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings(
127                    anyhow::anyhow!("Leader aggregator did not return any epoch settings"),
128                ),
129            )?;
130        let registration_epoch = leader_epoch_settings
131            .epoch
132            .offset_to_leader_synchronization_epoch();
133        let next_signers = leader_epoch_settings.next_signers;
134        let stake_distribution = self
135            .stake_store
136            .get_stakes(registration_epoch)
137            .await
138            .with_context(|| "synchronize_all_signers failed")
139            .map_err(SignerRegistrationError::Store)?
140            .ok_or(SignerRegistrationError::Store(anyhow::anyhow!(
141                "Follower aggregator did not return any stake distribution"
142            )))?;
143        self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
144            .await?;
145
146        Ok(())
147    }
148}
149
150#[async_trait]
151impl SignerRegisterer for MithrilSignerRegistrationFollower {
152    async fn register_signer(
153        &self,
154        _epoch: Epoch,
155        _signer: &Signer,
156    ) -> Result<SignerWithStake, SignerRegistrationError> {
157        Err(SignerRegistrationError::RegistrationRoundAlwaysClosedOnFollowerAggregator)
158    }
159
160    async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
161        None
162    }
163}
164
165#[async_trait]
166impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower {
167    async fn open_registration_round(
168        &self,
169        _registration_epoch: Epoch,
170        _stake_distribution: StakeDistribution,
171    ) -> StdResult<()> {
172        Ok(())
173    }
174
175    async fn close_registration_round(&self) -> StdResult<()> {
176        Ok(())
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use std::sync::Arc;
183
184    use anyhow::anyhow;
185    use mithril_persistence::store::StakeStorer;
186
187    use mithril_common::{
188        entities::{Epoch, Signer, SignerWithStake},
189        messages::{EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter},
190        test_utils::MithrilFixtureBuilder,
191    };
192
193    use crate::{
194        database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
195        message_adapters::FromEpochSettingsAdapter,
196        services::{
197            AggregatorClient, AggregatorClientError, FakeEpochService, MockAggregatorClient,
198            MockSignerRecorder, MockSignerRegistrationVerifier, SignerSynchronizer,
199        },
200        tools::mocks::MockStakeStore,
201        MithrilSignerRegistrationFollower, SignerRecorder, SignerRegisterer,
202        SignerRegistrationRoundOpener, SignerRegistrationVerifier, VerificationKeyStorer,
203    };
204
205    use test_utils::*;
206
207    mod test_utils {
208        use tokio::sync::RwLock;
209
210        use crate::{dependency_injection::EpochServiceWrapper, services::FakeEpochService};
211
212        use super::*;
213
214        /// MithrilSignerRegistrationFollowerBuilder is a test builder for [MithrilSignerRegistrationFollower]
215        pub struct MithrilSignerRegistrationFollowerBuilder {
216            epoch_service: EpochServiceWrapper,
217            signer_recorder: Arc<dyn SignerRecorder>,
218            signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
219            leader_aggregator_client: Arc<dyn AggregatorClient>,
220            stake_store: Arc<dyn StakeStorer>,
221            verification_key_store: Arc<dyn VerificationKeyStorer>,
222        }
223
224        impl Default for MithrilSignerRegistrationFollowerBuilder {
225            fn default() -> Self {
226                Self {
227                    epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
228                    signer_recorder: Arc::new(MockSignerRecorder::new()),
229                    signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
230                    leader_aggregator_client: Arc::new(MockAggregatorClient::new()),
231                    stake_store: Arc::new(MockStakeStore::new()),
232                    verification_key_store: Arc::new(SignerRegistrationStore::new(
233                        Arc::new(main_db_connection().unwrap()),
234                        None,
235                    )),
236                }
237            }
238        }
239
240        impl MithrilSignerRegistrationFollowerBuilder {
241            pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
242                Self {
243                    epoch_service: Arc::new(RwLock::new(epoch_service)),
244                    ..self
245                }
246            }
247
248            pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
249                Self {
250                    signer_recorder,
251                    ..self
252                }
253            }
254
255            pub fn with_signer_registration_verifier(
256                self,
257                signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
258            ) -> Self {
259                Self {
260                    signer_registration_verifier,
261                    ..self
262                }
263            }
264
265            pub fn with_leader_aggregator_client(
266                self,
267                leader_aggregator_client: Arc<dyn AggregatorClient>,
268            ) -> Self {
269                Self {
270                    leader_aggregator_client,
271                    ..self
272                }
273            }
274
275            pub fn with_stake_store(self, stake_store: Arc<dyn StakeStorer>) -> Self {
276                Self {
277                    stake_store,
278                    ..self
279                }
280            }
281
282            pub fn build(self) -> MithrilSignerRegistrationFollower {
283                MithrilSignerRegistrationFollower {
284                    epoch_service: self.epoch_service,
285                    verification_key_store: self.verification_key_store,
286                    signer_recorder: self.signer_recorder,
287                    signer_registration_verifier: self.signer_registration_verifier,
288                    leader_aggregator_client: self.leader_aggregator_client,
289                    stake_store: self.stake_store,
290                }
291            }
292        }
293    }
294
295    #[tokio::test]
296    async fn open_close_registration_always_succeeds() {
297        let signer_registration_follower =
298            MithrilSignerRegistrationFollowerBuilder::default().build();
299        let registration_epoch = Epoch(1);
300        let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
301        let stake_distribution = fixture.stake_distribution();
302
303        signer_registration_follower
304            .open_registration_round(registration_epoch, stake_distribution)
305            .await
306            .expect("signer registration round opening should not fail");
307
308        signer_registration_follower
309            .close_registration_round()
310            .await
311            .expect("signer registration round opening should not fail");
312    }
313
314    #[tokio::test]
315    async fn register_signer_always_fails() {
316        let signer_registration_follower =
317            MithrilSignerRegistrationFollowerBuilder::default().build();
318        let registration_epoch = Epoch(1);
319        let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
320        let signer_to_register: Signer = fixture.signers()[0].to_owned();
321
322        signer_registration_follower
323            .register_signer(registration_epoch, &signer_to_register)
324            .await
325            .expect_err("signer registration should always fail");
326    }
327
328    #[tokio::test]
329    async fn synchronize_all_signers_succeeds() {
330        let registration_epoch = Epoch(1);
331        let fixture = MithrilFixtureBuilder::default()
332            .with_signers(5)
333            .disable_signers_certification()
334            .build();
335        let signers = fixture.signers();
336        let stake_distribution = fixture.stake_distribution();
337        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
338            epoch: registration_epoch,
339            next_signers: SignerMessagePart::from_signers(signers),
340            ..EpochSettingsMessage::dummy()
341        })
342        .unwrap();
343        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
344            .with_signer_recorder({
345                let mut signer_recorder = MockSignerRecorder::new();
346                signer_recorder
347                    .expect_record_signer_registration()
348                    .returning(|_| Ok(()))
349                    .times(5);
350
351                Arc::new(signer_recorder)
352            })
353            .with_signer_registration_verifier({
354                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
355                signer_registration_verifier
356                    .expect_verify()
357                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
358                    .times(5);
359
360                Arc::new(signer_registration_verifier)
361            })
362            .with_leader_aggregator_client({
363                let mut aggregator_client = MockAggregatorClient::new();
364                aggregator_client
365                    .expect_retrieve_epoch_settings()
366                    .returning(move || Ok(Some(epoch_settings_message.clone())))
367                    .times(1);
368
369                Arc::new(aggregator_client)
370            })
371            .with_stake_store({
372                let mut stake_store = MockStakeStore::new();
373                stake_store
374                    .expect_get_stakes()
375                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
376                    .times(1);
377
378                Arc::new(stake_store)
379            })
380            .build();
381
382        signer_registration_follower
383            .synchronize_all_signers()
384            .await
385            .unwrap();
386    }
387
388    #[tokio::test]
389    async fn synchronize_all_signers_fails_if_one_signer_registration_fails() {
390        let registration_epoch = Epoch(1);
391        let fixture = MithrilFixtureBuilder::default()
392            .with_signers(5)
393            .disable_signers_certification()
394            .build();
395        let signers = fixture.signers();
396        let stake_distribution = fixture.stake_distribution();
397        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
398            epoch: registration_epoch,
399            next_signers: SignerMessagePart::from_signers(signers),
400            ..EpochSettingsMessage::dummy()
401        })
402        .unwrap();
403
404        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
405            .with_signer_recorder({
406                let mut signer_recorder = MockSignerRecorder::new();
407                signer_recorder
408                    .expect_record_signer_registration()
409                    .returning(|_| Ok(()))
410                    .times(4);
411                signer_recorder
412                    .expect_record_signer_registration()
413                    .returning(|_| Err(anyhow!("an error")))
414                    .times(1);
415
416                Arc::new(signer_recorder)
417            })
418            .with_signer_registration_verifier({
419                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
420                signer_registration_verifier
421                    .expect_verify()
422                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
423                    .times(5);
424
425                Arc::new(signer_registration_verifier)
426            })
427            .with_leader_aggregator_client({
428                let mut aggregator_client = MockAggregatorClient::new();
429                aggregator_client
430                    .expect_retrieve_epoch_settings()
431                    .returning(move || Ok(Some(epoch_settings_message.clone())))
432                    .times(1);
433
434                Arc::new(aggregator_client)
435            })
436            .with_stake_store({
437                let mut stake_store = MockStakeStore::new();
438                stake_store
439                    .expect_get_stakes()
440                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
441                    .times(1);
442
443                Arc::new(stake_store)
444            })
445            .build();
446
447        signer_registration_follower
448            .synchronize_all_signers()
449            .await
450            .expect_err("synchronize_all_signers should fail");
451    }
452
453    #[tokio::test]
454    async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
455        let registration_epoch = Epoch(1);
456        let fixture = MithrilFixtureBuilder::default()
457            .with_signers(5)
458            .disable_signers_certification()
459            .build();
460        let signers = fixture.signers();
461        let stake_distribution = fixture.stake_distribution();
462        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
463            epoch: registration_epoch,
464            next_signers: SignerMessagePart::from_signers(signers),
465            ..EpochSettingsMessage::dummy()
466        })
467        .unwrap();
468
469        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
470            .with_epoch_service({
471                let mut epoch_service = FakeEpochService::without_data();
472                epoch_service.toggle_errors(false, false, false, true);
473
474                epoch_service
475            })
476            .with_signer_recorder({
477                let mut signer_recorder = MockSignerRecorder::new();
478                signer_recorder
479                    .expect_record_signer_registration()
480                    .returning(|_| Ok(()))
481                    .times(5);
482
483                Arc::new(signer_recorder)
484            })
485            .with_signer_registration_verifier({
486                let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
487                signer_registration_verifier
488                    .expect_verify()
489                    .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
490                    .times(5);
491
492                Arc::new(signer_registration_verifier)
493            })
494            .with_leader_aggregator_client({
495                let mut aggregator_client = MockAggregatorClient::new();
496                aggregator_client
497                    .expect_retrieve_epoch_settings()
498                    .returning(move || Ok(Some(epoch_settings_message.clone())))
499                    .times(1);
500
501                Arc::new(aggregator_client)
502            })
503            .with_stake_store({
504                let mut stake_store = MockStakeStore::new();
505                stake_store
506                    .expect_get_stakes()
507                    .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
508                    .times(1);
509
510                Arc::new(stake_store)
511            })
512            .build();
513
514        signer_registration_follower
515            .synchronize_all_signers()
516            .await
517            .expect_err("synchronize_all_signers should fail");
518    }
519
520    #[tokio::test]
521    async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
522        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
523            .with_leader_aggregator_client({
524                let mut aggregator_client = MockAggregatorClient::new();
525                aggregator_client
526                    .expect_retrieve_epoch_settings()
527                    .returning(move || {
528                        Err(AggregatorClientError::RemoteServerTechnical(anyhow!(
529                            "an error"
530                        )))
531                    })
532                    .times(1);
533
534                Arc::new(aggregator_client)
535            })
536            .build();
537
538        signer_registration_follower
539            .synchronize_all_signers()
540            .await
541            .expect_err("synchronize_all_signers should fail");
542    }
543
544    #[tokio::test]
545    async fn synchronize_all_signers_fails_if_fetching_stakes_fails() {
546        let registration_epoch = Epoch(1);
547        let fixture = MithrilFixtureBuilder::default()
548            .with_signers(5)
549            .disable_signers_certification()
550            .build();
551        let signers = fixture.signers();
552        let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
553            epoch: registration_epoch,
554            next_signers: SignerMessagePart::from_signers(signers),
555            ..EpochSettingsMessage::dummy()
556        })
557        .unwrap();
558        let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
559            .with_leader_aggregator_client({
560                let mut aggregator_client = MockAggregatorClient::new();
561                aggregator_client
562                    .expect_retrieve_epoch_settings()
563                    .returning(move || Ok(Some(epoch_settings_message.clone())))
564                    .times(1);
565
566                Arc::new(aggregator_client)
567            })
568            .with_stake_store({
569                let mut stake_store = MockStakeStore::new();
570                stake_store
571                    .expect_get_stakes()
572                    .returning(move |_epoch| Err(anyhow!("an error")))
573                    .times(1);
574
575                Arc::new(stake_store)
576            })
577            .build();
578
579        signer_registration_follower
580            .synchronize_all_signers()
581            .await
582            .expect_err("synchronize_all_signers should fail");
583    }
584}