mithril_aggregator/services/signer_registration/
leader.rs1use std::sync::Arc;
2
3use anyhow::{anyhow, Context};
4use async_trait::async_trait;
5use tokio::sync::RwLock;
6
7use mithril_common::{
8 entities::{Epoch, Signer, SignerWithStake, StakeDistribution},
9 StdResult,
10};
11
12use crate::{services::EpochPruningTask, SignerRegistrationVerifier, VerificationKeyStorer};
13
14use super::{
15 SignerRecorder, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound,
16 SignerRegistrationRoundOpener, SignerSynchronizer,
17};
18
19pub struct MithrilSignerRegistrationLeader {
21 current_round: RwLock<Option<SignerRegistrationRound>>,
23
24 verification_key_store: Arc<dyn VerificationKeyStorer>,
26
27 signer_recorder: Arc<dyn SignerRecorder>,
29
30 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
32
33 verification_key_epoch_retention_limit: Option<u64>,
36}
37
38impl MithrilSignerRegistrationLeader {
39 pub fn new(
41 verification_key_store: Arc<dyn VerificationKeyStorer>,
42 signer_recorder: Arc<dyn SignerRecorder>,
43 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
44 verification_key_epoch_retention_limit: Option<u64>,
45 ) -> Self {
46 Self {
47 current_round: RwLock::new(None),
48 verification_key_store,
49 signer_recorder,
50 signer_registration_verifier,
51 verification_key_epoch_retention_limit,
52 }
53 }
54
55 #[cfg(test)]
56 pub(crate) async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
57 self.current_round.read().await.as_ref().cloned()
58 }
59}
60
61#[async_trait]
62impl SignerRegistrationRoundOpener for MithrilSignerRegistrationLeader {
63 async fn open_registration_round(
64 &self,
65 registration_epoch: Epoch,
66 stake_distribution: StakeDistribution,
67 ) -> StdResult<()> {
68 let mut current_round = self.current_round.write().await;
69 *current_round = Some(SignerRegistrationRound {
70 epoch: registration_epoch,
71 stake_distribution,
72 });
73
74 Ok(())
75 }
76
77 async fn close_registration_round(&self) -> StdResult<()> {
78 let mut current_round = self.current_round.write().await;
79 *current_round = None;
80
81 Ok(())
82 }
83}
84
85#[async_trait]
86impl EpochPruningTask for MithrilSignerRegistrationLeader {
87 fn pruned_data(&self) -> &'static str {
88 "Signer registration"
89 }
90
91 async fn prune(&self, epoch: Epoch) -> StdResult<()> {
92 let registration_epoch = epoch.offset_to_recording_epoch();
93
94 if let Some(retention_limit) = self.verification_key_epoch_retention_limit {
95 self.verification_key_store
96 .prune_verification_keys(registration_epoch - retention_limit)
97 .await
98 .with_context(|| {
99 format!(
100 "VerificationKeyStorer can not prune verification keys below epoch: '{}'",
101 registration_epoch - retention_limit
102 )
103 })?;
104 }
105
106 Ok(())
107 }
108}
109
110#[async_trait]
111impl SignerRegisterer for MithrilSignerRegistrationLeader {
112 async fn register_signer(
113 &self,
114 epoch: Epoch,
115 signer: &Signer,
116 ) -> Result<SignerWithStake, SignerRegistrationError> {
117 let registration_round = self.current_round.read().await;
118 let registration_round = registration_round
119 .as_ref()
120 .ok_or(SignerRegistrationError::RegistrationRoundNotYetOpened)?;
121 if registration_round.epoch != epoch {
122 return Err(SignerRegistrationError::RegistrationRoundUnexpectedEpoch {
123 current_round_epoch: registration_round.epoch,
124 received_epoch: epoch,
125 });
126 }
127
128 let signer_save = self
129 .signer_registration_verifier
130 .verify(signer, ®istration_round.stake_distribution)
131 .await
132 .map_err(|e| SignerRegistrationError::FailedSignerRegistration(anyhow!(e)))?;
133
134 self.signer_recorder
135 .record_signer_registration(signer_save.party_id.clone())
136 .await
137 .map_err(|e| SignerRegistrationError::FailedSignerRecorder(e.to_string()))?;
138
139 match self
140 .verification_key_store
141 .save_verification_key(registration_round.epoch, signer_save.clone())
142 .await
143 .with_context(|| {
144 format!(
145 "VerificationKeyStorer can not save verification keys for party_id: '{}' for epoch: '{}'",
146 signer_save.party_id,
147 registration_round.epoch
148 )
149 })
150 .map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?
151 {
152 Some(_) => Err(SignerRegistrationError::ExistingSigner(Box::new(
153 signer_save,
154 ))),
155 None => Ok(signer_save),
156 }
157 }
158
159 async fn get_current_round(&self) -> Option<SignerRegistrationRound> {
160 self.current_round.read().await.as_ref().cloned()
161 }
162}
163
164#[async_trait]
165impl SignerSynchronizer for MithrilSignerRegistrationLeader {
166 async fn can_synchronize_signers(
167 &self,
168 _epoch: Epoch,
169 ) -> Result<bool, SignerRegistrationError> {
170 Ok(false)
171 }
172
173 async fn synchronize_all_signers(&self) -> Result<(), SignerRegistrationError> {
174 Err(SignerRegistrationError::SignerSynchronizationUnavailableOnLeaderAggregator)
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use std::{collections::HashMap, sync::Arc};
181
182 use mockall::predicate::eq;
183
184 use mithril_common::{
185 entities::{Epoch, Signer, SignerWithStake},
186 test_utils::MithrilFixtureBuilder,
187 };
188
189 use crate::{
190 database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
191 services::{
192 EpochPruningTask, MockSignerRecorder, MockSignerRegistrationVerifier,
193 SignerSynchronizer,
194 },
195 store::MockVerificationKeyStorer,
196 MithrilSignerRegistrationLeader, SignerRegisterer, SignerRegistrationRoundOpener,
197 VerificationKeyStorer,
198 };
199
200 use test_utils::*;
201
202 use super::*;
203
204 mod test_utils {
205 use super::*;
206
207 pub struct MithrilSignerRegistrationLeaderBuilder {
209 signer_recorder: Arc<dyn SignerRecorder>,
210 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
211 verification_key_store: Arc<dyn VerificationKeyStorer>,
212 verification_key_epoch_retention_limit: Option<u64>,
213 }
214
215 impl Default for MithrilSignerRegistrationLeaderBuilder {
216 fn default() -> Self {
217 Self {
218 signer_recorder: Arc::new(MockSignerRecorder::new()),
219 signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
220 verification_key_store: Arc::new(SignerRegistrationStore::new(Arc::new(
221 main_db_connection().unwrap(),
222 ))),
223 verification_key_epoch_retention_limit: None,
224 }
225 }
226 }
227
228 impl MithrilSignerRegistrationLeaderBuilder {
229 pub fn with_verification_key_store(
230 self,
231 verification_key_store: Arc<dyn VerificationKeyStorer>,
232 ) -> Self {
233 Self {
234 verification_key_store,
235 ..self
236 }
237 }
238
239 pub fn with_signer_recorder(self, signer_recorder: Arc<dyn SignerRecorder>) -> Self {
240 Self {
241 signer_recorder,
242 ..self
243 }
244 }
245
246 pub fn with_signer_registration_verifier(
247 self,
248 signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
249 ) -> Self {
250 Self {
251 signer_registration_verifier,
252 ..self
253 }
254 }
255
256 pub fn with_verification_key_epoch_retention_limit(
257 self,
258 verification_key_epoch_retention_limit: Option<u64>,
259 ) -> Self {
260 Self {
261 verification_key_epoch_retention_limit,
262 ..self
263 }
264 }
265
266 pub fn build(self) -> MithrilSignerRegistrationLeader {
267 MithrilSignerRegistrationLeader {
268 current_round: RwLock::new(None),
269 verification_key_store: self.verification_key_store,
270 signer_recorder: self.signer_recorder,
271 signer_registration_verifier: self.signer_registration_verifier,
272 verification_key_epoch_retention_limit: self
273 .verification_key_epoch_retention_limit,
274 }
275 }
276 }
277 }
278
279 #[tokio::test]
280 async fn can_register_signer_if_registration_round_is_opened_with_operational_certificate() {
281 let registration_epoch = Epoch(1);
282 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
283 let signer_to_register: Signer = fixture.signers()[0].to_owned();
284 let stake_distribution = fixture.stake_distribution();
285 let signer_registration_leader = MithrilSignerRegistrationLeaderBuilder::default()
286 .with_signer_recorder({
287 let mut signer_recorder = MockSignerRecorder::new();
288 signer_recorder
289 .expect_record_signer_registration()
290 .returning(|_| Ok(()))
291 .once();
292
293 Arc::new(signer_recorder)
294 })
295 .with_signer_registration_verifier({
296 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
297 signer_registration_verifier
298 .expect_verify()
299 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
300 .once();
301
302 Arc::new(signer_registration_verifier)
303 })
304 .build();
305
306 signer_registration_leader
307 .open_registration_round(registration_epoch, stake_distribution)
308 .await
309 .expect("signer registration round opening should not fail");
310
311 signer_registration_leader
312 .register_signer(registration_epoch, &signer_to_register)
313 .await
314 .expect("signer registration should not fail");
315
316 let registered_signers = &signer_registration_leader
317 .verification_key_store
318 .get_verification_keys(registration_epoch)
319 .await
320 .expect("registered signers retrieval should not fail");
321
322 assert_eq!(
323 &Some(HashMap::from([(
324 signer_to_register.party_id.clone(),
325 signer_to_register
326 )])),
327 registered_signers
328 );
329 }
330
331 #[tokio::test]
332 async fn can_register_signer_if_registration_round_is_opened_without_operational_certificate() {
333 let registration_epoch = Epoch(1);
334 let fixture = MithrilFixtureBuilder::default()
335 .with_signers(5)
336 .disable_signers_certification()
337 .build();
338 let signer_to_register: Signer = fixture.signers()[0].to_owned();
339 let stake_distribution = fixture.stake_distribution();
340 let signer_registration_leader = MithrilSignerRegistrationLeaderBuilder::default()
341 .with_signer_recorder({
342 let mut signer_recorder = MockSignerRecorder::new();
343 signer_recorder
344 .expect_record_signer_registration()
345 .returning(|_| Ok(()))
346 .once();
347
348 Arc::new(signer_recorder)
349 })
350 .with_signer_registration_verifier({
351 let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
352 signer_registration_verifier
353 .expect_verify()
354 .returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
355 .once();
356
357 Arc::new(signer_registration_verifier)
358 })
359 .build();
360
361 signer_registration_leader
362 .open_registration_round(registration_epoch, stake_distribution)
363 .await
364 .expect("signer registration round opening should not fail");
365
366 signer_registration_leader
367 .register_signer(registration_epoch, &signer_to_register)
368 .await
369 .expect("signer registration should not fail");
370
371 let registered_signers = &signer_registration_leader
372 .verification_key_store
373 .get_verification_keys(registration_epoch)
374 .await
375 .expect("registered signers retrieval should not fail");
376
377 assert_eq!(
378 &Some(HashMap::from([(
379 signer_to_register.party_id.clone(),
380 signer_to_register
381 )])),
382 registered_signers
383 );
384 }
385
386 #[tokio::test]
387 async fn cant_register_signer_if_registration_round_is_not_opened() {
388 let registration_epoch = Epoch(1);
389 let fixture = MithrilFixtureBuilder::default().with_signers(5).build();
390 let signer_to_register: Signer = fixture.signers()[0].to_owned();
391 let signer_registration_leader = MithrilSignerRegistrationLeaderBuilder::default().build();
392
393 signer_registration_leader
394 .register_signer(registration_epoch, &signer_to_register)
395 .await
396 .expect_err("signer registration should fail if no round opened");
397 }
398
399 #[tokio::test]
400 async fn synchronize_all_signers_always_fails() {
401 let signer_registration_leader = MithrilSignerRegistrationLeaderBuilder::default().build();
402
403 signer_registration_leader
404 .synchronize_all_signers()
405 .await
406 .expect_err("synchronize_signers");
407 }
408
409 #[tokio::test]
410 async fn prune_epoch_older_than_threshold() {
411 const PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD: u64 = 10;
412 let signer_registration_leader = MithrilSignerRegistrationLeaderBuilder::default()
413 .with_verification_key_store({
414 let mut verification_key_store = MockVerificationKeyStorer::new();
415 verification_key_store
416 .expect_prune_verification_keys()
417 .with(eq(Epoch(4).offset_to_recording_epoch()))
418 .times(1)
419 .returning(|_| Ok(()));
420
421 Arc::new(verification_key_store)
422 })
423 .with_verification_key_epoch_retention_limit(Some(
424 PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD,
425 ))
426 .build();
427
428 let current_epoch = Epoch(4) + PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD;
429 signer_registration_leader
430 .prune(current_epoch)
431 .await
432 .unwrap();
433 }
434
435 #[tokio::test]
436 async fn without_threshold_nothing_is_pruned() {
437 let signer_registration_leader = MithrilSignerRegistrationLeaderBuilder::default()
438 .with_verification_key_store({
439 let mut verification_key_store = MockVerificationKeyStorer::new();
440 verification_key_store
441 .expect_prune_verification_keys()
442 .never();
443
444 Arc::new(verification_key_store)
445 })
446 .with_verification_key_epoch_retention_limit(None)
447 .build();
448
449 signer_registration_leader.prune(Epoch(100)).await.unwrap();
450 }
451}