mithril_aggregator/services/
signature_processor.rs1use std::{sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use slog::{Logger, error, trace, warn};
5
6use mithril_common::{
7 StdResult,
8 entities::{SingleSignature, SingleSignatureAuthenticationStatus},
9 logging::LoggerExtensions,
10};
11use tokio::{select, sync::watch::Receiver};
12
13use crate::MetricsService;
14
15use super::{CertifierService, SignatureConsumer};
16
17#[cfg_attr(test, mockall::automock)]
19#[async_trait::async_trait]
20pub trait SignatureProcessor: Sync + Send {
21 async fn process_signatures(&self) -> StdResult<()>;
23
24 async fn run(&self) -> StdResult<()>;
26}
27
28pub struct SequentialSignatureProcessor {
30 consumer: Arc<dyn SignatureConsumer>,
31 certifier: Arc<dyn CertifierService>,
32 stop_rx: Receiver<()>,
33 metrics_service: Arc<MetricsService>,
34 wait_delay_on_error: Duration,
35 logger: Logger,
36}
37
38impl SequentialSignatureProcessor {
39 pub fn new(
41 consumer: Arc<dyn SignatureConsumer>,
42 certifier: Arc<dyn CertifierService>,
43 stop_rx: Receiver<()>,
44 metrics_service: Arc<MetricsService>,
45 wait_delay_on_error_in_seconds: Duration,
46 logger: Logger,
47 ) -> Self {
48 Self {
49 consumer,
50 certifier,
51 stop_rx,
52 metrics_service,
53 wait_delay_on_error: wait_delay_on_error_in_seconds,
54 logger: logger.new_with_component_name::<Self>(),
55 }
56 }
57
58 fn authenticate_signature(&self, signature: &mut SingleSignature) {
62 signature.authentication_status = SingleSignatureAuthenticationStatus::Authenticated;
63 }
64}
65
66#[async_trait::async_trait]
67impl SignatureProcessor for SequentialSignatureProcessor {
68 async fn process_signatures(&self) -> StdResult<()> {
69 let origin_network = self.consumer.get_origin_tag();
70
71 let mut total_import_errors = 0;
72 match self.consumer.get_signatures().await {
73 Ok(signatures) => {
74 let number_of_signatures = signatures.len() as u32;
75 trace!(self.logger, "Received {} signatures", number_of_signatures);
76
77 self.metrics_service
78 .get_signature_registration_total_received_since_startup()
79 .increment_by(&[&origin_network], number_of_signatures);
80
81 for (mut signature, signed_entity_type) in signatures {
82 self.authenticate_signature(&mut signature);
83 match self
84 .certifier
85 .register_single_signature(&signed_entity_type, &signature)
86 .await
87 {
88 Ok(_registration_status) => {
89 self.metrics_service
90 .get_signature_registration_total_successful_since_startup()
91 .increment(&[&origin_network]);
92 }
93 Err(e) => {
94 total_import_errors += 1;
95 error!(
96 self.logger, "Error dispatching single signature";
97 "full_payload" => #?signature, "error" => ?e
98 );
99 }
100 }
101 }
102 }
103 Err(e) => {
104 error!(self.logger, "Error consuming single signatures"; "error" => ?e);
105 total_import_errors += 1;
106 }
107 }
108
109 if total_import_errors > 0 {
110 error!(
111 self.logger,
112 "Total import errors while processing signatures: {}", total_import_errors
113 );
114 return Err(anyhow!(
115 "Total import errors while processing signatures: {}",
116 total_import_errors
117 ));
118 }
119
120 Ok(())
121 }
122
123 async fn run(&self) -> StdResult<()> {
124 loop {
125 let mut stop_rx = self.stop_rx.clone();
126 select! {
127 _ = stop_rx.changed() => {
128 warn!(self.logger, "Stopping signature processor...");
129
130 return Ok(());
131 }
132 res = self.process_signatures() => {
133 if let Err(e) = res {
134 error!(self.logger, "Error processing signatures"; "error" => ?e);
135 error!(self.logger, "Sleep for {} seconds", self.wait_delay_on_error.as_secs());
136 tokio::time::sleep(self.wait_delay_on_error).await;
137 }
138 }
139 }
140 }
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use anyhow::anyhow;
147 use mockall::predicate::eq;
148 use tokio::{
149 sync::watch::channel,
150 time::{Duration, sleep},
151 };
152
153 use mithril_common::{
154 entities::{Epoch, SignedEntityType},
155 test::{double::fake_data, mock_extensions::MockBuilder},
156 };
157
158 use crate::services::{
159 FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
160 SignatureRegistrationStatus,
161 };
162 use crate::test::TestLogger;
163
164 use super::*;
165
166 #[tokio::test]
167 async fn processor_process_signatures_succeeds() {
168 let logger = TestLogger::stdout();
169 let single_signatures = vec![
170 (
171 fake_data::single_signature(vec![1, 2, 3]),
172 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
173 ),
174 (
175 fake_data::single_signature(vec![4, 5, 6]),
176 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
177 ),
178 ];
179 let mock_consumer = MockBuilder::<MockSignatureConsumer>::configure(|mock| {
180 mock.expect_get_signatures()
181 .returning(move || Ok(single_signatures.clone()))
182 .times(1);
183 mock.expect_get_origin_tag()
184 .returning(|| "whatever".to_string())
185 .times(1);
186 });
187 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
188 mock.expect_register_single_signature()
189 .with(
190 eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
191 eq(SingleSignature {
192 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
193 ..fake_data::single_signature(vec![1, 2, 3])
194 }),
195 )
196 .returning(|_, single_signature| {
197 assert_eq!(
198 single_signature.authentication_status,
199 SingleSignatureAuthenticationStatus::Authenticated
200 );
201 Ok(SignatureRegistrationStatus::Registered)
202 })
203 .times(1);
204 mock.expect_register_single_signature()
205 .with(
206 eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
207 eq(SingleSignature {
208 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
209 ..fake_data::single_signature(vec![4, 5, 6])
210 }),
211 )
212 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
213 .times(1);
214 });
215 let (_stop_tx, stop_rx) = channel(());
216 let processor = SequentialSignatureProcessor::new(
217 mock_consumer,
218 mock_certifier,
219 stop_rx,
220 Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
221 Duration::from_millis(10),
222 logger,
223 );
224
225 processor
226 .process_signatures()
227 .await
228 .expect("Failed to process signatures");
229 }
230
231 #[tokio::test]
232 async fn processor_process_signatures_send_total_received_and_successful_statistics_if_successful()
233 {
234 let logger = TestLogger::stdout();
235 let fake_consumer = FakeSignatureConsumer::new(vec![Ok(vec![(
236 fake_data::single_signature(vec![1, 2, 3]),
237 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
238 )])]);
239 let network_origin = fake_consumer.get_origin_tag();
240
241 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
242 mock.expect_register_single_signature()
243 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered));
244 });
245 let (_stop_tx, stop_rx) = channel(());
246 let metrics_service = Arc::new(MetricsService::new(logger.clone()).unwrap());
247 let processor = SequentialSignatureProcessor::new(
248 Arc::new(fake_consumer),
249 mock_certifier,
250 stop_rx,
251 metrics_service.clone(),
252 Duration::from_millis(10),
253 logger,
254 );
255
256 let initial_received_counter_value = metrics_service
257 .get_signature_registration_total_received_since_startup()
258 .get(&[&network_origin]);
259 let initial_successful_counter_value = metrics_service
260 .get_signature_registration_total_successful_since_startup()
261 .get(&[&network_origin]);
262
263 processor.process_signatures().await.unwrap();
264
265 assert_eq!(
266 initial_received_counter_value + 1,
267 metrics_service
268 .get_signature_registration_total_received_since_startup()
269 .get(&[&network_origin])
270 );
271 assert_eq!(
272 initial_successful_counter_value + 1,
273 metrics_service
274 .get_signature_registration_total_successful_since_startup()
275 .get(&[&network_origin])
276 );
277 }
278
279 #[tokio::test]
280 async fn processor_process_signatures_send_only_total_received_statistic_if_failure() {
281 let logger = TestLogger::stdout();
282 let fake_consumer = FakeSignatureConsumer::new(vec![Ok(vec![(
283 fake_data::single_signature(vec![1, 2, 3]),
284 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
285 )])]);
286 let network_origin = fake_consumer.get_origin_tag();
287
288 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
289 mock.expect_register_single_signature()
290 .returning(|_, _| Err(anyhow!("Error registering signature")));
291 });
292 let (_stop_tx, stop_rx) = channel(());
293 let metrics_service = Arc::new(MetricsService::new(logger.clone()).unwrap());
294 let processor = SequentialSignatureProcessor::new(
295 Arc::new(fake_consumer),
296 mock_certifier,
297 stop_rx,
298 metrics_service.clone(),
299 Duration::from_millis(10),
300 logger,
301 );
302
303 let initial_received_counter_value = metrics_service
304 .get_signature_registration_total_received_since_startup()
305 .get(&[&network_origin]);
306 let initial_successful_counter_value = metrics_service
307 .get_signature_registration_total_successful_since_startup()
308 .get(&[&network_origin]);
309
310 processor.process_signatures().await.expect_err("Should have failed");
311
312 assert_eq!(
313 initial_received_counter_value + 1,
314 metrics_service
315 .get_signature_registration_total_received_since_startup()
316 .get(&[&network_origin])
317 );
318 assert_eq!(
319 initial_successful_counter_value,
320 metrics_service
321 .get_signature_registration_total_successful_since_startup()
322 .get(&[&network_origin])
323 );
324 }
325
326 #[tokio::test]
327 async fn processor_run_succeeds_even_if_processing_signatures_fails() {
328 let logger = TestLogger::stdout();
329 let fake_consumer = FakeSignatureConsumer::new(vec![
330 Err(anyhow!("Error consuming signatures")),
331 Ok(vec![(
332 fake_data::single_signature(vec![1, 2, 3]),
333 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
334 )]),
335 ]);
336 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
337 mock.expect_register_single_signature()
338 .with(
339 eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
340 eq(SingleSignature {
341 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
342 ..fake_data::single_signature(vec![1, 2, 3])
343 }),
344 )
345 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
346 .times(1);
347 });
348 let (stop_tx, stop_rx) = channel(());
349 let metrics_service = MetricsService::new(logger.clone()).unwrap();
350 let processor = SequentialSignatureProcessor::new(
351 Arc::new(fake_consumer),
352 mock_certifier,
353 stop_rx,
354 Arc::new(metrics_service),
355 Duration::from_millis(1),
356 logger,
357 );
358
359 select!(
360 _res = processor.run() => {},
361 _res = sleep(Duration::from_millis(500)) => {
362 println!("Stopping signature processor...");
363 stop_tx.send(()).unwrap();
364 },
365 );
366 }
367
368 #[tokio::test]
369 async fn processor_run_waits_before_resuming_if_processing_signatures_fails() {
370 let logger = TestLogger::stdout();
371 let fake_consumer = FakeSignatureConsumer::new(vec![
372 Err(anyhow!("Error consuming signatures")),
373 Ok(vec![(
374 fake_data::single_signature(vec![1, 2, 3]),
375 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
376 )]),
377 ]);
378 let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
379 mock.expect_register_single_signature().never();
380 });
381 let (stop_tx, stop_rx) = channel(());
382 let metrics_service = MetricsService::new(logger.clone()).unwrap();
383 let processor = SequentialSignatureProcessor::new(
384 Arc::new(fake_consumer),
385 mock_certifier,
386 stop_rx,
387 Arc::new(metrics_service),
388 Duration::from_millis(1000),
389 logger,
390 );
391
392 select!(
393 _res = processor.run() => {},
394 _res = sleep(Duration::from_millis(500)) => {
395 println!("Stopping signature processor...");
396 stop_tx.send(()).unwrap();
397 },
398 );
399 }
400}