1use async_trait::async_trait;
2use slog::{debug, trace, warn, Logger};
3use std::sync::Arc;
4
5use mithril_common::entities::{
6 Certificate, Epoch, ProtocolMessage, SignedEntityType, SignedEntityTypeDiscriminants,
7 SingleSignatures,
8};
9use mithril_common::logging::LoggerExtensions;
10use mithril_common::StdResult;
11
12use crate::entities::OpenMessage;
13use crate::services::{
14 BufferedSingleSignatureStore, CertifierService, CertifierServiceError,
15 SignatureRegistrationStatus,
16};
17
18pub struct BufferedCertifierService {
24 certifier_service: Arc<dyn CertifierService>,
25 buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
26 logger: Logger,
27}
28
29impl BufferedCertifierService {
30 pub fn new(
32 certifier_service: Arc<dyn CertifierService>,
33 buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
34 logger: Logger,
35 ) -> Self {
36 Self {
37 certifier_service,
38 buffered_single_signature_store,
39 logger: logger.new_with_component_name::<Self>(),
40 }
41 }
42
43 async fn try_register_buffered_signatures_to_current_open_message(
44 &self,
45 signed_entity_type: &SignedEntityType,
46 ) -> StdResult<()> {
47 let discriminant: SignedEntityTypeDiscriminants = signed_entity_type.into();
48 let buffered_signatures = self
49 .buffered_single_signature_store
50 .get_buffered_signatures(discriminant)
51 .await?;
52 let mut signatures_to_remove = vec![];
53
54 for signature in buffered_signatures {
55 match self
56 .certifier_service
57 .register_single_signature(signed_entity_type, &signature)
58 .await
59 {
60 Ok(..) => {
61 signatures_to_remove.push(signature);
62 }
63 Err(error) => match error.downcast_ref::<CertifierServiceError>() {
64 Some(CertifierServiceError::InvalidSingleSignature(..)) => {
65 trace!(self.logger, "Skipping invalid signature for signed entity '{signed_entity_type:?}'";
66 "party_id" => &signature.party_id,
67 "error" => ?error,
68 );
69 }
70 _ => {
71 anyhow::bail!(error);
72 }
73 },
74 }
75 }
76
77 self.buffered_single_signature_store
78 .remove_buffered_signatures(discriminant, signatures_to_remove)
79 .await?;
80
81 Ok(())
82 }
83}
84
85#[async_trait]
86impl CertifierService for BufferedCertifierService {
87 async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()> {
88 self.certifier_service.inform_epoch(epoch).await
89 }
90
91 async fn register_single_signature(
92 &self,
93 signed_entity_type: &SignedEntityType,
94 signature: &SingleSignatures,
95 ) -> StdResult<SignatureRegistrationStatus> {
96 match self
97 .certifier_service
98 .register_single_signature(signed_entity_type, signature)
99 .await
100 {
101 Ok(res) => Ok(res),
102 Err(error) => match error.downcast_ref::<CertifierServiceError>() {
103 Some(CertifierServiceError::NotFound(..)) if signature.is_authenticated() => {
104 debug!(
105 self.logger, "No OpenMessage available for signed entity - Buffering single signature";
106 "signed_entity_type" => ?signed_entity_type,
107 "party_id" => &signature.party_id
108 );
109
110 self.buffered_single_signature_store
111 .buffer_signature(signed_entity_type.into(), signature)
112 .await?;
113
114 Ok(SignatureRegistrationStatus::Buffered)
115 }
116 _ => Err(error),
117 },
118 }
119 }
120
121 async fn create_open_message(
122 &self,
123 signed_entity_type: &SignedEntityType,
124 protocol_message: &ProtocolMessage,
125 ) -> StdResult<OpenMessage> {
126 let creation_result = self
130 .certifier_service
131 .create_open_message(signed_entity_type, protocol_message)
132 .await;
133
134 if creation_result.is_ok() {
135 if let Err(error) = self
136 .try_register_buffered_signatures_to_current_open_message(signed_entity_type)
137 .await
138 {
139 warn!(self.logger, "Failed to register buffered signatures to the new open message";
140 "signed_entity_type" => ?signed_entity_type,
141 "error" => ?error
142 );
143 }
144 }
145
146 creation_result
147 }
148
149 async fn get_open_message(
150 &self,
151 signed_entity_type: &SignedEntityType,
152 ) -> StdResult<Option<OpenMessage>> {
153 self.certifier_service
154 .get_open_message(signed_entity_type)
155 .await
156 }
157
158 async fn mark_open_message_if_expired(
159 &self,
160 signed_entity_type: &SignedEntityType,
161 ) -> StdResult<Option<OpenMessage>> {
162 self.certifier_service
163 .mark_open_message_if_expired(signed_entity_type)
164 .await
165 }
166
167 async fn create_certificate(
168 &self,
169 signed_entity_type: &SignedEntityType,
170 ) -> StdResult<Option<Certificate>> {
171 self.certifier_service
172 .create_certificate(signed_entity_type)
173 .await
174 }
175
176 async fn get_certificate_by_hash(&self, hash: &str) -> StdResult<Option<Certificate>> {
177 self.certifier_service.get_certificate_by_hash(hash).await
178 }
179
180 async fn get_latest_certificates(&self, last_n: usize) -> StdResult<Vec<Certificate>> {
181 self.certifier_service.get_latest_certificates(last_n).await
182 }
183
184 async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()> {
185 self.certifier_service.verify_certificate_chain(epoch).await
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use anyhow::anyhow;
192 use mockall::predicate::eq;
193
194 use mithril_common::entities::SignedEntityTypeDiscriminants::{
195 CardanoTransactions, MithrilStakeDistribution,
196 };
197 use mithril_common::entities::SingleSignatureAuthenticationStatus;
198 use mithril_common::test_utils::fake_data;
199
200 use crate::database::repository::BufferedSingleSignatureRepository;
201 use crate::database::test_helper::main_db_connection;
202 use crate::services::{
203 CertifierServiceError, MockBufferedSingleSignatureStore, MockCertifierService,
204 };
205 use crate::test_tools::TestLogger;
206
207 use super::*;
208
209 fn mock_certifier(
210 certifier_mock_config: impl FnOnce(&mut MockCertifierService),
211 ) -> Arc<MockCertifierService> {
212 let mut certifier = MockCertifierService::new();
213 certifier_mock_config(&mut certifier);
214 Arc::new(certifier)
215 }
216
217 fn mock_store<F>(store_mock_config: F) -> Arc<MockBufferedSingleSignatureStore>
218 where
219 F: FnOnce(&mut MockBufferedSingleSignatureStore),
220 {
221 let mut store = MockBufferedSingleSignatureStore::new();
222 store_mock_config(&mut store);
223 Arc::new(store)
224 }
225
226 async fn run_register_signature_scenario(
230 decorated_certifier_mock_config: impl FnOnce(&mut MockCertifierService),
231 signature_to_register: &SingleSignatures,
232 ) -> (
233 StdResult<SignatureRegistrationStatus>,
234 Vec<SingleSignatures>,
235 ) {
236 let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
237 main_db_connection().unwrap(),
238 )));
239 let certifier = BufferedCertifierService::new(
240 mock_certifier(decorated_certifier_mock_config),
241 store.clone(),
242 TestLogger::stdout(),
243 );
244
245 let registration_result = certifier
246 .register_single_signature(
247 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
248 signature_to_register,
249 )
250 .await;
251
252 let buffered_signatures = store
253 .get_buffered_signatures(MithrilStakeDistribution)
254 .await
255 .unwrap();
256
257 (registration_result, buffered_signatures)
258 }
259
260 #[tokio::test]
261 async fn when_registering_single_signature_dont_buffer_signature_if_decorated_certifier_succeed(
262 ) {
263 let (registration_result, buffered_signatures_after_registration) =
264 run_register_signature_scenario(
265 |mock_certifier| {
266 mock_certifier
267 .expect_register_single_signature()
268 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
269 },
270 &SingleSignatures::fake("party_1", "a message"),
271 )
272 .await;
273
274 let status = registration_result.expect("Registration should have succeed");
275 assert_eq!(status, SignatureRegistrationStatus::Registered);
276 assert_eq!(
277 buffered_signatures_after_registration,
278 Vec::<SingleSignatures>::new()
279 );
280 }
281
282 mod when_registering_single_signature_if_decorated_certifier_as_no_opened_message {
283 use super::*;
284
285 #[tokio::test]
286 async fn buffer_signature_if_authenticated() {
287 let (registration_result, buffered_signatures_after_registration) =
288 run_register_signature_scenario(
289 |mock_certifier| {
290 mock_certifier
291 .expect_register_single_signature()
292 .returning(|_, _| {
293 Err(CertifierServiceError::NotFound(
294 SignedEntityType::MithrilStakeDistribution(Epoch(5)),
295 )
296 .into())
297 });
298 },
299 &SingleSignatures {
300 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
301 ..SingleSignatures::fake("party_1", "a message")
302 },
303 )
304 .await;
305
306 let status = registration_result.expect("Registration should have succeed");
307 assert_eq!(status, SignatureRegistrationStatus::Buffered);
308 assert_eq!(
309 buffered_signatures_after_registration,
310 vec![SingleSignatures::fake("party_1", "a message")]
311 );
312 }
313
314 #[tokio::test]
315 async fn dont_buffer_signature_if_not_authenticated() {
316 let (registration_result, buffered_signatures_after_registration) =
317 run_register_signature_scenario(
318 |mock_certifier| {
319 mock_certifier
320 .expect_register_single_signature()
321 .returning(|_, _| {
322 Err(CertifierServiceError::NotFound(
323 SignedEntityType::MithrilStakeDistribution(Epoch(5)),
324 )
325 .into())
326 });
327 },
328 &SingleSignatures {
329 authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated,
330 ..SingleSignatures::fake("party_1", "a message")
331 },
332 )
333 .await;
334
335 registration_result.expect_err("Registration should have failed");
336 assert_eq!(
337 buffered_signatures_after_registration,
338 Vec::<SingleSignatures>::new()
339 );
340 }
341 }
342
343 #[tokio::test]
344 async fn buffered_signatures_are_moved_to_newly_opened_message() {
345 let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
346 main_db_connection().unwrap(),
347 )));
348 for (signed_type, signature) in [
349 (
350 MithrilStakeDistribution,
351 SingleSignatures::fake("party_1", "message 1"),
352 ),
353 (
354 MithrilStakeDistribution,
355 SingleSignatures::fake("party_2", "message 2"),
356 ),
357 (
358 CardanoTransactions,
359 SingleSignatures::fake("party_3", "message 3"),
360 ),
361 ] {
362 store
363 .buffer_signature(signed_type, &signature)
364 .await
365 .unwrap();
366 }
367
368 let certifier = BufferedCertifierService::new(
369 mock_certifier(|mock| {
370 mock.expect_create_open_message()
371 .returning(|_, _| Ok(OpenMessage::dummy()));
372
373 mock.expect_register_single_signature()
375 .with(
376 eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
377 eq(SingleSignatures::fake("party_1", "message 1")),
378 )
379 .once()
380 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
381 mock.expect_register_single_signature()
382 .with(
383 eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
384 eq(SingleSignatures::fake("party_2", "message 2")),
385 )
386 .once()
387 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
388 }),
389 store.clone(),
390 TestLogger::stdout(),
391 );
392
393 certifier
394 .create_open_message(
395 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
396 &ProtocolMessage::new(),
397 )
398 .await
399 .unwrap();
400
401 let remaining_sigs = store
402 .get_buffered_signatures(MithrilStakeDistribution)
403 .await
404 .unwrap();
405 assert!(remaining_sigs.is_empty());
406 }
407
408 mod when_failing_to_transfer_buffered_signature_to_new_open_message {
409 use mockall::predicate::always;
410
411 use super::*;
412
413 async fn run_scenario(
414 certifier_mock_config: impl FnOnce(&mut MockCertifierService),
415 store_mock_config: impl FnOnce(&mut MockBufferedSingleSignatureStore),
416 ) {
417 let store = mock_store(store_mock_config);
418 let certifier = BufferedCertifierService::new(
419 mock_certifier(certifier_mock_config),
420 store,
421 TestLogger::stdout(),
422 );
423
424 certifier
425 .create_open_message(
426 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
427 &ProtocolMessage::new(),
428 )
429 .await
430 .expect("Transferring buffered signatures to new open message should not fail");
431 }
432
433 #[tokio::test]
434 async fn skip_invalid_signatures() {
435 run_scenario(
436 |mock| {
437 mock.expect_create_open_message()
438 .returning(|_, _| Ok(OpenMessage::dummy()));
439
440 mock.expect_register_single_signature()
441 .with(always(), eq(fake_data::single_signatures(vec![1])))
442 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
443 .once();
444 mock.expect_register_single_signature()
445 .with(always(), eq(fake_data::single_signatures(vec![2])))
446 .returning(|_, _| {
447 Err(CertifierServiceError::InvalidSingleSignature(
448 OpenMessage::dummy().signed_entity_type,
449 anyhow!("Invalid signature"),
450 )
451 .into())
452 })
453 .once();
454 mock.expect_register_single_signature()
455 .with(always(), eq(fake_data::single_signatures(vec![3])))
456 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
457 .once();
458 },
459 |mock| {
460 mock.expect_get_buffered_signatures().returning(|_| {
461 Ok(vec![
462 fake_data::single_signatures(vec![1]),
463 fake_data::single_signatures(vec![2]),
464 fake_data::single_signatures(vec![3]),
465 ])
466 });
467 mock.expect_remove_buffered_signatures()
468 .withf(|_, sig_to_remove| sig_to_remove.len() == 2)
470 .returning(|_, _| Ok(()));
471 },
472 )
473 .await;
474 }
475
476 #[tokio::test]
477 async fn do_not_return_an_error_if_getting_buffer_signatures_fail() {
478 run_scenario(
479 |mock| {
480 mock.expect_create_open_message()
481 .returning(|_, _| Ok(OpenMessage::dummy()));
482 mock.expect_register_single_signature()
483 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
484 },
485 |mock| {
486 mock.expect_get_buffered_signatures()
487 .returning(|_| Err(anyhow!("get_buffered_signatures error")));
488 },
489 )
490 .await;
491 }
492
493 #[tokio::test]
494 async fn do_not_return_an_error_if_registering_signature_fail() {
495 run_scenario(
496 |mock| {
497 mock.expect_create_open_message()
498 .returning(|_, _| Ok(OpenMessage::dummy()));
499 mock.expect_register_single_signature()
500 .returning(|_, _| Err(anyhow!("register_single_signature error")));
501 },
502 |mock| {
503 mock.expect_get_buffered_signatures()
504 .returning(|_| Ok(vec![fake_data::single_signatures(vec![1])]));
505 },
506 )
507 .await;
508 }
509
510 #[tokio::test]
511 async fn do_not_return_an_error_if_removing_buffered_signatures_fail() {
512 run_scenario(
513 |mock| {
514 mock.expect_create_open_message()
515 .returning(|_, _| Ok(OpenMessage::dummy()));
516 mock.expect_register_single_signature()
517 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
518 },
519 |mock| {
520 mock.expect_get_buffered_signatures()
521 .returning(|_| Ok(vec![fake_data::single_signatures(vec![1])]));
522 mock.expect_remove_buffered_signatures()
523 .returning(|_, _| Err(anyhow!("remove_buffered_signatures error")));
524 },
525 )
526 .await;
527 }
528 }
529}