1use async_trait::async_trait;
2use slog::{Logger, debug, trace, warn};
3use std::sync::Arc;
4
5use mithril_common::StdResult;
6use mithril_common::entities::{
7 Certificate, Epoch, ProtocolMessage, SignedEntityType, SignedEntityTypeDiscriminants,
8 SingleSignature,
9};
10use mithril_common::logging::LoggerExtensions;
11
12use crate::entities::OpenMessage;
13use crate::services::{
14 BufferedSingleSignatureStore, CertifierService, CertifierServiceError,
15 SignatureRegistrationStatus,
16};
17
18pub struct BufferedCertifierService {
24 certifier_service: Arc<dyn CertifierService>,
25 buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
26 logger: Logger,
27}
28
29impl BufferedCertifierService {
30 pub fn new(
32 certifier_service: Arc<dyn CertifierService>,
33 buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
34 logger: Logger,
35 ) -> Self {
36 Self {
37 certifier_service,
38 buffered_single_signature_store,
39 logger: logger.new_with_component_name::<Self>(),
40 }
41 }
42
43 async fn try_register_buffered_signatures_to_current_open_message(
44 &self,
45 signed_entity_type: &SignedEntityType,
46 ) -> StdResult<()> {
47 let discriminant: SignedEntityTypeDiscriminants = signed_entity_type.into();
48 let buffered_signatures = self
49 .buffered_single_signature_store
50 .get_buffered_signatures(discriminant)
51 .await?;
52 let mut signatures_to_remove = vec![];
53
54 for signature in buffered_signatures {
55 match self
56 .certifier_service
57 .register_single_signature(signed_entity_type, &signature)
58 .await
59 {
60 Ok(..) => {
61 signatures_to_remove.push(signature);
62 }
63 Err(error) => match error.downcast_ref::<CertifierServiceError>() {
64 Some(CertifierServiceError::InvalidSingleSignature(..)) => {
65 trace!(self.logger, "Skipping invalid signature for signed entity '{signed_entity_type:?}'";
66 "party_id" => &signature.party_id,
67 "error" => ?error,
68 );
69 }
70 _ => {
71 anyhow::bail!(error);
72 }
73 },
74 }
75 }
76
77 self.buffered_single_signature_store
78 .remove_buffered_signatures(discriminant, signatures_to_remove)
79 .await?;
80
81 Ok(())
82 }
83}
84
85#[async_trait]
86impl CertifierService for BufferedCertifierService {
87 async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()> {
88 self.certifier_service.inform_epoch(epoch).await
89 }
90
91 async fn register_single_signature(
92 &self,
93 signed_entity_type: &SignedEntityType,
94 signature: &SingleSignature,
95 ) -> StdResult<SignatureRegistrationStatus> {
96 match self
97 .certifier_service
98 .register_single_signature(signed_entity_type, signature)
99 .await
100 {
101 Ok(res) => Ok(res),
102 Err(error) => match error.downcast_ref::<CertifierServiceError>() {
103 Some(CertifierServiceError::NotFound(..)) if signature.is_authenticated() => {
104 debug!(
105 self.logger, "No OpenMessage available for signed entity - Buffering single signature";
106 "signed_entity_type" => ?signed_entity_type,
107 "party_id" => &signature.party_id
108 );
109
110 self.buffered_single_signature_store
111 .buffer_signature(signed_entity_type.into(), signature)
112 .await?;
113
114 Ok(SignatureRegistrationStatus::Buffered)
115 }
116 _ => Err(error),
117 },
118 }
119 }
120
121 async fn create_open_message(
122 &self,
123 signed_entity_type: &SignedEntityType,
124 protocol_message: &ProtocolMessage,
125 ) -> StdResult<OpenMessage> {
126 let creation_result = self
130 .certifier_service
131 .create_open_message(signed_entity_type, protocol_message)
132 .await;
133
134 if creation_result.is_ok() {
135 if let Err(error) = self
136 .try_register_buffered_signatures_to_current_open_message(signed_entity_type)
137 .await
138 {
139 warn!(self.logger, "Failed to register buffered signatures to the new open message";
140 "signed_entity_type" => ?signed_entity_type,
141 "error" => ?error
142 );
143 }
144 }
145
146 creation_result
147 }
148
149 async fn get_open_message(
150 &self,
151 signed_entity_type: &SignedEntityType,
152 ) -> StdResult<Option<OpenMessage>> {
153 self.certifier_service.get_open_message(signed_entity_type).await
154 }
155
156 async fn mark_open_message_if_expired(
157 &self,
158 signed_entity_type: &SignedEntityType,
159 ) -> StdResult<Option<OpenMessage>> {
160 self.certifier_service
161 .mark_open_message_if_expired(signed_entity_type)
162 .await
163 }
164
165 async fn create_certificate(
166 &self,
167 signed_entity_type: &SignedEntityType,
168 ) -> StdResult<Option<Certificate>> {
169 self.certifier_service.create_certificate(signed_entity_type).await
170 }
171
172 async fn get_certificate_by_hash(&self, hash: &str) -> StdResult<Option<Certificate>> {
173 self.certifier_service.get_certificate_by_hash(hash).await
174 }
175
176 async fn get_latest_certificates(&self, last_n: usize) -> StdResult<Vec<Certificate>> {
177 self.certifier_service.get_latest_certificates(last_n).await
178 }
179
180 async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()> {
181 self.certifier_service.verify_certificate_chain(epoch).await
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use anyhow::anyhow;
188 use mockall::predicate::eq;
189
190 use mithril_common::entities::SignedEntityTypeDiscriminants::{
191 CardanoTransactions, MithrilStakeDistribution,
192 };
193 use mithril_common::entities::SingleSignatureAuthenticationStatus;
194 use mithril_common::test::double::{Dummy, fake_data};
195 use mithril_common::test::entities_extensions::SingleSignatureTestExtension;
196
197 use crate::database::repository::BufferedSingleSignatureRepository;
198 use crate::database::test_helper::main_db_connection;
199 use crate::services::{
200 CertifierServiceError, MockBufferedSingleSignatureStore, MockCertifierService,
201 };
202 use crate::test::TestLogger;
203
204 use super::*;
205
206 fn mock_certifier(
207 certifier_mock_config: impl FnOnce(&mut MockCertifierService),
208 ) -> Arc<MockCertifierService> {
209 let mut certifier = MockCertifierService::new();
210 certifier_mock_config(&mut certifier);
211 Arc::new(certifier)
212 }
213
214 fn mock_store<F>(store_mock_config: F) -> Arc<MockBufferedSingleSignatureStore>
215 where
216 F: FnOnce(&mut MockBufferedSingleSignatureStore),
217 {
218 let mut store = MockBufferedSingleSignatureStore::new();
219 store_mock_config(&mut store);
220 Arc::new(store)
221 }
222
223 async fn run_register_signature_scenario(
227 decorated_certifier_mock_config: impl FnOnce(&mut MockCertifierService),
228 signature_to_register: &SingleSignature,
229 ) -> (StdResult<SignatureRegistrationStatus>, Vec<SingleSignature>) {
230 let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
231 main_db_connection().unwrap(),
232 )));
233 let certifier = BufferedCertifierService::new(
234 mock_certifier(decorated_certifier_mock_config),
235 store.clone(),
236 TestLogger::stdout(),
237 );
238
239 let registration_result = certifier
240 .register_single_signature(
241 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
242 signature_to_register,
243 )
244 .await;
245
246 let buffered_signatures =
247 store.get_buffered_signatures(MithrilStakeDistribution).await.unwrap();
248
249 (registration_result, buffered_signatures)
250 }
251
252 #[tokio::test]
253 async fn when_registering_single_signature_dont_buffer_signature_if_decorated_certifier_succeed()
254 {
255 let (registration_result, buffered_signatures_after_registration) =
256 run_register_signature_scenario(
257 |mock_certifier| {
258 mock_certifier
259 .expect_register_single_signature()
260 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
261 },
262 &SingleSignature::fake("party_1", "a message"),
263 )
264 .await;
265
266 let status = registration_result.expect("Registration should have succeed");
267 assert_eq!(status, SignatureRegistrationStatus::Registered);
268 assert_eq!(
269 buffered_signatures_after_registration,
270 Vec::<SingleSignature>::new()
271 );
272 }
273
274 mod when_registering_single_signature_if_decorated_certifier_as_no_opened_message {
275 use super::*;
276
277 #[tokio::test]
278 async fn buffer_signature_if_authenticated() {
279 let (registration_result, buffered_signatures_after_registration) =
280 run_register_signature_scenario(
281 |mock_certifier| {
282 mock_certifier.expect_register_single_signature().returning(|_, _| {
283 Err(CertifierServiceError::NotFound(
284 SignedEntityType::MithrilStakeDistribution(Epoch(5)),
285 )
286 .into())
287 });
288 },
289 &SingleSignature {
290 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
291 ..SingleSignature::fake("party_1", "a message")
292 },
293 )
294 .await;
295
296 let status = registration_result.expect("Registration should have succeed");
297 assert_eq!(status, SignatureRegistrationStatus::Buffered);
298 assert_eq!(
299 buffered_signatures_after_registration,
300 vec![SingleSignature::fake("party_1", "a message")]
301 );
302 }
303
304 #[tokio::test]
305 async fn dont_buffer_signature_if_not_authenticated() {
306 let (registration_result, buffered_signatures_after_registration) =
307 run_register_signature_scenario(
308 |mock_certifier| {
309 mock_certifier.expect_register_single_signature().returning(|_, _| {
310 Err(CertifierServiceError::NotFound(
311 SignedEntityType::MithrilStakeDistribution(Epoch(5)),
312 )
313 .into())
314 });
315 },
316 &SingleSignature {
317 authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated,
318 ..SingleSignature::fake("party_1", "a message")
319 },
320 )
321 .await;
322
323 registration_result.expect_err("Registration should have failed");
324 assert_eq!(
325 buffered_signatures_after_registration,
326 Vec::<SingleSignature>::new()
327 );
328 }
329 }
330
331 #[tokio::test]
332 async fn buffered_signatures_are_moved_to_newly_opened_message() {
333 let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
334 main_db_connection().unwrap(),
335 )));
336 for (signed_type, signature) in [
337 (
338 MithrilStakeDistribution,
339 SingleSignature::fake("party_1", "message 1"),
340 ),
341 (
342 MithrilStakeDistribution,
343 SingleSignature::fake("party_2", "message 2"),
344 ),
345 (
346 CardanoTransactions,
347 SingleSignature::fake("party_3", "message 3"),
348 ),
349 ] {
350 store.buffer_signature(signed_type, &signature).await.unwrap();
351 }
352
353 let certifier = BufferedCertifierService::new(
354 mock_certifier(|mock| {
355 mock.expect_create_open_message()
356 .returning(|_, _| Ok(OpenMessage::dummy()));
357
358 mock.expect_register_single_signature()
360 .with(
361 eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
362 eq(SingleSignature::fake("party_1", "message 1")),
363 )
364 .once()
365 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
366 mock.expect_register_single_signature()
367 .with(
368 eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
369 eq(SingleSignature::fake("party_2", "message 2")),
370 )
371 .once()
372 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
373 }),
374 store.clone(),
375 TestLogger::stdout(),
376 );
377
378 certifier
379 .create_open_message(
380 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
381 &ProtocolMessage::new(),
382 )
383 .await
384 .unwrap();
385
386 let remaining_sigs = store.get_buffered_signatures(MithrilStakeDistribution).await.unwrap();
387 assert!(remaining_sigs.is_empty());
388 }
389
390 mod when_failing_to_transfer_buffered_signature_to_new_open_message {
391 use mockall::predicate::always;
392
393 use super::*;
394
395 async fn run_scenario(
396 certifier_mock_config: impl FnOnce(&mut MockCertifierService),
397 store_mock_config: impl FnOnce(&mut MockBufferedSingleSignatureStore),
398 ) {
399 let store = mock_store(store_mock_config);
400 let certifier = BufferedCertifierService::new(
401 mock_certifier(certifier_mock_config),
402 store,
403 TestLogger::stdout(),
404 );
405
406 certifier
407 .create_open_message(
408 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
409 &ProtocolMessage::new(),
410 )
411 .await
412 .expect("Transferring buffered signatures to new open message should not fail");
413 }
414
415 #[tokio::test]
416 async fn skip_invalid_signatures() {
417 run_scenario(
418 |mock| {
419 mock.expect_create_open_message()
420 .returning(|_, _| Ok(OpenMessage::dummy()));
421
422 mock.expect_register_single_signature()
423 .with(always(), eq(fake_data::single_signature(vec![1])))
424 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
425 .once();
426 mock.expect_register_single_signature()
427 .with(always(), eq(fake_data::single_signature(vec![2])))
428 .returning(|_, _| {
429 Err(CertifierServiceError::InvalidSingleSignature(
430 OpenMessage::dummy().signed_entity_type,
431 anyhow!("Invalid signature"),
432 )
433 .into())
434 })
435 .once();
436 mock.expect_register_single_signature()
437 .with(always(), eq(fake_data::single_signature(vec![3])))
438 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
439 .once();
440 },
441 |mock| {
442 mock.expect_get_buffered_signatures().returning(|_| {
443 Ok(vec![
444 fake_data::single_signature(vec![1]),
445 fake_data::single_signature(vec![2]),
446 fake_data::single_signature(vec![3]),
447 ])
448 });
449 mock.expect_remove_buffered_signatures()
450 .withf(|_, sig_to_remove| sig_to_remove.len() == 2)
452 .returning(|_, _| Ok(()));
453 },
454 )
455 .await;
456 }
457
458 #[tokio::test]
459 async fn do_not_return_an_error_if_getting_buffer_signatures_fail() {
460 run_scenario(
461 |mock| {
462 mock.expect_create_open_message()
463 .returning(|_, _| Ok(OpenMessage::dummy()));
464 mock.expect_register_single_signature()
465 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
466 },
467 |mock| {
468 mock.expect_get_buffered_signatures()
469 .returning(|_| Err(anyhow!("get_buffered_signatures error")));
470 },
471 )
472 .await;
473 }
474
475 #[tokio::test]
476 async fn do_not_return_an_error_if_registering_signature_fail() {
477 run_scenario(
478 |mock| {
479 mock.expect_create_open_message()
480 .returning(|_, _| Ok(OpenMessage::dummy()));
481 mock.expect_register_single_signature()
482 .returning(|_, _| Err(anyhow!("register_single_signature error")));
483 },
484 |mock| {
485 mock.expect_get_buffered_signatures()
486 .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
487 },
488 )
489 .await;
490 }
491
492 #[tokio::test]
493 async fn do_not_return_an_error_if_removing_buffered_signatures_fail() {
494 run_scenario(
495 |mock| {
496 mock.expect_create_open_message()
497 .returning(|_, _| Ok(OpenMessage::dummy()));
498 mock.expect_register_single_signature()
499 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
500 },
501 |mock| {
502 mock.expect_get_buffered_signatures()
503 .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
504 mock.expect_remove_buffered_signatures()
505 .returning(|_, _| Err(anyhow!("remove_buffered_signatures error")));
506 },
507 )
508 .await;
509 }
510 }
511}