mithril_aggregator/services/signer_registration/
follower.rs1use std::sync::Arc;
2
3use anyhow::{Context, anyhow};
4use async_trait::async_trait;
5
6use mithril_common::{
7 StdResult,
8 entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
9};
10use mithril_persistence::store::StakeStorer;
11
12use crate::{
13 SignerRegistrationVerifier, VerificationKeyStorer, dependency_injection::EpochServiceWrapper,
14};
15
16use super::{
17 LeaderAggregatorClient, SignerRecorder, SignerRegisterer, SignerRegistrationError,
18 SignerRegistrationRound, SignerRegistrationRoundOpener, SignerSynchronizer,
19};
20
21pub struct MithrilSignerRegistrationFollower {
23 pub epoch_service: EpochServiceWrapper,
25
26 verification_key_store: Arc<dyn VerificationKeyStorer>,
28
29 signer_recorder: Arc<dyn SignerRecorder>,
31
32 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
34
35 leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
37
38 stake_store: Arc<dyn StakeStorer>,
40}
41
42impl MithrilSignerRegistrationFollower {
43 pub fn new(
45 epoch_service: EpochServiceWrapper,
46 verification_key_store: Arc<dyn VerificationKeyStorer>,
47 signer_recorder: Arc<dyn SignerRecorder>,
48 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
49 leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
50 stake_store: Arc<dyn StakeStorer>,
51 ) -> Self {
52 Self {
53 epoch_service,
54 verification_key_store,
55 signer_recorder,
56 signer_registration_verifier,
57 leader_aggregator_client,
58 stake_store,
59 }
60 }
61
62 async fn synchronize_signers(
63 &self,
64 epoch: Epoch,
65 signers: &[Signer],
66 stake_distribution: &StakeDistribution,
67 ) -> Result<(), SignerRegistrationError> {
68 for signer in signers {
69 let signer_with_stake = self
70 .signer_registration_verifier
71 .verify(signer, stake_distribution)
72 .await
73 .map_err(|e| SignerRegistrationError::FailedSignerRegistration(anyhow!(e)))?;
74
75 self.signer_recorder
76 .record_signer_registration(signer_with_stake.party_id.clone())
77 .await
78 .map_err(|e| SignerRegistrationError::FailedSignerRecorder(e.to_string()))?;
79
80 self
81 .verification_key_store
82 .save_verification_key(epoch, signer_with_stake.clone())
83 .await
84 .with_context(|| {
85 format!(
86 "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
87 signer_with_stake.party_id,
88 epoch
89 )
90 })
91 .map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?;
92 }
93
94 self.epoch_service
95 .write()
96 .await
97 .update_next_signers_with_stake()
98 .await
99 .map_err(|e| SignerRegistrationError::EpochService(anyhow!(e)))?;
100
101 Ok(())
102 }
103}
104
105#[async_trait]
106impl SignerSynchronizer for MithrilSignerRegistrationFollower {
107 async fn can_synchronize_signers(&self, epoch: Epoch) -> Result<bool, SignerRegistrationError> {
108 Ok(self
109 .leader_aggregator_client
110 .retrieve_epoch_settings()
111 .await
112 .with_context(|| "can_synchronize_signers failed")
113 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
114 .is_some_and(|leader_epoch_settings| epoch == leader_epoch_settings.epoch))
115 }
116
117 async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
118 let leader_epoch_settings = self
119 .leader_aggregator_client
120 .retrieve_epoch_settings()
121 .await
122 .with_context(|| "synchronize_all_signers failed")
123 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
124 .ok_or(
125 SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings(
126 anyhow::anyhow!("Leader aggregator did not return any epoch settings"),
127 ),
128 )?;
129 let registration_epoch =
130 leader_epoch_settings.epoch.offset_to_leader_synchronization_epoch();
131 let next_signers = leader_epoch_settings.next_signers;
132 let stake_distribution = self
133 .stake_store
134 .get_stakes(registration_epoch)
135 .await
136 .with_context(|| "synchronize_all_signers failed")
137 .map_err(SignerRegistrationError::Store)?
138 .ok_or(SignerRegistrationError::Store(anyhow::anyhow!(
139 "Follower aggregator did not return any stake distribution"
140 )))?;
141 self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
142 .await?;
143
144 Ok(())
145 }
146}
147
148#[async_trait]
149impl SignerRegisterer for MithrilSignerRegistrationFollower {
150 async fn register_signer(
151 &self,
152 _epoch: Epoch,
153 _signer: &Signer,
154 ) -> Result<SignerWithStake, SignerRegistrationError> {
155 Err(SignerRegistrationError::RegistrationRoundAlwaysClosedOnFollowerAggregator)
156 }
157
158 async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
159 None
160 }
161}
162
163#[async_trait]
164impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower {
165 async fn open_registration_round(
166 &self,
167 _registration_epoch: Epoch,
168 _stake_distribution: StakeDistribution,
169 ) -> StdResult<()> {
170 Ok(())
171 }
172
173 async fn close_registration_round(&self) -> StdResult<()> {
174 Ok(())
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use mithril_common::messages::{
181 EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter,
182 };
183 use mithril_common::test::{builder::MithrilFixtureBuilder, double::Dummy};
184
185 use crate::{
186 database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
187 message_adapters::FromEpochSettingsAdapter,
188 services::{
189 FakeEpochService, MockLeaderAggregatorClient, MockSignerRecorder,
190 MockSignerRegistrationVerifier,
191 },
192 test::double::mocks::MockStakeStore,
193 };
194
195 use super::*;
196
197 use test_utils::*;
198
199 mod test_utils {
200 use tokio::sync::RwLock;
201
202 use super::*;
203
204 pub struct MithrilSignerRegistrationFollowerBuilder {
206 epoch_service: EpochServiceWrapper,
207 signer_recorder: Arc<dyn SignerRecorder>,
208 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
209 leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
210 stake_store: Arc<dyn StakeStorer>,
211 verification_key_store: Arc<dyn VerificationKeyStorer>,
212 }
213
214 impl Default for MithrilSignerRegistrationFollowerBuilder {
215 fn default() -> Self {
216 Self {
217 epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
218 signer_recorder: Arc::new(MockSignerRecorder::new()),
219 signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
220 leader_aggregator_client: Arc::new(MockLeaderAggregatorClient::new()),
221 stake_store: Arc::new(MockStakeStore::new()),
222 verification_key_store: Arc::new(SignerRegistrationStore::new(
223 Arc::new(main_db_connection().unwrap()),
224 None,
225 )),
226 }
227 }
228 }
229
230 impl MithrilSignerRegistrationFollowerBuilder {
231 pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
232 Self {
233 epoch_service: Arc::new(RwLock::new(epoch_service)),
234 ..self
235 }
236 }
237
238 pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
239 Self {
240 signer_recorder,
241 ..self
242 }
243 }
244
245 pub fn with_signer_registration_verifier(
246 self,
247 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
248 ) -> Self {
249 Self {
250 signer_registration_verifier,
251 ..self
252 }
253 }
254
255 pub fn with_leader_aggregator_client(
256 self,
257 leader_aggregator_client: Arc<dyn LeaderAggregatorClient>,
258 ) -> Self {
259 Self {
260 leader_aggregator_client,
261 ..self
262 }
263 }
264
265 pub fn with_stake_store(self, stake_store: Arc<dyn StakeStorer>) -> Self {
266 Self {
267 stake_store,
268 ..self
269 }
270 }
271
272 pub fn build(self) -> MithrilSignerRegistrationFollower {
273 MithrilSignerRegistrationFollower {
274 epoch_service: self.epoch_service,
275 verification_key_store: self.verification_key_store,
276 signer_recorder: self.signer_recorder,
277 signer_registration_verifier: self.signer_registration_verifier,
278 leader_aggregator_client: self.leader_aggregator_client,
279 stake_store: self.stake_store,
280 }
281 }
282 }
283 }
284
285 #[tokio::test]
286 async fn open_close_registration_always_succeeds() {
287 let signer_registration_follower =
288 MithrilSignerRegistrationFollowerBuilder::default().build();
289 let registration_epoch = Epoch(1);
290 let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
291 let stake_distribution = fixture.stake_distribution();
292
293 signer_registration_follower
294 .open_registration_round(registration_epoch, stake_distribution)
295 .await
296 .expect("signer registration round opening should not fail");
297
298 signer_registration_follower
299 .close_registration_round()
300 .await
301 .expect("signer registration round opening should not fail");
302 }
303
304 #[tokio::test]
305 async fn register_signer_always_fails() {
306 let signer_registration_follower =
307 MithrilSignerRegistrationFollowerBuilder::default().build();
308 let registration_epoch = Epoch(1);
309 let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
310 let signer_to_register: Signer = fixture.signers()[0].to_owned();
311
312 signer_registration_follower
313 .register_signer(registration_epoch, &signer_to_register)
314 .await
315 .expect_err("signer registration should always fail");
316 }
317
318 #[tokio::test]
319 async fn synchronize_all_signers_succeeds() {
320 let registration_epoch = Epoch(1);
321 let fixture = MithrilFixtureBuilder::default()
322 .with_signers(5)
323 .disable_signers_certification()
324 .build();
325 let signers = fixture.signers();
326 let stake_distribution = fixture.stake_distribution();
327 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
328 epoch: registration_epoch,
329 next_signers: SignerMessagePart::from_signers(signers),
330 ..EpochSettingsMessage::dummy()
331 })
332 .unwrap();
333 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
334 .with_signer_recorder({
335 let mut signer_recorder = MockSignerRecorder::new();
336 signer_recorder
337 .expect_record_signer_registration()
338 .returning(|_| Ok(()))
339 .times(5);
340
341 Arc::new(signer_recorder)
342 })
343 .with_signer_registration_verifier({
344 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
345 signer_registration_verifier
346 .expect_verify()
347 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
348 .times(5);
349
350 Arc::new(signer_registration_verifier)
351 })
352 .with_leader_aggregator_client({
353 let mut aggregator_client = MockLeaderAggregatorClient::new();
354 aggregator_client
355 .expect_retrieve_epoch_settings()
356 .returning(move || Ok(Some(epoch_settings_message.clone())))
357 .times(1);
358
359 Arc::new(aggregator_client)
360 })
361 .with_stake_store({
362 let mut stake_store = MockStakeStore::new();
363 stake_store
364 .expect_get_stakes()
365 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
366 .times(1);
367
368 Arc::new(stake_store)
369 })
370 .build();
371
372 signer_registration_follower.synchronize_all_signers().await.unwrap();
373 }
374
375 #[tokio::test]
376 async fn synchronize_all_signers_fails_if_one_signer_registration_fails() {
377 let registration_epoch = Epoch(1);
378 let fixture = MithrilFixtureBuilder::default()
379 .with_signers(5)
380 .disable_signers_certification()
381 .build();
382 let signers = fixture.signers();
383 let stake_distribution = fixture.stake_distribution();
384 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
385 epoch: registration_epoch,
386 next_signers: SignerMessagePart::from_signers(signers),
387 ..EpochSettingsMessage::dummy()
388 })
389 .unwrap();
390
391 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
392 .with_signer_recorder({
393 let mut signer_recorder = MockSignerRecorder::new();
394 signer_recorder
395 .expect_record_signer_registration()
396 .returning(|_| Ok(()))
397 .times(4);
398 signer_recorder
399 .expect_record_signer_registration()
400 .returning(|_| Err(anyhow!("an error")))
401 .times(1);
402
403 Arc::new(signer_recorder)
404 })
405 .with_signer_registration_verifier({
406 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
407 signer_registration_verifier
408 .expect_verify()
409 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
410 .times(5);
411
412 Arc::new(signer_registration_verifier)
413 })
414 .with_leader_aggregator_client({
415 let mut aggregator_client = MockLeaderAggregatorClient::new();
416 aggregator_client
417 .expect_retrieve_epoch_settings()
418 .returning(move || Ok(Some(epoch_settings_message.clone())))
419 .times(1);
420
421 Arc::new(aggregator_client)
422 })
423 .with_stake_store({
424 let mut stake_store = MockStakeStore::new();
425 stake_store
426 .expect_get_stakes()
427 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
428 .times(1);
429
430 Arc::new(stake_store)
431 })
432 .build();
433
434 signer_registration_follower
435 .synchronize_all_signers()
436 .await
437 .expect_err("synchronize_all_signers should fail");
438 }
439
440 #[tokio::test]
441 async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
442 let registration_epoch = Epoch(1);
443 let fixture = MithrilFixtureBuilder::default()
444 .with_signers(5)
445 .disable_signers_certification()
446 .build();
447 let signers = fixture.signers();
448 let stake_distribution = fixture.stake_distribution();
449 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
450 epoch: registration_epoch,
451 next_signers: SignerMessagePart::from_signers(signers),
452 ..EpochSettingsMessage::dummy()
453 })
454 .unwrap();
455
456 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
457 .with_epoch_service({
458 let mut epoch_service = FakeEpochService::without_data();
459 epoch_service.toggle_errors(false, false, false, true);
460
461 epoch_service
462 })
463 .with_signer_recorder({
464 let mut signer_recorder = MockSignerRecorder::new();
465 signer_recorder
466 .expect_record_signer_registration()
467 .returning(|_| Ok(()))
468 .times(5);
469
470 Arc::new(signer_recorder)
471 })
472 .with_signer_registration_verifier({
473 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
474 signer_registration_verifier
475 .expect_verify()
476 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
477 .times(5);
478
479 Arc::new(signer_registration_verifier)
480 })
481 .with_leader_aggregator_client({
482 let mut aggregator_client = MockLeaderAggregatorClient::new();
483 aggregator_client
484 .expect_retrieve_epoch_settings()
485 .returning(move || Ok(Some(epoch_settings_message.clone())))
486 .times(1);
487
488 Arc::new(aggregator_client)
489 })
490 .with_stake_store({
491 let mut stake_store = MockStakeStore::new();
492 stake_store
493 .expect_get_stakes()
494 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
495 .times(1);
496
497 Arc::new(stake_store)
498 })
499 .build();
500
501 signer_registration_follower
502 .synchronize_all_signers()
503 .await
504 .expect_err("synchronize_all_signers should fail");
505 }
506
507 #[tokio::test]
508 async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
509 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
510 .with_leader_aggregator_client({
511 let mut aggregator_client = MockLeaderAggregatorClient::new();
512 aggregator_client
513 .expect_retrieve_epoch_settings()
514 .returning(move || Err(anyhow!("an error")))
515 .times(1);
516
517 Arc::new(aggregator_client)
518 })
519 .build();
520
521 signer_registration_follower
522 .synchronize_all_signers()
523 .await
524 .expect_err("synchronize_all_signers should fail");
525 }
526
527 #[tokio::test]
528 async fn synchronize_all_signers_fails_if_fetching_stakes_fails() {
529 let registration_epoch = Epoch(1);
530 let fixture = MithrilFixtureBuilder::default()
531 .with_signers(5)
532 .disable_signers_certification()
533 .build();
534 let signers = fixture.signers();
535 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
536 epoch: registration_epoch,
537 next_signers: SignerMessagePart::from_signers(signers),
538 ..EpochSettingsMessage::dummy()
539 })
540 .unwrap();
541 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
542 .with_leader_aggregator_client({
543 let mut aggregator_client = MockLeaderAggregatorClient::new();
544 aggregator_client
545 .expect_retrieve_epoch_settings()
546 .returning(move || Ok(Some(epoch_settings_message.clone())))
547 .times(1);
548
549 Arc::new(aggregator_client)
550 })
551 .with_stake_store({
552 let mut stake_store = MockStakeStore::new();
553 stake_store
554 .expect_get_stakes()
555 .returning(move |_epoch| Err(anyhow!("an error")))
556 .times(1);
557
558 Arc::new(stake_store)
559 })
560 .build();
561
562 signer_registration_follower
563 .synchronize_all_signers()
564 .await
565 .expect_err("synchronize_all_signers should fail");
566 }
567}