mithril_aggregator/services/certifier/
buffered_certifier.rs

1use async_trait::async_trait;
2use slog::{Logger, debug, trace, warn};
3use std::sync::Arc;
4
5use mithril_common::StdResult;
6use mithril_common::entities::{
7    Certificate, Epoch, ProtocolMessage, SignedEntityType, SignedEntityTypeDiscriminants,
8    SingleSignature,
9};
10use mithril_common::logging::LoggerExtensions;
11
12use crate::entities::OpenMessage;
13use crate::services::{
14    BufferedSingleSignatureStore, CertifierService, CertifierServiceError,
15    SignatureRegistrationStatus,
16};
17
18/// A decorator of [CertifierService] that can buffer registration of single signatures
19/// when the open message is not yet created.
20///
21/// When an open message is created, buffered single signatures for the open message type are
22/// registered.
23pub struct BufferedCertifierService {
24    certifier_service: Arc<dyn CertifierService>,
25    buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
26    logger: Logger,
27}
28
29impl BufferedCertifierService {
30    /// Create a new instance of `BufferedCertifierService`.
31    pub fn new(
32        certifier_service: Arc<dyn CertifierService>,
33        buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
34        logger: Logger,
35    ) -> Self {
36        Self {
37            certifier_service,
38            buffered_single_signature_store,
39            logger: logger.new_with_component_name::<Self>(),
40        }
41    }
42
43    async fn try_register_buffered_signatures_to_current_open_message(
44        &self,
45        signed_entity_type: &SignedEntityType,
46    ) -> StdResult<()> {
47        let discriminant: SignedEntityTypeDiscriminants = signed_entity_type.into();
48        let buffered_signatures = self
49            .buffered_single_signature_store
50            .get_buffered_signatures(discriminant)
51            .await?;
52        let mut signatures_to_remove = vec![];
53
54        for signature in buffered_signatures {
55            match self
56                .certifier_service
57                .register_single_signature(signed_entity_type, &signature)
58                .await
59            {
60                Ok(..) => {
61                    signatures_to_remove.push(signature);
62                }
63                Err(error) => match error.downcast_ref::<CertifierServiceError>() {
64                    Some(CertifierServiceError::InvalidSingleSignature(..)) => {
65                        trace!(self.logger, "Skipping invalid signature for signed entity '{signed_entity_type:?}'";
66                            "party_id" => &signature.party_id,
67                            "error" => ?error,
68                        );
69                    }
70                    _ => {
71                        anyhow::bail!(error);
72                    }
73                },
74            }
75        }
76
77        self.buffered_single_signature_store
78            .remove_buffered_signatures(discriminant, signatures_to_remove)
79            .await?;
80
81        Ok(())
82    }
83}
84
85#[async_trait]
86impl CertifierService for BufferedCertifierService {
87    async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()> {
88        self.certifier_service.inform_epoch(epoch).await
89    }
90
91    async fn register_single_signature(
92        &self,
93        signed_entity_type: &SignedEntityType,
94        signature: &SingleSignature,
95    ) -> StdResult<SignatureRegistrationStatus> {
96        match self
97            .certifier_service
98            .register_single_signature(signed_entity_type, signature)
99            .await
100        {
101            Ok(res) => Ok(res),
102            Err(error) => match error.downcast_ref::<CertifierServiceError>() {
103                Some(CertifierServiceError::NotFound(..)) if signature.is_authenticated() => {
104                    debug!(
105                        self.logger, "No OpenMessage available for signed entity - Buffering single signature";
106                        "signed_entity_type" => ?signed_entity_type,
107                        "party_id" => &signature.party_id
108                    );
109
110                    self.buffered_single_signature_store
111                        .buffer_signature(signed_entity_type.into(), signature)
112                        .await?;
113
114                    Ok(SignatureRegistrationStatus::Buffered)
115                }
116                _ => Err(error),
117            },
118        }
119    }
120
121    async fn create_open_message(
122        &self,
123        signed_entity_type: &SignedEntityType,
124        protocol_message: &ProtocolMessage,
125    ) -> StdResult<OpenMessage> {
126        // IMPORTANT: this method should not fail if the open message creation succeeds
127        // Otherwise, the state machine won't aggregate signatures for this open message.
128
129        let creation_result = self
130            .certifier_service
131            .create_open_message(signed_entity_type, protocol_message)
132            .await;
133
134        if creation_result.is_ok() {
135            if let Err(error) = self
136                .try_register_buffered_signatures_to_current_open_message(signed_entity_type)
137                .await
138            {
139                warn!(self.logger, "Failed to register buffered signatures to the new open message";
140                    "signed_entity_type" => ?signed_entity_type,
141                    "error" => ?error
142                );
143            }
144        }
145
146        creation_result
147    }
148
149    async fn get_open_message(
150        &self,
151        signed_entity_type: &SignedEntityType,
152    ) -> StdResult<Option<OpenMessage>> {
153        self.certifier_service.get_open_message(signed_entity_type).await
154    }
155
156    async fn mark_open_message_if_expired(
157        &self,
158        signed_entity_type: &SignedEntityType,
159    ) -> StdResult<Option<OpenMessage>> {
160        self.certifier_service
161            .mark_open_message_if_expired(signed_entity_type)
162            .await
163    }
164
165    async fn create_certificate(
166        &self,
167        signed_entity_type: &SignedEntityType,
168    ) -> StdResult<Option<Certificate>> {
169        self.certifier_service.create_certificate(signed_entity_type).await
170    }
171
172    async fn get_certificate_by_hash(&self, hash: &str) -> StdResult<Option<Certificate>> {
173        self.certifier_service.get_certificate_by_hash(hash).await
174    }
175
176    async fn get_latest_certificates(&self, last_n: usize) -> StdResult<Vec<Certificate>> {
177        self.certifier_service.get_latest_certificates(last_n).await
178    }
179
180    async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()> {
181        self.certifier_service.verify_certificate_chain(epoch).await
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use anyhow::anyhow;
188    use mockall::predicate::eq;
189
190    use mithril_common::entities::SignedEntityTypeDiscriminants::{
191        CardanoTransactions, MithrilStakeDistribution,
192    };
193    use mithril_common::entities::SingleSignatureAuthenticationStatus;
194    use mithril_common::test::double::{Dummy, fake_data};
195    use mithril_common::test::entities_extensions::SingleSignatureTestExtension;
196
197    use crate::database::repository::BufferedSingleSignatureRepository;
198    use crate::database::test_helper::main_db_connection;
199    use crate::services::{
200        CertifierServiceError, MockBufferedSingleSignatureStore, MockCertifierService,
201    };
202    use crate::test::TestLogger;
203
204    use super::*;
205
206    fn mock_certifier(
207        certifier_mock_config: impl FnOnce(&mut MockCertifierService),
208    ) -> Arc<MockCertifierService> {
209        let mut certifier = MockCertifierService::new();
210        certifier_mock_config(&mut certifier);
211        Arc::new(certifier)
212    }
213
214    fn mock_store<F>(store_mock_config: F) -> Arc<MockBufferedSingleSignatureStore>
215    where
216        F: FnOnce(&mut MockBufferedSingleSignatureStore),
217    {
218        let mut store = MockBufferedSingleSignatureStore::new();
219        store_mock_config(&mut store);
220        Arc::new(store)
221    }
222
223    /// Run a scenario where we try to register a signature (using a fixed signed entity type).
224    ///
225    /// Return the registration result and the list of buffered signatures after the registration.
226    async fn run_register_signature_scenario(
227        decorated_certifier_mock_config: impl FnOnce(&mut MockCertifierService),
228        signature_to_register: &SingleSignature,
229    ) -> (StdResult<SignatureRegistrationStatus>, Vec<SingleSignature>) {
230        let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
231            main_db_connection().unwrap(),
232        )));
233        let certifier = BufferedCertifierService::new(
234            mock_certifier(decorated_certifier_mock_config),
235            store.clone(),
236            TestLogger::stdout(),
237        );
238
239        let registration_result = certifier
240            .register_single_signature(
241                &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
242                signature_to_register,
243            )
244            .await;
245
246        let buffered_signatures =
247            store.get_buffered_signatures(MithrilStakeDistribution).await.unwrap();
248
249        (registration_result, buffered_signatures)
250    }
251
252    #[tokio::test]
253    async fn when_registering_single_signature_dont_buffer_signature_if_decorated_certifier_succeed()
254     {
255        let (registration_result, buffered_signatures_after_registration) =
256            run_register_signature_scenario(
257                |mock_certifier| {
258                    mock_certifier
259                        .expect_register_single_signature()
260                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
261                },
262                &SingleSignature::fake("party_1", "a message"),
263            )
264            .await;
265
266        let status = registration_result.expect("Registration should have succeed");
267        assert_eq!(status, SignatureRegistrationStatus::Registered);
268        assert_eq!(
269            buffered_signatures_after_registration,
270            Vec::<SingleSignature>::new()
271        );
272    }
273
274    mod when_registering_single_signature_if_decorated_certifier_as_no_opened_message {
275        use super::*;
276
277        #[tokio::test]
278        async fn buffer_signature_if_authenticated() {
279            let (registration_result, buffered_signatures_after_registration) =
280                run_register_signature_scenario(
281                    |mock_certifier| {
282                        mock_certifier.expect_register_single_signature().returning(|_, _| {
283                            Err(CertifierServiceError::NotFound(
284                                SignedEntityType::MithrilStakeDistribution(Epoch(5)),
285                            )
286                            .into())
287                        });
288                    },
289                    &SingleSignature {
290                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
291                        ..SingleSignature::fake("party_1", "a message")
292                    },
293                )
294                .await;
295
296            let status = registration_result.expect("Registration should have succeed");
297            assert_eq!(status, SignatureRegistrationStatus::Buffered);
298            assert_eq!(
299                buffered_signatures_after_registration,
300                vec![SingleSignature::fake("party_1", "a message")]
301            );
302        }
303
304        #[tokio::test]
305        async fn dont_buffer_signature_if_not_authenticated() {
306            let (registration_result, buffered_signatures_after_registration) =
307                run_register_signature_scenario(
308                    |mock_certifier| {
309                        mock_certifier.expect_register_single_signature().returning(|_, _| {
310                            Err(CertifierServiceError::NotFound(
311                                SignedEntityType::MithrilStakeDistribution(Epoch(5)),
312                            )
313                            .into())
314                        });
315                    },
316                    &SingleSignature {
317                        authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated,
318                        ..SingleSignature::fake("party_1", "a message")
319                    },
320                )
321                .await;
322
323            registration_result.expect_err("Registration should have failed");
324            assert_eq!(
325                buffered_signatures_after_registration,
326                Vec::<SingleSignature>::new()
327            );
328        }
329    }
330
331    #[tokio::test]
332    async fn buffered_signatures_are_moved_to_newly_opened_message() {
333        let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
334            main_db_connection().unwrap(),
335        )));
336        for (signed_type, signature) in [
337            (
338                MithrilStakeDistribution,
339                SingleSignature::fake("party_1", "message 1"),
340            ),
341            (
342                MithrilStakeDistribution,
343                SingleSignature::fake("party_2", "message 2"),
344            ),
345            (
346                CardanoTransactions,
347                SingleSignature::fake("party_3", "message 3"),
348            ),
349        ] {
350            store.buffer_signature(signed_type, &signature).await.unwrap();
351        }
352
353        let certifier = BufferedCertifierService::new(
354            mock_certifier(|mock| {
355                mock.expect_create_open_message()
356                    .returning(|_, _| Ok(OpenMessage::dummy()));
357
358                // Those configuration Asserts that the buffered signatures are registered
359                mock.expect_register_single_signature()
360                    .with(
361                        eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
362                        eq(SingleSignature::fake("party_1", "message 1")),
363                    )
364                    .once()
365                    .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
366                mock.expect_register_single_signature()
367                    .with(
368                        eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
369                        eq(SingleSignature::fake("party_2", "message 2")),
370                    )
371                    .once()
372                    .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
373            }),
374            store.clone(),
375            TestLogger::stdout(),
376        );
377
378        certifier
379            .create_open_message(
380                &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
381                &ProtocolMessage::new(),
382            )
383            .await
384            .unwrap();
385
386        let remaining_sigs = store.get_buffered_signatures(MithrilStakeDistribution).await.unwrap();
387        assert!(remaining_sigs.is_empty());
388    }
389
390    mod when_failing_to_transfer_buffered_signature_to_new_open_message {
391        use mockall::predicate::always;
392
393        use super::*;
394
395        async fn run_scenario(
396            certifier_mock_config: impl FnOnce(&mut MockCertifierService),
397            store_mock_config: impl FnOnce(&mut MockBufferedSingleSignatureStore),
398        ) {
399            let store = mock_store(store_mock_config);
400            let certifier = BufferedCertifierService::new(
401                mock_certifier(certifier_mock_config),
402                store,
403                TestLogger::stdout(),
404            );
405
406            certifier
407                .create_open_message(
408                    &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
409                    &ProtocolMessage::new(),
410                )
411                .await
412                .expect("Transferring buffered signatures to new open message should not fail");
413        }
414
415        #[tokio::test]
416        async fn skip_invalid_signatures() {
417            run_scenario(
418                |mock| {
419                    mock.expect_create_open_message()
420                        .returning(|_, _| Ok(OpenMessage::dummy()));
421
422                    mock.expect_register_single_signature()
423                        .with(always(), eq(fake_data::single_signature(vec![1])))
424                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
425                        .once();
426                    mock.expect_register_single_signature()
427                        .with(always(), eq(fake_data::single_signature(vec![2])))
428                        .returning(|_, _| {
429                            Err(CertifierServiceError::InvalidSingleSignature(
430                                OpenMessage::dummy().signed_entity_type,
431                                anyhow!("Invalid signature"),
432                            )
433                            .into())
434                        })
435                        .once();
436                    mock.expect_register_single_signature()
437                        .with(always(), eq(fake_data::single_signature(vec![3])))
438                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
439                        .once();
440                },
441                |mock| {
442                    mock.expect_get_buffered_signatures().returning(|_| {
443                        Ok(vec![
444                            fake_data::single_signature(vec![1]),
445                            fake_data::single_signature(vec![2]),
446                            fake_data::single_signature(vec![3]),
447                        ])
448                    });
449                    mock.expect_remove_buffered_signatures()
450                        // Only non-skipped signatures should be removed
451                        .withf(|_, sig_to_remove| sig_to_remove.len() == 2)
452                        .returning(|_, _| Ok(()));
453                },
454            )
455            .await;
456        }
457
458        #[tokio::test]
459        async fn do_not_return_an_error_if_getting_buffer_signatures_fail() {
460            run_scenario(
461                |mock| {
462                    mock.expect_create_open_message()
463                        .returning(|_, _| Ok(OpenMessage::dummy()));
464                    mock.expect_register_single_signature()
465                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
466                },
467                |mock| {
468                    mock.expect_get_buffered_signatures()
469                        .returning(|_| Err(anyhow!("get_buffered_signatures error")));
470                },
471            )
472            .await;
473        }
474
475        #[tokio::test]
476        async fn do_not_return_an_error_if_registering_signature_fail() {
477            run_scenario(
478                |mock| {
479                    mock.expect_create_open_message()
480                        .returning(|_, _| Ok(OpenMessage::dummy()));
481                    mock.expect_register_single_signature()
482                        .returning(|_, _| Err(anyhow!("register_single_signature error")));
483                },
484                |mock| {
485                    mock.expect_get_buffered_signatures()
486                        .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
487                },
488            )
489            .await;
490        }
491
492        #[tokio::test]
493        async fn do_not_return_an_error_if_removing_buffered_signatures_fail() {
494            run_scenario(
495                |mock| {
496                    mock.expect_create_open_message()
497                        .returning(|_, _| Ok(OpenMessage::dummy()));
498                    mock.expect_register_single_signature()
499                        .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
500                },
501                |mock| {
502                    mock.expect_get_buffered_signatures()
503                        .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
504                    mock.expect_remove_buffered_signatures()
505                        .returning(|_, _| Err(anyhow!("remove_buffered_signatures error")));
506                },
507            )
508            .await;
509        }
510    }
511}