mithril_aggregator/services/certificate_chain_synchronizer/
synchronizer_service.rs1use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use chrono::Utc;
25use slog::{Logger, debug, info};
26use std::collections::VecDeque;
27use std::sync::Arc;
28
29use mithril_common::StdResult;
30use mithril_common::certificate_chain::CertificateVerifier;
31use mithril_common::crypto_helper::ProtocolGenesisVerifier;
32use mithril_common::entities::{Certificate, SignedEntityType};
33use mithril_common::logging::LoggerExtensions;
34
35use crate::entities::OpenMessage;
36
37use super::{
38 CertificateChainSynchronizer, OpenMessageStorer, RemoteCertificateRetriever,
39 SynchronizedCertificateStorer,
40};
41
42pub struct MithrilCertificateChainSynchronizer {
44 remote_certificate_retriever: Arc<dyn RemoteCertificateRetriever>,
45 certificate_storer: Arc<dyn SynchronizedCertificateStorer>,
46 certificate_verifier: Arc<dyn CertificateVerifier>,
47 genesis_verifier: Arc<ProtocolGenesisVerifier>,
48 open_message_storer: Arc<dyn OpenMessageStorer>,
49 logger: Logger,
50}
51
52#[derive(Debug, Copy, Clone, PartialEq, Eq)]
53enum SyncStatus {
54 Forced,
55 NoLocalGenesis,
56 RemoteGenesisMatchesLocalGenesis,
57 RemoteGenesisDoesntMatchLocalGenesis,
58}
59
60impl SyncStatus {
61 fn should_sync(&self) -> bool {
62 match self {
63 SyncStatus::Forced => true,
64 SyncStatus::NoLocalGenesis => true,
65 SyncStatus::RemoteGenesisMatchesLocalGenesis => false,
66 SyncStatus::RemoteGenesisDoesntMatchLocalGenesis => true,
67 }
68 }
69}
70
71impl MithrilCertificateChainSynchronizer {
72 pub fn new(
74 remote_certificate_retriever: Arc<dyn RemoteCertificateRetriever>,
75 certificate_storer: Arc<dyn SynchronizedCertificateStorer>,
76 certificate_verifier: Arc<dyn CertificateVerifier>,
77 genesis_verifier: Arc<ProtocolGenesisVerifier>,
78 open_message_storer: Arc<dyn OpenMessageStorer>,
79 logger: Logger,
80 ) -> Self {
81 Self {
82 remote_certificate_retriever,
83 certificate_storer,
84 certificate_verifier,
85 genesis_verifier,
86 open_message_storer,
87 logger: logger.new_with_component_name::<Self>(),
88 }
89 }
90
91 async fn check_sync_state(&self, force: bool) -> StdResult<SyncStatus> {
92 if force {
93 return Ok(SyncStatus::Forced);
94 }
95
96 match self.certificate_storer.get_latest_genesis().await? {
97 Some(local_genesis) => {
98 match self
99 .remote_certificate_retriever
100 .get_genesis_certificate_details()
101 .await?
102 {
103 Some(remote_genesis) if (local_genesis == remote_genesis) => {
104 Ok(SyncStatus::RemoteGenesisMatchesLocalGenesis)
105 }
106 Some(_) => Ok(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis),
107 None => Err(anyhow!("Remote aggregator doesn't have a chain yet")),
109 }
110 }
111 None => Ok(SyncStatus::NoLocalGenesis),
112 }
113 }
114
115 async fn retrieve_and_validate_remote_certificate_chain(
116 &self,
117 starting_point: Certificate,
118 ) -> StdResult<Vec<Certificate>> {
119 let mut validated_certificates = VecDeque::new();
122 let mut certificate = starting_point;
123
124 loop {
125 let parent_certificate = self
126 .certificate_verifier
127 .verify_certificate(&certificate, &self.genesis_verifier.to_verification_key())
128 .await
129 .with_context(
130 || format!("Failed to verify certificate: `{}`", certificate.hash,),
131 )?;
132
133 match parent_certificate {
134 None => {
135 validated_certificates.push_front(certificate);
136 break;
137 }
138 Some(parent) => {
139 if !validated_certificates.is_empty() || parent.epoch != certificate.epoch {
142 validated_certificates.push_front(certificate);
143 }
144
145 certificate = parent;
146 }
147 }
148 }
149
150 Ok(validated_certificates.into())
151 }
152
153 async fn store_certificate_chain(&self, certificate_chain: Vec<Certificate>) -> StdResult<()> {
154 self.certificate_storer
155 .insert_or_replace_many(certificate_chain)
156 .await?;
157 Ok(())
158 }
159}
160
161#[async_trait]
162impl CertificateChainSynchronizer for MithrilCertificateChainSynchronizer {
163 async fn synchronize_certificate_chain(&self, force: bool) -> StdResult<()> {
164 debug!(self.logger, ">> synchronize_certificate_chain"; "force" => force);
165
166 let sync_state = self.check_sync_state(force).await.with_context(|| {
167 format!("Failed to check if certificate chain should be sync (force: `{force}`)")
168 })?;
169 if sync_state.should_sync() {
170 info!(self.logger, "Start synchronizing certificate chain"; "sync_state" => ?sync_state);
171 } else {
172 info!(self.logger, "No need to synchronize certificate chain"; "sync_state" => ?sync_state);
173 return Ok(());
174 }
175
176 let starting_point = self
177 .remote_certificate_retriever
178 .get_latest_certificate_details()
179 .await?
180 .ok_or(
181 anyhow!("Remote aggregator doesn't have a chain yet")
182 .context("Failed to retrieve latest remote certificate details"),
183 )?;
184 let remote_certificate_chain = self
185 .retrieve_and_validate_remote_certificate_chain(starting_point)
186 .await
187 .with_context(|| "Failed to retrieve and validate remote certificate chain")?;
188 let open_message = prepare_open_message_to_store(
189 remote_certificate_chain
190 .last()
191 .ok_or(anyhow!("Retrieved certificate chain is empty"))?,
192 );
193 self.store_certificate_chain(remote_certificate_chain)
194 .await
195 .with_context(|| "Failed to store remote retrieved certificate chain")?;
196 self.open_message_storer
197 .insert_or_replace_open_message(open_message)
198 .await
199 .with_context(|| "Failed to store open message when synchronizing certificate chain")?;
200
201 info!(
202 self.logger,
203 "Certificate chain synchronized with remote source"
204 );
205 Ok(())
206 }
207}
208
209fn prepare_open_message_to_store(latest_certificate: &Certificate) -> OpenMessage {
210 OpenMessage {
211 epoch: latest_certificate.epoch,
212 signed_entity_type: SignedEntityType::MithrilStakeDistribution(latest_certificate.epoch),
213 protocol_message: latest_certificate.protocol_message.clone(),
214 is_certified: true,
215 is_expired: false,
216 single_signatures: Vec::new(),
217 created_at: Utc::now(),
218 expires_at: None,
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use anyhow::anyhow;
225 use std::sync::RwLock;
226
227 use mithril_common::certificate_chain::MithrilCertificateVerifier;
228 use mithril_common::test::{
229 builder::{CertificateChainBuilder, CertificateChainFixture},
230 double::{FakeCertificaterRetriever, fake_data, fake_keys},
231 mock_extensions::MockBuilder,
232 };
233
234 use crate::services::{
235 MockOpenMessageStorer, MockRemoteCertificateRetriever, MockSynchronizedCertificateStorer,
236 };
237 use crate::test::TestLogger;
238 use crate::test::double::mocks::MockCertificateVerifier;
239
240 use super::*;
241
242 impl MithrilCertificateChainSynchronizer {
243 fn default_for_test() -> Self {
244 let genesis_verification_key =
245 fake_keys::genesis_verification_key()[0].try_into().unwrap();
246 Self::new(
247 Arc::new(MockRemoteCertificateRetriever::new()),
248 Arc::new(MockSynchronizedCertificateStorer::new()),
249 Arc::new(MockCertificateVerifier::new()),
250 Arc::new(ProtocolGenesisVerifier::from_verification_key(
251 genesis_verification_key,
252 )),
253 Arc::new(MockOpenMessageStorer::new()),
254 TestLogger::stdout(),
255 )
256 }
257 }
258
259 macro_rules! mocked_synchronizer {
260 (with_remote_genesis: $remote_genesis_result:expr) => {
261 MithrilCertificateChainSynchronizer {
262 remote_certificate_retriever:
263 MockBuilder::<MockRemoteCertificateRetriever>::configure(|retriever| {
264 retriever
265 .expect_get_genesis_certificate_details()
266 .return_once(move || $remote_genesis_result);
267 }),
268 ..MithrilCertificateChainSynchronizer::default_for_test()
269 }
270 };
271 (with_local_genesis: $local_genesis_result:expr) => {
272 MithrilCertificateChainSynchronizer {
273 certificate_storer: MockBuilder::<MockSynchronizedCertificateStorer>::configure(
274 |storer| {
275 storer
276 .expect_get_latest_genesis()
277 .return_once(move || $local_genesis_result);
278 },
279 ),
280 ..MithrilCertificateChainSynchronizer::default_for_test()
281 }
282 };
283 (with_remote_genesis: $remote_genesis_result:expr, with_local_genesis: $local_genesis_result:expr) => {
284 MithrilCertificateChainSynchronizer {
285 remote_certificate_retriever:
286 MockBuilder::<MockRemoteCertificateRetriever>::configure(|retriever| {
287 retriever
288 .expect_get_genesis_certificate_details()
289 .return_once(move || $remote_genesis_result);
290 }),
291 certificate_storer: MockBuilder::<MockSynchronizedCertificateStorer>::configure(
292 |storer| {
293 storer
294 .expect_get_latest_genesis()
295 .return_once(move || $local_genesis_result);
296 },
297 ),
298 ..MithrilCertificateChainSynchronizer::default_for_test()
299 }
300 };
301 (with_verify_certificate_result: $verify_certificate_result:expr) => {
302 MithrilCertificateChainSynchronizer {
303 certificate_verifier: MockBuilder::<MockCertificateVerifier>::configure(
304 |verifier| {
305 verifier
306 .expect_verify_certificate()
307 .return_once(move |_, _| $verify_certificate_result);
308 },
309 ),
310 ..MithrilCertificateChainSynchronizer::default_for_test()
311 }
312 };
313 }
314
315 fn fake_verifier(remote_certificate_chain: &[Certificate]) -> Arc<dyn CertificateVerifier> {
316 let verifier = MithrilCertificateVerifier::new(
317 TestLogger::stdout(),
318 Arc::new(FakeCertificaterRetriever::from_certificates(
319 remote_certificate_chain,
320 )),
321 );
322 Arc::new(verifier)
323 }
324
325 #[derive(Default)]
326 struct DumbCertificateStorer {
327 certificates: RwLock<Vec<Certificate>>,
328 genesis_certificate: Option<Certificate>,
329 }
330
331 impl DumbCertificateStorer {
332 fn new(genesis: Certificate, already_stored: Vec<Certificate>) -> Self {
333 Self {
334 certificates: RwLock::new(already_stored),
335 genesis_certificate: Some(genesis),
336 }
337 }
338
339 fn stored_certificates(&self) -> Vec<Certificate> {
340 self.certificates.read().unwrap().clone()
341 }
342 }
343
344 #[async_trait]
345 impl SynchronizedCertificateStorer for DumbCertificateStorer {
346 async fn insert_or_replace_many(
347 &self,
348 certificates_chain: Vec<Certificate>,
349 ) -> StdResult<()> {
350 let mut certificates = self.certificates.write().unwrap();
351 *certificates = certificates_chain;
352 Ok(())
353 }
354
355 async fn get_latest_genesis(&self) -> StdResult<Option<Certificate>> {
356 Ok(self.genesis_certificate.clone())
357 }
358 }
359
360 mod check_sync_state {
361 use super::*;
362
363 #[test]
364 fn sync_state_should_sync() {
365 assert!(SyncStatus::Forced.should_sync());
366 assert!(!SyncStatus::RemoteGenesisMatchesLocalGenesis.should_sync());
367 assert!(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis.should_sync());
368 assert!(SyncStatus::NoLocalGenesis.should_sync());
369 }
370
371 #[tokio::test]
372 async fn state_when_force_true() {
373 let synchronizer = MithrilCertificateChainSynchronizer::default_for_test();
374
375 let sync_state = synchronizer.check_sync_state(true).await.unwrap();
376 assert_eq!(SyncStatus::Forced, sync_state);
377 }
378
379 #[tokio::test]
380 async fn state_when_force_false_and_no_local_genesis_certificate_found() {
381 let synchronizer = mocked_synchronizer!(with_local_genesis: Ok(None));
382
383 let sync_state = synchronizer.check_sync_state(false).await.unwrap();
384 assert_eq!(SyncStatus::NoLocalGenesis, sync_state);
385 }
386
387 #[tokio::test]
388 async fn state_when_force_false_and_remote_genesis_dont_matches_local_genesis() {
389 let synchronizer = mocked_synchronizer!(
390 with_remote_genesis: Ok(Some(fake_data::genesis_certificate("remote_genesis"))),
391 with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis")))
392 );
393
394 let sync_state = synchronizer.check_sync_state(false).await.unwrap();
395 assert_eq!(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis, sync_state);
396 }
397
398 #[tokio::test]
399 async fn state_when_force_false_and_remote_genesis_matches_local_genesis() {
400 let remote_genesis = fake_data::genesis_certificate("genesis");
401 let local_genesis = remote_genesis.clone();
402 let synchronizer = mocked_synchronizer!(
403 with_remote_genesis: Ok(Some(remote_genesis)),
404 with_local_genesis: Ok(Some(local_genesis))
405 );
406
407 let sync_state = synchronizer.check_sync_state(false).await.unwrap();
408 assert_eq!(SyncStatus::RemoteGenesisMatchesLocalGenesis, sync_state);
409 }
410
411 #[tokio::test]
412 async fn if_force_true_it_should_not_fetch_remote_genesis_certificate() {
413 let synchronizer = mocked_synchronizer!(with_remote_genesis: Err(anyhow!(
414 "should not fetch genesis"
415 )));
416
417 synchronizer.check_sync_state(true).await.unwrap();
418 }
419
420 #[tokio::test]
421 async fn should_abort_with_error_if_force_false_and_fails_to_retrieve_local_genesis() {
422 let synchronizer = mocked_synchronizer!(with_local_genesis: Err(anyhow!("failure")));
423 synchronizer
424 .check_sync_state(false)
425 .await
426 .expect_err("Expected an error but was:");
427 }
428
429 #[tokio::test]
430 async fn should_abort_with_error_if_force_false_and_fails_to_retrieve_remote_genesis() {
431 let synchronizer = mocked_synchronizer!(
432 with_remote_genesis: Err(anyhow!("failure")),
433 with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis")))
434 );
435 synchronizer
436 .check_sync_state(false)
437 .await
438 .expect_err("Expected an error but was:");
439 }
440
441 #[tokio::test]
442 async fn should_abort_with_error_if_force_false_and_remote_genesis_is_none() {
443 let synchronizer = mocked_synchronizer!(
444 with_remote_genesis: Ok(None),
445 with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis")))
446 );
447 let error = synchronizer
448 .check_sync_state(false)
449 .await
450 .expect_err("Expected an error but was:");
451
452 assert!(
453 error
454 .to_string()
455 .contains("Remote aggregator doesn't have a chain yet"),
456 "Unexpected error:\n{error:?}"
457 );
458 }
459 }
460
461 mod retrieve_validate_remote_certificate_chain {
462 use mockall::predicate::{always, eq};
463
464 use mithril_common::entities::Epoch;
465
466 use super::*;
467
468 #[tokio::test]
469 async fn succeed_if_the_remote_chain_only_contains_a_genesis_certificate() {
470 let chain = CertificateChainBuilder::new().with_total_certificates(1).build();
471 let synchronizer = MithrilCertificateChainSynchronizer {
472 certificate_verifier: fake_verifier(&chain),
473 genesis_verifier: Arc::new(chain.genesis_verifier.clone()),
474 ..MithrilCertificateChainSynchronizer::default_for_test()
475 };
476
477 let starting_point = chain[0].clone();
478 let remote_certificate_chain = synchronizer
479 .retrieve_and_validate_remote_certificate_chain(starting_point)
480 .await
481 .unwrap();
482
483 assert_eq!(remote_certificate_chain, chain.certificates_chained);
484 }
485
486 #[tokio::test]
487 async fn abort_with_error_if_a_certificate_is_invalid() {
488 let synchronizer = mocked_synchronizer!(with_verify_certificate_result: Err(anyhow!("invalid certificate")));
489
490 let starting_point = fake_data::certificate("certificate");
491 synchronizer
492 .retrieve_and_validate_remote_certificate_chain(starting_point)
493 .await
494 .expect_err("Expected an error but was:");
495 }
496
497 #[tokio::test]
498 async fn succeed_with_a_valid_certificate_chain_and_only_get_first_certificate_of_each_epoch_plus_genesis()
499 {
500 let chain = CertificateChainBuilder::new()
504 .with_total_certificates(9)
505 .with_certificates_per_epoch(2)
506 .build();
507 let synchronizer = MithrilCertificateChainSynchronizer {
508 certificate_verifier: fake_verifier(&chain),
509 genesis_verifier: Arc::new(chain.genesis_verifier.clone()),
510 ..MithrilCertificateChainSynchronizer::default_for_test()
511 };
512
513 let starting_point = chain[0].clone();
514 let remote_certificate_chain = synchronizer
515 .retrieve_and_validate_remote_certificate_chain(starting_point.clone())
516 .await
517 .unwrap();
518
519 let mut expected = chain.certificate_path_to_genesis(&starting_point.hash);
520 expected.reverse();
522 expected.pop();
524 assert_eq!(remote_certificate_chain, expected);
525 }
526
527 #[tokio::test]
528 async fn return_chain_ordered_from_genesis_to_latest() {
529 let base_certificate = fake_data::certificate("whatever");
530 let chain = vec![
531 Certificate {
532 epoch: Epoch(2),
533 ..fake_data::genesis_certificate("genesis")
534 },
535 Certificate {
536 epoch: Epoch(3),
537 hash: "hash1".to_string(),
538 previous_hash: "genesis".to_string(),
539 ..base_certificate.clone()
540 },
541 Certificate {
542 epoch: Epoch(4),
543 hash: "hash2".to_string(),
544 previous_hash: "hash1".to_string(),
545 ..base_certificate
546 },
547 ];
548 let synchronizer = MithrilCertificateChainSynchronizer {
549 certificate_verifier: MockBuilder::<MockCertificateVerifier>::configure(|mock| {
550 let cert_1 = chain[1].clone();
551 mock.expect_verify_certificate()
552 .with(eq(chain[2].clone()), always())
553 .return_once(move |_, _| Ok(Some(cert_1)));
554 let genesis = chain[0].clone();
555 mock.expect_verify_certificate()
556 .with(eq(chain[1].clone()), always())
557 .return_once(move |_, _| Ok(Some(genesis)));
558 mock.expect_verify_certificate()
559 .with(eq(chain[0].clone()), always())
560 .return_once(move |_, _| Ok(None));
561 }),
562 ..MithrilCertificateChainSynchronizer::default_for_test()
563 };
564
565 let starting_point = chain[2].clone();
566 let remote_certificate_chain = synchronizer
567 .retrieve_and_validate_remote_certificate_chain(starting_point.clone())
568 .await
569 .unwrap();
570
571 assert_eq!(
572 remote_certificate_chain
573 .into_iter()
574 .map(|c| c.hash)
575 .collect::<Vec<_>>(),
576 vec!["genesis".to_string(), "hash1".to_string(), "hash2".to_string()]
577 );
578 }
579 }
580
581 mod store_remote_certificate_chain {
582 use super::*;
583
584 #[tokio::test]
585 async fn do_store_given_certificates() {
586 let certificates_chain = vec![
587 fake_data::genesis_certificate("genesis"),
588 fake_data::certificate("certificate1"),
589 fake_data::certificate("certificate2"),
590 ];
591 let storer = Arc::new(DumbCertificateStorer::default());
592 let synchronizer = MithrilCertificateChainSynchronizer {
593 certificate_storer: storer.clone(),
594 ..MithrilCertificateChainSynchronizer::default_for_test()
595 };
596
597 assert_eq!(Vec::<Certificate>::new(), storer.stored_certificates());
598
599 synchronizer
600 .store_certificate_chain(certificates_chain.clone())
601 .await
602 .unwrap();
603
604 assert_eq!(certificates_chain, storer.stored_certificates());
605 }
606
607 #[tokio::test]
608 async fn fail_on_storer_error() {
609 let synchronizer = MithrilCertificateChainSynchronizer {
610 certificate_storer: MockBuilder::<MockSynchronizedCertificateStorer>::configure(
611 |mock| {
612 mock.expect_insert_or_replace_many()
613 .return_once(move |_| Err(anyhow!("failure")));
614 },
615 ),
616 ..MithrilCertificateChainSynchronizer::default_for_test()
617 };
618
619 synchronizer
620 .store_certificate_chain(vec![fake_data::certificate("certificate")])
621 .await
622 .unwrap_err();
623 }
624 }
625
626 mod synchronize_certificate_chain {
627 use mockall::predicate::function;
628
629 use super::*;
630
631 fn build_synchronizer(
632 remote_chain: &CertificateChainFixture,
633 storer: Arc<dyn SynchronizedCertificateStorer>,
634 ) -> MithrilCertificateChainSynchronizer {
635 MithrilCertificateChainSynchronizer {
636 certificate_storer: storer.clone(),
637 remote_certificate_retriever:
638 MockBuilder::<MockRemoteCertificateRetriever>::configure(|mock| {
639 let genesis = remote_chain.genesis_certificate().clone();
640 mock.expect_get_genesis_certificate_details()
641 .return_once(move || Ok(Some(genesis)));
642 let latest = remote_chain.latest_certificate().clone();
643 mock.expect_get_latest_certificate_details()
644 .return_once(move || Ok(Some(latest)));
645 }),
646 certificate_verifier: fake_verifier(remote_chain),
647 open_message_storer: MockBuilder::<MockOpenMessageStorer>::configure(|mock| {
648 let expected_msd_epoch = remote_chain.latest_certificate().epoch;
650 mock.expect_insert_or_replace_open_message()
651 .with(function(move |open_message: &OpenMessage| {
652 open_message.signed_entity_type
653 == SignedEntityType::MithrilStakeDistribution(expected_msd_epoch)
654 }))
655 .times(1..)
656 .returning(|_| Ok(()));
657 }),
658 ..MithrilCertificateChainSynchronizer::default_for_test()
659 }
660 }
661
662 #[tokio::test]
663 async fn store_all() {
664 let remote_chain = CertificateChainBuilder::default()
665 .with_certificates_per_epoch(3)
666 .with_total_certificates(8)
667 .build();
668 let storer = Arc::new(DumbCertificateStorer::default());
669 let synchronizer = build_synchronizer(&remote_chain, storer.clone());
670
671 synchronizer.synchronize_certificate_chain(false).await.unwrap();
673
674 let mut expected =
675 remote_chain.certificate_path_to_genesis(&remote_chain.latest_certificate().hash);
676 expected.reverse();
677 assert_eq!(expected, storer.stored_certificates());
678 }
679
680 #[tokio::test]
681 async fn store_partial() {
682 let remote_chain = CertificateChainBuilder::default()
683 .with_certificates_per_epoch(1)
684 .with_total_certificates(8)
685 .build();
686 let existing_certificates =
687 remote_chain.certificate_path_to_genesis(&remote_chain[5].hash);
688 let storer = Arc::new(DumbCertificateStorer::new(
689 remote_chain.genesis_certificate().clone(),
690 existing_certificates.clone(),
691 ));
692 let synchronizer = build_synchronizer(&remote_chain, storer.clone());
693
694 synchronizer.synchronize_certificate_chain(false).await.unwrap();
696
697 assert_eq!(&existing_certificates, &storer.stored_certificates());
698
699 synchronizer.synchronize_certificate_chain(true).await.unwrap();
701
702 let mut expected =
703 remote_chain.certificate_path_to_genesis(&remote_chain.latest_certificate().hash);
704 expected.reverse();
705 assert_eq!(expected, storer.stored_certificates());
706 }
707 }
708}