mithril_aggregator/services/signer_registration/
follower.rs1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::{
7 StdResult,
8 entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
9};
10use mithril_persistence::store::StakeStorer;
11
12use crate::{
13 SignerRegistrationVerifier, VerificationKeyStorer, dependency_injection::EpochServiceWrapper,
14};
15
16use super::{
17 LeaderAggregatorClient, SignerRecorder, SignerRegisterer, SignerRegistrationError,
18 SignerRegistrationRound, SignerRegistrationRoundOpener, SignerSynchronizer,
19};
20
21pub struct MithrilSignerRegistrationFollower {
23 pub epoch_service: EpochServiceWrapper,
25
26 verification_key_store: Arc<dyn VerificationKeyStorer>,
28
29 signer_recorder: Arc<dyn SignerRecorder>,
31
32 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
34
35 leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
37
38 stake_store: Arc<dyn StakeStorer>,
40}
41
42impl MithrilSignerRegistrationFollower {
43 pub fn new(
45 epoch_service: EpochServiceWrapper,
46 verification_key_store: Arc<dyn VerificationKeyStorer>,
47 signer_recorder: Arc<dyn SignerRecorder>,
48 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
49 leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
50 stake_store: Arc<dyn StakeStorer>,
51 ) -> Self {
52 Self {
53 epoch_service,
54 verification_key_store,
55 signer_recorder,
56 signer_registration_verifier,
57 leader_aggregator_client,
58 stake_store,
59 }
60 }
61
62 async fn synchronize_signers(
63 &self,
64 epoch: Epoch,
65 signers: &[Signer],
66 stake_distribution: &StakeDistribution,
67 ) -> Result<(), SignerRegistrationError> {
68 for signer in signers {
69 let signer_with_stake = self
70 .signer_registration_verifier
71 .verify(signer, stake_distribution)
72 .await
73 .map_err(|err| {
74 SignerRegistrationError::InvalidSignerRegistration(
75 signer.party_id.clone(),
76 epoch,
77 err,
78 )
79 })?;
80
81 self.signer_recorder
82 .record_signer_registration(signer_with_stake.party_id.clone())
83 .await
84 .map_err(|err| {
85 SignerRegistrationError::FailedSignerRecorder(
86 signer_with_stake.party_id.clone(),
87 epoch,
88 err,
89 )
90 })?;
91
92 self
93 .verification_key_store
94 .save_verification_key(epoch, signer_with_stake.clone())
95 .await
96 .with_context(|| {
97 format!(
98 "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
99 signer_with_stake.party_id,
100 epoch
101 )
102 })
103 .map_err(SignerRegistrationError::Store)?;
104 }
105
106 self.epoch_service
107 .write()
108 .await
109 .update_next_signers_with_stake()
110 .await
111 .map_err(SignerRegistrationError::EpochService)?;
112
113 Ok(())
114 }
115}
116
117#[async_trait]
118impl SignerSynchronizer for MithrilSignerRegistrationFollower {
119 async fn can_synchronize_signers(&self, epoch: Epoch) -> Result<bool, SignerRegistrationError> {
120 Ok(self
121 .leader_aggregator_client
122 .retrieve_epoch_settings()
123 .await
124 .with_context(|| "can_synchronize_signers failed")
125 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
126 .is_some_and(|leader_epoch_settings| epoch == leader_epoch_settings.epoch))
127 }
128
129 async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
130 let leader_epoch_settings = self
131 .leader_aggregator_client
132 .retrieve_epoch_settings()
133 .await
134 .with_context(|| "synchronize_all_signers failed")
135 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
136 .with_context(|| "Leader aggregator did not return any epoch settings")
137 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?;
138 let registration_epoch =
139 leader_epoch_settings.epoch.offset_to_leader_synchronization_epoch();
140 let next_signers = leader_epoch_settings.next_signers;
141 let stake_distribution = self
142 .stake_store
143 .get_stakes(registration_epoch)
144 .await
145 .with_context(|| "synchronize_all_signers failed")
146 .map_err(SignerRegistrationError::Store)?
147 .with_context(|| "Follower aggregator did not return any stake distribution")
148 .map_err(SignerRegistrationError::Store)?;
149 self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
150 .await?;
151
152 Ok(())
153 }
154}
155
156#[async_trait]
157impl SignerRegisterer for MithrilSignerRegistrationFollower {
158 async fn register_signer(
159 &self,
160 _epoch: Epoch,
161 _signer: &Signer,
162 ) -> Result<SignerWithStake, SignerRegistrationError> {
163 Err(SignerRegistrationError::RegistrationRoundAlwaysClosedOnFollowerAggregator)
164 }
165
166 async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
167 None
168 }
169}
170
171#[async_trait]
172impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower {
173 async fn open_registration_round(
174 &self,
175 _registration_epoch: Epoch,
176 _stake_distribution: StakeDistribution,
177 ) -> StdResult<()> {
178 Ok(())
179 }
180
181 async fn close_registration_round(&self) -> StdResult<()> {
182 Ok(())
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use anyhow::anyhow;
189
190 use mithril_common::messages::{
191 EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter,
192 };
193 use mithril_common::test::{builder::MithrilFixtureBuilder, double::Dummy};
194
195 use crate::{
196 database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
197 message_adapters::FromEpochSettingsAdapter,
198 services::{
199 FakeEpochService, MockLeaderAggregatorClient, MockSignerRecorder,
200 MockSignerRegistrationVerifier,
201 },
202 test::double::mocks::MockStakeStore,
203 };
204
205 use super::*;
206
207 use test_utils::*;
208
209 mod test_utils {
210 use tokio::sync::RwLock;
211
212 use super::*;
213
214 pub struct MithrilSignerRegistrationFollowerBuilder {
216 epoch_service: EpochServiceWrapper,
217 signer_recorder: Arc<dyn SignerRecorder>,
218 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
219 leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
220 stake_store: Arc<dyn StakeStorer>,
221 verification_key_store: Arc<dyn VerificationKeyStorer>,
222 }
223
224 impl Default for MithrilSignerRegistrationFollowerBuilder {
225 fn default() -> Self {
226 Self {
227 epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
228 signer_recorder: Arc::new(MockSignerRecorder::new()),
229 signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
230 leader_aggregator_client: Arc::new(MockLeaderAggregatorClient::new()),
231 stake_store: Arc::new(MockStakeStore::new()),
232 verification_key_store: Arc::new(SignerRegistrationStore::new(
233 Arc::new(main_db_connection().unwrap()),
234 None,
235 )),
236 }
237 }
238 }
239
240 impl MithrilSignerRegistrationFollowerBuilder {
241 pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
242 Self {
243 epoch_service: Arc::new(RwLock::new(epoch_service)),
244 ..self
245 }
246 }
247
248 pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
249 Self {
250 signer_recorder,
251 ..self
252 }
253 }
254
255 pub fn with_signer_registration_verifier(
256 self,
257 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
258 ) -> Self {
259 Self {
260 signer_registration_verifier,
261 ..self
262 }
263 }
264
265 pub fn with_leader_aggregator_client(
266 self,
267 leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
268 ) -> Self {
269 Self {
270 leader_aggregator_client,
271 ..self
272 }
273 }
274
275 pub fn with_stake_store(self, stake_store: Arc<dyn StakeStorer>) -> Self {
276 Self {
277 stake_store,
278 ..self
279 }
280 }
281
282 pub fn build(self) -> MithrilSignerRegistrationFollower {
283 MithrilSignerRegistrationFollower {
284 epoch_service: self.epoch_service,
285 verification_key_store: self.verification_key_store,
286 signer_recorder: self.signer_recorder,
287 signer_registration_verifier: self.signer_registration_verifier,
288 leader_aggregator_client: self.leader_aggregator_client,
289 stake_store: self.stake_store,
290 }
291 }
292 }
293 }
294
295 #[tokio::test]
296 async fn open_close_registration_always_succeeds() {
297 let signer_registration_follower =
298 MithrilSignerRegistrationFollowerBuilder::default().build();
299 let registration_epoch = Epoch(1);
300 let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
301 let stake_distribution = fixture.stake_distribution();
302
303 signer_registration_follower
304 .open_registration_round(registration_epoch, stake_distribution)
305 .await
306 .expect("signer registration round opening should not fail");
307
308 signer_registration_follower
309 .close_registration_round()
310 .await
311 .expect("signer registration round opening should not fail");
312 }
313
314 #[tokio::test]
315 async fn register_signer_always_fails() {
316 let signer_registration_follower =
317 MithrilSignerRegistrationFollowerBuilder::default().build();
318 let registration_epoch = Epoch(1);
319 let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
320 let signer_to_register: Signer = fixture.signers()[0].to_owned();
321
322 signer_registration_follower
323 .register_signer(registration_epoch, &signer_to_register)
324 .await
325 .expect_err("signer registration should always fail");
326 }
327
328 #[tokio::test]
329 async fn synchronize_all_signers_succeeds() {
330 let registration_epoch = Epoch(1);
331 let fixture = MithrilFixtureBuilder::default()
332 .with_signers(5)
333 .disable_signers_certification()
334 .build();
335 let signers = fixture.signers();
336 let stake_distribution = fixture.stake_distribution();
337 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
338 epoch: registration_epoch,
339 next_signers: SignerMessagePart::from_signers(signers),
340 ..EpochSettingsMessage::dummy()
341 })
342 .unwrap();
343 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
344 .with_signer_recorder({
345 let mut signer_recorder = MockSignerRecorder::new();
346 signer_recorder
347 .expect_record_signer_registration()
348 .returning(|_| Ok(()))
349 .times(5);
350
351 Arc::new(signer_recorder)
352 })
353 .with_signer_registration_verifier({
354 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
355 signer_registration_verifier
356 .expect_verify()
357 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
358 .times(5);
359
360 Arc::new(signer_registration_verifier)
361 })
362 .with_leader_aggregator_client({
363 let mut aggregator_client = MockLeaderAggregatorClient::new();
364 aggregator_client
365 .expect_retrieve_epoch_settings()
366 .returning(move || Ok(Some(epoch_settings_message.clone())))
367 .times(1);
368
369 Arc::new(aggregator_client)
370 })
371 .with_stake_store({
372 let mut stake_store = MockStakeStore::new();
373 stake_store
374 .expect_get_stakes()
375 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
376 .times(1);
377
378 Arc::new(stake_store)
379 })
380 .build();
381
382 signer_registration_follower.synchronize_all_signers().await.unwrap();
383 }
384
385 #[tokio::test]
386 async fn synchronize_all_signers_fails_if_one_signer_registration_fails() {
387 let registration_epoch = Epoch(1);
388 let fixture = MithrilFixtureBuilder::default()
389 .with_signers(5)
390 .disable_signers_certification()
391 .build();
392 let signers = fixture.signers();
393 let stake_distribution = fixture.stake_distribution();
394 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
395 epoch: registration_epoch,
396 next_signers: SignerMessagePart::from_signers(signers),
397 ..EpochSettingsMessage::dummy()
398 })
399 .unwrap();
400
401 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
402 .with_signer_recorder({
403 let mut signer_recorder = MockSignerRecorder::new();
404 signer_recorder
405 .expect_record_signer_registration()
406 .returning(|_| Ok(()))
407 .times(4);
408 signer_recorder
409 .expect_record_signer_registration()
410 .returning(|_| Err(anyhow!("an error")))
411 .times(1);
412
413 Arc::new(signer_recorder)
414 })
415 .with_signer_registration_verifier({
416 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
417 signer_registration_verifier
418 .expect_verify()
419 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
420 .times(5);
421
422 Arc::new(signer_registration_verifier)
423 })
424 .with_leader_aggregator_client({
425 let mut aggregator_client = MockLeaderAggregatorClient::new();
426 aggregator_client
427 .expect_retrieve_epoch_settings()
428 .returning(move || Ok(Some(epoch_settings_message.clone())))
429 .times(1);
430
431 Arc::new(aggregator_client)
432 })
433 .with_stake_store({
434 let mut stake_store = MockStakeStore::new();
435 stake_store
436 .expect_get_stakes()
437 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
438 .times(1);
439
440 Arc::new(stake_store)
441 })
442 .build();
443
444 signer_registration_follower
445 .synchronize_all_signers()
446 .await
447 .expect_err("synchronize_all_signers should fail");
448 }
449
450 #[tokio::test]
451 async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
452 let registration_epoch = Epoch(1);
453 let fixture = MithrilFixtureBuilder::default()
454 .with_signers(5)
455 .disable_signers_certification()
456 .build();
457 let signers = fixture.signers();
458 let stake_distribution = fixture.stake_distribution();
459 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
460 epoch: registration_epoch,
461 next_signers: SignerMessagePart::from_signers(signers),
462 ..EpochSettingsMessage::dummy()
463 })
464 .unwrap();
465
466 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
467 .with_epoch_service({
468 let mut epoch_service = FakeEpochService::without_data();
469 epoch_service.toggle_errors(false, false, true);
470
471 epoch_service
472 })
473 .with_signer_recorder({
474 let mut signer_recorder = MockSignerRecorder::new();
475 signer_recorder
476 .expect_record_signer_registration()
477 .returning(|_| Ok(()))
478 .times(5);
479
480 Arc::new(signer_recorder)
481 })
482 .with_signer_registration_verifier({
483 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
484 signer_registration_verifier
485 .expect_verify()
486 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
487 .times(5);
488
489 Arc::new(signer_registration_verifier)
490 })
491 .with_leader_aggregator_client({
492 let mut aggregator_client = MockLeaderAggregatorClient::new();
493 aggregator_client
494 .expect_retrieve_epoch_settings()
495 .returning(move || Ok(Some(epoch_settings_message.clone())))
496 .times(1);
497
498 Arc::new(aggregator_client)
499 })
500 .with_stake_store({
501 let mut stake_store = MockStakeStore::new();
502 stake_store
503 .expect_get_stakes()
504 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
505 .times(1);
506
507 Arc::new(stake_store)
508 })
509 .build();
510
511 signer_registration_follower
512 .synchronize_all_signers()
513 .await
514 .expect_err("synchronize_all_signers should fail");
515 }
516
517 #[tokio::test]
518 async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
519 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
520 .with_leader_aggregator_client({
521 let mut aggregator_client = MockLeaderAggregatorClient::new();
522 aggregator_client
523 .expect_retrieve_epoch_settings()
524 .returning(move || Err(anyhow!("an error")))
525 .times(1);
526
527 Arc::new(aggregator_client)
528 })
529 .build();
530
531 signer_registration_follower
532 .synchronize_all_signers()
533 .await
534 .expect_err("synchronize_all_signers should fail");
535 }
536
537 #[tokio::test]
538 async fn synchronize_all_signers_fails_if_fetching_stakes_fails() {
539 let registration_epoch = Epoch(1);
540 let fixture = MithrilFixtureBuilder::default()
541 .with_signers(5)
542 .disable_signers_certification()
543 .build();
544 let signers = fixture.signers();
545 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
546 epoch: registration_epoch,
547 next_signers: SignerMessagePart::from_signers(signers),
548 ..EpochSettingsMessage::dummy()
549 })
550 .unwrap();
551 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
552 .with_leader_aggregator_client({
553 let mut aggregator_client = MockLeaderAggregatorClient::new();
554 aggregator_client
555 .expect_retrieve_epoch_settings()
556 .returning(move || Ok(Some(epoch_settings_message.clone())))
557 .times(1);
558
559 Arc::new(aggregator_client)
560 })
561 .with_stake_store({
562 let mut stake_store = MockStakeStore::new();
563 stake_store
564 .expect_get_stakes()
565 .returning(move |_epoch| Err(anyhow!("an error")))
566 .times(1);
567
568 Arc::new(stake_store)
569 })
570 .build();
571
572 signer_registration_follower
573 .synchronize_all_signers()
574 .await
575 .expect_err("synchronize_all_signers should fail");
576 }
577}