mithril_aggregator/services/signer_registration/
follower.rs1use std::sync::Arc;
2
3use anyhow::{Context, anyhow};
4use async_trait::async_trait;
5
6use mithril_common::{
7 StdResult,
8 entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
9};
10use mithril_persistence::store::StakeStorer;
11
12use crate::{
13 SignerRegistrationVerifier, VerificationKeyStorer, dependency_injection::EpochServiceWrapper,
14 services::AggregatorClient,
15};
16
17use super::{
18 SignerRecorder, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound,
19 SignerRegistrationRoundOpener, SignerSynchronizer,
20};
21
22pub struct MithrilSignerRegistrationFollower {
24 pub epoch_service: EpochServiceWrapper,
26
27 verification_key_store: Arc<dyn VerificationKeyStorer>,
29
30 signer_recorder: Arc<dyn SignerRecorder>,
32
33 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
35
36 leader_aggregator_client: Arc<dyn AggregatorClient>,
38
39 stake_store: Arc<dyn StakeStorer>,
41}
42
43impl MithrilSignerRegistrationFollower {
44 pub fn new(
46 epoch_service: EpochServiceWrapper,
47 verification_key_store: Arc<dyn VerificationKeyStorer>,
48 signer_recorder: Arc<dyn SignerRecorder>,
49 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
50 leader_aggregator_client: Arc<dyn AggregatorClient>,
51 stake_store: Arc<dyn StakeStorer>,
52 ) -> Self {
53 Self {
54 epoch_service,
55 verification_key_store,
56 signer_recorder,
57 signer_registration_verifier,
58 leader_aggregator_client,
59 stake_store,
60 }
61 }
62
63 async fn synchronize_signers(
64 &self,
65 epoch: Epoch,
66 signers: &[Signer],
67 stake_distribution: &StakeDistribution,
68 ) -> Result<(), SignerRegistrationError> {
69 for signer in signers {
70 let signer_with_stake = self
71 .signer_registration_verifier
72 .verify(signer, stake_distribution)
73 .await
74 .map_err(|e| SignerRegistrationError::FailedSignerRegistration(anyhow!(e)))?;
75
76 self.signer_recorder
77 .record_signer_registration(signer_with_stake.party_id.clone())
78 .await
79 .map_err(|e| SignerRegistrationError::FailedSignerRecorder(e.to_string()))?;
80
81 self
82 .verification_key_store
83 .save_verification_key(epoch, signer_with_stake.clone())
84 .await
85 .with_context(|| {
86 format!(
87 "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
88 signer_with_stake.party_id,
89 epoch
90 )
91 })
92 .map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?;
93 }
94
95 self.epoch_service
96 .write()
97 .await
98 .update_next_signers_with_stake()
99 .await
100 .map_err(|e| SignerRegistrationError::EpochService(anyhow!(e)))?;
101
102 Ok(())
103 }
104}
105
106#[async_trait]
107impl SignerSynchronizer for MithrilSignerRegistrationFollower {
108 async fn can_synchronize_signers(&self, epoch: Epoch) -> Result<bool, SignerRegistrationError> {
109 Ok(self
110 .leader_aggregator_client
111 .retrieve_epoch_settings()
112 .await
113 .with_context(|| "can_synchronize_signers failed")
114 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
115 .is_some_and(|leader_epoch_settings| epoch == leader_epoch_settings.epoch))
116 }
117
118 async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
119 let leader_epoch_settings = self
120 .leader_aggregator_client
121 .retrieve_epoch_settings()
122 .await
123 .with_context(|| "synchronize_all_signers failed")
124 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
125 .ok_or(
126 SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings(
127 anyhow::anyhow!("Leader aggregator did not return any epoch settings"),
128 ),
129 )?;
130 let registration_epoch =
131 leader_epoch_settings.epoch.offset_to_leader_synchronization_epoch();
132 let next_signers = leader_epoch_settings.next_signers;
133 let stake_distribution = self
134 .stake_store
135 .get_stakes(registration_epoch)
136 .await
137 .with_context(|| "synchronize_all_signers failed")
138 .map_err(SignerRegistrationError::Store)?
139 .ok_or(SignerRegistrationError::Store(anyhow::anyhow!(
140 "Follower aggregator did not return any stake distribution"
141 )))?;
142 self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
143 .await?;
144
145 Ok(())
146 }
147}
148
149#[async_trait]
150impl SignerRegisterer for MithrilSignerRegistrationFollower {
151 async fn register_signer(
152 &self,
153 _epoch: Epoch,
154 _signer: &Signer,
155 ) -> Result<SignerWithStake, SignerRegistrationError> {
156 Err(SignerRegistrationError::RegistrationRoundAlwaysClosedOnFollowerAggregator)
157 }
158
159 async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
160 None
161 }
162}
163
164#[async_trait]
165impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower {
166 async fn open_registration_round(
167 &self,
168 _registration_epoch: Epoch,
169 _stake_distribution: StakeDistribution,
170 ) -> StdResult<()> {
171 Ok(())
172 }
173
174 async fn close_registration_round(&self) -> StdResult<()> {
175 Ok(())
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use std::sync::Arc;
182
183 use anyhow::anyhow;
184 use mithril_persistence::store::StakeStorer;
185
186 use mithril_common::{
187 entities::{Epoch, Signer, SignerWithStake},
188 messages::{EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter},
189 test_utils::MithrilFixtureBuilder,
190 };
191
192 use crate::{
193 MithrilSignerRegistrationFollower, SignerRecorder, SignerRegisterer,
194 SignerRegistrationRoundOpener, SignerRegistrationVerifier, VerificationKeyStorer,
195 database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
196 message_adapters::FromEpochSettingsAdapter,
197 services::{
198 AggregatorClient, AggregatorClientError, FakeEpochService, MockAggregatorClient,
199 MockSignerRecorder, MockSignerRegistrationVerifier, SignerSynchronizer,
200 },
201 tools::mocks::MockStakeStore,
202 };
203
204 use test_utils::*;
205
206 mod test_utils {
207 use tokio::sync::RwLock;
208
209 use crate::{dependency_injection::EpochServiceWrapper, services::FakeEpochService};
210
211 use super::*;
212
213 pub struct MithrilSignerRegistrationFollowerBuilder {
215 epoch_service: EpochServiceWrapper,
216 signer_recorder: Arc<dyn SignerRecorder>,
217 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
218 leader_aggregator_client: Arc<dyn AggregatorClient>,
219 stake_store: Arc<dyn StakeStorer>,
220 verification_key_store: Arc<dyn VerificationKeyStorer>,
221 }
222
223 impl Default for MithrilSignerRegistrationFollowerBuilder {
224 fn default() -> Self {
225 Self {
226 epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
227 signer_recorder: Arc::new(MockSignerRecorder::new()),
228 signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
229 leader_aggregator_client: Arc::new(MockAggregatorClient::new()),
230 stake_store: Arc::new(MockStakeStore::new()),
231 verification_key_store: Arc::new(SignerRegistrationStore::new(
232 Arc::new(main_db_connection().unwrap()),
233 None,
234 )),
235 }
236 }
237 }
238
239 impl MithrilSignerRegistrationFollowerBuilder {
240 pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
241 Self {
242 epoch_service: Arc::new(RwLock::new(epoch_service)),
243 ..self
244 }
245 }
246
247 pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
248 Self {
249 signer_recorder,
250 ..self
251 }
252 }
253
254 pub fn with_signer_registration_verifier(
255 self,
256 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
257 ) -> Self {
258 Self {
259 signer_registration_verifier,
260 ..self
261 }
262 }
263
264 pub fn with_leader_aggregator_client(
265 self,
266 leader_aggregator_client: Arc<dyn AggregatorClient>,
267 ) -> Self {
268 Self {
269 leader_aggregator_client,
270 ..self
271 }
272 }
273
274 pub fn with_stake_store(self, stake_store: Arc<dyn StakeStorer>) -> Self {
275 Self {
276 stake_store,
277 ..self
278 }
279 }
280
281 pub fn build(self) -> MithrilSignerRegistrationFollower {
282 MithrilSignerRegistrationFollower {
283 epoch_service: self.epoch_service,
284 verification_key_store: self.verification_key_store,
285 signer_recorder: self.signer_recorder,
286 signer_registration_verifier: self.signer_registration_verifier,
287 leader_aggregator_client: self.leader_aggregator_client,
288 stake_store: self.stake_store,
289 }
290 }
291 }
292 }
293
294 #[tokio::test]
295 async fn open_close_registration_always_succeeds() {
296 let signer_registration_follower =
297 MithrilSignerRegistrationFollowerBuilder::default().build();
298 let registration_epoch = Epoch(1);
299 let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
300 let stake_distribution = fixture.stake_distribution();
301
302 signer_registration_follower
303 .open_registration_round(registration_epoch, stake_distribution)
304 .await
305 .expect("signer registration round opening should not fail");
306
307 signer_registration_follower
308 .close_registration_round()
309 .await
310 .expect("signer registration round opening should not fail");
311 }
312
313 #[tokio::test]
314 async fn register_signer_always_fails() {
315 let signer_registration_follower =
316 MithrilSignerRegistrationFollowerBuilder::default().build();
317 let registration_epoch = Epoch(1);
318 let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
319 let signer_to_register: Signer = fixture.signers()[0].to_owned();
320
321 signer_registration_follower
322 .register_signer(registration_epoch, &signer_to_register)
323 .await
324 .expect_err("signer registration should always fail");
325 }
326
327 #[tokio::test]
328 async fn synchronize_all_signers_succeeds() {
329 let registration_epoch = Epoch(1);
330 let fixture = MithrilFixtureBuilder::default()
331 .with_signers(5)
332 .disable_signers_certification()
333 .build();
334 let signers = fixture.signers();
335 let stake_distribution = fixture.stake_distribution();
336 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
337 epoch: registration_epoch,
338 next_signers: SignerMessagePart::from_signers(signers),
339 ..EpochSettingsMessage::dummy()
340 })
341 .unwrap();
342 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
343 .with_signer_recorder({
344 let mut signer_recorder = MockSignerRecorder::new();
345 signer_recorder
346 .expect_record_signer_registration()
347 .returning(|_| Ok(()))
348 .times(5);
349
350 Arc::new(signer_recorder)
351 })
352 .with_signer_registration_verifier({
353 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
354 signer_registration_verifier
355 .expect_verify()
356 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
357 .times(5);
358
359 Arc::new(signer_registration_verifier)
360 })
361 .with_leader_aggregator_client({
362 let mut aggregator_client = MockAggregatorClient::new();
363 aggregator_client
364 .expect_retrieve_epoch_settings()
365 .returning(move || Ok(Some(epoch_settings_message.clone())))
366 .times(1);
367
368 Arc::new(aggregator_client)
369 })
370 .with_stake_store({
371 let mut stake_store = MockStakeStore::new();
372 stake_store
373 .expect_get_stakes()
374 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
375 .times(1);
376
377 Arc::new(stake_store)
378 })
379 .build();
380
381 signer_registration_follower.synchronize_all_signers().await.unwrap();
382 }
383
384 #[tokio::test]
385 async fn synchronize_all_signers_fails_if_one_signer_registration_fails() {
386 let registration_epoch = Epoch(1);
387 let fixture = MithrilFixtureBuilder::default()
388 .with_signers(5)
389 .disable_signers_certification()
390 .build();
391 let signers = fixture.signers();
392 let stake_distribution = fixture.stake_distribution();
393 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
394 epoch: registration_epoch,
395 next_signers: SignerMessagePart::from_signers(signers),
396 ..EpochSettingsMessage::dummy()
397 })
398 .unwrap();
399
400 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
401 .with_signer_recorder({
402 let mut signer_recorder = MockSignerRecorder::new();
403 signer_recorder
404 .expect_record_signer_registration()
405 .returning(|_| Ok(()))
406 .times(4);
407 signer_recorder
408 .expect_record_signer_registration()
409 .returning(|_| Err(anyhow!("an error")))
410 .times(1);
411
412 Arc::new(signer_recorder)
413 })
414 .with_signer_registration_verifier({
415 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
416 signer_registration_verifier
417 .expect_verify()
418 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
419 .times(5);
420
421 Arc::new(signer_registration_verifier)
422 })
423 .with_leader_aggregator_client({
424 let mut aggregator_client = MockAggregatorClient::new();
425 aggregator_client
426 .expect_retrieve_epoch_settings()
427 .returning(move || Ok(Some(epoch_settings_message.clone())))
428 .times(1);
429
430 Arc::new(aggregator_client)
431 })
432 .with_stake_store({
433 let mut stake_store = MockStakeStore::new();
434 stake_store
435 .expect_get_stakes()
436 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
437 .times(1);
438
439 Arc::new(stake_store)
440 })
441 .build();
442
443 signer_registration_follower
444 .synchronize_all_signers()
445 .await
446 .expect_err("synchronize_all_signers should fail");
447 }
448
449 #[tokio::test]
450 async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
451 let registration_epoch = Epoch(1);
452 let fixture = MithrilFixtureBuilder::default()
453 .with_signers(5)
454 .disable_signers_certification()
455 .build();
456 let signers = fixture.signers();
457 let stake_distribution = fixture.stake_distribution();
458 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
459 epoch: registration_epoch,
460 next_signers: SignerMessagePart::from_signers(signers),
461 ..EpochSettingsMessage::dummy()
462 })
463 .unwrap();
464
465 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
466 .with_epoch_service({
467 let mut epoch_service = FakeEpochService::without_data();
468 epoch_service.toggle_errors(false, false, false, true);
469
470 epoch_service
471 })
472 .with_signer_recorder({
473 let mut signer_recorder = MockSignerRecorder::new();
474 signer_recorder
475 .expect_record_signer_registration()
476 .returning(|_| Ok(()))
477 .times(5);
478
479 Arc::new(signer_recorder)
480 })
481 .with_signer_registration_verifier({
482 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
483 signer_registration_verifier
484 .expect_verify()
485 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
486 .times(5);
487
488 Arc::new(signer_registration_verifier)
489 })
490 .with_leader_aggregator_client({
491 let mut aggregator_client = MockAggregatorClient::new();
492 aggregator_client
493 .expect_retrieve_epoch_settings()
494 .returning(move || Ok(Some(epoch_settings_message.clone())))
495 .times(1);
496
497 Arc::new(aggregator_client)
498 })
499 .with_stake_store({
500 let mut stake_store = MockStakeStore::new();
501 stake_store
502 .expect_get_stakes()
503 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
504 .times(1);
505
506 Arc::new(stake_store)
507 })
508 .build();
509
510 signer_registration_follower
511 .synchronize_all_signers()
512 .await
513 .expect_err("synchronize_all_signers should fail");
514 }
515
516 #[tokio::test]
517 async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
518 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
519 .with_leader_aggregator_client({
520 let mut aggregator_client = MockAggregatorClient::new();
521 aggregator_client
522 .expect_retrieve_epoch_settings()
523 .returning(move || {
524 Err(AggregatorClientError::RemoteServerTechnical(anyhow!(
525 "an error"
526 )))
527 })
528 .times(1);
529
530 Arc::new(aggregator_client)
531 })
532 .build();
533
534 signer_registration_follower
535 .synchronize_all_signers()
536 .await
537 .expect_err("synchronize_all_signers should fail");
538 }
539
540 #[tokio::test]
541 async fn synchronize_all_signers_fails_if_fetching_stakes_fails() {
542 let registration_epoch = Epoch(1);
543 let fixture = MithrilFixtureBuilder::default()
544 .with_signers(5)
545 .disable_signers_certification()
546 .build();
547 let signers = fixture.signers();
548 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
549 epoch: registration_epoch,
550 next_signers: SignerMessagePart::from_signers(signers),
551 ..EpochSettingsMessage::dummy()
552 })
553 .unwrap();
554 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
555 .with_leader_aggregator_client({
556 let mut aggregator_client = MockAggregatorClient::new();
557 aggregator_client
558 .expect_retrieve_epoch_settings()
559 .returning(move || Ok(Some(epoch_settings_message.clone())))
560 .times(1);
561
562 Arc::new(aggregator_client)
563 })
564 .with_stake_store({
565 let mut stake_store = MockStakeStore::new();
566 stake_store
567 .expect_get_stakes()
568 .returning(move |_epoch| Err(anyhow!("an error")))
569 .times(1);
570
571 Arc::new(stake_store)
572 })
573 .build();
574
575 signer_registration_follower
576 .synchronize_all_signers()
577 .await
578 .expect_err("synchronize_all_signers should fail");
579 }
580}