mithril_aggregator/services/signer_registration/
follower.rs1use std::sync::Arc;
2
3use anyhow::{anyhow, Context};
4use async_trait::async_trait;
5
6use mithril_common::{
7 entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
8 StdResult,
9};
10use mithril_persistence::store::StakeStorer;
11
12use crate::{
13 dependency_injection::EpochServiceWrapper, services::AggregatorClient,
14 SignerRegistrationVerifier, VerificationKeyStorer,
15};
16
17use super::{
18 SignerRecorder, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound,
19 SignerRegistrationRoundOpener, SignerSynchronizer,
20};
21
22pub struct MithrilSignerRegistrationFollower {
24 pub epoch_service: EpochServiceWrapper,
26
27 verification_key_store: Arc<dyn VerificationKeyStorer>,
29
30 signer_recorder: Arc<dyn SignerRecorder>,
32
33 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
35
36 leader_aggregator_client: Arc<dyn AggregatorClient>,
38
39 stake_store: Arc<dyn StakeStorer>,
41}
42
43impl MithrilSignerRegistrationFollower {
44 pub fn new(
46 epoch_service: EpochServiceWrapper,
47 verification_key_store: Arc<dyn VerificationKeyStorer>,
48 signer_recorder: Arc<dyn SignerRecorder>,
49 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
50 leader_aggregator_client: Arc<dyn AggregatorClient>,
51 stake_store: Arc<dyn StakeStorer>,
52 ) -> Self {
53 Self {
54 epoch_service,
55 verification_key_store,
56 signer_recorder,
57 signer_registration_verifier,
58 leader_aggregator_client,
59 stake_store,
60 }
61 }
62
63 async fn synchronize_signers(
64 &self,
65 epoch: Epoch,
66 signers: &[Signer],
67 stake_distribution: &StakeDistribution,
68 ) -> Result<(), SignerRegistrationError> {
69 for signer in signers {
70 let signer_with_stake = self
71 .signer_registration_verifier
72 .verify(signer, stake_distribution)
73 .await
74 .map_err(|e| SignerRegistrationError::FailedSignerRegistration(anyhow!(e)))?;
75
76 self.signer_recorder
77 .record_signer_registration(signer_with_stake.party_id.clone())
78 .await
79 .map_err(|e| SignerRegistrationError::FailedSignerRecorder(e.to_string()))?;
80
81 self
82 .verification_key_store
83 .save_verification_key(epoch, signer_with_stake.clone())
84 .await
85 .with_context(|| {
86 format!(
87 "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
88 signer_with_stake.party_id,
89 epoch
90 )
91 })
92 .map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?;
93 }
94
95 self.epoch_service
96 .write()
97 .await
98 .update_next_signers_with_stake()
99 .await
100 .map_err(|e| SignerRegistrationError::EpochService(anyhow!(e)))?;
101
102 Ok(())
103 }
104}
105
106#[async_trait]
107impl SignerSynchronizer for MithrilSignerRegistrationFollower {
108 async fn can_synchronize_signers(&self, epoch: Epoch) -> Result<bool, SignerRegistrationError> {
109 Ok(self
110 .leader_aggregator_client
111 .retrieve_epoch_settings()
112 .await
113 .with_context(|| "can_synchronize_signers failed")
114 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
115 .is_some_and(|leader_epoch_settings| epoch == leader_epoch_settings.epoch))
116 }
117
118 async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
119 let leader_epoch_settings = self
120 .leader_aggregator_client
121 .retrieve_epoch_settings()
122 .await
123 .with_context(|| "synchronize_all_signers failed")
124 .map_err(SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings)?
125 .ok_or(
126 SignerRegistrationError::FailedFetchingLeaderAggregatorEpochSettings(
127 anyhow::anyhow!("Leader aggregator did not return any epoch settings"),
128 ),
129 )?;
130 let registration_epoch = leader_epoch_settings
131 .epoch
132 .offset_to_leader_synchronization_epoch();
133 let next_signers = leader_epoch_settings.next_signers;
134 let stake_distribution = self
135 .stake_store
136 .get_stakes(registration_epoch)
137 .await
138 .with_context(|| "synchronize_all_signers failed")
139 .map_err(SignerRegistrationError::Store)?
140 .ok_or(SignerRegistrationError::Store(anyhow::anyhow!(
141 "Follower aggregator did not return any stake distribution"
142 )))?;
143 self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
144 .await?;
145
146 Ok(())
147 }
148}
149
150#[async_trait]
151impl SignerRegisterer for MithrilSignerRegistrationFollower {
152 async fn register_signer(
153 &self,
154 _epoch: Epoch,
155 _signer: &Signer,
156 ) -> Result<SignerWithStake, SignerRegistrationError> {
157 Err(SignerRegistrationError::RegistrationRoundAlwaysClosedOnFollowerAggregator)
158 }
159
160 async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
161 None
162 }
163}
164
165#[async_trait]
166impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower {
167 async fn open_registration_round(
168 &self,
169 _registration_epoch: Epoch,
170 _stake_distribution: StakeDistribution,
171 ) -> StdResult<()> {
172 Ok(())
173 }
174
175 async fn close_registration_round(&self) -> StdResult<()> {
176 Ok(())
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use std::sync::Arc;
183
184 use anyhow::anyhow;
185 use mithril_persistence::store::StakeStorer;
186
187 use mithril_common::{
188 entities::{Epoch, Signer, SignerWithStake},
189 messages::{EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter},
190 test_utils::MithrilFixtureBuilder,
191 };
192
193 use crate::{
194 database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
195 message_adapters::FromEpochSettingsAdapter,
196 services::{
197 AggregatorClient, AggregatorClientError, FakeEpochService, MockAggregatorClient,
198 MockSignerRecorder, MockSignerRegistrationVerifier, SignerSynchronizer,
199 },
200 tools::mocks::MockStakeStore,
201 MithrilSignerRegistrationFollower, SignerRecorder, SignerRegisterer,
202 SignerRegistrationRoundOpener, SignerRegistrationVerifier, VerificationKeyStorer,
203 };
204
205 use test_utils::*;
206
207 mod test_utils {
208 use tokio::sync::RwLock;
209
210 use crate::{dependency_injection::EpochServiceWrapper, services::FakeEpochService};
211
212 use super::*;
213
214 pub struct MithrilSignerRegistrationFollowerBuilder {
216 epoch_service: EpochServiceWrapper,
217 signer_recorder: Arc<dyn SignerRecorder>,
218 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
219 leader_aggregator_client: Arc<dyn AggregatorClient>,
220 stake_store: Arc<dyn StakeStorer>,
221 verification_key_store: Arc<dyn VerificationKeyStorer>,
222 }
223
224 impl Default for MithrilSignerRegistrationFollowerBuilder {
225 fn default() -> Self {
226 Self {
227 epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
228 signer_recorder: Arc::new(MockSignerRecorder::new()),
229 signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
230 leader_aggregator_client: Arc::new(MockAggregatorClient::new()),
231 stake_store: Arc::new(MockStakeStore::new()),
232 verification_key_store: Arc::new(SignerRegistrationStore::new(
233 Arc::new(main_db_connection().unwrap()),
234 None,
235 )),
236 }
237 }
238 }
239
240 impl MithrilSignerRegistrationFollowerBuilder {
241 pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
242 Self {
243 epoch_service: Arc::new(RwLock::new(epoch_service)),
244 ..self
245 }
246 }
247
248 pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
249 Self {
250 signer_recorder,
251 ..self
252 }
253 }
254
255 pub fn with_signer_registration_verifier(
256 self,
257 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
258 ) -> Self {
259 Self {
260 signer_registration_verifier,
261 ..self
262 }
263 }
264
265 pub fn with_leader_aggregator_client(
266 self,
267 leader_aggregator_client: Arc<dyn AggregatorClient>,
268 ) -> Self {
269 Self {
270 leader_aggregator_client,
271 ..self
272 }
273 }
274
275 pub fn with_stake_store(self, stake_store: Arc<dyn StakeStorer>) -> Self {
276 Self {
277 stake_store,
278 ..self
279 }
280 }
281
282 pub fn build(self) -> MithrilSignerRegistrationFollower {
283 MithrilSignerRegistrationFollower {
284 epoch_service: self.epoch_service,
285 verification_key_store: self.verification_key_store,
286 signer_recorder: self.signer_recorder,
287 signer_registration_verifier: self.signer_registration_verifier,
288 leader_aggregator_client: self.leader_aggregator_client,
289 stake_store: self.stake_store,
290 }
291 }
292 }
293 }
294
295 #[tokio::test]
296 async fn open_close_registration_always_succeeds() {
297 let signer_registration_follower =
298 MithrilSignerRegistrationFollowerBuilder::default().build();
299 let registration_epoch = Epoch(1);
300 let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
301 let stake_distribution = fixture.stake_distribution();
302
303 signer_registration_follower
304 .open_registration_round(registration_epoch, stake_distribution)
305 .await
306 .expect("signer registration round opening should not fail");
307
308 signer_registration_follower
309 .close_registration_round()
310 .await
311 .expect("signer registration round opening should not fail");
312 }
313
314 #[tokio::test]
315 async fn register_signer_always_fails() {
316 let signer_registration_follower =
317 MithrilSignerRegistrationFollowerBuilder::default().build();
318 let registration_epoch = Epoch(1);
319 let fixture = MithrilFixtureBuilder::default().with_signers(1).build();
320 let signer_to_register: Signer = fixture.signers()[0].to_owned();
321
322 signer_registration_follower
323 .register_signer(registration_epoch, &signer_to_register)
324 .await
325 .expect_err("signer registration should always fail");
326 }
327
328 #[tokio::test]
329 async fn synchronize_all_signers_succeeds() {
330 let registration_epoch = Epoch(1);
331 let fixture = MithrilFixtureBuilder::default()
332 .with_signers(5)
333 .disable_signers_certification()
334 .build();
335 let signers = fixture.signers();
336 let stake_distribution = fixture.stake_distribution();
337 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
338 epoch: registration_epoch,
339 next_signers: SignerMessagePart::from_signers(signers),
340 ..EpochSettingsMessage::dummy()
341 })
342 .unwrap();
343 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
344 .with_signer_recorder({
345 let mut signer_recorder = MockSignerRecorder::new();
346 signer_recorder
347 .expect_record_signer_registration()
348 .returning(|_| Ok(()))
349 .times(5);
350
351 Arc::new(signer_recorder)
352 })
353 .with_signer_registration_verifier({
354 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
355 signer_registration_verifier
356 .expect_verify()
357 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
358 .times(5);
359
360 Arc::new(signer_registration_verifier)
361 })
362 .with_leader_aggregator_client({
363 let mut aggregator_client = MockAggregatorClient::new();
364 aggregator_client
365 .expect_retrieve_epoch_settings()
366 .returning(move || Ok(Some(epoch_settings_message.clone())))
367 .times(1);
368
369 Arc::new(aggregator_client)
370 })
371 .with_stake_store({
372 let mut stake_store = MockStakeStore::new();
373 stake_store
374 .expect_get_stakes()
375 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
376 .times(1);
377
378 Arc::new(stake_store)
379 })
380 .build();
381
382 signer_registration_follower
383 .synchronize_all_signers()
384 .await
385 .unwrap();
386 }
387
388 #[tokio::test]
389 async fn synchronize_all_signers_fails_if_one_signer_registration_fails() {
390 let registration_epoch = Epoch(1);
391 let fixture = MithrilFixtureBuilder::default()
392 .with_signers(5)
393 .disable_signers_certification()
394 .build();
395 let signers = fixture.signers();
396 let stake_distribution = fixture.stake_distribution();
397 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
398 epoch: registration_epoch,
399 next_signers: SignerMessagePart::from_signers(signers),
400 ..EpochSettingsMessage::dummy()
401 })
402 .unwrap();
403
404 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
405 .with_signer_recorder({
406 let mut signer_recorder = MockSignerRecorder::new();
407 signer_recorder
408 .expect_record_signer_registration()
409 .returning(|_| Ok(()))
410 .times(4);
411 signer_recorder
412 .expect_record_signer_registration()
413 .returning(|_| Err(anyhow!("an error")))
414 .times(1);
415
416 Arc::new(signer_recorder)
417 })
418 .with_signer_registration_verifier({
419 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
420 signer_registration_verifier
421 .expect_verify()
422 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
423 .times(5);
424
425 Arc::new(signer_registration_verifier)
426 })
427 .with_leader_aggregator_client({
428 let mut aggregator_client = MockAggregatorClient::new();
429 aggregator_client
430 .expect_retrieve_epoch_settings()
431 .returning(move || Ok(Some(epoch_settings_message.clone())))
432 .times(1);
433
434 Arc::new(aggregator_client)
435 })
436 .with_stake_store({
437 let mut stake_store = MockStakeStore::new();
438 stake_store
439 .expect_get_stakes()
440 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
441 .times(1);
442
443 Arc::new(stake_store)
444 })
445 .build();
446
447 signer_registration_follower
448 .synchronize_all_signers()
449 .await
450 .expect_err("synchronize_all_signers should fail");
451 }
452
453 #[tokio::test]
454 async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
455 let registration_epoch = Epoch(1);
456 let fixture = MithrilFixtureBuilder::default()
457 .with_signers(5)
458 .disable_signers_certification()
459 .build();
460 let signers = fixture.signers();
461 let stake_distribution = fixture.stake_distribution();
462 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
463 epoch: registration_epoch,
464 next_signers: SignerMessagePart::from_signers(signers),
465 ..EpochSettingsMessage::dummy()
466 })
467 .unwrap();
468
469 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
470 .with_epoch_service({
471 let mut epoch_service = FakeEpochService::without_data();
472 epoch_service.toggle_errors(false, false, false, true);
473
474 epoch_service
475 })
476 .with_signer_recorder({
477 let mut signer_recorder = MockSignerRecorder::new();
478 signer_recorder
479 .expect_record_signer_registration()
480 .returning(|_| Ok(()))
481 .times(5);
482
483 Arc::new(signer_recorder)
484 })
485 .with_signer_registration_verifier({
486 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
487 signer_registration_verifier
488 .expect_verify()
489 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
490 .times(5);
491
492 Arc::new(signer_registration_verifier)
493 })
494 .with_leader_aggregator_client({
495 let mut aggregator_client = MockAggregatorClient::new();
496 aggregator_client
497 .expect_retrieve_epoch_settings()
498 .returning(move || Ok(Some(epoch_settings_message.clone())))
499 .times(1);
500
501 Arc::new(aggregator_client)
502 })
503 .with_stake_store({
504 let mut stake_store = MockStakeStore::new();
505 stake_store
506 .expect_get_stakes()
507 .returning(move |_epoch| Ok(Some(stake_distribution.clone())))
508 .times(1);
509
510 Arc::new(stake_store)
511 })
512 .build();
513
514 signer_registration_follower
515 .synchronize_all_signers()
516 .await
517 .expect_err("synchronize_all_signers should fail");
518 }
519
520 #[tokio::test]
521 async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
522 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
523 .with_leader_aggregator_client({
524 let mut aggregator_client = MockAggregatorClient::new();
525 aggregator_client
526 .expect_retrieve_epoch_settings()
527 .returning(move || {
528 Err(AggregatorClientError::RemoteServerTechnical(anyhow!(
529 "an error"
530 )))
531 })
532 .times(1);
533
534 Arc::new(aggregator_client)
535 })
536 .build();
537
538 signer_registration_follower
539 .synchronize_all_signers()
540 .await
541 .expect_err("synchronize_all_signers should fail");
542 }
543
544 #[tokio::test]
545 async fn synchronize_all_signers_fails_if_fetching_stakes_fails() {
546 let registration_epoch = Epoch(1);
547 let fixture = MithrilFixtureBuilder::default()
548 .with_signers(5)
549 .disable_signers_certification()
550 .build();
551 let signers = fixture.signers();
552 let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
553 epoch: registration_epoch,
554 next_signers: SignerMessagePart::from_signers(signers),
555 ..EpochSettingsMessage::dummy()
556 })
557 .unwrap();
558 let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default()
559 .with_leader_aggregator_client({
560 let mut aggregator_client = MockAggregatorClient::new();
561 aggregator_client
562 .expect_retrieve_epoch_settings()
563 .returning(move || Ok(Some(epoch_settings_message.clone())))
564 .times(1);
565
566 Arc::new(aggregator_client)
567 })
568 .with_stake_store({
569 let mut stake_store = MockStakeStore::new();
570 stake_store
571 .expect_get_stakes()
572 .returning(move |_epoch| Err(anyhow!("an error")))
573 .times(1);
574
575 Arc::new(stake_store)
576 })
577 .build();
578
579 signer_registration_follower
580 .synchronize_all_signers()
581 .await
582 .expect_err("synchronize_all_signers should fail");
583 }
584}