mithril_aggregator/services/
signature_processor.rs

1use std::sync::Arc;
2
3use slog::{Logger, error, warn};
4
5use mithril_common::{StdResult, logging::LoggerExtensions};
6use tokio::{select, sync::watch::Receiver};
7
8use crate::MetricsService;
9
10use super::{CertifierService, SignatureConsumer};
11
12/// A signature processor which receives signature and processes them.
13#[cfg_attr(test, mockall::automock)]
14#[async_trait::async_trait]
15pub trait SignatureProcessor: Sync + Send {
16    /// Processes the signatures received from the consumer.
17    async fn process_signatures(&self) -> StdResult<()>;
18
19    /// Starts the processor, which will run indefinitely, processing signatures as they arrive.
20    async fn run(&self) -> StdResult<()>;
21}
22
23/// A sequential signature processor receives messages and processes them sequentially
24pub struct SequentialSignatureProcessor {
25    consumer: Arc<dyn SignatureConsumer>,
26    certifier: Arc<dyn CertifierService>,
27    stop_rx: Receiver<()>,
28    logger: Logger,
29    metrics_service: Arc<MetricsService>,
30}
31
32impl SequentialSignatureProcessor {
33    /// Creates a new `SignatureProcessor` instance.
34    pub fn new(
35        consumer: Arc<dyn SignatureConsumer>,
36        certifier: Arc<dyn CertifierService>,
37        stop_rx: Receiver<()>,
38        logger: Logger,
39        metrics_service: Arc<MetricsService>,
40    ) -> Self {
41        Self {
42            consumer,
43            certifier,
44            stop_rx,
45            logger: logger.new_with_component_name::<Self>(),
46            metrics_service,
47        }
48    }
49}
50
51#[async_trait::async_trait]
52impl SignatureProcessor for SequentialSignatureProcessor {
53    async fn process_signatures(&self) -> StdResult<()> {
54        match self.consumer.get_signatures().await {
55            Ok(signatures) => {
56                for (signature, signed_entity_type) in signatures {
57                    match self
58                        .certifier
59                        .register_single_signature(&signed_entity_type, &signature)
60                        .await
61                    {
62                        Err(e) => {
63                            error!(self.logger, "Error dispatching single signature"; "error" => ?e);
64                        }
65                        _ => {
66                            let origin_network = self.consumer.get_origin_tag();
67                            self.metrics_service
68                                .get_signature_registration_total_received_since_startup()
69                                .increment(&[&origin_network]);
70                        }
71                    }
72                }
73            }
74            Err(e) => {
75                error!(self.logger, "Error consuming single signatures"; "error" => ?e);
76            }
77        }
78
79        Ok(())
80    }
81
82    async fn run(&self) -> StdResult<()> {
83        loop {
84            let mut stop_rx = self.stop_rx.clone();
85            select! {
86                _ = stop_rx.changed() => {
87                    warn!(self.logger, "Stopping signature processor...");
88
89                    return Ok(());
90                }
91                _ = self.process_signatures() => {}
92            }
93        }
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use anyhow::anyhow;
100    use mithril_common::{
101        entities::{Epoch, SignedEntityType},
102        test::double::fake_data,
103    };
104    use mockall::predicate::eq;
105    use tokio::{
106        sync::watch::channel,
107        time::{Duration, sleep},
108    };
109
110    use crate::{
111        services::{
112            FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
113            SignatureRegistrationStatus,
114        },
115        test::TestLogger,
116    };
117
118    use super::*;
119
120    #[tokio::test]
121    async fn processor_process_signatures_succeeds() {
122        let logger = TestLogger::stdout();
123        let single_signatures = vec![
124            (
125                fake_data::single_signature(vec![1, 2, 3]),
126                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
127            ),
128            (
129                fake_data::single_signature(vec![4, 5, 6]),
130                SignedEntityType::MithrilStakeDistribution(Epoch(2)),
131            ),
132        ];
133        let single_signatures_length = single_signatures.len();
134        let network_origin = "test_network";
135        let mock_consumer = {
136            let mut mock_consumer = MockSignatureConsumer::new();
137            mock_consumer
138                .expect_get_signatures()
139                .returning(move || Ok(single_signatures.clone()))
140                .times(1);
141            mock_consumer
142                .expect_get_origin_tag()
143                .returning(|| network_origin.to_string())
144                .times(single_signatures_length);
145            mock_consumer
146        };
147        let mock_certifier = {
148            let mut mock_certifier = MockCertifierService::new();
149            mock_certifier
150                .expect_register_single_signature()
151                .with(
152                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
153                    eq(fake_data::single_signature(vec![1, 2, 3])),
154                )
155                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
156                .times(1);
157            mock_certifier
158                .expect_register_single_signature()
159                .with(
160                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
161                    eq(fake_data::single_signature(vec![4, 5, 6])),
162                )
163                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
164                .times(1);
165            mock_certifier
166        };
167        let (_stop_tx, stop_rx) = channel(());
168        let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
169        let initial_counter_value = metrics_service
170            .get_signature_registration_total_received_since_startup()
171            .get(&[network_origin]);
172        let metrics_service = Arc::new(metrics_service);
173        let processor = SequentialSignatureProcessor::new(
174            Arc::new(mock_consumer),
175            Arc::new(mock_certifier),
176            stop_rx,
177            logger,
178            metrics_service.clone(),
179        );
180
181        processor
182            .process_signatures()
183            .await
184            .expect("Failed to process signatures");
185
186        assert_eq!(
187            initial_counter_value + single_signatures_length as u32,
188            metrics_service
189                .get_signature_registration_total_received_since_startup()
190                .get(&[network_origin])
191        )
192    }
193
194    #[tokio::test]
195    async fn processor_run_succeeds() {
196        let logger = TestLogger::stdout();
197        let fake_consumer = FakeSignatureConsumer::new(vec![
198            Err(anyhow!("Error consuming signatures")),
199            Ok(vec![(
200                fake_data::single_signature(vec![1, 2, 3]),
201                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
202            )]),
203        ]);
204        let mock_certifier = {
205            let mut mock_certifier = MockCertifierService::new();
206            mock_certifier
207                .expect_register_single_signature()
208                .with(
209                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
210                    eq(fake_data::single_signature(vec![1, 2, 3])),
211                )
212                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
213                .times(1);
214
215            mock_certifier
216        };
217        let (stop_tx, stop_rx) = channel(());
218        let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
219        let processor = SequentialSignatureProcessor::new(
220            Arc::new(fake_consumer),
221            Arc::new(mock_certifier),
222            stop_rx,
223            logger,
224            Arc::new(metrics_service),
225        );
226
227        tokio::select!(
228            _res =  processor.run() => {},
229            _res = sleep(Duration::from_millis(10)) => {
230                println!("Stopping signature processor...");
231                stop_tx.send(()).unwrap();
232            },
233        );
234    }
235}