1use async_trait::async_trait;
2use slog::{debug, trace, warn, Logger};
3use std::sync::Arc;
4
5use mithril_common::entities::{
6 Certificate, Epoch, ProtocolMessage, SignedEntityType, SignedEntityTypeDiscriminants,
7 SingleSignature,
8};
9use mithril_common::logging::LoggerExtensions;
10use mithril_common::StdResult;
11
12use crate::entities::OpenMessage;
13use crate::services::{
14 BufferedSingleSignatureStore, CertifierService, CertifierServiceError,
15 SignatureRegistrationStatus,
16};
17
18pub struct BufferedCertifierService {
24 certifier_service: Arc<dyn CertifierService>,
25 buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
26 logger: Logger,
27}
28
29impl BufferedCertifierService {
30 pub fn new(
32 certifier_service: Arc<dyn CertifierService>,
33 buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
34 logger: Logger,
35 ) -> Self {
36 Self {
37 certifier_service,
38 buffered_single_signature_store,
39 logger: logger.new_with_component_name::<Self>(),
40 }
41 }
42
43 async fn try_register_buffered_signatures_to_current_open_message(
44 &self,
45 signed_entity_type: &SignedEntityType,
46 ) -> StdResult<()> {
47 let discriminant: SignedEntityTypeDiscriminants = signed_entity_type.into();
48 let buffered_signatures = self
49 .buffered_single_signature_store
50 .get_buffered_signatures(discriminant)
51 .await?;
52 let mut signatures_to_remove = vec![];
53
54 for signature in buffered_signatures {
55 match self
56 .certifier_service
57 .register_single_signature(signed_entity_type, &signature)
58 .await
59 {
60 Ok(..) => {
61 signatures_to_remove.push(signature);
62 }
63 Err(error) => match error.downcast_ref::<CertifierServiceError>() {
64 Some(CertifierServiceError::InvalidSingleSignature(..)) => {
65 trace!(self.logger, "Skipping invalid signature for signed entity '{signed_entity_type:?}'";
66 "party_id" => &signature.party_id,
67 "error" => ?error,
68 );
69 }
70 _ => {
71 anyhow::bail!(error);
72 }
73 },
74 }
75 }
76
77 self.buffered_single_signature_store
78 .remove_buffered_signatures(discriminant, signatures_to_remove)
79 .await?;
80
81 Ok(())
82 }
83}
84
85#[async_trait]
86impl CertifierService for BufferedCertifierService {
87 async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()> {
88 self.certifier_service.inform_epoch(epoch).await
89 }
90
91 async fn register_single_signature(
92 &self,
93 signed_entity_type: &SignedEntityType,
94 signature: &SingleSignature,
95 ) -> StdResult<SignatureRegistrationStatus> {
96 match self
97 .certifier_service
98 .register_single_signature(signed_entity_type, signature)
99 .await
100 {
101 Ok(res) => Ok(res),
102 Err(error) => match error.downcast_ref::<CertifierServiceError>() {
103 Some(CertifierServiceError::NotFound(..)) if signature.is_authenticated() => {
104 debug!(
105 self.logger, "No OpenMessage available for signed entity - Buffering single signature";
106 "signed_entity_type" => ?signed_entity_type,
107 "party_id" => &signature.party_id
108 );
109
110 self.buffered_single_signature_store
111 .buffer_signature(signed_entity_type.into(), signature)
112 .await?;
113
114 Ok(SignatureRegistrationStatus::Buffered)
115 }
116 _ => Err(error),
117 },
118 }
119 }
120
121 async fn create_open_message(
122 &self,
123 signed_entity_type: &SignedEntityType,
124 protocol_message: &ProtocolMessage,
125 ) -> StdResult<OpenMessage> {
126 let creation_result = self
130 .certifier_service
131 .create_open_message(signed_entity_type, protocol_message)
132 .await;
133
134 if creation_result.is_ok() {
135 if let Err(error) = self
136 .try_register_buffered_signatures_to_current_open_message(signed_entity_type)
137 .await
138 {
139 warn!(self.logger, "Failed to register buffered signatures to the new open message";
140 "signed_entity_type" => ?signed_entity_type,
141 "error" => ?error
142 );
143 }
144 }
145
146 creation_result
147 }
148
149 async fn get_open_message(
150 &self,
151 signed_entity_type: &SignedEntityType,
152 ) -> StdResult<Option<OpenMessage>> {
153 self.certifier_service
154 .get_open_message(signed_entity_type)
155 .await
156 }
157
158 async fn mark_open_message_if_expired(
159 &self,
160 signed_entity_type: &SignedEntityType,
161 ) -> StdResult<Option<OpenMessage>> {
162 self.certifier_service
163 .mark_open_message_if_expired(signed_entity_type)
164 .await
165 }
166
167 async fn create_certificate(
168 &self,
169 signed_entity_type: &SignedEntityType,
170 ) -> StdResult<Option<Certificate>> {
171 self.certifier_service
172 .create_certificate(signed_entity_type)
173 .await
174 }
175
176 async fn get_certificate_by_hash(&self, hash: &str) -> StdResult<Option<Certificate>> {
177 self.certifier_service.get_certificate_by_hash(hash).await
178 }
179
180 async fn get_latest_certificates(&self, last_n: usize) -> StdResult<Vec<Certificate>> {
181 self.certifier_service.get_latest_certificates(last_n).await
182 }
183
184 async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()> {
185 self.certifier_service.verify_certificate_chain(epoch).await
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use anyhow::anyhow;
192 use mockall::predicate::eq;
193
194 use mithril_common::entities::SignedEntityTypeDiscriminants::{
195 CardanoTransactions, MithrilStakeDistribution,
196 };
197 use mithril_common::entities::SingleSignatureAuthenticationStatus;
198 use mithril_common::test_utils::fake_data;
199
200 use crate::database::repository::BufferedSingleSignatureRepository;
201 use crate::database::test_helper::main_db_connection;
202 use crate::services::{
203 CertifierServiceError, MockBufferedSingleSignatureStore, MockCertifierService,
204 };
205 use crate::test_tools::TestLogger;
206
207 use super::*;
208
209 fn mock_certifier(
210 certifier_mock_config: impl FnOnce(&mut MockCertifierService),
211 ) -> Arc<MockCertifierService> {
212 let mut certifier = MockCertifierService::new();
213 certifier_mock_config(&mut certifier);
214 Arc::new(certifier)
215 }
216
217 fn mock_store<F>(store_mock_config: F) -> Arc<MockBufferedSingleSignatureStore>
218 where
219 F: FnOnce(&mut MockBufferedSingleSignatureStore),
220 {
221 let mut store = MockBufferedSingleSignatureStore::new();
222 store_mock_config(&mut store);
223 Arc::new(store)
224 }
225
226 async fn run_register_signature_scenario(
230 decorated_certifier_mock_config: impl FnOnce(&mut MockCertifierService),
231 signature_to_register: &SingleSignature,
232 ) -> (StdResult<SignatureRegistrationStatus>, Vec<SingleSignature>) {
233 let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
234 main_db_connection().unwrap(),
235 )));
236 let certifier = BufferedCertifierService::new(
237 mock_certifier(decorated_certifier_mock_config),
238 store.clone(),
239 TestLogger::stdout(),
240 );
241
242 let registration_result = certifier
243 .register_single_signature(
244 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
245 signature_to_register,
246 )
247 .await;
248
249 let buffered_signatures = store
250 .get_buffered_signatures(MithrilStakeDistribution)
251 .await
252 .unwrap();
253
254 (registration_result, buffered_signatures)
255 }
256
257 #[tokio::test]
258 async fn when_registering_single_signature_dont_buffer_signature_if_decorated_certifier_succeed(
259 ) {
260 let (registration_result, buffered_signatures_after_registration) =
261 run_register_signature_scenario(
262 |mock_certifier| {
263 mock_certifier
264 .expect_register_single_signature()
265 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
266 },
267 &SingleSignature::fake("party_1", "a message"),
268 )
269 .await;
270
271 let status = registration_result.expect("Registration should have succeed");
272 assert_eq!(status, SignatureRegistrationStatus::Registered);
273 assert_eq!(
274 buffered_signatures_after_registration,
275 Vec::<SingleSignature>::new()
276 );
277 }
278
279 mod when_registering_single_signature_if_decorated_certifier_as_no_opened_message {
280 use super::*;
281
282 #[tokio::test]
283 async fn buffer_signature_if_authenticated() {
284 let (registration_result, buffered_signatures_after_registration) =
285 run_register_signature_scenario(
286 |mock_certifier| {
287 mock_certifier
288 .expect_register_single_signature()
289 .returning(|_, _| {
290 Err(CertifierServiceError::NotFound(
291 SignedEntityType::MithrilStakeDistribution(Epoch(5)),
292 )
293 .into())
294 });
295 },
296 &SingleSignature {
297 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
298 ..SingleSignature::fake("party_1", "a message")
299 },
300 )
301 .await;
302
303 let status = registration_result.expect("Registration should have succeed");
304 assert_eq!(status, SignatureRegistrationStatus::Buffered);
305 assert_eq!(
306 buffered_signatures_after_registration,
307 vec![SingleSignature::fake("party_1", "a message")]
308 );
309 }
310
311 #[tokio::test]
312 async fn dont_buffer_signature_if_not_authenticated() {
313 let (registration_result, buffered_signatures_after_registration) =
314 run_register_signature_scenario(
315 |mock_certifier| {
316 mock_certifier
317 .expect_register_single_signature()
318 .returning(|_, _| {
319 Err(CertifierServiceError::NotFound(
320 SignedEntityType::MithrilStakeDistribution(Epoch(5)),
321 )
322 .into())
323 });
324 },
325 &SingleSignature {
326 authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated,
327 ..SingleSignature::fake("party_1", "a message")
328 },
329 )
330 .await;
331
332 registration_result.expect_err("Registration should have failed");
333 assert_eq!(
334 buffered_signatures_after_registration,
335 Vec::<SingleSignature>::new()
336 );
337 }
338 }
339
340 #[tokio::test]
341 async fn buffered_signatures_are_moved_to_newly_opened_message() {
342 let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
343 main_db_connection().unwrap(),
344 )));
345 for (signed_type, signature) in [
346 (
347 MithrilStakeDistribution,
348 SingleSignature::fake("party_1", "message 1"),
349 ),
350 (
351 MithrilStakeDistribution,
352 SingleSignature::fake("party_2", "message 2"),
353 ),
354 (
355 CardanoTransactions,
356 SingleSignature::fake("party_3", "message 3"),
357 ),
358 ] {
359 store
360 .buffer_signature(signed_type, &signature)
361 .await
362 .unwrap();
363 }
364
365 let certifier = BufferedCertifierService::new(
366 mock_certifier(|mock| {
367 mock.expect_create_open_message()
368 .returning(|_, _| Ok(OpenMessage::dummy()));
369
370 mock.expect_register_single_signature()
372 .with(
373 eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
374 eq(SingleSignature::fake("party_1", "message 1")),
375 )
376 .once()
377 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
378 mock.expect_register_single_signature()
379 .with(
380 eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
381 eq(SingleSignature::fake("party_2", "message 2")),
382 )
383 .once()
384 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
385 }),
386 store.clone(),
387 TestLogger::stdout(),
388 );
389
390 certifier
391 .create_open_message(
392 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
393 &ProtocolMessage::new(),
394 )
395 .await
396 .unwrap();
397
398 let remaining_sigs = store
399 .get_buffered_signatures(MithrilStakeDistribution)
400 .await
401 .unwrap();
402 assert!(remaining_sigs.is_empty());
403 }
404
405 mod when_failing_to_transfer_buffered_signature_to_new_open_message {
406 use mockall::predicate::always;
407
408 use super::*;
409
410 async fn run_scenario(
411 certifier_mock_config: impl FnOnce(&mut MockCertifierService),
412 store_mock_config: impl FnOnce(&mut MockBufferedSingleSignatureStore),
413 ) {
414 let store = mock_store(store_mock_config);
415 let certifier = BufferedCertifierService::new(
416 mock_certifier(certifier_mock_config),
417 store,
418 TestLogger::stdout(),
419 );
420
421 certifier
422 .create_open_message(
423 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
424 &ProtocolMessage::new(),
425 )
426 .await
427 .expect("Transferring buffered signatures to new open message should not fail");
428 }
429
430 #[tokio::test]
431 async fn skip_invalid_signatures() {
432 run_scenario(
433 |mock| {
434 mock.expect_create_open_message()
435 .returning(|_, _| Ok(OpenMessage::dummy()));
436
437 mock.expect_register_single_signature()
438 .with(always(), eq(fake_data::single_signature(vec![1])))
439 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
440 .once();
441 mock.expect_register_single_signature()
442 .with(always(), eq(fake_data::single_signature(vec![2])))
443 .returning(|_, _| {
444 Err(CertifierServiceError::InvalidSingleSignature(
445 OpenMessage::dummy().signed_entity_type,
446 anyhow!("Invalid signature"),
447 )
448 .into())
449 })
450 .once();
451 mock.expect_register_single_signature()
452 .with(always(), eq(fake_data::single_signature(vec![3])))
453 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
454 .once();
455 },
456 |mock| {
457 mock.expect_get_buffered_signatures().returning(|_| {
458 Ok(vec![
459 fake_data::single_signature(vec![1]),
460 fake_data::single_signature(vec![2]),
461 fake_data::single_signature(vec![3]),
462 ])
463 });
464 mock.expect_remove_buffered_signatures()
465 .withf(|_, sig_to_remove| sig_to_remove.len() == 2)
467 .returning(|_, _| Ok(()));
468 },
469 )
470 .await;
471 }
472
473 #[tokio::test]
474 async fn do_not_return_an_error_if_getting_buffer_signatures_fail() {
475 run_scenario(
476 |mock| {
477 mock.expect_create_open_message()
478 .returning(|_, _| Ok(OpenMessage::dummy()));
479 mock.expect_register_single_signature()
480 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
481 },
482 |mock| {
483 mock.expect_get_buffered_signatures()
484 .returning(|_| Err(anyhow!("get_buffered_signatures error")));
485 },
486 )
487 .await;
488 }
489
490 #[tokio::test]
491 async fn do_not_return_an_error_if_registering_signature_fail() {
492 run_scenario(
493 |mock| {
494 mock.expect_create_open_message()
495 .returning(|_, _| Ok(OpenMessage::dummy()));
496 mock.expect_register_single_signature()
497 .returning(|_, _| Err(anyhow!("register_single_signature error")));
498 },
499 |mock| {
500 mock.expect_get_buffered_signatures()
501 .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
502 },
503 )
504 .await;
505 }
506
507 #[tokio::test]
508 async fn do_not_return_an_error_if_removing_buffered_signatures_fail() {
509 run_scenario(
510 |mock| {
511 mock.expect_create_open_message()
512 .returning(|_, _| Ok(OpenMessage::dummy()));
513 mock.expect_register_single_signature()
514 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
515 },
516 |mock| {
517 mock.expect_get_buffered_signatures()
518 .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
519 mock.expect_remove_buffered_signatures()
520 .returning(|_, _| Err(anyhow!("remove_buffered_signatures error")));
521 },
522 )
523 .await;
524 }
525 }
526}