mithril_aggregator/services/
signature_processor.rs

1use std::{sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use slog::{Logger, debug, error, trace, warn};
5
6use mithril_common::{
7    StdResult,
8    entities::{SingleSignature, SingleSignatureAuthenticationStatus},
9    logging::LoggerExtensions,
10};
11use tokio::{select, sync::watch::Receiver};
12
13use crate::{MetricsService, services::CertifierServiceError};
14
15use super::{CertifierService, SignatureConsumer};
16
17/// A signature processor which receives signature and processes them.
18#[cfg_attr(test, mockall::automock)]
19#[async_trait::async_trait]
20pub trait SignatureProcessor: Sync + Send {
21    /// Processes the signatures received from the consumer.
22    async fn process_signatures(&self) -> StdResult<()>;
23
24    /// Starts the processor, which will run indefinitely, processing signatures as they arrive.
25    async fn run(&self) -> StdResult<()>;
26}
27
28/// A sequential signature processor receives messages and processes them sequentially
29pub struct SequentialSignatureProcessor {
30    consumer: Arc<dyn SignatureConsumer>,
31    certifier: Arc<dyn CertifierService>,
32    stop_rx: Receiver<()>,
33    metrics_service: Arc<MetricsService>,
34    wait_delay_on_error: Duration,
35    logger: Logger,
36}
37
38impl SequentialSignatureProcessor {
39    /// Creates a new `SignatureProcessor` instance.
40    pub fn new(
41        consumer: Arc<dyn SignatureConsumer>,
42        certifier: Arc<dyn CertifierService>,
43        stop_rx: Receiver<()>,
44        metrics_service: Arc<MetricsService>,
45        wait_delay_on_error_in_seconds: Duration,
46        logger: Logger,
47    ) -> Self {
48        Self {
49            consumer,
50            certifier,
51            stop_rx,
52            metrics_service,
53            wait_delay_on_error: wait_delay_on_error_in_seconds,
54            logger: logger.new_with_component_name::<Self>(),
55        }
56    }
57
58    /// Authenticates a single signature
59    ///
60    /// This is always the case with single signatures received from the DMQ network.
61    fn authenticate_signature(&self, signature: &mut SingleSignature) {
62        signature.authentication_status = SingleSignatureAuthenticationStatus::Authenticated;
63    }
64}
65
66#[async_trait::async_trait]
67impl SignatureProcessor for SequentialSignatureProcessor {
68    async fn process_signatures(&self) -> StdResult<()> {
69        let origin_network = self.consumer.get_origin_tag();
70
71        let mut total_import_errors = 0;
72        match self.consumer.get_signatures().await {
73            Ok(signatures) => {
74                let number_of_signatures = signatures.len() as u32;
75                trace!(self.logger, "Received {} signatures", number_of_signatures);
76
77                self.metrics_service
78                    .get_signature_registration_total_received_since_startup()
79                    .increment_by(&[&origin_network], number_of_signatures);
80
81                for (mut signature, signed_entity_type) in signatures {
82                    self.authenticate_signature(&mut signature);
83                    match self
84                        .certifier
85                        .register_single_signature(&signed_entity_type, &signature)
86                        .await
87                    {
88                        Ok(_registration_status) => {
89                            self.metrics_service
90                                .get_signature_registration_total_successful_since_startup()
91                                .increment(&[&origin_network]);
92                        }
93                        Err(err) => match err.downcast_ref::<CertifierServiceError>() {
94                            Some(CertifierServiceError::AlreadyCertified(
95                                error_signed_entity_type,
96                            )) => {
97                                debug!(self.logger, "process_signatures::open_message_already_certified"; "signed_entity_type" => ?error_signed_entity_type, "party_id" => &signature.party_id);
98                            }
99                            Some(CertifierServiceError::Expired(error_signed_entity_type)) => {
100                                debug!(self.logger, "process_signatures::open_message_expired"; "signed_entity_type" => ?error_signed_entity_type, "party_id" => &signature.party_id);
101                            }
102                            Some(CertifierServiceError::NotFound(error_signed_entity_type)) => {
103                                debug!(self.logger, "process_signatures::not_found"; "signed_entity_type" => ?error_signed_entity_type, "party_id" => &signature.party_id);
104                            }
105                            Some(_) | None => {
106                                total_import_errors += 1;
107                                error!(
108                                    self.logger, "Error dispatching single signature";
109                                    "full_payload" => #?signature, "error" => ?err
110                                );
111                            }
112                        },
113                    }
114                }
115            }
116            Err(e) => {
117                error!(self.logger, "Error consuming single signatures"; "error" => ?e);
118                total_import_errors += 1;
119            }
120        }
121
122        if total_import_errors > 0 {
123            error!(
124                self.logger,
125                "Total import errors while processing signatures: {}", total_import_errors
126            );
127            return Err(anyhow!(
128                "Total import errors while processing signatures: {}",
129                total_import_errors
130            ));
131        }
132
133        Ok(())
134    }
135
136    async fn run(&self) -> StdResult<()> {
137        loop {
138            let mut stop_rx = self.stop_rx.clone();
139            select! {
140                _ = stop_rx.changed() => {
141                    warn!(self.logger, "Stopping signature processor...");
142
143                    return Ok(());
144                }
145                res = self.process_signatures() => {
146                    if let Err(e) = res {
147                        error!(self.logger, "Error processing signatures"; "error" => ?e);
148                        error!(self.logger, "Sleep for {} seconds", self.wait_delay_on_error.as_secs());
149                        tokio::time::sleep(self.wait_delay_on_error).await;
150                    }
151                }
152            }
153        }
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use anyhow::anyhow;
160    use mockall::predicate::eq;
161    use tokio::{
162        sync::watch::channel,
163        time::{Duration, sleep},
164    };
165
166    use mithril_common::{
167        entities::{Epoch, SignedEntityType},
168        test::{double::fake_data, mock_extensions::MockBuilder},
169    };
170
171    use crate::services::{
172        FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
173        SignatureRegistrationStatus,
174    };
175    use crate::test::TestLogger;
176
177    use super::*;
178
179    #[tokio::test]
180    async fn processor_process_signatures_succeeds_if_registration_succeeds() {
181        let logger = TestLogger::stdout();
182        let single_signatures = vec![
183            (
184                fake_data::single_signature(vec![1, 2, 3]),
185                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
186            ),
187            (
188                fake_data::single_signature(vec![4, 5, 6]),
189                SignedEntityType::MithrilStakeDistribution(Epoch(2)),
190            ),
191        ];
192        let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
193            mock.expect_get_signatures()
194                .returning(move || Ok(single_signatures.clone()))
195                .times(1);
196            mock.expect_get_origin_tag()
197                .returning(|| "whatever".to_string())
198                .times(1);
199        });
200        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
201            mock.expect_register_single_signature()
202                .with(
203                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
204                    eq(SingleSignature {
205                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
206                        ..fake_data::single_signature(vec![1, 2, 3])
207                    }),
208                )
209                .returning(|_, single_signature| {
210                    assert_eq!(
211                        single_signature.authentication_status,
212                        SingleSignatureAuthenticationStatus::Authenticated
213                    );
214                    Ok(SignatureRegistrationStatus::Registered)
215                })
216                .times(1);
217            mock.expect_register_single_signature()
218                .with(
219                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
220                    eq(SingleSignature {
221                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
222                        ..fake_data::single_signature(vec![4, 5, 6])
223                    }),
224                )
225                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
226                .times(1);
227        });
228        let (_stop_tx, stop_rx) = channel(());
229        let processor = SequentialSignatureProcessor::new(
230            mock_consumer,
231            mock_certifier,
232            stop_rx,
233            Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
234            Duration::from_millis(10),
235            logger,
236        );
237
238        processor
239            .process_signatures()
240            .await
241            .expect("Process signatures should succeed");
242    }
243
244    #[tokio::test]
245    async fn processor_process_signatures_succeeds_if_registration_fails_case_already_certified() {
246        let logger = TestLogger::stdout();
247        let single_signatures = vec![(
248            fake_data::single_signature(vec![4, 5, 6]),
249            SignedEntityType::MithrilStakeDistribution(Epoch(2)),
250        )];
251        let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
252            mock.expect_get_signatures()
253                .returning(move || Ok(single_signatures.clone()))
254                .times(1);
255            mock.expect_get_origin_tag()
256                .returning(|| "whatever".to_string())
257                .times(1);
258        });
259        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
260            mock.expect_register_single_signature()
261                .returning(|_, _| {
262                    Err(anyhow!(CertifierServiceError::AlreadyCertified(
263                        SignedEntityType::MithrilStakeDistribution(Epoch(2)),
264                    )))
265                })
266                .times(1);
267        });
268        let (_stop_tx, stop_rx) = channel(());
269        let processor = SequentialSignatureProcessor::new(
270            mock_consumer,
271            mock_certifier,
272            stop_rx,
273            Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
274            Duration::from_millis(10),
275            logger,
276        );
277
278        processor
279            .process_signatures()
280            .await
281            .expect("Process signatures should succeed");
282    }
283
284    #[tokio::test]
285    async fn processor_process_signatures_succeeds_if_registration_fails_case_expired() {
286        let logger = TestLogger::stdout();
287        let single_signatures = vec![(
288            fake_data::single_signature(vec![4, 5, 6]),
289            SignedEntityType::MithrilStakeDistribution(Epoch(2)),
290        )];
291        let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
292            mock.expect_get_signatures()
293                .returning(move || Ok(single_signatures.clone()))
294                .times(1);
295            mock.expect_get_origin_tag()
296                .returning(|| "whatever".to_string())
297                .times(1);
298        });
299        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
300            mock.expect_register_single_signature()
301                .returning(|_, _| {
302                    Err(anyhow!(CertifierServiceError::Expired(
303                        SignedEntityType::MithrilStakeDistribution(Epoch(2)),
304                    )))
305                })
306                .times(1);
307        });
308        let (_stop_tx, stop_rx) = channel(());
309        let processor = SequentialSignatureProcessor::new(
310            mock_consumer,
311            mock_certifier,
312            stop_rx,
313            Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
314            Duration::from_millis(10),
315            logger,
316        );
317
318        processor
319            .process_signatures()
320            .await
321            .expect("Process signatures should succeed");
322    }
323
324    #[tokio::test]
325    async fn processor_process_signatures_succeeds_if_registration_fails_case_not_found() {
326        let logger = TestLogger::stdout();
327        let single_signatures = vec![(
328            fake_data::single_signature(vec![4, 5, 6]),
329            SignedEntityType::MithrilStakeDistribution(Epoch(2)),
330        )];
331        let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
332            mock.expect_get_signatures()
333                .returning(move || Ok(single_signatures.clone()))
334                .times(1);
335            mock.expect_get_origin_tag()
336                .returning(|| "whatever".to_string())
337                .times(1);
338        });
339        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
340            mock.expect_register_single_signature()
341                .returning(|_, _| {
342                    Err(anyhow!(CertifierServiceError::NotFound(
343                        SignedEntityType::MithrilStakeDistribution(Epoch(2)),
344                    )))
345                })
346                .times(1);
347        });
348        let (_stop_tx, stop_rx) = channel(());
349        let processor = SequentialSignatureProcessor::new(
350            mock_consumer,
351            mock_certifier,
352            stop_rx,
353            Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
354            Duration::from_millis(10),
355            logger,
356        );
357
358        processor
359            .process_signatures()
360            .await
361            .expect("Process signatures should succeed");
362    }
363
364    #[tokio::test]
365    async fn processor_process_signatures_fails_if_registration_fails_case_general_error() {
366        let logger = TestLogger::stdout();
367        let single_signatures = vec![(
368            fake_data::single_signature(vec![4, 5, 6]),
369            SignedEntityType::MithrilStakeDistribution(Epoch(2)),
370        )];
371        let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
372            mock.expect_get_signatures()
373                .returning(move || Ok(single_signatures.clone()))
374                .times(1);
375            mock.expect_get_origin_tag()
376                .returning(|| "whatever".to_string())
377                .times(1);
378        });
379        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
380            mock.expect_register_single_signature()
381                .returning(|_, _| Err(anyhow!("Some general error occurred")))
382                .times(1);
383        });
384        let (_stop_tx, stop_rx) = channel(());
385        let processor = SequentialSignatureProcessor::new(
386            mock_consumer,
387            mock_certifier,
388            stop_rx,
389            Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
390            Duration::from_millis(10),
391            logger,
392        );
393
394        processor
395            .process_signatures()
396            .await
397            .expect_err("Process signatures should fail");
398    }
399
400    #[tokio::test]
401    async fn processor_process_signatures_send_total_received_and_successful_statistics_if_successful()
402     {
403        let logger = TestLogger::stdout();
404        let fake_consumer = FakeSignatureConsumer::new(vec![Ok(vec![(
405            fake_data::single_signature(vec![1, 2, 3]),
406            SignedEntityType::MithrilStakeDistribution(Epoch(1)),
407        )])]);
408        let network_origin = fake_consumer.get_origin_tag();
409
410        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
411            mock.expect_register_single_signature()
412                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
413        });
414        let (_stop_tx, stop_rx) = channel(());
415        let metrics_service = Arc::new(MetricsService::new(logger.clone()).unwrap());
416        let processor = SequentialSignatureProcessor::new(
417            Arc::new(fake_consumer),
418            mock_certifier,
419            stop_rx,
420            metrics_service.clone(),
421            Duration::from_millis(10),
422            logger,
423        );
424
425        let initial_received_counter_value = metrics_service
426            .get_signature_registration_total_received_since_startup()
427            .get(&[&network_origin]);
428        let initial_successful_counter_value = metrics_service
429            .get_signature_registration_total_successful_since_startup()
430            .get(&[&network_origin]);
431
432        processor.process_signatures().await.unwrap();
433
434        assert_eq!(
435            initial_received_counter_value + 1,
436            metrics_service
437                .get_signature_registration_total_received_since_startup()
438                .get(&[&network_origin])
439        );
440        assert_eq!(
441            initial_successful_counter_value + 1,
442            metrics_service
443                .get_signature_registration_total_successful_since_startup()
444                .get(&[&network_origin])
445        );
446    }
447
448    #[tokio::test]
449    async fn processor_process_signatures_send_only_total_received_statistic_if_failure() {
450        let logger = TestLogger::stdout();
451        let fake_consumer = FakeSignatureConsumer::new(vec![Ok(vec![(
452            fake_data::single_signature(vec![1, 2, 3]),
453            SignedEntityType::MithrilStakeDistribution(Epoch(1)),
454        )])]);
455        let network_origin = fake_consumer.get_origin_tag();
456
457        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
458            mock.expect_register_single_signature()
459                .returning(|_, _| Err(anyhow!("Error registering signature")));
460        });
461        let (_stop_tx, stop_rx) = channel(());
462        let metrics_service = Arc::new(MetricsService::new(logger.clone()).unwrap());
463        let processor = SequentialSignatureProcessor::new(
464            Arc::new(fake_consumer),
465            mock_certifier,
466            stop_rx,
467            metrics_service.clone(),
468            Duration::from_millis(10),
469            logger,
470        );
471
472        let initial_received_counter_value = metrics_service
473            .get_signature_registration_total_received_since_startup()
474            .get(&[&network_origin]);
475        let initial_successful_counter_value = metrics_service
476            .get_signature_registration_total_successful_since_startup()
477            .get(&[&network_origin]);
478
479        processor.process_signatures().await.expect_err("Should have failed");
480
481        assert_eq!(
482            initial_received_counter_value + 1,
483            metrics_service
484                .get_signature_registration_total_received_since_startup()
485                .get(&[&network_origin])
486        );
487        assert_eq!(
488            initial_successful_counter_value,
489            metrics_service
490                .get_signature_registration_total_successful_since_startup()
491                .get(&[&network_origin])
492        );
493    }
494
495    #[tokio::test]
496    async fn processor_run_succeeds_even_if_processing_signatures_fails() {
497        let logger = TestLogger::stdout();
498        let fake_consumer = FakeSignatureConsumer::new(vec![
499            Err(anyhow!("Error consuming signatures")),
500            Ok(vec![(
501                fake_data::single_signature(vec![1, 2, 3]),
502                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
503            )]),
504        ]);
505        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
506            mock.expect_register_single_signature()
507                .with(
508                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
509                    eq(SingleSignature {
510                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
511                        ..fake_data::single_signature(vec![1, 2, 3])
512                    }),
513                )
514                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
515                .times(1);
516        });
517        let (stop_tx, stop_rx) = channel(());
518        let metrics_service = MetricsService::new(logger.clone()).unwrap();
519        let processor = SequentialSignatureProcessor::new(
520            Arc::new(fake_consumer),
521            mock_certifier,
522            stop_rx,
523            Arc::new(metrics_service),
524            Duration::from_millis(1),
525            logger,
526        );
527
528        select!(
529            _res =  processor.run() => {},
530            _res = sleep(Duration::from_millis(500)) => {
531                println!("Stopping signature processor...");
532                stop_tx.send(()).unwrap();
533            },
534        );
535    }
536
537    #[tokio::test]
538    async fn processor_run_waits_before_resuming_if_processing_signatures_fails() {
539        let logger = TestLogger::stdout();
540        let fake_consumer = FakeSignatureConsumer::new(vec![
541            Err(anyhow!("Error consuming signatures")),
542            Ok(vec![(
543                fake_data::single_signature(vec![1, 2, 3]),
544                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
545            )]),
546        ]);
547        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
548            mock.expect_register_single_signature().never();
549        });
550        let (stop_tx, stop_rx) = channel(());
551        let metrics_service = MetricsService::new(logger.clone()).unwrap();
552        let processor = SequentialSignatureProcessor::new(
553            Arc::new(fake_consumer),
554            mock_certifier,
555            stop_rx,
556            Arc::new(metrics_service),
557            Duration::from_millis(1000),
558            logger,
559        );
560
561        select!(
562            _res =  processor.run() => {},
563            _res = sleep(Duration::from_millis(500)) => {
564                println!("Stopping signature processor...");
565                stop_tx.send(()).unwrap();
566            },
567        );
568    }
569}