mithril_aggregator/services/
signature_processor.rs

1use std::sync::Arc;
2
3use slog::{error, warn, Logger};
4
5use mithril_common::{logging::LoggerExtensions, StdResult};
6use tokio::{select, sync::watch::Receiver};
7
8use crate::MetricsService;
9
10use super::{CertifierService, SignatureConsumer};
11
12/// A signature processor which receives signature and processes them.
13#[cfg_attr(test, mockall::automock)]
14#[async_trait::async_trait]
15pub trait SignatureProcessor: Sync + Send {
16    /// Processes the signatures received from the consumer.
17    async fn process_signatures(&self) -> StdResult<()>;
18
19    /// Starts the processor, which will run indefinitely, processing signatures as they arrive.
20    async fn run(&self) -> StdResult<()>;
21}
22
23/// A sequential signature processor receives messages and processes them sequentially
24pub struct SequentialSignatureProcessor {
25    consumer: Arc<dyn SignatureConsumer>,
26    certifier: Arc<dyn CertifierService>,
27    stop_rx: Receiver<()>,
28    logger: Logger,
29    metrics_service: Arc<MetricsService>,
30}
31
32impl SequentialSignatureProcessor {
33    /// Creates a new `SignatureProcessor` instance.
34    pub fn new(
35        consumer: Arc<dyn SignatureConsumer>,
36        certifier: Arc<dyn CertifierService>,
37        stop_rx: Receiver<()>,
38        logger: Logger,
39        metrics_service: Arc<MetricsService>,
40    ) -> Self {
41        Self {
42            consumer,
43            certifier,
44            stop_rx,
45            logger: logger.new_with_component_name::<Self>(),
46            metrics_service,
47        }
48    }
49}
50
51#[async_trait::async_trait]
52impl SignatureProcessor for SequentialSignatureProcessor {
53    async fn process_signatures(&self) -> StdResult<()> {
54        match self.consumer.get_signatures().await {
55            Ok(signatures) => {
56                for (signature, signed_entity_type) in signatures {
57                    if let Err(e) = self
58                        .certifier
59                        .register_single_signature(&signed_entity_type, &signature)
60                        .await
61                    {
62                        error!(self.logger, "Error dispatching single signature"; "error" => ?e);
63                    } else {
64                        let origin_network = self.consumer.get_origin_tag();
65                        self.metrics_service
66                            .get_signature_registration_total_received_since_startup()
67                            .increment(&[&origin_network]);
68                    }
69                }
70            }
71            Err(e) => {
72                error!(self.logger, "Error consuming single signatures"; "error" => ?e);
73            }
74        }
75
76        Ok(())
77    }
78
79    async fn run(&self) -> StdResult<()> {
80        loop {
81            let mut stop_rx = self.stop_rx.clone();
82            select! {
83                _ = stop_rx.changed() => {
84                    warn!(self.logger, "Stopping signature processor...");
85
86                    return Ok(());
87                }
88                _ = self.process_signatures() => {}
89            }
90        }
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use anyhow::anyhow;
97    use mithril_common::{
98        entities::{Epoch, SignedEntityType},
99        test_utils::fake_data,
100    };
101    use mockall::predicate::eq;
102    use tokio::{
103        sync::watch::channel,
104        time::{sleep, Duration},
105    };
106
107    use crate::{
108        services::{
109            FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
110            SignatureRegistrationStatus,
111        },
112        test_tools::TestLogger,
113    };
114
115    use super::*;
116
117    #[tokio::test]
118    async fn processor_process_signatures_succeeds() {
119        let logger = TestLogger::stdout();
120        let single_signatures = vec![
121            (
122                fake_data::single_signature(vec![1, 2, 3]),
123                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
124            ),
125            (
126                fake_data::single_signature(vec![4, 5, 6]),
127                SignedEntityType::MithrilStakeDistribution(Epoch(2)),
128            ),
129        ];
130        let single_signatures_length = single_signatures.len();
131        let network_origin = "test_network";
132        let mock_consumer = {
133            let mut mock_consumer = MockSignatureConsumer::new();
134            mock_consumer
135                .expect_get_signatures()
136                .returning(move || Ok(single_signatures.clone()))
137                .times(1);
138            mock_consumer
139                .expect_get_origin_tag()
140                .returning(|| network_origin.to_string())
141                .times(single_signatures_length);
142            mock_consumer
143        };
144        let mock_certifier = {
145            let mut mock_certifier = MockCertifierService::new();
146            mock_certifier
147                .expect_register_single_signature()
148                .with(
149                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
150                    eq(fake_data::single_signature(vec![1, 2, 3])),
151                )
152                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
153                .times(1);
154            mock_certifier
155                .expect_register_single_signature()
156                .with(
157                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
158                    eq(fake_data::single_signature(vec![4, 5, 6])),
159                )
160                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
161                .times(1);
162            mock_certifier
163        };
164        let (_stop_tx, stop_rx) = channel(());
165        let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
166        let initial_counter_value = metrics_service
167            .get_signature_registration_total_received_since_startup()
168            .get(&[network_origin]);
169        let metrics_service = Arc::new(metrics_service);
170        let processor = SequentialSignatureProcessor::new(
171            Arc::new(mock_consumer),
172            Arc::new(mock_certifier),
173            stop_rx,
174            logger,
175            metrics_service.clone(),
176        );
177
178        processor
179            .process_signatures()
180            .await
181            .expect("Failed to process signatures");
182
183        assert_eq!(
184            initial_counter_value + single_signatures_length as u32,
185            metrics_service
186                .get_signature_registration_total_received_since_startup()
187                .get(&[network_origin])
188        )
189    }
190
191    #[tokio::test]
192    async fn processor_run_succeeds() {
193        let logger = TestLogger::stdout();
194        let fake_consumer = FakeSignatureConsumer::new(vec![
195            Err(anyhow!("Error consuming signatures")),
196            Ok(vec![(
197                fake_data::single_signature(vec![1, 2, 3]),
198                SignedEntityType::MithrilStakeDistribution(Epoch(1)),
199            )]),
200        ]);
201        let mock_certifier = {
202            let mut mock_certifier = MockCertifierService::new();
203            mock_certifier
204                .expect_register_single_signature()
205                .with(
206                    eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
207                    eq(fake_data::single_signature(vec![1, 2, 3])),
208                )
209                .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
210                .times(1);
211
212            mock_certifier
213        };
214        let (stop_tx, stop_rx) = channel(());
215        let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
216        let processor = SequentialSignatureProcessor::new(
217            Arc::new(fake_consumer),
218            Arc::new(mock_certifier),
219            stop_rx,
220            logger,
221            Arc::new(metrics_service),
222        );
223
224        tokio::select!(
225            _res =  processor.run() => {},
226            _res = sleep(Duration::from_millis(10)) => {
227                println!("Stopping signature processor...");
228                stop_tx.send(()).unwrap();
229            },
230        );
231    }
232}