mithril_aggregator/services/
signature_processor.rs1use std::sync::Arc;
2
3use slog::{error, warn, Logger};
4
5use mithril_common::{logging::LoggerExtensions, StdResult};
6use tokio::{select, sync::watch::Receiver};
7
8use crate::MetricsService;
9
10use super::{CertifierService, SignatureConsumer};
11
12#[cfg_attr(test, mockall::automock)]
14#[async_trait::async_trait]
15pub trait SignatureProcessor: Sync + Send {
16 async fn process_signatures(&self) -> StdResult<()>;
18
19 async fn run(&self) -> StdResult<()>;
21}
22
23pub struct SequentialSignatureProcessor {
25 consumer: Arc<dyn SignatureConsumer>,
26 certifier: Arc<dyn CertifierService>,
27 stop_rx: Receiver<()>,
28 logger: Logger,
29 metrics_service: Arc<MetricsService>,
30}
31
32impl SequentialSignatureProcessor {
33 pub fn new(
35 consumer: Arc<dyn SignatureConsumer>,
36 certifier: Arc<dyn CertifierService>,
37 stop_rx: Receiver<()>,
38 logger: Logger,
39 metrics_service: Arc<MetricsService>,
40 ) -> Self {
41 Self {
42 consumer,
43 certifier,
44 stop_rx,
45 logger: logger.new_with_component_name::<Self>(),
46 metrics_service,
47 }
48 }
49}
50
51#[async_trait::async_trait]
52impl SignatureProcessor for SequentialSignatureProcessor {
53 async fn process_signatures(&self) -> StdResult<()> {
54 match self.consumer.get_signatures().await {
55 Ok(signatures) => {
56 for (signature, signed_entity_type) in signatures {
57 if let Err(e) = self
58 .certifier
59 .register_single_signature(&signed_entity_type, &signature)
60 .await
61 {
62 error!(self.logger, "Error dispatching single signature"; "error" => ?e);
63 } else {
64 let origin_network = self.consumer.get_origin_tag();
65 self.metrics_service
66 .get_signature_registration_total_received_since_startup()
67 .increment(&[&origin_network]);
68 }
69 }
70 }
71 Err(e) => {
72 error!(self.logger, "Error consuming single signatures"; "error" => ?e);
73 }
74 }
75
76 Ok(())
77 }
78
79 async fn run(&self) -> StdResult<()> {
80 loop {
81 let mut stop_rx = self.stop_rx.clone();
82 select! {
83 _ = stop_rx.changed() => {
84 warn!(self.logger, "Stopping signature processor...");
85
86 return Ok(());
87 }
88 _ = self.process_signatures() => {}
89 }
90 }
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use anyhow::anyhow;
97 use mithril_common::{
98 entities::{Epoch, SignedEntityType},
99 test_utils::fake_data,
100 };
101 use mockall::predicate::eq;
102 use tokio::{
103 sync::watch::channel,
104 time::{sleep, Duration},
105 };
106
107 use crate::{
108 services::{
109 FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
110 SignatureRegistrationStatus,
111 },
112 test_tools::TestLogger,
113 };
114
115 use super::*;
116
117 #[tokio::test]
118 async fn processor_process_signatures_succeeds() {
119 let logger = TestLogger::stdout();
120 let single_signatures = vec![
121 (
122 fake_data::single_signature(vec![1, 2, 3]),
123 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
124 ),
125 (
126 fake_data::single_signature(vec![4, 5, 6]),
127 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
128 ),
129 ];
130 let single_signatures_length = single_signatures.len();
131 let network_origin = "test_network";
132 let mock_consumer = {
133 let mut mock_consumer = MockSignatureConsumer::new();
134 mock_consumer
135 .expect_get_signatures()
136 .returning(move || Ok(single_signatures.clone()))
137 .times(1);
138 mock_consumer
139 .expect_get_origin_tag()
140 .returning(|| network_origin.to_string())
141 .times(single_signatures_length);
142 mock_consumer
143 };
144 let mock_certifier = {
145 let mut mock_certifier = MockCertifierService::new();
146 mock_certifier
147 .expect_register_single_signature()
148 .with(
149 eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
150 eq(fake_data::single_signature(vec![1, 2, 3])),
151 )
152 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
153 .times(1);
154 mock_certifier
155 .expect_register_single_signature()
156 .with(
157 eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
158 eq(fake_data::single_signature(vec![4, 5, 6])),
159 )
160 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
161 .times(1);
162 mock_certifier
163 };
164 let (_stop_tx, stop_rx) = channel(());
165 let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
166 let initial_counter_value = metrics_service
167 .get_signature_registration_total_received_since_startup()
168 .get(&[network_origin]);
169 let metrics_service = Arc::new(metrics_service);
170 let processor = SequentialSignatureProcessor::new(
171 Arc::new(mock_consumer),
172 Arc::new(mock_certifier),
173 stop_rx,
174 logger,
175 metrics_service.clone(),
176 );
177
178 processor
179 .process_signatures()
180 .await
181 .expect("Failed to process signatures");
182
183 assert_eq!(
184 initial_counter_value + single_signatures_length as u32,
185 metrics_service
186 .get_signature_registration_total_received_since_startup()
187 .get(&[network_origin])
188 )
189 }
190
191 #[tokio::test]
192 async fn processor_run_succeeds() {
193 let logger = TestLogger::stdout();
194 let fake_consumer = FakeSignatureConsumer::new(vec![
195 Err(anyhow!("Error consuming signatures")),
196 Ok(vec![(
197 fake_data::single_signature(vec![1, 2, 3]),
198 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
199 )]),
200 ]);
201 let mock_certifier = {
202 let mut mock_certifier = MockCertifierService::new();
203 mock_certifier
204 .expect_register_single_signature()
205 .with(
206 eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
207 eq(fake_data::single_signature(vec![1, 2, 3])),
208 )
209 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
210 .times(1);
211
212 mock_certifier
213 };
214 let (stop_tx, stop_rx) = channel(());
215 let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
216 let processor = SequentialSignatureProcessor::new(
217 Arc::new(fake_consumer),
218 Arc::new(mock_certifier),
219 stop_rx,
220 logger,
221 Arc::new(metrics_service),
222 );
223
224 tokio::select!(
225 _res = processor.run() => {},
226 _res = sleep(Duration::from_millis(10)) => {
227 println!("Stopping signature processor...");
228 stop_tx.send(()).unwrap();
229 },
230 );
231 }
232}