mithril_aggregator/services/
signature_processor.rs

1use std::sync::Arc;
2
3use slog::{Logger, error, warn};
4
5use mithril_common::{
6    StdResult,
7    entities::{SingleSignature, SingleSignatureAuthenticationStatus},
8    logging::LoggerExtensions,
9};
10use tokio::{select, sync::watch::Receiver};
11
12use crate::MetricsService;
13
14use super::{CertifierService, SignatureConsumer};
15
16/// A signature processor which receives signature and processes them.
17#[cfg_attr(test, mockall::automock)]
18#[async_trait::async_trait]
19pub trait SignatureProcessor: Sync + Send {
20    /// Processes the signatures received from the consumer.
21    async fn process_signatures(&self) -> StdResult<()>;
22
23    /// Starts the processor, which will run indefinitely, processing signatures as they arrive.
24    async fn run(&self) -> StdResult<()>;
25}
26
27/// A sequential signature processor receives messages and processes them sequentially
28pub struct SequentialSignatureProcessor {
29    consumer: Arc<dyn SignatureConsumer>,
30    certifier: Arc<dyn CertifierService>,
31    stop_rx: Receiver<()>,
32    logger: Logger,
33    metrics_service: Arc<MetricsService>,
34}
35
36impl SequentialSignatureProcessor {
37    /// Creates a new `SignatureProcessor` instance.
38    pub fn new(
39        consumer: Arc<dyn SignatureConsumer>,
40        certifier: Arc<dyn CertifierService>,
41        stop_rx: Receiver<()>,
42        logger: Logger,
43        metrics_service: Arc<MetricsService>,
44    ) -> Self {
45        Self {
46            consumer,
47            certifier,
48            stop_rx,
49            logger: logger.new_with_component_name::<Self>(),
50            metrics_service,
51        }
52    }
53
54    /// Authenticates a single signature
55    ///
56    /// This is always the case with single signatures received from the DMQ network.
57    fn authenticate_signature(&self, signature: &mut SingleSignature) {
58        signature.authentication_status = SingleSignatureAuthenticationStatus::Authenticated;
59    }
60}
61
62#[async_trait::async_trait]
63impl SignatureProcessor for SequentialSignatureProcessor {
64    async fn process_signatures(&self) -> StdResult<()> {
65        match self.consumer.get_signatures().await {
66            Ok(signatures) => {
67                for (mut signature, signed_entity_type) in signatures {
68                    self.authenticate_signature(&mut signature);
69                    match self
70                        .certifier
71                        .register_single_signature(&signed_entity_type, &signature)
72                        .await
73                    {
74                        Err(e) => {
75                            error!(self.logger, "Error dispatching single signature"; "error" => ?e);
76                        }
77                        _ => {
78                            let origin_network = self.consumer.get_origin_tag();
79                            self.metrics_service
80                                .get_signature_registration_total_received_since_startup()
81                                .increment(&[&origin_network]);
82                        }
83                    }
84                }
85            }
86            Err(e) => {
87                error!(self.logger, "Error consuming single signatures"; "error" => ?e);
88            }
89        }
90
91        Ok(())
92    }
93
94    async fn run(&self) -> StdResult<()> {
95        loop {
96            let mut stop_rx = self.stop_rx.clone();
97            select! {
98                _ = stop_rx.changed() => {
99                    warn!(self.logger, "Stopping signature processor...");
100
101                    return Ok(());
102                }
103                _ = self.process_signatures() => {}
104            }
105        }
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use anyhow::anyhow;
112    use mithril_common::{
113        entities::{Epoch, SignedEntityType},
114        test::double::fake_data,
115    };
116    use mockall::predicate::eq;
117    use tokio::{
118        sync::watch::channel,
119        time::{Duration, sleep},
120    };
121
122    use crate::{
123        services::{
124            FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
125            SignatureRegistrationStatus,
126        },
127        test::TestLogger,
128    };
129
130    use super::*;
131
132    #[tokio::test]
133    async fn processor_process_signatures_succeeds() {
134        let logger = TestLogger::stdout();
135        let single_signatures = vec![
136            (
137                fake_data::single_signature(vec![1, 2, 3]),
138                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
139            ),
140            (
141                fake_data::single_signature(vec![4, 5, 6]),
142                SignedEntityType::MithrilStakeDistribution(Epoch(2)),
143            ),
144        ];
145        let single_signatures_length = single_signatures.len();
146        let network_origin = "test_network";
147        let mock_consumer = {
148            let mut mock_consumer = MockSignatureConsumer::new();
149            mock_consumer
150                .expect_get_signatures()
151                .returning(move || Ok(single_signatures.clone()))
152                .times(1);
153            mock_consumer
154                .expect_get_origin_tag()
155                .returning(|| network_origin.to_string())
156                .times(single_signatures_length);
157            mock_consumer
158        };
159        let mock_certifier = {
160            let mut mock_certifier = MockCertifierService::new();
161            mock_certifier
162                .expect_register_single_signature()
163                .with(
164                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
165                    eq(SingleSignature {
166                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
167                        ..fake_data::single_signature(vec![1, 2, 3])
168                    }),
169                )
170                .returning(|_, single_signature| {
171                    assert_eq!(
172                        single_signature.authentication_status,
173                        SingleSignatureAuthenticationStatus::Authenticated
174                    );
175                    Ok(SignatureRegistrationStatus::Registered)
176                })
177                .times(1);
178            mock_certifier
179                .expect_register_single_signature()
180                .with(
181                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
182                    eq(SingleSignature {
183                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
184                        ..fake_data::single_signature(vec![4, 5, 6])
185                    }),
186                )
187                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
188                .times(1);
189            mock_certifier
190        };
191        let (_stop_tx, stop_rx) = channel(());
192        let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
193        let initial_counter_value = metrics_service
194            .get_signature_registration_total_received_since_startup()
195            .get(&[network_origin]);
196        let metrics_service = Arc::new(metrics_service);
197        let processor = SequentialSignatureProcessor::new(
198            Arc::new(mock_consumer),
199            Arc::new(mock_certifier),
200            stop_rx,
201            logger,
202            metrics_service.clone(),
203        );
204
205        processor
206            .process_signatures()
207            .await
208            .expect("Failed to process signatures");
209
210        assert_eq!(
211            initial_counter_value + single_signatures_length as u32,
212            metrics_service
213                .get_signature_registration_total_received_since_startup()
214                .get(&[network_origin])
215        )
216    }
217
218    #[tokio::test]
219    async fn processor_run_succeeds() {
220        let logger = TestLogger::stdout();
221        let fake_consumer = FakeSignatureConsumer::new(vec![
222            Err(anyhow!("Error consuming signatures")),
223            Ok(vec![(
224                fake_data::single_signature(vec![1, 2, 3]),
225                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
226            )]),
227        ]);
228        let mock_certifier = {
229            let mut mock_certifier = MockCertifierService::new();
230            mock_certifier
231                .expect_register_single_signature()
232                .with(
233                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
234                    eq(SingleSignature {
235                        authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
236                        ..fake_data::single_signature(vec![1, 2, 3])
237                    }),
238                )
239                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
240                .times(1);
241
242            mock_certifier
243        };
244        let (stop_tx, stop_rx) = channel(());
245        let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
246        let processor = SequentialSignatureProcessor::new(
247            Arc::new(fake_consumer),
248            Arc::new(mock_certifier),
249            stop_rx,
250            logger,
251            Arc::new(metrics_service),
252        );
253
254        tokio::select!(
255            _res =  processor.run() => {},
256            _res = sleep(Duration::from_millis(10)) => {
257                println!("Stopping signature processor...");
258                stop_tx.send(()).unwrap();
259            },
260        );
261    }
262}