1use async_trait::async_trait;
2use slog::{Logger, debug, trace, warn};
3use std::sync::Arc;
4
5use mithril_common::StdResult;
6use mithril_common::entities::{
7 Certificate, Epoch, ProtocolMessage, SignedEntityType, SignedEntityTypeDiscriminants,
8 SingleSignature,
9};
10use mithril_common::logging::LoggerExtensions;
11
12use crate::entities::OpenMessage;
13use crate::services::{
14 BufferedSingleSignatureStore, CertifierService, CertifierServiceError,
15 SignatureRegistrationStatus,
16};
17
18pub struct BufferedCertifierService {
24 certifier_service: Arc<dyn CertifierService>,
25 buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
26 logger: Logger,
27}
28
29impl BufferedCertifierService {
30 pub fn new(
32 certifier_service: Arc<dyn CertifierService>,
33 buffered_single_signature_store: Arc<dyn BufferedSingleSignatureStore>,
34 logger: Logger,
35 ) -> Self {
36 Self {
37 certifier_service,
38 buffered_single_signature_store,
39 logger: logger.new_with_component_name::<Self>(),
40 }
41 }
42
43 async fn try_register_buffered_signatures_to_current_open_message(
44 &self,
45 signed_entity_type: &SignedEntityType,
46 ) -> StdResult<()> {
47 let discriminant: SignedEntityTypeDiscriminants = signed_entity_type.into();
48 let buffered_signatures = self
49 .buffered_single_signature_store
50 .get_buffered_signatures(discriminant)
51 .await?;
52 let mut signatures_to_remove = vec![];
53
54 for signature in buffered_signatures {
55 match self
56 .certifier_service
57 .register_single_signature(signed_entity_type, &signature)
58 .await
59 {
60 Ok(..) => {
61 signatures_to_remove.push(signature);
62 }
63 Err(error) => match error.downcast_ref::<CertifierServiceError>() {
64 Some(CertifierServiceError::InvalidSingleSignature(..)) => {
65 trace!(self.logger, "Skipping invalid signature for signed entity '{signed_entity_type:?}'";
66 "party_id" => &signature.party_id,
67 "error" => ?error,
68 );
69 }
70 _ => {
71 anyhow::bail!(error);
72 }
73 },
74 }
75 }
76
77 self.buffered_single_signature_store
78 .remove_buffered_signatures(discriminant, signatures_to_remove)
79 .await?;
80
81 Ok(())
82 }
83}
84
85#[async_trait]
86impl CertifierService for BufferedCertifierService {
87 async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()> {
88 self.certifier_service.inform_epoch(epoch).await
89 }
90
91 async fn register_single_signature(
92 &self,
93 signed_entity_type: &SignedEntityType,
94 signature: &SingleSignature,
95 ) -> StdResult<SignatureRegistrationStatus> {
96 match self
97 .certifier_service
98 .register_single_signature(signed_entity_type, signature)
99 .await
100 {
101 Ok(res) => Ok(res),
102 Err(error) => match error.downcast_ref::<CertifierServiceError>() {
103 Some(CertifierServiceError::NotFound(..)) if signature.is_authenticated() => {
104 debug!(
105 self.logger, "No OpenMessage available for signed entity - Buffering single signature";
106 "signed_entity_type" => ?signed_entity_type,
107 "party_id" => &signature.party_id
108 );
109
110 self.buffered_single_signature_store
111 .buffer_signature(signed_entity_type.into(), signature)
112 .await?;
113
114 Ok(SignatureRegistrationStatus::Buffered)
115 }
116 _ => Err(error),
117 },
118 }
119 }
120
121 async fn create_open_message(
122 &self,
123 signed_entity_type: &SignedEntityType,
124 protocol_message: &ProtocolMessage,
125 ) -> StdResult<OpenMessage> {
126 let creation_result = self
130 .certifier_service
131 .create_open_message(signed_entity_type, protocol_message)
132 .await;
133
134 if creation_result.is_ok() {
135 if let Err(error) = self
136 .try_register_buffered_signatures_to_current_open_message(signed_entity_type)
137 .await
138 {
139 warn!(self.logger, "Failed to register buffered signatures to the new open message";
140 "signed_entity_type" => ?signed_entity_type,
141 "error" => ?error
142 );
143 }
144 }
145
146 creation_result
147 }
148
149 async fn get_open_message(
150 &self,
151 signed_entity_type: &SignedEntityType,
152 ) -> StdResult<Option<OpenMessage>> {
153 self.certifier_service.get_open_message(signed_entity_type).await
154 }
155
156 async fn mark_open_message_if_expired(
157 &self,
158 signed_entity_type: &SignedEntityType,
159 ) -> StdResult<Option<OpenMessage>> {
160 self.certifier_service
161 .mark_open_message_if_expired(signed_entity_type)
162 .await
163 }
164
165 async fn create_certificate(
166 &self,
167 signed_entity_type: &SignedEntityType,
168 ) -> StdResult<Option<Certificate>> {
169 self.certifier_service.create_certificate(signed_entity_type).await
170 }
171
172 async fn get_certificate_by_hash(&self, hash: &str) -> StdResult<Option<Certificate>> {
173 self.certifier_service.get_certificate_by_hash(hash).await
174 }
175
176 async fn get_latest_certificates(&self, last_n: usize) -> StdResult<Vec<Certificate>> {
177 self.certifier_service.get_latest_certificates(last_n).await
178 }
179
180 async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()> {
181 self.certifier_service.verify_certificate_chain(epoch).await
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use anyhow::anyhow;
188 use mockall::predicate::eq;
189
190 use mithril_common::entities::SignedEntityTypeDiscriminants::{
191 CardanoTransactions, MithrilStakeDistribution,
192 };
193 use mithril_common::entities::SingleSignatureAuthenticationStatus;
194 use mithril_common::test_utils::fake_data;
195
196 use crate::database::repository::BufferedSingleSignatureRepository;
197 use crate::database::test_helper::main_db_connection;
198 use crate::services::{
199 CertifierServiceError, MockBufferedSingleSignatureStore, MockCertifierService,
200 };
201 use crate::test_tools::TestLogger;
202
203 use super::*;
204
205 fn mock_certifier(
206 certifier_mock_config: impl FnOnce(&mut MockCertifierService),
207 ) -> Arc<MockCertifierService> {
208 let mut certifier = MockCertifierService::new();
209 certifier_mock_config(&mut certifier);
210 Arc::new(certifier)
211 }
212
213 fn mock_store<F>(store_mock_config: F) -> Arc<MockBufferedSingleSignatureStore>
214 where
215 F: FnOnce(&mut MockBufferedSingleSignatureStore),
216 {
217 let mut store = MockBufferedSingleSignatureStore::new();
218 store_mock_config(&mut store);
219 Arc::new(store)
220 }
221
222 async fn run_register_signature_scenario(
226 decorated_certifier_mock_config: impl FnOnce(&mut MockCertifierService),
227 signature_to_register: &SingleSignature,
228 ) -> (StdResult<SignatureRegistrationStatus>, Vec<SingleSignature>) {
229 let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
230 main_db_connection().unwrap(),
231 )));
232 let certifier = BufferedCertifierService::new(
233 mock_certifier(decorated_certifier_mock_config),
234 store.clone(),
235 TestLogger::stdout(),
236 );
237
238 let registration_result = certifier
239 .register_single_signature(
240 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
241 signature_to_register,
242 )
243 .await;
244
245 let buffered_signatures =
246 store.get_buffered_signatures(MithrilStakeDistribution).await.unwrap();
247
248 (registration_result, buffered_signatures)
249 }
250
251 #[tokio::test]
252 async fn when_registering_single_signature_dont_buffer_signature_if_decorated_certifier_succeed()
253 {
254 let (registration_result, buffered_signatures_after_registration) =
255 run_register_signature_scenario(
256 |mock_certifier| {
257 mock_certifier
258 .expect_register_single_signature()
259 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
260 },
261 &SingleSignature::fake("party_1", "a message"),
262 )
263 .await;
264
265 let status = registration_result.expect("Registration should have succeed");
266 assert_eq!(status, SignatureRegistrationStatus::Registered);
267 assert_eq!(
268 buffered_signatures_after_registration,
269 Vec::<SingleSignature>::new()
270 );
271 }
272
273 mod when_registering_single_signature_if_decorated_certifier_as_no_opened_message {
274 use super::*;
275
276 #[tokio::test]
277 async fn buffer_signature_if_authenticated() {
278 let (registration_result, buffered_signatures_after_registration) =
279 run_register_signature_scenario(
280 |mock_certifier| {
281 mock_certifier.expect_register_single_signature().returning(|_, _| {
282 Err(CertifierServiceError::NotFound(
283 SignedEntityType::MithrilStakeDistribution(Epoch(5)),
284 )
285 .into())
286 });
287 },
288 &SingleSignature {
289 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
290 ..SingleSignature::fake("party_1", "a message")
291 },
292 )
293 .await;
294
295 let status = registration_result.expect("Registration should have succeed");
296 assert_eq!(status, SignatureRegistrationStatus::Buffered);
297 assert_eq!(
298 buffered_signatures_after_registration,
299 vec![SingleSignature::fake("party_1", "a message")]
300 );
301 }
302
303 #[tokio::test]
304 async fn dont_buffer_signature_if_not_authenticated() {
305 let (registration_result, buffered_signatures_after_registration) =
306 run_register_signature_scenario(
307 |mock_certifier| {
308 mock_certifier.expect_register_single_signature().returning(|_, _| {
309 Err(CertifierServiceError::NotFound(
310 SignedEntityType::MithrilStakeDistribution(Epoch(5)),
311 )
312 .into())
313 });
314 },
315 &SingleSignature {
316 authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated,
317 ..SingleSignature::fake("party_1", "a message")
318 },
319 )
320 .await;
321
322 registration_result.expect_err("Registration should have failed");
323 assert_eq!(
324 buffered_signatures_after_registration,
325 Vec::<SingleSignature>::new()
326 );
327 }
328 }
329
330 #[tokio::test]
331 async fn buffered_signatures_are_moved_to_newly_opened_message() {
332 let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new(
333 main_db_connection().unwrap(),
334 )));
335 for (signed_type, signature) in [
336 (
337 MithrilStakeDistribution,
338 SingleSignature::fake("party_1", "message 1"),
339 ),
340 (
341 MithrilStakeDistribution,
342 SingleSignature::fake("party_2", "message 2"),
343 ),
344 (
345 CardanoTransactions,
346 SingleSignature::fake("party_3", "message 3"),
347 ),
348 ] {
349 store.buffer_signature(signed_type, &signature).await.unwrap();
350 }
351
352 let certifier = BufferedCertifierService::new(
353 mock_certifier(|mock| {
354 mock.expect_create_open_message()
355 .returning(|_, _| Ok(OpenMessage::dummy()));
356
357 mock.expect_register_single_signature()
359 .with(
360 eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
361 eq(SingleSignature::fake("party_1", "message 1")),
362 )
363 .once()
364 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
365 mock.expect_register_single_signature()
366 .with(
367 eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))),
368 eq(SingleSignature::fake("party_2", "message 2")),
369 )
370 .once()
371 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
372 }),
373 store.clone(),
374 TestLogger::stdout(),
375 );
376
377 certifier
378 .create_open_message(
379 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
380 &ProtocolMessage::new(),
381 )
382 .await
383 .unwrap();
384
385 let remaining_sigs = store.get_buffered_signatures(MithrilStakeDistribution).await.unwrap();
386 assert!(remaining_sigs.is_empty());
387 }
388
389 mod when_failing_to_transfer_buffered_signature_to_new_open_message {
390 use mockall::predicate::always;
391
392 use super::*;
393
394 async fn run_scenario(
395 certifier_mock_config: impl FnOnce(&mut MockCertifierService),
396 store_mock_config: impl FnOnce(&mut MockBufferedSingleSignatureStore),
397 ) {
398 let store = mock_store(store_mock_config);
399 let certifier = BufferedCertifierService::new(
400 mock_certifier(certifier_mock_config),
401 store,
402 TestLogger::stdout(),
403 );
404
405 certifier
406 .create_open_message(
407 &SignedEntityType::MithrilStakeDistribution(Epoch(5)),
408 &ProtocolMessage::new(),
409 )
410 .await
411 .expect("Transferring buffered signatures to new open message should not fail");
412 }
413
414 #[tokio::test]
415 async fn skip_invalid_signatures() {
416 run_scenario(
417 |mock| {
418 mock.expect_create_open_message()
419 .returning(|_, _| Ok(OpenMessage::dummy()));
420
421 mock.expect_register_single_signature()
422 .with(always(), eq(fake_data::single_signature(vec![1])))
423 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
424 .once();
425 mock.expect_register_single_signature()
426 .with(always(), eq(fake_data::single_signature(vec![2])))
427 .returning(|_, _| {
428 Err(CertifierServiceError::InvalidSingleSignature(
429 OpenMessage::dummy().signed_entity_type,
430 anyhow!("Invalid signature"),
431 )
432 .into())
433 })
434 .once();
435 mock.expect_register_single_signature()
436 .with(always(), eq(fake_data::single_signature(vec![3])))
437 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
438 .once();
439 },
440 |mock| {
441 mock.expect_get_buffered_signatures().returning(|_| {
442 Ok(vec![
443 fake_data::single_signature(vec![1]),
444 fake_data::single_signature(vec![2]),
445 fake_data::single_signature(vec![3]),
446 ])
447 });
448 mock.expect_remove_buffered_signatures()
449 .withf(|_, sig_to_remove| sig_to_remove.len() == 2)
451 .returning(|_, _| Ok(()));
452 },
453 )
454 .await;
455 }
456
457 #[tokio::test]
458 async fn do_not_return_an_error_if_getting_buffer_signatures_fail() {
459 run_scenario(
460 |mock| {
461 mock.expect_create_open_message()
462 .returning(|_, _| Ok(OpenMessage::dummy()));
463 mock.expect_register_single_signature()
464 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
465 },
466 |mock| {
467 mock.expect_get_buffered_signatures()
468 .returning(|_| Err(anyhow!("get_buffered_signatures error")));
469 },
470 )
471 .await;
472 }
473
474 #[tokio::test]
475 async fn do_not_return_an_error_if_registering_signature_fail() {
476 run_scenario(
477 |mock| {
478 mock.expect_create_open_message()
479 .returning(|_, _| Ok(OpenMessage::dummy()));
480 mock.expect_register_single_signature()
481 .returning(|_, _| Err(anyhow!("register_single_signature error")));
482 },
483 |mock| {
484 mock.expect_get_buffered_signatures()
485 .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
486 },
487 )
488 .await;
489 }
490
491 #[tokio::test]
492 async fn do_not_return_an_error_if_removing_buffered_signatures_fail() {
493 run_scenario(
494 |mock| {
495 mock.expect_create_open_message()
496 .returning(|_, _| Ok(OpenMessage::dummy()));
497 mock.expect_register_single_signature()
498 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
499 },
500 |mock| {
501 mock.expect_get_buffered_signatures()
502 .returning(|_| Ok(vec![fake_data::single_signature(vec![1])]));
503 mock.expect_remove_buffered_signatures()
504 .returning(|_, _| Err(anyhow!("remove_buffered_signatures error")));
505 },
506 )
507 .await;
508 }
509 }
510}