mithril_aggregator/services/certifier/
buffered_certifier.rs

1use async_trait::async_trait;
2use slog::{Logger, debug, trace, warn};
3use std::sync::Arc;
4
5use mithril_common::StdResult;
6use mithril_common::entities::{
7    Certificate, Epoch, ProtocolMessage, SignedEntityType, SignedEntityTypeDiscriminants,
8    SingleSignature,
9};
10use mithril_common::logging::LoggerExtensions;
11
12use crate::entities::OpenMessage;
13use crate::services::{
14    BufferedSingleSignatureStore, CertifierService, CertifierServiceError,
15    SignatureRegistrationStatus,
16};
17
18/// A decorator of [CertifierService] that can buffer registration of single signatures
19/// when the open message is not yet created.
20///
21/// When an open message is created, buffered single signatures for the open message type are
22/// registered.
23pub struct BufferedCertifierService {
24    certifier_service: Arc<dyn CertifierService>,
25    buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
26    logger: Logger,
27}
28
29impl BufferedCertifierService {
30    /// Create a new instance of `BufferedCertifierService`.
31    pub fn new(
32        certifier_service: Arc<dyn CertifierService>,
33        buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
34        logger: Logger,
35    ) -> Self {
36        Self {
37            certifier_service,
38            buffered_single_signature_store,
39            logger: logger.new_with_component_name::<Self>(),
40        }
41    }
42
43    async fn try_register_buffered_signatures_to_current_open_message(
44        &self,
45        signed_entity_type: &SignedEntityType,
46    ) -> StdResult<()> {
47        let discriminant: SignedEntityTypeDiscriminants = signed_entity_type.into();
48        let buffered_signatures = self
49            .buffered_single_signature_store
50            .get_buffered_signatures(discriminant)
51            .await?;
52        let mut signatures_to_remove = vec![];
53
54        for signature in buffered_signatures {
55            match self
56                .certifier_service
57                .register_single_signature(signed_entity_type, &signature)
58                .await
59            {
60                Ok(..) => {
61                    signatures_to_remove.push(signature);
62                }
63                Err(error) => match error.downcast_ref::<CertifierServiceError>() {
64                    Some(CertifierServiceError::InvalidSingleSignature(..)) => {
65                        trace!(self.logger, "Skipping invalid signature for signed entity '{signed_entity_type:?}'";
66                            "party_id" => &signature.party_id,
67                            "error" => ?error,
68                        );
69                    }
70                    _ => {
71                        anyhow::bail!(error);
72                    }
73                },
74            }
75        }
76
77        self.buffered_single_signature_store
78            .remove_buffered_signatures(discriminant, signatures_to_remove)
79            .await?;
80
81        Ok(())
82    }
83}
84
85#[async_trait]
86impl CertifierService for BufferedCertifierService {
87    async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()> {
88        self.certifier_service.inform_epoch(epoch).await
89    }
90
91    async fn register_single_signature(
92        &self,
93        signed_entity_type: &SignedEntityType,
94        signature: &SingleSignature,
95    ) -> StdResult<SignatureRegistrationStatus> {
96        match self
97            .certifier_service
98            .register_single_signature(signed_entity_type, signature)
99            .await
100        {
101            Ok(res) => Ok(res),
102            Err(error) => match error.downcast_ref::<CertifierServiceError>() {
103                Some(CertifierServiceError::NotFound(..)) if signature.is_authenticated() => {
104                    debug!(
105                        self.logger, "No OpenMessage available for signed entity - Buffering single signature";
106                        "signed_entity_type" => ?signed_entity_type,
107                        "party_id" => &signature.party_id
108                    );
109
110                    self.buffered_single_signature_store
111                        .buffer_signature(signed_entity_type.into(), signature)
112                        .await?;
113
114                    Ok(SignatureRegistrationStatus::Buffered)
115                }
116                _ => Err(error),
117            },
118        }
119    }
120
121    async fn create_open_message(
122        &self,
123        signed_entity_type: &SignedEntityType,
124        protocol_message: &ProtocolMessage,
125    ) -> StdResult<OpenMessage> {
126        // IMPORTANT: this method should not fail if the open message creation succeeds
127        // Otherwise, the state machine won't aggregate signatures for this open message.
128
129        let creation_result = self
130            .certifier_service
131            .create_open_message(signed_entity_type, protocol_message)
132            .await;
133
134        if creation_result.is_ok() {
135            if let Err(error) = self
136                .try_register_buffered_signatures_to_current_open_message(signed_entity_type)
137                .await
138            {
139                warn!(self.logger, "Failed to register buffered signatures to the new open message";
140                    "signed_entity_type" => ?signed_entity_type,
141                    "error" => ?error
142                );
143            }
144        }
145
146        creation_result
147    }
148
149    async fn get_open_message(
150        &self,
151        signed_entity_type: &SignedEntityType,
152    ) -> StdResult<Option<OpenMessage>> {
153        self.certifier_service.get_open_message(signed_entity_type).await
154    }
155
156    async fn mark_open_message_if_expired(
157        &self,
158        signed_entity_type: &SignedEntityType,
159    ) -> StdResult<Option<OpenMessage>> {
160        self.certifier_service
161            .mark_open_message_if_expired(signed_entity_type)
162            .await
163    }
164
165    async fn create_certificate(
166        &self,
167        signed_entity_type: &SignedEntityType,
168    ) -> StdResult<Option<Certificate>> {
169        self.certifier_service.create_certificate(signed_entity_type).await
170    }
171
172    async fn get_certificate_by_hash(&self, hash: &str) -> StdResult<Option<Certificate>> {
173        self.certifier_service.get_certificate_by_hash(hash).await
174    }
175
176    async fn get_latest_certificates(&self, last_n: usize) -> StdResult<Vec<Certificate>> {
177        self.certifier_service.get_latest_certificates(last_n).await
178    }
179
180    async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()> {
181        self.certifier_service.verify_certificate_chain(epoch).await
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use anyhow::anyhow;
188    use mockall::predicate::eq;
189
190    use mithril_common::entities::SignedEntityTypeDiscriminants::{
191        CardanoTransactions, MithrilStakeDistribution,
192    };
193    use mithril_common::entities::SingleSignatureAuthenticationStatus;
194    use mithril_common::test_utils::fake_data;
195
196    use crate::database::repository::BufferedSingleSignatureRepository;
197    use crate::database::test_helper::main_db_connection;
198    use crate::services::{
199        CertifierServiceError, MockBufferedSingleSignatureStore, MockCertifierService,
200    };
201    use crate::test_tools::TestLogger;
202
203    use super::*;
204
205    fn mock_certifier(
206        certifier_mock_config: impl FnOnce(&mut MockCertifierService),
207    ) -> Arc<MockCertifierService> {
208        let mut certifier = MockCertifierService::new();
209        certifier_mock_config(&mut certifier);
210        Arc::new(certifier)
211    }
212
213    fn mock_store<F>(store_mock_config: F) -> Arc<MockBufferedSingleSignatureStore>
214    where
215        F: FnOnce(&mut MockBufferedSingleSignatureStore),
216    {
217        let mut store = MockBufferedSingleSignatureStore::new();
218        store_mock_config(&mut store);
219        Arc::new(store)
220    }
221
222    /// Run a scenario where we try to register a signature (using a fixed signed entity type).
223    ///
224    /// Return the registration result and the list of buffered signatures after the registration.
225    async fn run_register_signature_scenario(
226        decorated_certifier_mock_config: impl FnOnce(&mut MockCertifierService),
227        signature_to_register: &SingleSignature,
228    ) -> (StdResult<SignatureRegistrationStatus>, Vec<SingleSignature>) {
229        let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
230            main_db_connection().unwrap(),
231        )));
232        let certifier = BufferedCertifierService::new(
233            mock_certifier(decorated_certifier_mock_config),
234            store.clone(),
235            TestLogger::stdout(),
236        );
237
238        let registration_result = certifier
239            .register_single_signature(
240                &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
241                signature_to_register,
242            )
243            .await;
244
245        let buffered_signatures =
246            store.get_buffered_signatures(MithrilStakeDistribution).await.unwrap();
247
248        (registration_result, buffered_signatures)
249    }
250
251    #[tokio::test]
252    async fn when_registering_single_signature_dont_buffer_signature_if_decorated_certifier_succeed()
253     {
254        let (registration_result, buffered_signatures_after_registration) =
255            run_register_signature_scenario(
256                |mock_certifier| {
257                    mock_certifier
258                        .expect_register_single_signature()
259                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
260                },
261                &SingleSignature::fake("party_1", "a message"),
262            )
263            .await;
264
265        let status = registration_result.expect("Registration should have succeed");
266        assert_eq!(status, SignatureRegistrationStatus::Registered);
267        assert_eq!(
268            buffered_signatures_after_registration,
269            Vec::<SingleSignature>::new()
270        );
271    }
272
273    mod when_registering_single_signature_if_decorated_certifier_as_no_opened_message {
274        use super::*;
275
276        #[tokio::test]
277        async fn buffer_signature_if_authenticated() {
278            let (registration_result, buffered_signatures_after_registration) =
279                run_register_signature_scenario(
280                    |mock_certifier| {
281                        mock_certifier.expect_register_single_signature().returning(|_, _| {
282                            Err(CertifierServiceError::NotFound(
283                                SignedEntityType::MithrilStakeDistribution(Epoch(5)),
284                            )
285                            .into())
286                        });
287                    },
288                    &SingleSignature {
289                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
290                        ..SingleSignature::fake("party_1", "a message")
291                    },
292                )
293                .await;
294
295            let status = registration_result.expect("Registration should have succeed");
296            assert_eq!(status, SignatureRegistrationStatus::Buffered);
297            assert_eq!(
298                buffered_signatures_after_registration,
299                vec![SingleSignature::fake("party_1", "a message")]
300            );
301        }
302
303        #[tokio::test]
304        async fn dont_buffer_signature_if_not_authenticated() {
305            let (registration_result, buffered_signatures_after_registration) =
306                run_register_signature_scenario(
307                    |mock_certifier| {
308                        mock_certifier.expect_register_single_signature().returning(|_, _| {
309                            Err(CertifierServiceError::NotFound(
310                                SignedEntityType::MithrilStakeDistribution(Epoch(5)),
311                            )
312                            .into())
313                        });
314                    },
315                    &SingleSignature {
316                        authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated,
317                        ..SingleSignature::fake("party_1", "a message")
318                    },
319                )
320                .await;
321
322            registration_result.expect_err("Registration should have failed");
323            assert_eq!(
324                buffered_signatures_after_registration,
325                Vec::<SingleSignature>::new()
326            );
327        }
328    }
329
330    #[tokio::test]
331    async fn buffered_signatures_are_moved_to_newly_opened_message() {
332        let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
333            main_db_connection().unwrap(),
334        )));
335        for (signed_type, signature) in [
336            (
337                MithrilStakeDistribution,
338                SingleSignature::fake("party_1", "message 1"),
339            ),
340            (
341                MithrilStakeDistribution,
342                SingleSignature::fake("party_2", "message 2"),
343            ),
344            (
345                CardanoTransactions,
346                SingleSignature::fake("party_3", "message 3"),
347            ),
348        ] {
349            store.buffer_signature(signed_type, &signature).await.unwrap();
350        }
351
352        let certifier = BufferedCertifierService::new(
353            mock_certifier(|mock| {
354                mock.expect_create_open_message()
355                    .returning(|_, _| Ok(OpenMessage::dummy()));
356
357                // Those configuration Asserts that the buffered signatures are registered
358                mock.expect_register_single_signature()
359                    .with(
360                        eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
361                        eq(SingleSignature::fake("party_1", "message 1")),
362                    )
363                    .once()
364                    .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
365                mock.expect_register_single_signature()
366                    .with(
367                        eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
368                        eq(SingleSignature::fake("party_2", "message 2")),
369                    )
370                    .once()
371                    .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
372            }),
373            store.clone(),
374            TestLogger::stdout(),
375        );
376
377        certifier
378            .create_open_message(
379                &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
380                &ProtocolMessage::new(),
381            )
382            .await
383            .unwrap();
384
385        let remaining_sigs = store.get_buffered_signatures(MithrilStakeDistribution).await.unwrap();
386        assert!(remaining_sigs.is_empty());
387    }
388
389    mod when_failing_to_transfer_buffered_signature_to_new_open_message {
390        use mockall::predicate::always;
391
392        use super::*;
393
394        async fn run_scenario(
395            certifier_mock_config: impl FnOnce(&mut MockCertifierService),
396            store_mock_config: impl FnOnce(&mut MockBufferedSingleSignatureStore),
397        ) {
398            let store = mock_store(store_mock_config);
399            let certifier = BufferedCertifierService::new(
400                mock_certifier(certifier_mock_config),
401                store,
402                TestLogger::stdout(),
403            );
404
405            certifier
406                .create_open_message(
407                    &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
408                    &ProtocolMessage::new(),
409                )
410                .await
411                .expect("Transferring buffered signatures to new open message should not fail");
412        }
413
414        #[tokio::test]
415        async fn skip_invalid_signatures() {
416            run_scenario(
417                |mock| {
418                    mock.expect_create_open_message()
419                        .returning(|_, _| Ok(OpenMessage::dummy()));
420
421                    mock.expect_register_single_signature()
422                        .with(always(), eq(fake_data::single_signature(vec![1])))
423                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
424                        .once();
425                    mock.expect_register_single_signature()
426                        .with(always(), eq(fake_data::single_signature(vec![2])))
427                        .returning(|_, _| {
428                            Err(CertifierServiceError::InvalidSingleSignature(
429                                OpenMessage::dummy().signed_entity_type,
430                                anyhow!("Invalid signature"),
431                            )
432                            .into())
433                        })
434                        .once();
435                    mock.expect_register_single_signature()
436                        .with(always(), eq(fake_data::single_signature(vec![3])))
437                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
438                        .once();
439                },
440                |mock| {
441                    mock.expect_get_buffered_signatures().returning(|_| {
442                        Ok(vec![
443                            fake_data::single_signature(vec![1]),
444                            fake_data::single_signature(vec![2]),
445                            fake_data::single_signature(vec![3]),
446                        ])
447                    });
448                    mock.expect_remove_buffered_signatures()
449                        // Only non-skipped signatures should be removed
450                        .withf(|_, sig_to_remove| sig_to_remove.len() == 2)
451                        .returning(|_, _| Ok(()));
452                },
453            )
454            .await;
455        }
456
457        #[tokio::test]
458        async fn do_not_return_an_error_if_getting_buffer_signatures_fail() {
459            run_scenario(
460                |mock| {
461                    mock.expect_create_open_message()
462                        .returning(|_, _| Ok(OpenMessage::dummy()));
463                    mock.expect_register_single_signature()
464                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
465                },
466                |mock| {
467                    mock.expect_get_buffered_signatures()
468                        .returning(|_| Err(anyhow!("get_buffered_signatures error")));
469                },
470            )
471            .await;
472        }
473
474        #[tokio::test]
475        async fn do_not_return_an_error_if_registering_signature_fail() {
476            run_scenario(
477                |mock| {
478                    mock.expect_create_open_message()
479                        .returning(|_, _| Ok(OpenMessage::dummy()));
480                    mock.expect_register_single_signature()
481                        .returning(|_, _| Err(anyhow!("register_single_signature error")));
482                },
483                |mock| {
484                    mock.expect_get_buffered_signatures()
485                        .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
486                },
487            )
488            .await;
489        }
490
491        #[tokio::test]
492        async fn do_not_return_an_error_if_removing_buffered_signatures_fail() {
493            run_scenario(
494                |mock| {
495                    mock.expect_create_open_message()
496                        .returning(|_, _| Ok(OpenMessage::dummy()));
497                    mock.expect_register_single_signature()
498                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
499                },
500                |mock| {
501                    mock.expect_get_buffered_signatures()
502                        .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
503                    mock.expect_remove_buffered_signatures()
504                        .returning(|_, _| Err(anyhow!("remove_buffered_signatures error")));
505                },
506            )
507            .await;
508        }
509    }
510}