mithril_aggregator/services/
signature_processor.rs

1use std::{sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use slog::{Logger, error, trace, warn};
5
6use mithril_common::{
7    StdResult,
8    entities::{SingleSignature, SingleSignatureAuthenticationStatus},
9    logging::LoggerExtensions,
10};
11use tokio::{select, sync::watch::Receiver};
12
13use crate::MetricsService;
14
15use super::{CertifierService, SignatureConsumer};
16
17/// A signature processor which receives signature and processes them.
18#[cfg_attr(test, mockall::automock)]
19#[async_trait::async_trait]
20pub trait SignatureProcessor: Sync + Send {
21    /// Processes the signatures received from the consumer.
22    async fn process_signatures(&self) -> StdResult<()>;
23
24    /// Starts the processor, which will run indefinitely, processing signatures as they arrive.
25    async fn run(&self) -> StdResult<()>;
26}
27
28/// A sequential signature processor receives messages and processes them sequentially
29pub struct SequentialSignatureProcessor {
30    consumer: Arc<dyn SignatureConsumer>,
31    certifier: Arc<dyn CertifierService>,
32    stop_rx: Receiver<()>,
33    metrics_service: Arc<MetricsService>,
34    wait_delay_on_error: Duration,
35    logger: Logger,
36}
37
38impl SequentialSignatureProcessor {
39    /// Creates a new `SignatureProcessor` instance.
40    pub fn new(
41        consumer: Arc<dyn SignatureConsumer>,
42        certifier: Arc<dyn CertifierService>,
43        stop_rx: Receiver<()>,
44        metrics_service: Arc<MetricsService>,
45        wait_delay_on_error_in_seconds: Duration,
46        logger: Logger,
47    ) -> Self {
48        Self {
49            consumer,
50            certifier,
51            stop_rx,
52            metrics_service,
53            wait_delay_on_error: wait_delay_on_error_in_seconds,
54            logger: logger.new_with_component_name::<Self>(),
55        }
56    }
57
58    /// Authenticates a single signature
59    ///
60    /// This is always the case with single signatures received from the DMQ network.
61    fn authenticate_signature(&self, signature: &mut SingleSignature) {
62        signature.authentication_status = SingleSignatureAuthenticationStatus::Authenticated;
63    }
64}
65
66#[async_trait::async_trait]
67impl SignatureProcessor for SequentialSignatureProcessor {
68    async fn process_signatures(&self) -> StdResult<()> {
69        let origin_network = self.consumer.get_origin_tag();
70
71        let mut total_import_errors = 0;
72        match self.consumer.get_signatures().await {
73            Ok(signatures) => {
74                let number_of_signatures = signatures.len() as u32;
75                trace!(self.logger, "Received {} signatures", number_of_signatures);
76
77                self.metrics_service
78                    .get_signature_registration_total_received_since_startup()
79                    .increment_by(&[&origin_network], number_of_signatures);
80
81                for (mut signature, signed_entity_type) in signatures {
82                    self.authenticate_signature(&mut signature);
83                    match self
84                        .certifier
85                        .register_single_signature(&signed_entity_type, &signature)
86                        .await
87                    {
88                        Ok(_registration_status) => {
89                            self.metrics_service
90                                .get_signature_registration_total_successful_since_startup()
91                                .increment(&[&origin_network]);
92                        }
93                        Err(e) => {
94                            total_import_errors += 1;
95                            error!(
96                                self.logger, "Error dispatching single signature";
97                                "full_payload" => #?signature, "error" => ?e
98                            );
99                        }
100                    }
101                }
102            }
103            Err(e) => {
104                error!(self.logger, "Error consuming single signatures"; "error" => ?e);
105                total_import_errors += 1;
106            }
107        }
108
109        if total_import_errors > 0 {
110            error!(
111                self.logger,
112                "Total import errors while processing signatures: {}", total_import_errors
113            );
114            return Err(anyhow!(
115                "Total import errors while processing signatures: {}",
116                total_import_errors
117            ));
118        }
119
120        Ok(())
121    }
122
123    async fn run(&self) -> StdResult<()> {
124        loop {
125            let mut stop_rx = self.stop_rx.clone();
126            select! {
127                _ = stop_rx.changed() => {
128                    warn!(self.logger, "Stopping signature processor...");
129
130                    return Ok(());
131                }
132                res = self.process_signatures() => {
133                    if let Err(e) = res {
134                        error!(self.logger, "Error processing signatures"; "error" => ?e);
135                        error!(self.logger, "Sleep for {} seconds", self.wait_delay_on_error.as_secs());
136                        tokio::time::sleep(self.wait_delay_on_error).await;
137                    }
138                }
139            }
140        }
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use anyhow::anyhow;
147    use mockall::predicate::eq;
148    use tokio::{
149        sync::watch::channel,
150        time::{Duration, sleep},
151    };
152
153    use mithril_common::{
154        entities::{Epoch, SignedEntityType},
155        test::{double::fake_data, mock_extensions::MockBuilder},
156    };
157
158    use crate::services::{
159        FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
160        SignatureRegistrationStatus,
161    };
162    use crate::test::TestLogger;
163
164    use super::*;
165
166    #[tokio::test]
167    async fn processor_process_signatures_succeeds() {
168        let logger = TestLogger::stdout();
169        let single_signatures = vec![
170            (
171                fake_data::single_signature(vec![1, 2, 3]),
172                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
173            ),
174            (
175                fake_data::single_signature(vec![4, 5, 6]),
176                SignedEntityType::MithrilStakeDistribution(Epoch(2)),
177            ),
178        ];
179        let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
180            mock.expect_get_signatures()
181                .returning(move || Ok(single_signatures.clone()))
182                .times(1);
183            mock.expect_get_origin_tag()
184                .returning(|| "whatever".to_string())
185                .times(1);
186        });
187        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
188            mock.expect_register_single_signature()
189                .with(
190                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
191                    eq(SingleSignature {
192                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
193                        ..fake_data::single_signature(vec![1, 2, 3])
194                    }),
195                )
196                .returning(|_, single_signature| {
197                    assert_eq!(
198                        single_signature.authentication_status,
199                        SingleSignatureAuthenticationStatus::Authenticated
200                    );
201                    Ok(SignatureRegistrationStatus::Registered)
202                })
203                .times(1);
204            mock.expect_register_single_signature()
205                .with(
206                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
207                    eq(SingleSignature {
208                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
209                        ..fake_data::single_signature(vec![4, 5, 6])
210                    }),
211                )
212                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
213                .times(1);
214        });
215        let (_stop_tx, stop_rx) = channel(());
216        let processor = SequentialSignatureProcessor::new(
217            mock_consumer,
218            mock_certifier,
219            stop_rx,
220            Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
221            Duration::from_millis(10),
222            logger,
223        );
224
225        processor
226            .process_signatures()
227            .await
228            .expect("Failed to process signatures");
229    }
230
231    #[tokio::test]
232    async fn processor_process_signatures_send_total_received_and_successful_statistics_if_successful()
233     {
234        let logger = TestLogger::stdout();
235        let fake_consumer = FakeSignatureConsumer::new(vec![Ok(vec![(
236            fake_data::single_signature(vec![1, 2, 3]),
237            SignedEntityType::MithrilStakeDistribution(Epoch(1)),
238        )])]);
239        let network_origin = fake_consumer.get_origin_tag();
240
241        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
242            mock.expect_register_single_signature()
243                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
244        });
245        let (_stop_tx, stop_rx) = channel(());
246        let metrics_service = Arc::new(MetricsService::new(logger.clone()).unwrap());
247        let processor = SequentialSignatureProcessor::new(
248            Arc::new(fake_consumer),
249            mock_certifier,
250            stop_rx,
251            metrics_service.clone(),
252            Duration::from_millis(10),
253            logger,
254        );
255
256        let initial_received_counter_value = metrics_service
257            .get_signature_registration_total_received_since_startup()
258            .get(&[&network_origin]);
259        let initial_successful_counter_value = metrics_service
260            .get_signature_registration_total_successful_since_startup()
261            .get(&[&network_origin]);
262
263        processor.process_signatures().await.unwrap();
264
265        assert_eq!(
266            initial_received_counter_value + 1,
267            metrics_service
268                .get_signature_registration_total_received_since_startup()
269                .get(&[&network_origin])
270        );
271        assert_eq!(
272            initial_successful_counter_value + 1,
273            metrics_service
274                .get_signature_registration_total_successful_since_startup()
275                .get(&[&network_origin])
276        );
277    }
278
279    #[tokio::test]
280    async fn processor_process_signatures_send_only_total_received_statistic_if_failure() {
281        let logger = TestLogger::stdout();
282        let fake_consumer = FakeSignatureConsumer::new(vec![Ok(vec![(
283            fake_data::single_signature(vec![1, 2, 3]),
284            SignedEntityType::MithrilStakeDistribution(Epoch(1)),
285        )])]);
286        let network_origin = fake_consumer.get_origin_tag();
287
288        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
289            mock.expect_register_single_signature()
290                .returning(|_, _| Err(anyhow!("Error registering signature")));
291        });
292        let (_stop_tx, stop_rx) = channel(());
293        let metrics_service = Arc::new(MetricsService::new(logger.clone()).unwrap());
294        let processor = SequentialSignatureProcessor::new(
295            Arc::new(fake_consumer),
296            mock_certifier,
297            stop_rx,
298            metrics_service.clone(),
299            Duration::from_millis(10),
300            logger,
301        );
302
303        let initial_received_counter_value = metrics_service
304            .get_signature_registration_total_received_since_startup()
305            .get(&[&network_origin]);
306        let initial_successful_counter_value = metrics_service
307            .get_signature_registration_total_successful_since_startup()
308            .get(&[&network_origin]);
309
310        processor.process_signatures().await.expect_err("Should have failed");
311
312        assert_eq!(
313            initial_received_counter_value + 1,
314            metrics_service
315                .get_signature_registration_total_received_since_startup()
316                .get(&[&network_origin])
317        );
318        assert_eq!(
319            initial_successful_counter_value,
320            metrics_service
321                .get_signature_registration_total_successful_since_startup()
322                .get(&[&network_origin])
323        );
324    }
325
326    #[tokio::test]
327    async fn processor_run_succeeds_even_if_processing_signatures_fails() {
328        let logger = TestLogger::stdout();
329        let fake_consumer = FakeSignatureConsumer::new(vec![
330            Err(anyhow!("Error consuming signatures")),
331            Ok(vec![(
332                fake_data::single_signature(vec![1, 2, 3]),
333                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
334            )]),
335        ]);
336        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
337            mock.expect_register_single_signature()
338                .with(
339                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
340                    eq(SingleSignature {
341                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
342                        ..fake_data::single_signature(vec![1, 2, 3])
343                    }),
344                )
345                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
346                .times(1);
347        });
348        let (stop_tx, stop_rx) = channel(());
349        let metrics_service = MetricsService::new(logger.clone()).unwrap();
350        let processor = SequentialSignatureProcessor::new(
351            Arc::new(fake_consumer),
352            mock_certifier,
353            stop_rx,
354            Arc::new(metrics_service),
355            Duration::from_millis(1),
356            logger,
357        );
358
359        select!(
360            _res =  processor.run() => {},
361            _res = sleep(Duration::from_millis(500)) => {
362                println!("Stopping signature processor...");
363                stop_tx.send(()).unwrap();
364            },
365        );
366    }
367
368    #[tokio::test]
369    async fn processor_run_waits_before_resuming_if_processing_signatures_fails() {
370        let logger = TestLogger::stdout();
371        let fake_consumer = FakeSignatureConsumer::new(vec![
372            Err(anyhow!("Error consuming signatures")),
373            Ok(vec![(
374                fake_data::single_signature(vec![1, 2, 3]),
375                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
376            )]),
377        ]);
378        let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
379            mock.expect_register_single_signature().never();
380        });
381        let (stop_tx, stop_rx) = channel(());
382        let metrics_service = MetricsService::new(logger.clone()).unwrap();
383        let processor = SequentialSignatureProcessor::new(
384            Arc::new(fake_consumer),
385            mock_certifier,
386            stop_rx,
387            Arc::new(metrics_service),
388            Duration::from_millis(1000),
389            logger,
390        );
391
392        select!(
393            _res =  processor.run() => {},
394            _res = sleep(Duration::from_millis(500)) => {
395                println!("Stopping signature processor...");
396                stop_tx.send(()).unwrap();
397            },
398        );
399    }
400}