mithril_aggregator/services/certificate_chain_synchronizer/
synchronizer_service.rs1use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use chrono::Utc;
25use slog::{Logger, debug, info};
26use std::collections::VecDeque;
27use std::sync::Arc;
28
29use mithril_common::StdResult;
30use mithril_common::certificate_chain::CertificateVerifier;
31use mithril_common::crypto_helper::ProtocolGenesisVerifier;
32use mithril_common::entities::{Certificate, SignedEntityType};
33use mithril_common::logging::LoggerExtensions;
34
35use crate::entities::OpenMessage;
36
37use super::{
38 CertificateChainSynchronizer, OpenMessageStorer, RemoteCertificateRetriever,
39 SynchronizedCertificateStorer,
40};
41
42pub struct MithrilCertificateChainSynchronizer {
44 remote_certificate_retriever: Arc<dyn RemoteCertificateRetriever>,
45 certificate_storer: Arc<dyn SynchronizedCertificateStorer>,
46 certificate_verifier: Arc<dyn CertificateVerifier>,
47 genesis_verifier: Arc<ProtocolGenesisVerifier>,
48 open_message_storer: Arc<dyn OpenMessageStorer>,
49 logger: Logger,
50}
51
52#[derive(Debug, Copy, Clone, PartialEq, Eq)]
53enum SyncStatus {
54 Forced,
55 NoLocalGenesis,
56 RemoteGenesisMatchesLocalGenesis,
57 RemoteGenesisDoesntMatchLocalGenesis,
58}
59
60impl SyncStatus {
61 fn should_sync(&self) -> bool {
62 match self {
63 SyncStatus::Forced => true,
64 SyncStatus::NoLocalGenesis => true,
65 SyncStatus::RemoteGenesisMatchesLocalGenesis => false,
66 SyncStatus::RemoteGenesisDoesntMatchLocalGenesis => true,
67 }
68 }
69}
70
71impl MithrilCertificateChainSynchronizer {
72 pub fn new(
74 remote_certificate_retriever: Arc<dyn RemoteCertificateRetriever>,
75 certificate_storer: Arc<dyn SynchronizedCertificateStorer>,
76 certificate_verifier: Arc<dyn CertificateVerifier>,
77 genesis_verifier: Arc<ProtocolGenesisVerifier>,
78 open_message_storer: Arc<dyn OpenMessageStorer>,
79 logger: Logger,
80 ) -> Self {
81 Self {
82 remote_certificate_retriever,
83 certificate_storer,
84 certificate_verifier,
85 genesis_verifier,
86 open_message_storer,
87 logger: logger.new_with_component_name::<Self>(),
88 }
89 }
90
91 async fn check_sync_state(&self, force: bool) -> StdResult<SyncStatus> {
92 if force {
93 return Ok(SyncStatus::Forced);
94 }
95
96 match self.certificate_storer.get_latest_genesis().await? {
97 Some(local_genesis) => {
98 match self
99 .remote_certificate_retriever
100 .get_genesis_certificate_details()
101 .await?
102 {
103 Some(remote_genesis) if (local_genesis == remote_genesis) => {
104 Ok(SyncStatus::RemoteGenesisMatchesLocalGenesis)
105 }
106 Some(_) => Ok(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis),
107 None => Err(anyhow!("Remote aggregator doesn't have a chain yet")),
109 }
110 }
111 None => Ok(SyncStatus::NoLocalGenesis),
112 }
113 }
114
115 async fn retrieve_and_validate_remote_certificate_chain(
116 &self,
117 starting_point: Certificate,
118 ) -> StdResult<Vec<Certificate>> {
119 let mut validated_certificates = VecDeque::new();
122 let mut certificate = starting_point;
123
124 loop {
125 let parent_certificate = self
126 .certificate_verifier
127 .verify_certificate(&certificate, &self.genesis_verifier.to_verification_key())
128 .await
129 .with_context(
130 || format!("Failed to verify certificate: `{}`", certificate.hash,),
131 )?;
132
133 match parent_certificate {
134 None => {
135 validated_certificates.push_front(certificate);
136 break;
137 }
138 Some(parent) => {
139 if !validated_certificates.is_empty() || parent.epoch != certificate.epoch {
142 validated_certificates.push_front(certificate);
143 }
144
145 certificate = parent;
146 }
147 }
148 }
149
150 Ok(validated_certificates.into())
151 }
152
153 async fn store_certificate_chain(&self, certificate_chain: Vec<Certificate>) -> StdResult<()> {
154 self.certificate_storer
155 .insert_or_replace_many(certificate_chain)
156 .await?;
157 Ok(())
158 }
159}
160
161#[async_trait]
162impl CertificateChainSynchronizer for MithrilCertificateChainSynchronizer {
163 async fn synchronize_certificate_chain(&self, force: bool) -> StdResult<()> {
164 debug!(self.logger, ">> synchronize_certificate_chain"; "force" => force);
165
166 let sync_state = self.check_sync_state(force).await.with_context(|| {
167 format!("Failed to check if certificate chain should be sync (force: `{force}`)")
168 })?;
169 if sync_state.should_sync() {
170 info!(self.logger, "Start synchronizing certificate chain"; "sync_state" => ?sync_state);
171 } else {
172 info!(self.logger, "No need to synchronize certificate chain"; "sync_state" => ?sync_state);
173 return Ok(());
174 }
175
176 let starting_point = self
177 .remote_certificate_retriever
178 .get_latest_certificate_details()
179 .await?
180 .ok_or(
181 anyhow!("Remote aggregator doesn't have a chain yet")
182 .context("Failed to retrieve latest remote certificate details"),
183 )?;
184 let remote_certificate_chain = self
185 .retrieve_and_validate_remote_certificate_chain(starting_point)
186 .await
187 .with_context(|| "Failed to retrieve and validate remote certificate chain")?;
188 let open_message = prepare_open_message_to_store(
189 remote_certificate_chain
190 .last()
191 .ok_or(anyhow!("Retrieved certificate chain is empty"))?,
192 );
193 self.store_certificate_chain(remote_certificate_chain)
194 .await
195 .with_context(|| "Failed to store remote retrieved certificate chain")?;
196 self.open_message_storer
197 .insert_or_replace_open_message(open_message)
198 .await
199 .with_context(|| "Failed to store open message when synchronizing certificate chain")?;
200
201 info!(
202 self.logger,
203 "Certificate chain synchronized with remote source"
204 );
205 Ok(())
206 }
207}
208
209fn prepare_open_message_to_store(latest_certificate: &Certificate) -> OpenMessage {
210 OpenMessage {
211 epoch: latest_certificate.epoch,
212 signed_entity_type: SignedEntityType::MithrilStakeDistribution(latest_certificate.epoch),
213 protocol_message: latest_certificate.protocol_message.clone(),
214 is_certified: true,
215 is_expired: false,
216 single_signatures: Vec::new(),
217 created_at: Utc::now(),
218 expires_at: None,
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use anyhow::anyhow;
225 use std::sync::RwLock;
226
227 use mithril_common::certificate_chain::{
228 FakeCertificaterRetriever, MithrilCertificateVerifier,
229 };
230 use mithril_common::test_utils::{
231 CertificateChainBuilder, CertificateChainFixture, fake_data, fake_keys,
232 mock_extensions::MockBuilder,
233 };
234
235 use crate::services::{
236 MockOpenMessageStorer, MockRemoteCertificateRetriever, MockSynchronizedCertificateStorer,
237 };
238 use crate::test_tools::TestLogger;
239 use crate::tools::mocks::MockCertificateVerifier;
240
241 use super::*;
242
243 impl MithrilCertificateChainSynchronizer {
244 fn default_for_test() -> Self {
245 let genesis_verification_key =
246 fake_keys::genesis_verification_key()[0].try_into().unwrap();
247 Self::new(
248 Arc::new(MockRemoteCertificateRetriever::new()),
249 Arc::new(MockSynchronizedCertificateStorer::new()),
250 Arc::new(MockCertificateVerifier::new()),
251 Arc::new(ProtocolGenesisVerifier::from_verification_key(
252 genesis_verification_key,
253 )),
254 Arc::new(MockOpenMessageStorer::new()),
255 TestLogger::stdout(),
256 )
257 }
258 }
259
260 macro_rules! mocked_synchronizer {
261 (with_remote_genesis: $remote_genesis_result:expr) => {
262 MithrilCertificateChainSynchronizer {
263 remote_certificate_retriever:
264 MockBuilder::<MockRemoteCertificateRetriever>::configure(|retriever| {
265 retriever
266 .expect_get_genesis_certificate_details()
267 .return_once(move || $remote_genesis_result);
268 }),
269 ..MithrilCertificateChainSynchronizer::default_for_test()
270 }
271 };
272 (with_local_genesis: $local_genesis_result:expr) => {
273 MithrilCertificateChainSynchronizer {
274 certificate_storer: MockBuilder::<MockSynchronizedCertificateStorer>::configure(
275 |storer| {
276 storer
277 .expect_get_latest_genesis()
278 .return_once(move || $local_genesis_result);
279 },
280 ),
281 ..MithrilCertificateChainSynchronizer::default_for_test()
282 }
283 };
284 (with_remote_genesis: $remote_genesis_result:expr, with_local_genesis: $local_genesis_result:expr) => {
285 MithrilCertificateChainSynchronizer {
286 remote_certificate_retriever:
287 MockBuilder::<MockRemoteCertificateRetriever>::configure(|retriever| {
288 retriever
289 .expect_get_genesis_certificate_details()
290 .return_once(move || $remote_genesis_result);
291 }),
292 certificate_storer: MockBuilder::<MockSynchronizedCertificateStorer>::configure(
293 |storer| {
294 storer
295 .expect_get_latest_genesis()
296 .return_once(move || $local_genesis_result);
297 },
298 ),
299 ..MithrilCertificateChainSynchronizer::default_for_test()
300 }
301 };
302 (with_verify_certificate_result: $verify_certificate_result:expr) => {
303 MithrilCertificateChainSynchronizer {
304 certificate_verifier: MockBuilder::<MockCertificateVerifier>::configure(
305 |verifier| {
306 verifier
307 .expect_verify_certificate()
308 .return_once(move |_, _| $verify_certificate_result);
309 },
310 ),
311 ..MithrilCertificateChainSynchronizer::default_for_test()
312 }
313 };
314 }
315
316 fn fake_verifier(remote_certificate_chain: &[Certificate]) -> Arc<dyn CertificateVerifier> {
317 let verifier = MithrilCertificateVerifier::new(
318 TestLogger::stdout(),
319 Arc::new(FakeCertificaterRetriever::from_certificates(
320 remote_certificate_chain,
321 )),
322 );
323 Arc::new(verifier)
324 }
325
326 #[derive(Default)]
327 struct DumbCertificateStorer {
328 certificates: RwLock<Vec<Certificate>>,
329 genesis_certificate: Option<Certificate>,
330 }
331
332 impl DumbCertificateStorer {
333 fn new(genesis: Certificate, already_stored: Vec<Certificate>) -> Self {
334 Self {
335 certificates: RwLock::new(already_stored),
336 genesis_certificate: Some(genesis),
337 }
338 }
339
340 fn stored_certificates(&self) -> Vec<Certificate> {
341 self.certificates.read().unwrap().clone()
342 }
343 }
344
345 #[async_trait]
346 impl SynchronizedCertificateStorer for DumbCertificateStorer {
347 async fn insert_or_replace_many(
348 &self,
349 certificates_chain: Vec<Certificate>,
350 ) -> StdResult<()> {
351 let mut certificates = self.certificates.write().unwrap();
352 *certificates = certificates_chain;
353 Ok(())
354 }
355
356 async fn get_latest_genesis(&self) -> StdResult<Option<Certificate>> {
357 Ok(self.genesis_certificate.clone())
358 }
359 }
360
361 mod check_sync_state {
362 use super::*;
363
364 #[test]
365 fn sync_state_should_sync() {
366 assert!(SyncStatus::Forced.should_sync());
367 assert!(!SyncStatus::RemoteGenesisMatchesLocalGenesis.should_sync());
368 assert!(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis.should_sync());
369 assert!(SyncStatus::NoLocalGenesis.should_sync());
370 }
371
372 #[tokio::test]
373 async fn state_when_force_true() {
374 let synchronizer = MithrilCertificateChainSynchronizer::default_for_test();
375
376 let sync_state = synchronizer.check_sync_state(true).await.unwrap();
377 assert_eq!(SyncStatus::Forced, sync_state);
378 }
379
380 #[tokio::test]
381 async fn state_when_force_false_and_no_local_genesis_certificate_found() {
382 let synchronizer = mocked_synchronizer!(with_local_genesis: Ok(None));
383
384 let sync_state = synchronizer.check_sync_state(false).await.unwrap();
385 assert_eq!(SyncStatus::NoLocalGenesis, sync_state);
386 }
387
388 #[tokio::test]
389 async fn state_when_force_false_and_remote_genesis_dont_matches_local_genesis() {
390 let synchronizer = mocked_synchronizer!(
391 with_remote_genesis: Ok(Some(fake_data::genesis_certificate("remote_genesis"))),
392 with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis")))
393 );
394
395 let sync_state = synchronizer.check_sync_state(false).await.unwrap();
396 assert_eq!(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis, sync_state);
397 }
398
399 #[tokio::test]
400 async fn state_when_force_false_and_remote_genesis_matches_local_genesis() {
401 let remote_genesis = fake_data::genesis_certificate("genesis");
402 let local_genesis = remote_genesis.clone();
403 let synchronizer = mocked_synchronizer!(
404 with_remote_genesis: Ok(Some(remote_genesis)),
405 with_local_genesis: Ok(Some(local_genesis))
406 );
407
408 let sync_state = synchronizer.check_sync_state(false).await.unwrap();
409 assert_eq!(SyncStatus::RemoteGenesisMatchesLocalGenesis, sync_state);
410 }
411
412 #[tokio::test]
413 async fn if_force_true_it_should_not_fetch_remote_genesis_certificate() {
414 let synchronizer = mocked_synchronizer!(with_remote_genesis: Err(anyhow!(
415 "should not fetch genesis"
416 )));
417
418 synchronizer.check_sync_state(true).await.unwrap();
419 }
420
421 #[tokio::test]
422 async fn should_abort_with_error_if_force_false_and_fails_to_retrieve_local_genesis() {
423 let synchronizer = mocked_synchronizer!(with_local_genesis: Err(anyhow!("failure")));
424 synchronizer
425 .check_sync_state(false)
426 .await
427 .expect_err("Expected an error but was:");
428 }
429
430 #[tokio::test]
431 async fn should_abort_with_error_if_force_false_and_fails_to_retrieve_remote_genesis() {
432 let synchronizer = mocked_synchronizer!(
433 with_remote_genesis: Err(anyhow!("failure")),
434 with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis")))
435 );
436 synchronizer
437 .check_sync_state(false)
438 .await
439 .expect_err("Expected an error but was:");
440 }
441
442 #[tokio::test]
443 async fn should_abort_with_error_if_force_false_and_remote_genesis_is_none() {
444 let synchronizer = mocked_synchronizer!(
445 with_remote_genesis: Ok(None),
446 with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis")))
447 );
448 let error = synchronizer
449 .check_sync_state(false)
450 .await
451 .expect_err("Expected an error but was:");
452
453 assert!(
454 error
455 .to_string()
456 .contains("Remote aggregator doesn't have a chain yet"),
457 "Unexpected error:\n{error:?}"
458 );
459 }
460 }
461
462 mod retrieve_validate_remote_certificate_chain {
463 use mockall::predicate::{always, eq};
464
465 use mithril_common::entities::Epoch;
466
467 use super::*;
468
469 #[tokio::test]
470 async fn succeed_if_the_remote_chain_only_contains_a_genesis_certificate() {
471 let chain = CertificateChainBuilder::new().with_total_certificates(1).build();
472 let synchronizer = MithrilCertificateChainSynchronizer {
473 certificate_verifier: fake_verifier(&chain),
474 genesis_verifier: Arc::new(chain.genesis_verifier.clone()),
475 ..MithrilCertificateChainSynchronizer::default_for_test()
476 };
477
478 let starting_point = chain[0].clone();
479 let remote_certificate_chain = synchronizer
480 .retrieve_and_validate_remote_certificate_chain(starting_point)
481 .await
482 .unwrap();
483
484 assert_eq!(remote_certificate_chain, chain.certificates_chained);
485 }
486
487 #[tokio::test]
488 async fn abort_with_error_if_a_certificate_is_invalid() {
489 let synchronizer = mocked_synchronizer!(with_verify_certificate_result: Err(anyhow!("invalid certificate")));
490
491 let starting_point = fake_data::certificate("certificate");
492 synchronizer
493 .retrieve_and_validate_remote_certificate_chain(starting_point)
494 .await
495 .expect_err("Expected an error but was:");
496 }
497
498 #[tokio::test]
499 async fn succeed_with_a_valid_certificate_chain_and_only_get_first_certificate_of_each_epoch_plus_genesis()
500 {
501 let chain = CertificateChainBuilder::new()
505 .with_total_certificates(9)
506 .with_certificates_per_epoch(2)
507 .build();
508 let synchronizer = MithrilCertificateChainSynchronizer {
509 certificate_verifier: fake_verifier(&chain),
510 genesis_verifier: Arc::new(chain.genesis_verifier.clone()),
511 ..MithrilCertificateChainSynchronizer::default_for_test()
512 };
513
514 let starting_point = chain[0].clone();
515 let remote_certificate_chain = synchronizer
516 .retrieve_and_validate_remote_certificate_chain(starting_point.clone())
517 .await
518 .unwrap();
519
520 let mut expected = chain.certificate_path_to_genesis(&starting_point.hash);
521 expected.reverse();
523 expected.pop();
525 assert_eq!(remote_certificate_chain, expected);
526 }
527
528 #[tokio::test]
529 async fn return_chain_ordered_from_genesis_to_latest() {
530 let base_certificate = fake_data::certificate("whatever");
531 let chain = vec![
532 Certificate {
533 epoch: Epoch(2),
534 ..fake_data::genesis_certificate("genesis")
535 },
536 Certificate {
537 epoch: Epoch(3),
538 hash: "hash1".to_string(),
539 previous_hash: "genesis".to_string(),
540 ..base_certificate.clone()
541 },
542 Certificate {
543 epoch: Epoch(4),
544 hash: "hash2".to_string(),
545 previous_hash: "hash1".to_string(),
546 ..base_certificate
547 },
548 ];
549 let synchronizer = MithrilCertificateChainSynchronizer {
550 certificate_verifier: MockBuilder::<MockCertificateVerifier>::configure(|mock| {
551 let cert_1 = chain[1].clone();
552 mock.expect_verify_certificate()
553 .with(eq(chain[2].clone()), always())
554 .return_once(move |_, _| Ok(Some(cert_1)));
555 let genesis = chain[0].clone();
556 mock.expect_verify_certificate()
557 .with(eq(chain[1].clone()), always())
558 .return_once(move |_, _| Ok(Some(genesis)));
559 mock.expect_verify_certificate()
560 .with(eq(chain[0].clone()), always())
561 .return_once(move |_, _| Ok(None));
562 }),
563 ..MithrilCertificateChainSynchronizer::default_for_test()
564 };
565
566 let starting_point = chain[2].clone();
567 let remote_certificate_chain = synchronizer
568 .retrieve_and_validate_remote_certificate_chain(starting_point.clone())
569 .await
570 .unwrap();
571
572 assert_eq!(
573 remote_certificate_chain
574 .into_iter()
575 .map(|c| c.hash)
576 .collect::<Vec<_>>(),
577 vec!["genesis".to_string(), "hash1".to_string(), "hash2".to_string()]
578 );
579 }
580 }
581
582 mod store_remote_certificate_chain {
583 use super::*;
584
585 #[tokio::test]
586 async fn do_store_given_certificates() {
587 let certificates_chain = vec![
588 fake_data::genesis_certificate("genesis"),
589 fake_data::certificate("certificate1"),
590 fake_data::certificate("certificate2"),
591 ];
592 let storer = Arc::new(DumbCertificateStorer::default());
593 let synchronizer = MithrilCertificateChainSynchronizer {
594 certificate_storer: storer.clone(),
595 ..MithrilCertificateChainSynchronizer::default_for_test()
596 };
597
598 assert_eq!(Vec::<Certificate>::new(), storer.stored_certificates());
599
600 synchronizer
601 .store_certificate_chain(certificates_chain.clone())
602 .await
603 .unwrap();
604
605 assert_eq!(certificates_chain, storer.stored_certificates());
606 }
607
608 #[tokio::test]
609 async fn fail_on_storer_error() {
610 let synchronizer = MithrilCertificateChainSynchronizer {
611 certificate_storer: MockBuilder::<MockSynchronizedCertificateStorer>::configure(
612 |mock| {
613 mock.expect_insert_or_replace_many()
614 .return_once(move |_| Err(anyhow!("failure")));
615 },
616 ),
617 ..MithrilCertificateChainSynchronizer::default_for_test()
618 };
619
620 synchronizer
621 .store_certificate_chain(vec![fake_data::certificate("certificate")])
622 .await
623 .unwrap_err();
624 }
625 }
626
627 mod synchronize_certificate_chain {
628 use mockall::predicate::function;
629
630 use super::*;
631
632 fn build_synchronizer(
633 remote_chain: &CertificateChainFixture,
634 storer: Arc<dyn SynchronizedCertificateStorer>,
635 ) -> MithrilCertificateChainSynchronizer {
636 MithrilCertificateChainSynchronizer {
637 certificate_storer: storer.clone(),
638 remote_certificate_retriever:
639 MockBuilder::<MockRemoteCertificateRetriever>::configure(|mock| {
640 let genesis = remote_chain.genesis_certificate().clone();
641 mock.expect_get_genesis_certificate_details()
642 .return_once(move || Ok(Some(genesis)));
643 let latest = remote_chain.latest_certificate().clone();
644 mock.expect_get_latest_certificate_details()
645 .return_once(move || Ok(Some(latest)));
646 }),
647 certificate_verifier: fake_verifier(remote_chain),
648 open_message_storer: MockBuilder::<MockOpenMessageStorer>::configure(|mock| {
649 let expected_msd_epoch = remote_chain.latest_certificate().epoch;
651 mock.expect_insert_or_replace_open_message()
652 .with(function(move |open_message: &OpenMessage| {
653 open_message.signed_entity_type
654 == SignedEntityType::MithrilStakeDistribution(expected_msd_epoch)
655 }))
656 .times(1..)
657 .returning(|_| Ok(()));
658 }),
659 ..MithrilCertificateChainSynchronizer::default_for_test()
660 }
661 }
662
663 #[tokio::test]
664 async fn store_all() {
665 let remote_chain = CertificateChainBuilder::default()
666 .with_certificates_per_epoch(3)
667 .with_total_certificates(8)
668 .build();
669 let storer = Arc::new(DumbCertificateStorer::default());
670 let synchronizer = build_synchronizer(&remote_chain, storer.clone());
671
672 synchronizer.synchronize_certificate_chain(false).await.unwrap();
674
675 let mut expected =
676 remote_chain.certificate_path_to_genesis(&remote_chain.latest_certificate().hash);
677 expected.reverse();
678 assert_eq!(expected, storer.stored_certificates());
679 }
680
681 #[tokio::test]
682 async fn store_partial() {
683 let remote_chain = CertificateChainBuilder::default()
684 .with_certificates_per_epoch(1)
685 .with_total_certificates(8)
686 .build();
687 let existing_certificates =
688 remote_chain.certificate_path_to_genesis(&remote_chain[5].hash);
689 let storer = Arc::new(DumbCertificateStorer::new(
690 remote_chain.genesis_certificate().clone(),
691 existing_certificates.clone(),
692 ));
693 let synchronizer = build_synchronizer(&remote_chain, storer.clone());
694
695 synchronizer.synchronize_certificate_chain(false).await.unwrap();
697
698 assert_eq!(&existing_certificates, &storer.stored_certificates());
699
700 synchronizer.synchronize_certificate_chain(true).await.unwrap();
702
703 let mut expected =
704 remote_chain.certificate_path_to_genesis(&remote_chain.latest_certificate().hash);
705 expected.reverse();
706 assert_eq!(expected, storer.stored_certificates());
707 }
708 }
709}