mithril_aggregator/services/certificate_chain_synchronizer/
synchronizer_service.rs1use anyhow::{Context, anyhow};
23use async_trait::async_trait;
24use chrono::Utc;
25use slog::{Logger, debug, info};
26use std::collections::VecDeque;
27use std::sync::Arc;
28
29use mithril_common::StdResult;
30use mithril_common::certificate_chain::CertificateVerifier;
31use mithril_common::crypto_helper::ProtocolGenesisVerifier;
32use mithril_common::entities::{Certificate, SignedEntityType};
33use mithril_common::logging::LoggerExtensions;
34
35use crate::entities::OpenMessage;
36
37use super::{
38 CertificateChainSynchronizer, OpenMessageStorer, RemoteCertificateRetriever,
39 SynchronizedCertificateStorer,
40};
41
42pub struct MithrilCertificateChainSynchronizer {
44 remote_certificate_retriever: Arc<dyn RemoteCertificateRetriever>,
45 certificate_storer: Arc<dyn SynchronizedCertificateStorer>,
46 certificate_verifier: Arc<dyn CertificateVerifier>,
47 genesis_verifier: Arc<ProtocolGenesisVerifier>,
48 open_message_storer: Arc<dyn OpenMessageStorer>,
49 logger: Logger,
50}
51
52#[derive(Debug, Copy, Clone, PartialEq, Eq)]
53enum SyncStatus {
54 Forced,
55 NoLocalGenesis,
56 RemoteGenesisMatchesLocalGenesis,
57 RemoteGenesisDoesntMatchLocalGenesis,
58}
59
60impl SyncStatus {
61 fn should_sync(&self) -> bool {
62 match self {
63 SyncStatus::Forced => true,
64 SyncStatus::NoLocalGenesis => true,
65 SyncStatus::RemoteGenesisMatchesLocalGenesis => false,
66 SyncStatus::RemoteGenesisDoesntMatchLocalGenesis => true,
67 }
68 }
69}
70
71impl MithrilCertificateChainSynchronizer {
72 pub fn new(
74 remote_certificate_retriever: Arc<dyn RemoteCertificateRetriever>,
75 certificate_storer: Arc<dyn SynchronizedCertificateStorer>,
76 certificate_verifier: Arc<dyn CertificateVerifier>,
77 genesis_verifier: Arc<ProtocolGenesisVerifier>,
78 open_message_storer: Arc<dyn OpenMessageStorer>,
79 logger: Logger,
80 ) -> Self {
81 Self {
82 remote_certificate_retriever,
83 certificate_storer,
84 certificate_verifier,
85 genesis_verifier,
86 open_message_storer,
87 logger: logger.new_with_component_name::<Self>(),
88 }
89 }
90
91 async fn check_sync_state(&self, force: bool) -> StdResult<SyncStatus> {
92 if force {
93 return Ok(SyncStatus::Forced);
94 }
95
96 match self.certificate_storer.get_latest_genesis().await? {
97 Some(local_genesis) => {
98 match self
99 .remote_certificate_retriever
100 .get_genesis_certificate_details()
101 .await?
102 {
103 Some(remote_genesis) if (local_genesis == remote_genesis) => {
104 Ok(SyncStatus::RemoteGenesisMatchesLocalGenesis)
105 }
106 Some(_) => Ok(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis),
107 None => Err(anyhow!("Remote aggregator doesn't have a chain yet")),
109 }
110 }
111 None => Ok(SyncStatus::NoLocalGenesis),
112 }
113 }
114
115 async fn retrieve_and_validate_remote_certificate_chain(
116 &self,
117 starting_point: Certificate,
118 ) -> StdResult<Vec<Certificate>> {
119 let mut validated_certificates = VecDeque::new();
122 let mut certificate = starting_point;
123
124 loop {
125 let parent_certificate = self
126 .certificate_verifier
127 .verify_certificate(&certificate, &self.genesis_verifier.to_verification_key())
128 .await
129 .with_context(
130 || format!("Failed to verify certificate: `{}`", certificate.hash,),
131 )?;
132
133 match parent_certificate {
134 None => {
135 validated_certificates.push_front(certificate);
136 break;
137 }
138 Some(parent) => {
139 if !validated_certificates.is_empty() || parent.epoch != certificate.epoch {
142 validated_certificates.push_front(certificate);
143 }
144
145 certificate = parent;
146 }
147 }
148 }
149
150 Ok(validated_certificates.into())
151 }
152
153 async fn store_certificate_chain(&self, certificate_chain: Vec<Certificate>) -> StdResult<()> {
154 self.certificate_storer
155 .insert_or_replace_many(certificate_chain)
156 .await?;
157 Ok(())
158 }
159}
160
161#[async_trait]
162impl CertificateChainSynchronizer for MithrilCertificateChainSynchronizer {
163 async fn synchronize_certificate_chain(&self, force: bool) -> StdResult<()> {
164 debug!(self.logger, ">> synchronize_certificate_chain"; "force" => force);
165
166 let sync_state = self.check_sync_state(force).await.with_context(|| {
167 format!("Failed to check if certificate chain should be sync (force: `{force}`)")
168 })?;
169 if sync_state.should_sync() {
170 info!(self.logger, "Start synchronizing certificate chain"; "sync_state" => ?sync_state);
171 } else {
172 info!(self.logger, "No need to synchronize certificate chain"; "sync_state" => ?sync_state);
173 return Ok(());
174 }
175
176 let starting_point = self
177 .remote_certificate_retriever
178 .get_latest_certificate_details()
179 .await?
180 .with_context(|| "Failed to retrieve latest remote certificate details")
181 .with_context(|| "Remote aggregator doesn't have a chain yet")?;
182 let remote_certificate_chain = self
183 .retrieve_and_validate_remote_certificate_chain(starting_point)
184 .await
185 .with_context(|| "Failed to retrieve and validate remote certificate chain")?;
186 let open_message = prepare_open_message_to_store(
187 remote_certificate_chain
188 .last()
189 .with_context(|| "Retrieved certificate chain is empty")?,
190 );
191 self.store_certificate_chain(remote_certificate_chain)
192 .await
193 .with_context(|| "Failed to store remote retrieved certificate chain")?;
194 self.open_message_storer
195 .insert_or_replace_open_message(open_message)
196 .await
197 .with_context(|| "Failed to store open message when synchronizing certificate chain")?;
198
199 info!(
200 self.logger,
201 "Certificate chain synchronized with remote source"
202 );
203 Ok(())
204 }
205}
206
207fn prepare_open_message_to_store(latest_certificate: &Certificate) -> OpenMessage {
208 OpenMessage {
209 epoch: latest_certificate.epoch,
210 signed_entity_type: SignedEntityType::MithrilStakeDistribution(latest_certificate.epoch),
211 protocol_message: latest_certificate.protocol_message.clone(),
212 is_certified: true,
213 is_expired: false,
214 single_signatures: Vec::new(),
215 created_at: Utc::now(),
216 expires_at: None,
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use anyhow::anyhow;
223 use std::sync::RwLock;
224
225 use mithril_common::certificate_chain::MithrilCertificateVerifier;
226 use mithril_common::test::{
227 builder::{CertificateChainBuilder, CertificateChainFixture},
228 double::{FakeCertificaterRetriever, fake_data, fake_keys},
229 mock_extensions::MockBuilder,
230 };
231
232 use crate::services::{
233 MockOpenMessageStorer, MockRemoteCertificateRetriever, MockSynchronizedCertificateStorer,
234 };
235 use crate::test::TestLogger;
236 use crate::test::double::mocks::MockCertificateVerifier;
237
238 use super::*;
239
240 impl MithrilCertificateChainSynchronizer {
241 fn default_for_test() -> Self {
242 let genesis_verification_key =
243 fake_keys::genesis_verification_key()[0].try_into().unwrap();
244 Self::new(
245 Arc::new(MockRemoteCertificateRetriever::new()),
246 Arc::new(MockSynchronizedCertificateStorer::new()),
247 Arc::new(MockCertificateVerifier::new()),
248 Arc::new(ProtocolGenesisVerifier::from_verification_key(
249 genesis_verification_key,
250 )),
251 Arc::new(MockOpenMessageStorer::new()),
252 TestLogger::stdout(),
253 )
254 }
255 }
256
257 macro_rules! mocked_synchronizer {
258 (with_remote_genesis: $remote_genesis_result:expr) => {
259 MithrilCertificateChainSynchronizer {
260 remote_certificate_retriever:
261 MockBuilder::<MockRemoteCertificateRetriever>::configure(|retriever| {
262 retriever
263 .expect_get_genesis_certificate_details()
264 .return_once(move || $remote_genesis_result);
265 }),
266 ..MithrilCertificateChainSynchronizer::default_for_test()
267 }
268 };
269 (with_local_genesis: $local_genesis_result:expr) => {
270 MithrilCertificateChainSynchronizer {
271 certificate_storer: MockBuilder::<MockSynchronizedCertificateStorer>::configure(
272 |storer| {
273 storer
274 .expect_get_latest_genesis()
275 .return_once(move || $local_genesis_result);
276 },
277 ),
278 ..MithrilCertificateChainSynchronizer::default_for_test()
279 }
280 };
281 (with_remote_genesis: $remote_genesis_result:expr, with_local_genesis: $local_genesis_result:expr) => {
282 MithrilCertificateChainSynchronizer {
283 remote_certificate_retriever:
284 MockBuilder::<MockRemoteCertificateRetriever>::configure(|retriever| {
285 retriever
286 .expect_get_genesis_certificate_details()
287 .return_once(move || $remote_genesis_result);
288 }),
289 certificate_storer: MockBuilder::<MockSynchronizedCertificateStorer>::configure(
290 |storer| {
291 storer
292 .expect_get_latest_genesis()
293 .return_once(move || $local_genesis_result);
294 },
295 ),
296 ..MithrilCertificateChainSynchronizer::default_for_test()
297 }
298 };
299 (with_verify_certificate_result: $verify_certificate_result:expr) => {
300 MithrilCertificateChainSynchronizer {
301 certificate_verifier: MockBuilder::<MockCertificateVerifier>::configure(
302 |verifier| {
303 verifier
304 .expect_verify_certificate()
305 .return_once(move |_, _| $verify_certificate_result);
306 },
307 ),
308 ..MithrilCertificateChainSynchronizer::default_for_test()
309 }
310 };
311 }
312
313 fn fake_verifier(remote_certificate_chain: &[Certificate]) -> Arc<dyn CertificateVerifier> {
314 let verifier = MithrilCertificateVerifier::new(
315 TestLogger::stdout(),
316 Arc::new(FakeCertificaterRetriever::from_certificates(
317 remote_certificate_chain,
318 )),
319 );
320 Arc::new(verifier)
321 }
322
323 #[derive(Default)]
324 struct DumbCertificateStorer {
325 certificates: RwLock<Vec<Certificate>>,
326 genesis_certificate: Option<Certificate>,
327 }
328
329 impl DumbCertificateStorer {
330 fn new(genesis: Certificate, already_stored: Vec<Certificate>) -> Self {
331 Self {
332 certificates: RwLock::new(already_stored),
333 genesis_certificate: Some(genesis),
334 }
335 }
336
337 fn stored_certificates(&self) -> Vec<Certificate> {
338 self.certificates.read().unwrap().clone()
339 }
340 }
341
342 #[async_trait]
343 impl SynchronizedCertificateStorer for DumbCertificateStorer {
344 async fn insert_or_replace_many(
345 &self,
346 certificates_chain: Vec<Certificate>,
347 ) -> StdResult<()> {
348 let mut certificates = self.certificates.write().unwrap();
349 *certificates = certificates_chain;
350 Ok(())
351 }
352
353 async fn get_latest_genesis(&self) -> StdResult<Option<Certificate>> {
354 Ok(self.genesis_certificate.clone())
355 }
356 }
357
358 mod check_sync_state {
359 use super::*;
360
361 #[test]
362 fn sync_state_should_sync() {
363 assert!(SyncStatus::Forced.should_sync());
364 assert!(!SyncStatus::RemoteGenesisMatchesLocalGenesis.should_sync());
365 assert!(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis.should_sync());
366 assert!(SyncStatus::NoLocalGenesis.should_sync());
367 }
368
369 #[tokio::test]
370 async fn state_when_force_true() {
371 let synchronizer = MithrilCertificateChainSynchronizer::default_for_test();
372
373 let sync_state = synchronizer.check_sync_state(true).await.unwrap();
374 assert_eq!(SyncStatus::Forced, sync_state);
375 }
376
377 #[tokio::test]
378 async fn state_when_force_false_and_no_local_genesis_certificate_found() {
379 let synchronizer = mocked_synchronizer!(with_local_genesis: Ok(None));
380
381 let sync_state = synchronizer.check_sync_state(false).await.unwrap();
382 assert_eq!(SyncStatus::NoLocalGenesis, sync_state);
383 }
384
385 #[tokio::test]
386 async fn state_when_force_false_and_remote_genesis_dont_matches_local_genesis() {
387 let synchronizer = mocked_synchronizer!(
388 with_remote_genesis: Ok(Some(fake_data::genesis_certificate("remote_genesis"))),
389 with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis")))
390 );
391
392 let sync_state = synchronizer.check_sync_state(false).await.unwrap();
393 assert_eq!(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis, sync_state);
394 }
395
396 #[tokio::test]
397 async fn state_when_force_false_and_remote_genesis_matches_local_genesis() {
398 let remote_genesis = fake_data::genesis_certificate("genesis");
399 let local_genesis = remote_genesis.clone();
400 let synchronizer = mocked_synchronizer!(
401 with_remote_genesis: Ok(Some(remote_genesis)),
402 with_local_genesis: Ok(Some(local_genesis))
403 );
404
405 let sync_state = synchronizer.check_sync_state(false).await.unwrap();
406 assert_eq!(SyncStatus::RemoteGenesisMatchesLocalGenesis, sync_state);
407 }
408
409 #[tokio::test]
410 async fn if_force_true_it_should_not_fetch_remote_genesis_certificate() {
411 let synchronizer = mocked_synchronizer!(with_remote_genesis: Err(anyhow!(
412 "should not fetch genesis"
413 )));
414
415 synchronizer.check_sync_state(true).await.unwrap();
416 }
417
418 #[tokio::test]
419 async fn should_abort_with_error_if_force_false_and_fails_to_retrieve_local_genesis() {
420 let synchronizer = mocked_synchronizer!(with_local_genesis: Err(anyhow!("failure")));
421 synchronizer
422 .check_sync_state(false)
423 .await
424 .expect_err("Expected an error but was:");
425 }
426
427 #[tokio::test]
428 async fn should_abort_with_error_if_force_false_and_fails_to_retrieve_remote_genesis() {
429 let synchronizer = mocked_synchronizer!(
430 with_remote_genesis: Err(anyhow!("failure")),
431 with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis")))
432 );
433 synchronizer
434 .check_sync_state(false)
435 .await
436 .expect_err("Expected an error but was:");
437 }
438
439 #[tokio::test]
440 async fn should_abort_with_error_if_force_false_and_remote_genesis_is_none() {
441 let synchronizer = mocked_synchronizer!(
442 with_remote_genesis: Ok(None),
443 with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis")))
444 );
445 let error = synchronizer
446 .check_sync_state(false)
447 .await
448 .expect_err("Expected an error but was:");
449
450 assert!(
451 error
452 .to_string()
453 .contains("Remote aggregator doesn't have a chain yet"),
454 "Unexpected error:\n{error:?}"
455 );
456 }
457 }
458
459 mod retrieve_validate_remote_certificate_chain {
460 use mockall::predicate::{always, eq};
461
462 use mithril_common::entities::Epoch;
463
464 use super::*;
465
466 #[tokio::test]
467 async fn succeed_if_the_remote_chain_only_contains_a_genesis_certificate() {
468 let chain = CertificateChainBuilder::new().with_total_certificates(1).build();
469 let synchronizer = MithrilCertificateChainSynchronizer {
470 certificate_verifier: fake_verifier(&chain),
471 genesis_verifier: Arc::new(chain.genesis_verifier.clone()),
472 ..MithrilCertificateChainSynchronizer::default_for_test()
473 };
474
475 let starting_point = chain[0].clone();
476 let remote_certificate_chain = synchronizer
477 .retrieve_and_validate_remote_certificate_chain(starting_point)
478 .await
479 .unwrap();
480
481 assert_eq!(remote_certificate_chain, chain.certificates_chained);
482 }
483
484 #[tokio::test]
485 async fn abort_with_error_if_a_certificate_is_invalid() {
486 let synchronizer = mocked_synchronizer!(with_verify_certificate_result: Err(anyhow!("invalid certificate")));
487
488 let starting_point = fake_data::certificate("certificate");
489 synchronizer
490 .retrieve_and_validate_remote_certificate_chain(starting_point)
491 .await
492 .expect_err("Expected an error but was:");
493 }
494
495 #[tokio::test]
496 async fn succeed_with_a_valid_certificate_chain_and_only_get_first_certificate_of_each_epoch_plus_genesis()
497 {
498 let chain = CertificateChainBuilder::new()
502 .with_total_certificates(9)
503 .with_certificates_per_epoch(2)
504 .build();
505 let synchronizer = MithrilCertificateChainSynchronizer {
506 certificate_verifier: fake_verifier(&chain),
507 genesis_verifier: Arc::new(chain.genesis_verifier.clone()),
508 ..MithrilCertificateChainSynchronizer::default_for_test()
509 };
510
511 let starting_point = chain[0].clone();
512 let remote_certificate_chain = synchronizer
513 .retrieve_and_validate_remote_certificate_chain(starting_point.clone())
514 .await
515 .unwrap();
516
517 let mut expected = chain.certificate_path_to_genesis(&starting_point.hash);
518 expected.reverse();
520 expected.pop();
522 assert_eq!(remote_certificate_chain, expected);
523 }
524
525 #[tokio::test]
526 async fn return_chain_ordered_from_genesis_to_latest() {
527 let base_certificate = fake_data::certificate("whatever");
528 let chain = [
529 Certificate {
530 epoch: Epoch(2),
531 ..fake_data::genesis_certificate("genesis")
532 },
533 Certificate {
534 epoch: Epoch(3),
535 hash: "hash1".to_string(),
536 previous_hash: "genesis".to_string(),
537 ..base_certificate.clone()
538 },
539 Certificate {
540 epoch: Epoch(4),
541 hash: "hash2".to_string(),
542 previous_hash: "hash1".to_string(),
543 ..base_certificate
544 },
545 ];
546 let synchronizer = MithrilCertificateChainSynchronizer {
547 certificate_verifier: MockBuilder::<MockCertificateVerifier>::configure(|mock| {
548 let cert_1 = chain[1].clone();
549 mock.expect_verify_certificate()
550 .with(eq(chain[2].clone()), always())
551 .return_once(move |_, _| Ok(Some(cert_1)));
552 let genesis = chain[0].clone();
553 mock.expect_verify_certificate()
554 .with(eq(chain[1].clone()), always())
555 .return_once(move |_, _| Ok(Some(genesis)));
556 mock.expect_verify_certificate()
557 .with(eq(chain[0].clone()), always())
558 .return_once(move |_, _| Ok(None));
559 }),
560 ..MithrilCertificateChainSynchronizer::default_for_test()
561 };
562
563 let starting_point = chain[2].clone();
564 let remote_certificate_chain = synchronizer
565 .retrieve_and_validate_remote_certificate_chain(starting_point.clone())
566 .await
567 .unwrap();
568
569 assert_eq!(
570 remote_certificate_chain
571 .into_iter()
572 .map(|c| c.hash)
573 .collect::<Vec<_>>(),
574 vec!["genesis".to_string(), "hash1".to_string(), "hash2".to_string()]
575 );
576 }
577 }
578
579 mod store_remote_certificate_chain {
580 use super::*;
581
582 #[tokio::test]
583 async fn do_store_given_certificates() {
584 let certificates_chain = vec![
585 fake_data::genesis_certificate("genesis"),
586 fake_data::certificate("certificate1"),
587 fake_data::certificate("certificate2"),
588 ];
589 let storer = Arc::new(DumbCertificateStorer::default());
590 let synchronizer = MithrilCertificateChainSynchronizer {
591 certificate_storer: storer.clone(),
592 ..MithrilCertificateChainSynchronizer::default_for_test()
593 };
594
595 assert_eq!(Vec::<Certificate>::new(), storer.stored_certificates());
596
597 synchronizer
598 .store_certificate_chain(certificates_chain.clone())
599 .await
600 .unwrap();
601
602 assert_eq!(certificates_chain, storer.stored_certificates());
603 }
604
605 #[tokio::test]
606 async fn fail_on_storer_error() {
607 let synchronizer = MithrilCertificateChainSynchronizer {
608 certificate_storer: MockBuilder::<MockSynchronizedCertificateStorer>::configure(
609 |mock| {
610 mock.expect_insert_or_replace_many()
611 .return_once(move |_| Err(anyhow!("failure")));
612 },
613 ),
614 ..MithrilCertificateChainSynchronizer::default_for_test()
615 };
616
617 synchronizer
618 .store_certificate_chain(vec![fake_data::certificate("certificate")])
619 .await
620 .unwrap_err();
621 }
622 }
623
624 mod synchronize_certificate_chain {
625 use mockall::predicate::function;
626
627 use super::*;
628
629 fn build_synchronizer(
630 remote_chain: &CertificateChainFixture,
631 storer: Arc<dyn SynchronizedCertificateStorer>,
632 ) -> MithrilCertificateChainSynchronizer {
633 MithrilCertificateChainSynchronizer {
634 certificate_storer: storer.clone(),
635 remote_certificate_retriever:
636 MockBuilder::<MockRemoteCertificateRetriever>::configure(|mock| {
637 let genesis = remote_chain.genesis_certificate().clone();
638 mock.expect_get_genesis_certificate_details()
639 .return_once(move || Ok(Some(genesis)));
640 let latest = remote_chain.latest_certificate().clone();
641 mock.expect_get_latest_certificate_details()
642 .return_once(move || Ok(Some(latest)));
643 }),
644 certificate_verifier: fake_verifier(remote_chain),
645 open_message_storer: MockBuilder::<MockOpenMessageStorer>::configure(|mock| {
646 let expected_msd_epoch = remote_chain.latest_certificate().epoch;
648 mock.expect_insert_or_replace_open_message()
649 .with(function(move |open_message: &OpenMessage| {
650 open_message.signed_entity_type
651 == SignedEntityType::MithrilStakeDistribution(expected_msd_epoch)
652 }))
653 .times(1..)
654 .returning(|_| Ok(()));
655 }),
656 ..MithrilCertificateChainSynchronizer::default_for_test()
657 }
658 }
659
660 #[tokio::test]
661 async fn store_all() {
662 let remote_chain = CertificateChainBuilder::default()
663 .with_certificates_per_epoch(3)
664 .with_total_certificates(8)
665 .build();
666 let storer = Arc::new(DumbCertificateStorer::default());
667 let synchronizer = build_synchronizer(&remote_chain, storer.clone());
668
669 synchronizer.synchronize_certificate_chain(false).await.unwrap();
671
672 let mut expected =
673 remote_chain.certificate_path_to_genesis(&remote_chain.latest_certificate().hash);
674 expected.reverse();
675 assert_eq!(expected, storer.stored_certificates());
676 }
677
678 #[tokio::test]
679 async fn store_partial() {
680 let remote_chain = CertificateChainBuilder::default()
681 .with_certificates_per_epoch(1)
682 .with_total_certificates(8)
683 .build();
684 let existing_certificates =
685 remote_chain.certificate_path_to_genesis(&remote_chain[5].hash);
686 let storer = Arc::new(DumbCertificateStorer::new(
687 remote_chain.genesis_certificate().clone(),
688 existing_certificates.clone(),
689 ));
690 let synchronizer = build_synchronizer(&remote_chain, storer.clone());
691
692 synchronizer.synchronize_certificate_chain(false).await.unwrap();
694
695 assert_eq!(&existing_certificates, &storer.stored_certificates());
696
697 synchronizer.synchronize_certificate_chain(true).await.unwrap();
699
700 let mut expected =
701 remote_chain.certificate_path_to_genesis(&remote_chain.latest_certificate().hash);
702 expected.reverse();
703 assert_eq!(expected, storer.stored_certificates());
704 }
705 }
706}