mithril_aggregator/services/certifier/
buffered_certifier.rs

1use async_trait::async_trait;
2use slog::{debug, trace, warn, Logger};
3use std::sync::Arc;
4
5use mithril_common::entities::{
6    Certificate, Epoch, ProtocolMessage, SignedEntityType, SignedEntityTypeDiscriminants,
7    SingleSignature,
8};
9use mithril_common::logging::LoggerExtensions;
10use mithril_common::StdResult;
11
12use crate::entities::OpenMessage;
13use crate::services::{
14    BufferedSingleSignatureStore, CertifierService, CertifierServiceError,
15    SignatureRegistrationStatus,
16};
17
18/// A decorator of [CertifierService] that can buffer registration of single signatures
19/// when the open message is not yet created.
20///
21/// When an open message is created, buffered single signatures for the open message type are
22/// registered.
23pub struct BufferedCertifierService {
24    certifier_service: Arc<dyn CertifierService>,
25    buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
26    logger: Logger,
27}
28
29impl BufferedCertifierService {
30    /// Create a new instance of `BufferedCertifierService`.
31    pub fn new(
32        certifier_service: Arc<dyn CertifierService>,
33        buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
34        logger: Logger,
35    ) -> Self {
36        Self {
37            certifier_service,
38            buffered_single_signature_store,
39            logger: logger.new_with_component_name::<Self>(),
40        }
41    }
42
43    async fn try_register_buffered_signatures_to_current_open_message(
44        &self,
45        signed_entity_type: &SignedEntityType,
46    ) -> StdResult<()> {
47        let discriminant: SignedEntityTypeDiscriminants = signed_entity_type.into();
48        let buffered_signatures = self
49            .buffered_single_signature_store
50            .get_buffered_signatures(discriminant)
51            .await?;
52        let mut signatures_to_remove = vec![];
53
54        for signature in buffered_signatures {
55            match self
56                .certifier_service
57                .register_single_signature(signed_entity_type, &signature)
58                .await
59            {
60                Ok(..) => {
61                    signatures_to_remove.push(signature);
62                }
63                Err(error) => match error.downcast_ref::<CertifierServiceError>() {
64                    Some(CertifierServiceError::InvalidSingleSignature(..)) => {
65                        trace!(self.logger, "Skipping invalid signature for signed entity '{signed_entity_type:?}'";
66                            "party_id" => &signature.party_id,
67                            "error" => ?error,
68                        );
69                    }
70                    _ => {
71                        anyhow::bail!(error);
72                    }
73                },
74            }
75        }
76
77        self.buffered_single_signature_store
78            .remove_buffered_signatures(discriminant, signatures_to_remove)
79            .await?;
80
81        Ok(())
82    }
83}
84
85#[async_trait]
86impl CertifierService for BufferedCertifierService {
87    async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()> {
88        self.certifier_service.inform_epoch(epoch).await
89    }
90
91    async fn register_single_signature(
92        &self,
93        signed_entity_type: &SignedEntityType,
94        signature: &SingleSignature,
95    ) -> StdResult<SignatureRegistrationStatus> {
96        match self
97            .certifier_service
98            .register_single_signature(signed_entity_type, signature)
99            .await
100        {
101            Ok(res) => Ok(res),
102            Err(error) => match error.downcast_ref::<CertifierServiceError>() {
103                Some(CertifierServiceError::NotFound(..)) if signature.is_authenticated() => {
104                    debug!(
105                        self.logger, "No OpenMessage available for signed entity - Buffering single signature";
106                        "signed_entity_type" => ?signed_entity_type,
107                        "party_id" => &signature.party_id
108                    );
109
110                    self.buffered_single_signature_store
111                        .buffer_signature(signed_entity_type.into(), signature)
112                        .await?;
113
114                    Ok(SignatureRegistrationStatus::Buffered)
115                }
116                _ => Err(error),
117            },
118        }
119    }
120
121    async fn create_open_message(
122        &self,
123        signed_entity_type: &SignedEntityType,
124        protocol_message: &ProtocolMessage,
125    ) -> StdResult<OpenMessage> {
126        // IMPORTANT: this method should not fail if the open message creation succeeds
127        // Otherwise, the state machine won't aggregate signatures for this open message.
128
129        let creation_result = self
130            .certifier_service
131            .create_open_message(signed_entity_type, protocol_message)
132            .await;
133
134        if creation_result.is_ok() {
135            if let Err(error) = self
136                .try_register_buffered_signatures_to_current_open_message(signed_entity_type)
137                .await
138            {
139                warn!(self.logger, "Failed to register buffered signatures to the new open message";
140                    "signed_entity_type" => ?signed_entity_type,
141                    "error" => ?error
142                );
143            }
144        }
145
146        creation_result
147    }
148
149    async fn get_open_message(
150        &self,
151        signed_entity_type: &SignedEntityType,
152    ) -> StdResult<Option<OpenMessage>> {
153        self.certifier_service
154            .get_open_message(signed_entity_type)
155            .await
156    }
157
158    async fn mark_open_message_if_expired(
159        &self,
160        signed_entity_type: &SignedEntityType,
161    ) -> StdResult<Option<OpenMessage>> {
162        self.certifier_service
163            .mark_open_message_if_expired(signed_entity_type)
164            .await
165    }
166
167    async fn create_certificate(
168        &self,
169        signed_entity_type: &SignedEntityType,
170    ) -> StdResult<Option<Certificate>> {
171        self.certifier_service
172            .create_certificate(signed_entity_type)
173            .await
174    }
175
176    async fn get_certificate_by_hash(&self, hash: &str) -> StdResult<Option<Certificate>> {
177        self.certifier_service.get_certificate_by_hash(hash).await
178    }
179
180    async fn get_latest_certificates(&self, last_n: usize) -> StdResult<Vec<Certificate>> {
181        self.certifier_service.get_latest_certificates(last_n).await
182    }
183
184    async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()> {
185        self.certifier_service.verify_certificate_chain(epoch).await
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use anyhow::anyhow;
192    use mockall::predicate::eq;
193
194    use mithril_common::entities::SignedEntityTypeDiscriminants::{
195        CardanoTransactions, MithrilStakeDistribution,
196    };
197    use mithril_common::entities::SingleSignatureAuthenticationStatus;
198    use mithril_common::test_utils::fake_data;
199
200    use crate::database::repository::BufferedSingleSignatureRepository;
201    use crate::database::test_helper::main_db_connection;
202    use crate::services::{
203        CertifierServiceError, MockBufferedSingleSignatureStore, MockCertifierService,
204    };
205    use crate::test_tools::TestLogger;
206
207    use super::*;
208
209    fn mock_certifier(
210        certifier_mock_config: impl FnOnce(&mut MockCertifierService),
211    ) -> Arc<MockCertifierService> {
212        let mut certifier = MockCertifierService::new();
213        certifier_mock_config(&mut certifier);
214        Arc::new(certifier)
215    }
216
217    fn mock_store<F>(store_mock_config: F) -> Arc<MockBufferedSingleSignatureStore>
218    where
219        F: FnOnce(&mut MockBufferedSingleSignatureStore),
220    {
221        let mut store = MockBufferedSingleSignatureStore::new();
222        store_mock_config(&mut store);
223        Arc::new(store)
224    }
225
226    /// Run a scenario where we try to register a signature (using a fixed signed entity type).
227    ///
228    /// Return the registration result and the list of buffered signatures after the registration.
229    async fn run_register_signature_scenario(
230        decorated_certifier_mock_config: impl FnOnce(&mut MockCertifierService),
231        signature_to_register: &SingleSignature,
232    ) -> (StdResult<SignatureRegistrationStatus>, Vec<SingleSignature>) {
233        let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
234            main_db_connection().unwrap(),
235        )));
236        let certifier = BufferedCertifierService::new(
237            mock_certifier(decorated_certifier_mock_config),
238            store.clone(),
239            TestLogger::stdout(),
240        );
241
242        let registration_result = certifier
243            .register_single_signature(
244                &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
245                signature_to_register,
246            )
247            .await;
248
249        let buffered_signatures = store
250            .get_buffered_signatures(MithrilStakeDistribution)
251            .await
252            .unwrap();
253
254        (registration_result, buffered_signatures)
255    }
256
257    #[tokio::test]
258    async fn when_registering_single_signature_dont_buffer_signature_if_decorated_certifier_succeed(
259    ) {
260        let (registration_result, buffered_signatures_after_registration) =
261            run_register_signature_scenario(
262                |mock_certifier| {
263                    mock_certifier
264                        .expect_register_single_signature()
265                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
266                },
267                &SingleSignature::fake("party_1", "a message"),
268            )
269            .await;
270
271        let status = registration_result.expect("Registration should have succeed");
272        assert_eq!(status, SignatureRegistrationStatus::Registered);
273        assert_eq!(
274            buffered_signatures_after_registration,
275            Vec::<SingleSignature>::new()
276        );
277    }
278
279    mod when_registering_single_signature_if_decorated_certifier_as_no_opened_message {
280        use super::*;
281
282        #[tokio::test]
283        async fn buffer_signature_if_authenticated() {
284            let (registration_result, buffered_signatures_after_registration) =
285                run_register_signature_scenario(
286                    |mock_certifier| {
287                        mock_certifier
288                            .expect_register_single_signature()
289                            .returning(|_, _| {
290                                Err(CertifierServiceError::NotFound(
291                                    SignedEntityType::MithrilStakeDistribution(Epoch(5)),
292                                )
293                                .into())
294                            });
295                    },
296                    &SingleSignature {
297                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
298                        ..SingleSignature::fake("party_1", "a message")
299                    },
300                )
301                .await;
302
303            let status = registration_result.expect("Registration should have succeed");
304            assert_eq!(status, SignatureRegistrationStatus::Buffered);
305            assert_eq!(
306                buffered_signatures_after_registration,
307                vec![SingleSignature::fake("party_1", "a message")]
308            );
309        }
310
311        #[tokio::test]
312        async fn dont_buffer_signature_if_not_authenticated() {
313            let (registration_result, buffered_signatures_after_registration) =
314                run_register_signature_scenario(
315                    |mock_certifier| {
316                        mock_certifier
317                            .expect_register_single_signature()
318                            .returning(|_, _| {
319                                Err(CertifierServiceError::NotFound(
320                                    SignedEntityType::MithrilStakeDistribution(Epoch(5)),
321                                )
322                                .into())
323                            });
324                    },
325                    &SingleSignature {
326                        authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated,
327                        ..SingleSignature::fake("party_1", "a message")
328                    },
329                )
330                .await;
331
332            registration_result.expect_err("Registration should have failed");
333            assert_eq!(
334                buffered_signatures_after_registration,
335                Vec::<SingleSignature>::new()
336            );
337        }
338    }
339
340    #[tokio::test]
341    async fn buffered_signatures_are_moved_to_newly_opened_message() {
342        let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
343            main_db_connection().unwrap(),
344        )));
345        for (signed_type, signature) in [
346            (
347                MithrilStakeDistribution,
348                SingleSignature::fake("party_1", "message 1"),
349            ),
350            (
351                MithrilStakeDistribution,
352                SingleSignature::fake("party_2", "message 2"),
353            ),
354            (
355                CardanoTransactions,
356                SingleSignature::fake("party_3", "message 3"),
357            ),
358        ] {
359            store
360                .buffer_signature(signed_type, &signature)
361                .await
362                .unwrap();
363        }
364
365        let certifier = BufferedCertifierService::new(
366            mock_certifier(|mock| {
367                mock.expect_create_open_message()
368                    .returning(|_, _| Ok(OpenMessage::dummy()));
369
370                // Those configuration Asserts that the buffered signatures are registered
371                mock.expect_register_single_signature()
372                    .with(
373                        eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
374                        eq(SingleSignature::fake("party_1", "message 1")),
375                    )
376                    .once()
377                    .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
378                mock.expect_register_single_signature()
379                    .with(
380                        eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
381                        eq(SingleSignature::fake("party_2", "message 2")),
382                    )
383                    .once()
384                    .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
385            }),
386            store.clone(),
387            TestLogger::stdout(),
388        );
389
390        certifier
391            .create_open_message(
392                &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
393                &ProtocolMessage::new(),
394            )
395            .await
396            .unwrap();
397
398        let remaining_sigs = store
399            .get_buffered_signatures(MithrilStakeDistribution)
400            .await
401            .unwrap();
402        assert!(remaining_sigs.is_empty());
403    }
404
405    mod when_failing_to_transfer_buffered_signature_to_new_open_message {
406        use mockall::predicate::always;
407
408        use super::*;
409
410        async fn run_scenario(
411            certifier_mock_config: impl FnOnce(&mut MockCertifierService),
412            store_mock_config: impl FnOnce(&mut MockBufferedSingleSignatureStore),
413        ) {
414            let store = mock_store(store_mock_config);
415            let certifier = BufferedCertifierService::new(
416                mock_certifier(certifier_mock_config),
417                store,
418                TestLogger::stdout(),
419            );
420
421            certifier
422                .create_open_message(
423                    &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
424                    &ProtocolMessage::new(),
425                )
426                .await
427                .expect("Transferring buffered signatures to new open message should not fail");
428        }
429
430        #[tokio::test]
431        async fn skip_invalid_signatures() {
432            run_scenario(
433                |mock| {
434                    mock.expect_create_open_message()
435                        .returning(|_, _| Ok(OpenMessage::dummy()));
436
437                    mock.expect_register_single_signature()
438                        .with(always(), eq(fake_data::single_signature(vec![1])))
439                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
440                        .once();
441                    mock.expect_register_single_signature()
442                        .with(always(), eq(fake_data::single_signature(vec![2])))
443                        .returning(|_, _| {
444                            Err(CertifierServiceError::InvalidSingleSignature(
445                                OpenMessage::dummy().signed_entity_type,
446                                anyhow!("Invalid signature"),
447                            )
448                            .into())
449                        })
450                        .once();
451                    mock.expect_register_single_signature()
452                        .with(always(), eq(fake_data::single_signature(vec![3])))
453                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
454                        .once();
455                },
456                |mock| {
457                    mock.expect_get_buffered_signatures().returning(|_| {
458                        Ok(vec![
459                            fake_data::single_signature(vec![1]),
460                            fake_data::single_signature(vec![2]),
461                            fake_data::single_signature(vec![3]),
462                        ])
463                    });
464                    mock.expect_remove_buffered_signatures()
465                        // Only non-skipped signatures should be removed
466                        .withf(|_, sig_to_remove| sig_to_remove.len() == 2)
467                        .returning(|_, _| Ok(()));
468                },
469            )
470            .await;
471        }
472
473        #[tokio::test]
474        async fn do_not_return_an_error_if_getting_buffer_signatures_fail() {
475            run_scenario(
476                |mock| {
477                    mock.expect_create_open_message()
478                        .returning(|_, _| Ok(OpenMessage::dummy()));
479                    mock.expect_register_single_signature()
480                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
481                },
482                |mock| {
483                    mock.expect_get_buffered_signatures()
484                        .returning(|_| Err(anyhow!("get_buffered_signatures error")));
485                },
486            )
487            .await;
488        }
489
490        #[tokio::test]
491        async fn do_not_return_an_error_if_registering_signature_fail() {
492            run_scenario(
493                |mock| {
494                    mock.expect_create_open_message()
495                        .returning(|_, _| Ok(OpenMessage::dummy()));
496                    mock.expect_register_single_signature()
497                        .returning(|_, _| Err(anyhow!("register_single_signature error")));
498                },
499                |mock| {
500                    mock.expect_get_buffered_signatures()
501                        .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
502                },
503            )
504            .await;
505        }
506
507        #[tokio::test]
508        async fn do_not_return_an_error_if_removing_buffered_signatures_fail() {
509            run_scenario(
510                |mock| {
511                    mock.expect_create_open_message()
512                        .returning(|_, _| Ok(OpenMessage::dummy()));
513                    mock.expect_register_single_signature()
514                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
515                },
516                |mock| {
517                    mock.expect_get_buffered_signatures()
518                        .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
519                    mock.expect_remove_buffered_signatures()
520                        .returning(|_, _| Err(anyhow!("remove_buffered_signatures error")));
521                },
522            )
523            .await;
524        }
525    }
526}