mithril_aggregator/services/certifier/
buffered_certifier.rs

1use async_trait::async_trait;
2use slog::{debug, trace, warn, Logger};
3use std::sync::Arc;
4
5use mithril_common::entities::{
6    Certificate, Epoch, ProtocolMessage, SignedEntityType, SignedEntityTypeDiscriminants,
7    SingleSignatures,
8};
9use mithril_common::logging::LoggerExtensions;
10use mithril_common::StdResult;
11
12use crate::entities::OpenMessage;
13use crate::services::{
14    BufferedSingleSignatureStore, CertifierService, CertifierServiceError,
15    SignatureRegistrationStatus,
16};
17
18/// A decorator of [CertifierService] that can buffer registration of single signatures
19/// when the open message is not yet created.
20///
21/// When an open message is created, buffered single signatures for the open message type are
22/// registered.
23pub struct BufferedCertifierService {
24    certifier_service: Arc<dyn CertifierService>,
25    buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
26    logger: Logger,
27}
28
29impl BufferedCertifierService {
30    /// Create a new instance of `BufferedCertifierService`.
31    pub fn new(
32        certifier_service: Arc<dyn CertifierService>,
33        buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
34        logger: Logger,
35    ) -> Self {
36        Self {
37            certifier_service,
38            buffered_single_signature_store,
39            logger: logger.new_with_component_name::<Self>(),
40        }
41    }
42
43    async fn try_register_buffered_signatures_to_current_open_message(
44        &self,
45        signed_entity_type: &SignedEntityType,
46    ) -> StdResult<()> {
47        let discriminant: SignedEntityTypeDiscriminants = signed_entity_type.into();
48        let buffered_signatures = self
49            .buffered_single_signature_store
50            .get_buffered_signatures(discriminant)
51            .await?;
52        let mut signatures_to_remove = vec![];
53
54        for signature in buffered_signatures {
55            match self
56                .certifier_service
57                .register_single_signature(signed_entity_type, &signature)
58                .await
59            {
60                Ok(..) => {
61                    signatures_to_remove.push(signature);
62                }
63                Err(error) => match error.downcast_ref::<CertifierServiceError>() {
64                    Some(CertifierServiceError::InvalidSingleSignature(..)) => {
65                        trace!(self.logger, "Skipping invalid signature for signed entity '{signed_entity_type:?}'";
66                            "party_id" => &signature.party_id,
67                            "error" => ?error,
68                        );
69                    }
70                    _ => {
71                        anyhow::bail!(error);
72                    }
73                },
74            }
75        }
76
77        self.buffered_single_signature_store
78            .remove_buffered_signatures(discriminant, signatures_to_remove)
79            .await?;
80
81        Ok(())
82    }
83}
84
85#[async_trait]
86impl CertifierService for BufferedCertifierService {
87    async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()> {
88        self.certifier_service.inform_epoch(epoch).await
89    }
90
91    async fn register_single_signature(
92        &self,
93        signed_entity_type: &SignedEntityType,
94        signature: &SingleSignatures,
95    ) -> StdResult<SignatureRegistrationStatus> {
96        match self
97            .certifier_service
98            .register_single_signature(signed_entity_type, signature)
99            .await
100        {
101            Ok(res) => Ok(res),
102            Err(error) => match error.downcast_ref::<CertifierServiceError>() {
103                Some(CertifierServiceError::NotFound(..)) if signature.is_authenticated() => {
104                    debug!(
105                        self.logger, "No OpenMessage available for signed entity - Buffering single signature";
106                        "signed_entity_type" => ?signed_entity_type,
107                        "party_id" => &signature.party_id
108                    );
109
110                    self.buffered_single_signature_store
111                        .buffer_signature(signed_entity_type.into(), signature)
112                        .await?;
113
114                    Ok(SignatureRegistrationStatus::Buffered)
115                }
116                _ => Err(error),
117            },
118        }
119    }
120
121    async fn create_open_message(
122        &self,
123        signed_entity_type: &SignedEntityType,
124        protocol_message: &ProtocolMessage,
125    ) -> StdResult<OpenMessage> {
126        // IMPORTANT: this method should not fail if the open message creation succeeds
127        // Otherwise, the state machine won't aggregate signatures for this open message.
128
129        let creation_result = self
130            .certifier_service
131            .create_open_message(signed_entity_type, protocol_message)
132            .await;
133
134        if creation_result.is_ok() {
135            if let Err(error) = self
136                .try_register_buffered_signatures_to_current_open_message(signed_entity_type)
137                .await
138            {
139                warn!(self.logger, "Failed to register buffered signatures to the new open message";
140                    "signed_entity_type" => ?signed_entity_type,
141                    "error" => ?error
142                );
143            }
144        }
145
146        creation_result
147    }
148
149    async fn get_open_message(
150        &self,
151        signed_entity_type: &SignedEntityType,
152    ) -> StdResult<Option<OpenMessage>> {
153        self.certifier_service
154            .get_open_message(signed_entity_type)
155            .await
156    }
157
158    async fn mark_open_message_if_expired(
159        &self,
160        signed_entity_type: &SignedEntityType,
161    ) -> StdResult<Option<OpenMessage>> {
162        self.certifier_service
163            .mark_open_message_if_expired(signed_entity_type)
164            .await
165    }
166
167    async fn create_certificate(
168        &self,
169        signed_entity_type: &SignedEntityType,
170    ) -> StdResult<Option<Certificate>> {
171        self.certifier_service
172            .create_certificate(signed_entity_type)
173            .await
174    }
175
176    async fn get_certificate_by_hash(&self, hash: &str) -> StdResult<Option<Certificate>> {
177        self.certifier_service.get_certificate_by_hash(hash).await
178    }
179
180    async fn get_latest_certificates(&self, last_n: usize) -> StdResult<Vec<Certificate>> {
181        self.certifier_service.get_latest_certificates(last_n).await
182    }
183
184    async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()> {
185        self.certifier_service.verify_certificate_chain(epoch).await
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use anyhow::anyhow;
192    use mockall::predicate::eq;
193
194    use mithril_common::entities::SignedEntityTypeDiscriminants::{
195        CardanoTransactions, MithrilStakeDistribution,
196    };
197    use mithril_common::entities::SingleSignatureAuthenticationStatus;
198    use mithril_common::test_utils::fake_data;
199
200    use crate::database::repository::BufferedSingleSignatureRepository;
201    use crate::database::test_helper::main_db_connection;
202    use crate::services::{
203        CertifierServiceError, MockBufferedSingleSignatureStore, MockCertifierService,
204    };
205    use crate::test_tools::TestLogger;
206
207    use super::*;
208
209    fn mock_certifier(
210        certifier_mock_config: impl FnOnce(&mut MockCertifierService),
211    ) -> Arc<MockCertifierService> {
212        let mut certifier = MockCertifierService::new();
213        certifier_mock_config(&mut certifier);
214        Arc::new(certifier)
215    }
216
217    fn mock_store<F>(store_mock_config: F) -> Arc<MockBufferedSingleSignatureStore>
218    where
219        F: FnOnce(&mut MockBufferedSingleSignatureStore),
220    {
221        let mut store = MockBufferedSingleSignatureStore::new();
222        store_mock_config(&mut store);
223        Arc::new(store)
224    }
225
226    /// Run a scenario where we try to register a signature (using a fixed signed entity type).
227    ///
228    /// Return the registration result and the list of buffered signatures after the registration.
229    async fn run_register_signature_scenario(
230        decorated_certifier_mock_config: impl FnOnce(&mut MockCertifierService),
231        signature_to_register: &SingleSignatures,
232    ) -> (
233        StdResult<SignatureRegistrationStatus>,
234        Vec<SingleSignatures>,
235    ) {
236        let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
237            main_db_connection().unwrap(),
238        )));
239        let certifier = BufferedCertifierService::new(
240            mock_certifier(decorated_certifier_mock_config),
241            store.clone(),
242            TestLogger::stdout(),
243        );
244
245        let registration_result = certifier
246            .register_single_signature(
247                &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
248                signature_to_register,
249            )
250            .await;
251
252        let buffered_signatures = store
253            .get_buffered_signatures(MithrilStakeDistribution)
254            .await
255            .unwrap();
256
257        (registration_result, buffered_signatures)
258    }
259
260    #[tokio::test]
261    async fn when_registering_single_signature_dont_buffer_signature_if_decorated_certifier_succeed(
262    ) {
263        let (registration_result, buffered_signatures_after_registration) =
264            run_register_signature_scenario(
265                |mock_certifier| {
266                    mock_certifier
267                        .expect_register_single_signature()
268                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
269                },
270                &SingleSignatures::fake("party_1", "a message"),
271            )
272            .await;
273
274        let status = registration_result.expect("Registration should have succeed");
275        assert_eq!(status, SignatureRegistrationStatus::Registered);
276        assert_eq!(
277            buffered_signatures_after_registration,
278            Vec::<SingleSignatures>::new()
279        );
280    }
281
282    mod when_registering_single_signature_if_decorated_certifier_as_no_opened_message {
283        use super::*;
284
285        #[tokio::test]
286        async fn buffer_signature_if_authenticated() {
287            let (registration_result, buffered_signatures_after_registration) =
288                run_register_signature_scenario(
289                    |mock_certifier| {
290                        mock_certifier
291                            .expect_register_single_signature()
292                            .returning(|_, _| {
293                                Err(CertifierServiceError::NotFound(
294                                    SignedEntityType::MithrilStakeDistribution(Epoch(5)),
295                                )
296                                .into())
297                            });
298                    },
299                    &SingleSignatures {
300                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
301                        ..SingleSignatures::fake("party_1", "a message")
302                    },
303                )
304                .await;
305
306            let status = registration_result.expect("Registration should have succeed");
307            assert_eq!(status, SignatureRegistrationStatus::Buffered);
308            assert_eq!(
309                buffered_signatures_after_registration,
310                vec![SingleSignatures::fake("party_1", "a message")]
311            );
312        }
313
314        #[tokio::test]
315        async fn dont_buffer_signature_if_not_authenticated() {
316            let (registration_result, buffered_signatures_after_registration) =
317                run_register_signature_scenario(
318                    |mock_certifier| {
319                        mock_certifier
320                            .expect_register_single_signature()
321                            .returning(|_, _| {
322                                Err(CertifierServiceError::NotFound(
323                                    SignedEntityType::MithrilStakeDistribution(Epoch(5)),
324                                )
325                                .into())
326                            });
327                    },
328                    &SingleSignatures {
329                        authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated,
330                        ..SingleSignatures::fake("party_1", "a message")
331                    },
332                )
333                .await;
334
335            registration_result.expect_err("Registration should have failed");
336            assert_eq!(
337                buffered_signatures_after_registration,
338                Vec::<SingleSignatures>::new()
339            );
340        }
341    }
342
343    #[tokio::test]
344    async fn buffered_signatures_are_moved_to_newly_opened_message() {
345        let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
346            main_db_connection().unwrap(),
347        )));
348        for (signed_type, signature) in [
349            (
350                MithrilStakeDistribution,
351                SingleSignatures::fake("party_1", "message 1"),
352            ),
353            (
354                MithrilStakeDistribution,
355                SingleSignatures::fake("party_2", "message 2"),
356            ),
357            (
358                CardanoTransactions,
359                SingleSignatures::fake("party_3", "message 3"),
360            ),
361        ] {
362            store
363                .buffer_signature(signed_type, &signature)
364                .await
365                .unwrap();
366        }
367
368        let certifier = BufferedCertifierService::new(
369            mock_certifier(|mock| {
370                mock.expect_create_open_message()
371                    .returning(|_, _| Ok(OpenMessage::dummy()));
372
373                // Those configuration Asserts that the buffered signatures are registered
374                mock.expect_register_single_signature()
375                    .with(
376                        eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
377                        eq(SingleSignatures::fake("party_1", "message 1")),
378                    )
379                    .once()
380                    .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
381                mock.expect_register_single_signature()
382                    .with(
383                        eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
384                        eq(SingleSignatures::fake("party_2", "message 2")),
385                    )
386                    .once()
387                    .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
388            }),
389            store.clone(),
390            TestLogger::stdout(),
391        );
392
393        certifier
394            .create_open_message(
395                &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
396                &ProtocolMessage::new(),
397            )
398            .await
399            .unwrap();
400
401        let remaining_sigs = store
402            .get_buffered_signatures(MithrilStakeDistribution)
403            .await
404            .unwrap();
405        assert!(remaining_sigs.is_empty());
406    }
407
408    mod when_failing_to_transfer_buffered_signature_to_new_open_message {
409        use mockall::predicate::always;
410
411        use super::*;
412
413        async fn run_scenario(
414            certifier_mock_config: impl FnOnce(&mut MockCertifierService),
415            store_mock_config: impl FnOnce(&mut MockBufferedSingleSignatureStore),
416        ) {
417            let store = mock_store(store_mock_config);
418            let certifier = BufferedCertifierService::new(
419                mock_certifier(certifier_mock_config),
420                store,
421                TestLogger::stdout(),
422            );
423
424            certifier
425                .create_open_message(
426                    &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
427                    &ProtocolMessage::new(),
428                )
429                .await
430                .expect("Transferring buffered signatures to new open message should not fail");
431        }
432
433        #[tokio::test]
434        async fn skip_invalid_signatures() {
435            run_scenario(
436                |mock| {
437                    mock.expect_create_open_message()
438                        .returning(|_, _| Ok(OpenMessage::dummy()));
439
440                    mock.expect_register_single_signature()
441                        .with(always(), eq(fake_data::single_signatures(vec![1])))
442                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
443                        .once();
444                    mock.expect_register_single_signature()
445                        .with(always(), eq(fake_data::single_signatures(vec![2])))
446                        .returning(|_, _| {
447                            Err(CertifierServiceError::InvalidSingleSignature(
448                                OpenMessage::dummy().signed_entity_type,
449                                anyhow!("Invalid signature"),
450                            )
451                            .into())
452                        })
453                        .once();
454                    mock.expect_register_single_signature()
455                        .with(always(), eq(fake_data::single_signatures(vec![3])))
456                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
457                        .once();
458                },
459                |mock| {
460                    mock.expect_get_buffered_signatures().returning(|_| {
461                        Ok(vec![
462                            fake_data::single_signatures(vec![1]),
463                            fake_data::single_signatures(vec![2]),
464                            fake_data::single_signatures(vec![3]),
465                        ])
466                    });
467                    mock.expect_remove_buffered_signatures()
468                        // Only non-skipped signatures should be removed
469                        .withf(|_, sig_to_remove| sig_to_remove.len() == 2)
470                        .returning(|_, _| Ok(()));
471                },
472            )
473            .await;
474        }
475
476        #[tokio::test]
477        async fn do_not_return_an_error_if_getting_buffer_signatures_fail() {
478            run_scenario(
479                |mock| {
480                    mock.expect_create_open_message()
481                        .returning(|_, _| Ok(OpenMessage::dummy()));
482                    mock.expect_register_single_signature()
483                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
484                },
485                |mock| {
486                    mock.expect_get_buffered_signatures()
487                        .returning(|_| Err(anyhow!("get_buffered_signatures error")));
488                },
489            )
490            .await;
491        }
492
493        #[tokio::test]
494        async fn do_not_return_an_error_if_registering_signature_fail() {
495            run_scenario(
496                |mock| {
497                    mock.expect_create_open_message()
498                        .returning(|_, _| Ok(OpenMessage::dummy()));
499                    mock.expect_register_single_signature()
500                        .returning(|_, _| Err(anyhow!("register_single_signature error")));
501                },
502                |mock| {
503                    mock.expect_get_buffered_signatures()
504                        .returning(|_| Ok(vec![fake_data::single_signatures(vec![1])]));
505                },
506            )
507            .await;
508        }
509
510        #[tokio::test]
511        async fn do_not_return_an_error_if_removing_buffered_signatures_fail() {
512            run_scenario(
513                |mock| {
514                    mock.expect_create_open_message()
515                        .returning(|_, _| Ok(OpenMessage::dummy()));
516                    mock.expect_register_single_signature()
517                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
518                },
519                |mock| {
520                    mock.expect_get_buffered_signatures()
521                        .returning(|_| Ok(vec![fake_data::single_signatures(vec![1])]));
522                    mock.expect_remove_buffered_signatures()
523                        .returning(|_, _| Err(anyhow!("remove_buffered_signatures error")));
524                },
525            )
526            .await;
527        }
528    }
529}