1use std::{sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use slog::{Logger, debug, error, trace, warn};
5
6use mithril_common::{
7 StdResult,
8 entities::{SingleSignature, SingleSignatureAuthenticationStatus},
9 logging::LoggerExtensions,
10};
11use tokio::{select, sync::watch::Receiver};
12
13use crate::{MetricsService, services::CertifierServiceError};
14
15use super::{CertifierService, SignatureConsumer};
16
17#[cfg_attr(test, mockall::automock)]
19#[async_trait::async_trait]
20pub trait SignatureProcessor: Sync + Send {
21 async fn process_signatures(&self) -> StdResult<()>;
23
24 async fn run(&self) -> StdResult<()>;
26}
27
28pub struct SequentialSignatureProcessor {
30 consumer: Arc<dyn SignatureConsumer>,
31 certifier: Arc<dyn CertifierService>,
32 stop_rx: Receiver<()>,
33 metrics_service: Arc<MetricsService>,
34 wait_delay_on_error: Duration,
35 logger: Logger,
36}
37
38impl SequentialSignatureProcessor {
39 pub fn new(
41 consumer: Arc<dyn SignatureConsumer>,
42 certifier: Arc<dyn CertifierService>,
43 stop_rx: Receiver<()>,
44 metrics_service: Arc<MetricsService>,
45 wait_delay_on_error_in_seconds: Duration,
46 logger: Logger,
47 ) -> Self {
48 Self {
49 consumer,
50 certifier,
51 stop_rx,
52 metrics_service,
53 wait_delay_on_error: wait_delay_on_error_in_seconds,
54 logger: logger.new_with_component_name::<Self>(),
55 }
56 }
57
58 fn authenticate_signature(&self, signature: &mut SingleSignature) {
62 signature.authentication_status = SingleSignatureAuthenticationStatus::Authenticated;
63 }
64}
65
66#[async_trait::async_trait]
67impl SignatureProcessor for SequentialSignatureProcessor {
68 async fn process_signatures(&self) -> StdResult<()> {
69 let origin_network = self.consumer.get_origin_tag();
70
71 let mut total_import_errors = 0;
72 match self.consumer.get_signatures().await {
73 Ok(signatures) => {
74 let number_of_signatures = signatures.len() as u32;
75 trace!(self.logger, "Received {} signatures", number_of_signatures);
76
77 self.metrics_service
78 .get_signature_registration_total_received_since_startup()
79 .increment_by(&[&origin_network], number_of_signatures);
80
81 for (mut signature, signed_entity_type) in signatures {
82 self.authenticate_signature(&mut signature);
83 match self
84 .certifier
85 .register_single_signature(&signed_entity_type, &signature)
86 .await
87 {
88 Ok(_registration_status) => {
89 self.metrics_service
90 .get_signature_registration_total_successful_since_startup()
91 .increment(&[&origin_network]);
92 }
93 Err(err) => match err.downcast_ref::<CertifierServiceError>() {
94 Some(CertifierServiceError::AlreadyCertified(
95 error_signed_entity_type,
96 )) => {
97 debug!(self.logger, "process_signatures::open_message_already_certified"; "signed_entity_type" => ?error_signed_entity_type, "party_id" => &signature.party_id);
98 }
99 Some(CertifierServiceError::Expired(error_signed_entity_type)) => {
100 debug!(self.logger, "process_signatures::open_message_expired"; "signed_entity_type" => ?error_signed_entity_type, "party_id" => &signature.party_id);
101 }
102 Some(CertifierServiceError::NotFound(error_signed_entity_type)) => {
103 debug!(self.logger, "process_signatures::not_found"; "signed_entity_type" => ?error_signed_entity_type, "party_id" => &signature.party_id);
104 }
105 Some(_) | None => {
106 total_import_errors += 1;
107 error!(
108 self.logger, "Error dispatching single signature";
109 "full_payload" => #?signature, "error" => ?err
110 );
111 }
112 },
113 }
114 }
115 }
116 Err(e) => {
117 error!(self.logger, "Error consuming single signatures"; "error" => ?e);
118 total_import_errors += 1;
119 }
120 }
121
122 if total_import_errors > 0 {
123 error!(
124 self.logger,
125 "Total import errors while processing signatures: {}", total_import_errors
126 );
127 return Err(anyhow!(
128 "Total import errors while processing signatures: {}",
129 total_import_errors
130 ));
131 }
132
133 Ok(())
134 }
135
136 async fn run(&self) -> StdResult<()> {
137 loop {
138 let mut stop_rx = self.stop_rx.clone();
139 select! {
140 _ = stop_rx.changed() => {
141 warn!(self.logger, "Stopping signature processor...");
142
143 return Ok(());
144 }
145 res = self.process_signatures() => {
146 if let Err(e) = res {
147 error!(self.logger, "Error processing signatures"; "error" => ?e);
148 error!(self.logger, "Sleep for {} seconds", self.wait_delay_on_error.as_secs());
149 tokio::time::sleep(self.wait_delay_on_error).await;
150 }
151 }
152 }
153 }
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use anyhow::anyhow;
160 use mockall::predicate::eq;
161 use tokio::{
162 sync::watch::channel,
163 time::{Duration, sleep},
164 };
165
166 use mithril_common::{
167 entities::{Epoch, SignedEntityType},
168 test::{double::fake_data, mock_extensions::MockBuilder},
169 };
170
171 use crate::services::{
172 FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
173 SignatureRegistrationStatus,
174 };
175 use crate::test::TestLogger;
176
177 use super::*;
178
179 #[tokio::test]
180 async fn processor_process_signatures_succeeds_if_registration_succeeds() {
181 let logger = TestLogger::stdout();
182 let single_signatures = vec![
183 (
184 fake_data::single_signature(vec![1, 2, 3]),
185 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
186 ),
187 (
188 fake_data::single_signature(vec![4, 5, 6]),
189 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
190 ),
191 ];
192 let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
193 mock.expect_get_signatures()
194 .returning(move || Ok(single_signatures.clone()))
195 .times(1);
196 mock.expect_get_origin_tag()
197 .returning(|| "whatever".to_string())
198 .times(1);
199 });
200 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
201 mock.expect_register_single_signature()
202 .with(
203 eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
204 eq(SingleSignature {
205 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
206 ..fake_data::single_signature(vec![1, 2, 3])
207 }),
208 )
209 .returning(|_, single_signature| {
210 assert_eq!(
211 single_signature.authentication_status,
212 SingleSignatureAuthenticationStatus::Authenticated
213 );
214 Ok(SignatureRegistrationStatus::Registered)
215 })
216 .times(1);
217 mock.expect_register_single_signature()
218 .with(
219 eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
220 eq(SingleSignature {
221 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
222 ..fake_data::single_signature(vec![4, 5, 6])
223 }),
224 )
225 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
226 .times(1);
227 });
228 let (_stop_tx, stop_rx) = channel(());
229 let processor = SequentialSignatureProcessor::new(
230 mock_consumer,
231 mock_certifier,
232 stop_rx,
233 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
234 Duration::from_millis(10),
235 logger,
236 );
237
238 processor
239 .process_signatures()
240 .await
241 .expect("Process signatures should succeed");
242 }
243
244 #[tokio::test]
245 async fn processor_process_signatures_succeeds_if_registration_fails_case_already_certified() {
246 let logger = TestLogger::stdout();
247 let single_signatures = vec![(
248 fake_data::single_signature(vec![4, 5, 6]),
249 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
250 )];
251 let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
252 mock.expect_get_signatures()
253 .returning(move || Ok(single_signatures.clone()))
254 .times(1);
255 mock.expect_get_origin_tag()
256 .returning(|| "whatever".to_string())
257 .times(1);
258 });
259 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
260 mock.expect_register_single_signature()
261 .returning(|_, _| {
262 Err(anyhow!(CertifierServiceError::AlreadyCertified(
263 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
264 )))
265 })
266 .times(1);
267 });
268 let (_stop_tx, stop_rx) = channel(());
269 let processor = SequentialSignatureProcessor::new(
270 mock_consumer,
271 mock_certifier,
272 stop_rx,
273 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
274 Duration::from_millis(10),
275 logger,
276 );
277
278 processor
279 .process_signatures()
280 .await
281 .expect("Process signatures should succeed");
282 }
283
284 #[tokio::test]
285 async fn processor_process_signatures_succeeds_if_registration_fails_case_expired() {
286 let logger = TestLogger::stdout();
287 let single_signatures = vec![(
288 fake_data::single_signature(vec![4, 5, 6]),
289 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
290 )];
291 let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
292 mock.expect_get_signatures()
293 .returning(move || Ok(single_signatures.clone()))
294 .times(1);
295 mock.expect_get_origin_tag()
296 .returning(|| "whatever".to_string())
297 .times(1);
298 });
299 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
300 mock.expect_register_single_signature()
301 .returning(|_, _| {
302 Err(anyhow!(CertifierServiceError::Expired(
303 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
304 )))
305 })
306 .times(1);
307 });
308 let (_stop_tx, stop_rx) = channel(());
309 let processor = SequentialSignatureProcessor::new(
310 mock_consumer,
311 mock_certifier,
312 stop_rx,
313 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
314 Duration::from_millis(10),
315 logger,
316 );
317
318 processor
319 .process_signatures()
320 .await
321 .expect("Process signatures should succeed");
322 }
323
324 #[tokio::test]
325 async fn processor_process_signatures_succeeds_if_registration_fails_case_not_found() {
326 let logger = TestLogger::stdout();
327 let single_signatures = vec![(
328 fake_data::single_signature(vec![4, 5, 6]),
329 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
330 )];
331 let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
332 mock.expect_get_signatures()
333 .returning(move || Ok(single_signatures.clone()))
334 .times(1);
335 mock.expect_get_origin_tag()
336 .returning(|| "whatever".to_string())
337 .times(1);
338 });
339 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
340 mock.expect_register_single_signature()
341 .returning(|_, _| {
342 Err(anyhow!(CertifierServiceError::NotFound(
343 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
344 )))
345 })
346 .times(1);
347 });
348 let (_stop_tx, stop_rx) = channel(());
349 let processor = SequentialSignatureProcessor::new(
350 mock_consumer,
351 mock_certifier,
352 stop_rx,
353 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
354 Duration::from_millis(10),
355 logger,
356 );
357
358 processor
359 .process_signatures()
360 .await
361 .expect("Process signatures should succeed");
362 }
363
364 #[tokio::test]
365 async fn processor_process_signatures_fails_if_registration_fails_case_general_error() {
366 let logger = TestLogger::stdout();
367 let single_signatures = vec![(
368 fake_data::single_signature(vec![4, 5, 6]),
369 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
370 )];
371 let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
372 mock.expect_get_signatures()
373 .returning(move || Ok(single_signatures.clone()))
374 .times(1);
375 mock.expect_get_origin_tag()
376 .returning(|| "whatever".to_string())
377 .times(1);
378 });
379 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
380 mock.expect_register_single_signature()
381 .returning(|_, _| Err(anyhow!("Some general error occurred")))
382 .times(1);
383 });
384 let (_stop_tx, stop_rx) = channel(());
385 let processor = SequentialSignatureProcessor::new(
386 mock_consumer,
387 mock_certifier,
388 stop_rx,
389 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
390 Duration::from_millis(10),
391 logger,
392 );
393
394 processor
395 .process_signatures()
396 .await
397 .expect_err("Process signatures should fail");
398 }
399
400 #[tokio::test]
401 async fn processor_process_signatures_send_total_received_and_successful_statistics_if_successful()
402 {
403 let logger = TestLogger::stdout();
404 let fake_consumer = FakeSignatureConsumer::new(vec![Ok(vec![(
405 fake_data::single_signature(vec![1, 2, 3]),
406 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
407 )])]);
408 let network_origin = fake_consumer.get_origin_tag();
409
410 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
411 mock.expect_register_single_signature()
412 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
413 });
414 let (_stop_tx, stop_rx) = channel(());
415 let metrics_service = Arc::new(MetricsService::new(logger.clone()).unwrap());
416 let processor = SequentialSignatureProcessor::new(
417 Arc::new(fake_consumer),
418 mock_certifier,
419 stop_rx,
420 metrics_service.clone(),
421 Duration::from_millis(10),
422 logger,
423 );
424
425 let initial_received_counter_value = metrics_service
426 .get_signature_registration_total_received_since_startup()
427 .get(&[&network_origin]);
428 let initial_successful_counter_value = metrics_service
429 .get_signature_registration_total_successful_since_startup()
430 .get(&[&network_origin]);
431
432 processor.process_signatures().await.unwrap();
433
434 assert_eq!(
435 initial_received_counter_value + 1,
436 metrics_service
437 .get_signature_registration_total_received_since_startup()
438 .get(&[&network_origin])
439 );
440 assert_eq!(
441 initial_successful_counter_value + 1,
442 metrics_service
443 .get_signature_registration_total_successful_since_startup()
444 .get(&[&network_origin])
445 );
446 }
447
448 #[tokio::test]
449 async fn processor_process_signatures_send_only_total_received_statistic_if_failure() {
450 let logger = TestLogger::stdout();
451 let fake_consumer = FakeSignatureConsumer::new(vec![Ok(vec![(
452 fake_data::single_signature(vec![1, 2, 3]),
453 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
454 )])]);
455 let network_origin = fake_consumer.get_origin_tag();
456
457 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
458 mock.expect_register_single_signature()
459 .returning(|_, _| Err(anyhow!("Error registering signature")));
460 });
461 let (_stop_tx, stop_rx) = channel(());
462 let metrics_service = Arc::new(MetricsService::new(logger.clone()).unwrap());
463 let processor = SequentialSignatureProcessor::new(
464 Arc::new(fake_consumer),
465 mock_certifier,
466 stop_rx,
467 metrics_service.clone(),
468 Duration::from_millis(10),
469 logger,
470 );
471
472 let initial_received_counter_value = metrics_service
473 .get_signature_registration_total_received_since_startup()
474 .get(&[&network_origin]);
475 let initial_successful_counter_value = metrics_service
476 .get_signature_registration_total_successful_since_startup()
477 .get(&[&network_origin]);
478
479 processor.process_signatures().await.expect_err("Should have failed");
480
481 assert_eq!(
482 initial_received_counter_value + 1,
483 metrics_service
484 .get_signature_registration_total_received_since_startup()
485 .get(&[&network_origin])
486 );
487 assert_eq!(
488 initial_successful_counter_value,
489 metrics_service
490 .get_signature_registration_total_successful_since_startup()
491 .get(&[&network_origin])
492 );
493 }
494
495 #[tokio::test]
496 async fn processor_run_succeeds_even_if_processing_signatures_fails() {
497 let logger = TestLogger::stdout();
498 let fake_consumer = FakeSignatureConsumer::new(vec![
499 Err(anyhow!("Error consuming signatures")),
500 Ok(vec![(
501 fake_data::single_signature(vec![1, 2, 3]),
502 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
503 )]),
504 ]);
505 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
506 mock.expect_register_single_signature()
507 .with(
508 eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
509 eq(SingleSignature {
510 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
511 ..fake_data::single_signature(vec![1, 2, 3])
512 }),
513 )
514 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
515 .times(1);
516 });
517 let (stop_tx, stop_rx) = channel(());
518 let metrics_service = MetricsService::new(logger.clone()).unwrap();
519 let processor = SequentialSignatureProcessor::new(
520 Arc::new(fake_consumer),
521 mock_certifier,
522 stop_rx,
523 Arc::new(metrics_service),
524 Duration::from_millis(1),
525 logger,
526 );
527
528 select!(
529 _res = processor.run() => {},
530 _res = sleep(Duration::from_millis(500)) => {
531 println!("Stopping signature processor...");
532 stop_tx.send(()).unwrap();
533 },
534 );
535 }
536
537 #[tokio::test]
538 async fn processor_run_waits_before_resuming_if_processing_signatures_fails() {
539 let logger = TestLogger::stdout();
540 let fake_consumer = FakeSignatureConsumer::new(vec![
541 Err(anyhow!("Error consuming signatures")),
542 Ok(vec![(
543 fake_data::single_signature(vec![1, 2, 3]),
544 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
545 )]),
546 ]);
547 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
548 mock.expect_register_single_signature().never();
549 });
550 let (stop_tx, stop_rx) = channel(());
551 let metrics_service = MetricsService::new(logger.clone()).unwrap();
552 let processor = SequentialSignatureProcessor::new(
553 Arc::new(fake_consumer),
554 mock_certifier,
555 stop_rx,
556 Arc::new(metrics_service),
557 Duration::from_millis(1000),
558 logger,
559 );
560
561 select!(
562 _res = processor.run() => {},
563 _res = sleep(Duration::from_millis(500)) => {
564 println!("Stopping signature processor...");
565 stop_tx.send(()).unwrap();
566 },
567 );
568 }
569}