mithril_aggregator/services/
signature_processor.rs1use std::sync::Arc;
2
3use slog::{Logger, error, warn};
4
5use mithril_common::{
6 StdResult,
7 entities::{SingleSignature, SingleSignatureAuthenticationStatus},
8 logging::LoggerExtensions,
9};
10use tokio::{select, sync::watch::Receiver};
11
12use crate::MetricsService;
13
14use super::{CertifierService, SignatureConsumer};
15
16#[cfg_attr(test, mockall::automock)]
18#[async_trait::async_trait]
19pub trait SignatureProcessor: Sync + Send {
20 async fn process_signatures(&self) -> StdResult<()>;
22
23 async fn run(&self) -> StdResult<()>;
25}
26
27pub struct SequentialSignatureProcessor {
29 consumer: Arc<dyn SignatureConsumer>,
30 certifier: Arc<dyn CertifierService>,
31 stop_rx: Receiver<()>,
32 logger: Logger,
33 metrics_service: Arc<MetricsService>,
34}
35
36impl SequentialSignatureProcessor {
37 pub fn new(
39 consumer: Arc<dyn SignatureConsumer>,
40 certifier: Arc<dyn CertifierService>,
41 stop_rx: Receiver<()>,
42 logger: Logger,
43 metrics_service: Arc<MetricsService>,
44 ) -> Self {
45 Self {
46 consumer,
47 certifier,
48 stop_rx,
49 logger: logger.new_with_component_name::<Self>(),
50 metrics_service,
51 }
52 }
53
54 fn authenticate_signature(&self, signature: &mut SingleSignature) {
58 signature.authentication_status = SingleSignatureAuthenticationStatus::Authenticated;
59 }
60}
61
62#[async_trait::async_trait]
63impl SignatureProcessor for SequentialSignatureProcessor {
64 async fn process_signatures(&self) -> StdResult<()> {
65 match self.consumer.get_signatures().await {
66 Ok(signatures) => {
67 for (mut signature, signed_entity_type) in signatures {
68 self.authenticate_signature(&mut signature);
69 match self
70 .certifier
71 .register_single_signature(&signed_entity_type, &signature)
72 .await
73 {
74 Err(e) => {
75 error!(self.logger, "Error dispatching single signature"; "error" => ?e);
76 }
77 _ => {
78 let origin_network = self.consumer.get_origin_tag();
79 self.metrics_service
80 .get_signature_registration_total_received_since_startup()
81 .increment(&[&origin_network]);
82 }
83 }
84 }
85 }
86 Err(e) => {
87 error!(self.logger, "Error consuming single signatures"; "error" => ?e);
88 }
89 }
90
91 Ok(())
92 }
93
94 async fn run(&self) -> StdResult<()> {
95 loop {
96 let mut stop_rx = self.stop_rx.clone();
97 select! {
98 _ = stop_rx.changed() => {
99 warn!(self.logger, "Stopping signature processor...");
100
101 return Ok(());
102 }
103 _ = self.process_signatures() => {}
104 }
105 }
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use anyhow::anyhow;
112 use mithril_common::{
113 entities::{Epoch, SignedEntityType},
114 test::double::fake_data,
115 };
116 use mockall::predicate::eq;
117 use tokio::{
118 sync::watch::channel,
119 time::{Duration, sleep},
120 };
121
122 use crate::{
123 services::{
124 FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
125 SignatureRegistrationStatus,
126 },
127 test::TestLogger,
128 };
129
130 use super::*;
131
132 #[tokio::test]
133 async fn processor_process_signatures_succeeds() {
134 let logger = TestLogger::stdout();
135 let single_signatures = vec![
136 (
137 fake_data::single_signature(vec![1, 2, 3]),
138 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
139 ),
140 (
141 fake_data::single_signature(vec![4, 5, 6]),
142 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
143 ),
144 ];
145 let single_signatures_length = single_signatures.len();
146 let network_origin = "test_network";
147 let mock_consumer = {
148 let mut mock_consumer = MockSignatureConsumer::new();
149 mock_consumer
150 .expect_get_signatures()
151 .returning(move || Ok(single_signatures.clone()))
152 .times(1);
153 mock_consumer
154 .expect_get_origin_tag()
155 .returning(|| network_origin.to_string())
156 .times(single_signatures_length);
157 mock_consumer
158 };
159 let mock_certifier = {
160 let mut mock_certifier = MockCertifierService::new();
161 mock_certifier
162 .expect_register_single_signature()
163 .with(
164 eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
165 eq(SingleSignature {
166 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
167 ..fake_data::single_signature(vec![1, 2, 3])
168 }),
169 )
170 .returning(|_, single_signature| {
171 assert_eq!(
172 single_signature.authentication_status,
173 SingleSignatureAuthenticationStatus::Authenticated
174 );
175 Ok(SignatureRegistrationStatus::Registered)
176 })
177 .times(1);
178 mock_certifier
179 .expect_register_single_signature()
180 .with(
181 eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
182 eq(SingleSignature {
183 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
184 ..fake_data::single_signature(vec![4, 5, 6])
185 }),
186 )
187 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
188 .times(1);
189 mock_certifier
190 };
191 let (_stop_tx, stop_rx) = channel(());
192 let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
193 let initial_counter_value = metrics_service
194 .get_signature_registration_total_received_since_startup()
195 .get(&[network_origin]);
196 let metrics_service = Arc::new(metrics_service);
197 let processor = SequentialSignatureProcessor::new(
198 Arc::new(mock_consumer),
199 Arc::new(mock_certifier),
200 stop_rx,
201 logger,
202 metrics_service.clone(),
203 );
204
205 processor
206 .process_signatures()
207 .await
208 .expect("Failed to process signatures");
209
210 assert_eq!(
211 initial_counter_value + single_signatures_length as u32,
212 metrics_service
213 .get_signature_registration_total_received_since_startup()
214 .get(&[network_origin])
215 )
216 }
217
218 #[tokio::test]
219 async fn processor_run_succeeds() {
220 let logger = TestLogger::stdout();
221 let fake_consumer = FakeSignatureConsumer::new(vec![
222 Err(anyhow!("Error consuming signatures")),
223 Ok(vec![(
224 fake_data::single_signature(vec![1, 2, 3]),
225 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
226 )]),
227 ]);
228 let mock_certifier = {
229 let mut mock_certifier = MockCertifierService::new();
230 mock_certifier
231 .expect_register_single_signature()
232 .with(
233 eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
234 eq(SingleSignature {
235 authentication_status: SingleSignatureAuthenticationStatus::Authenticated,
236 ..fake_data::single_signature(vec![1, 2, 3])
237 }),
238 )
239 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
240 .times(1);
241
242 mock_certifier
243 };
244 let (stop_tx, stop_rx) = channel(());
245 let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
246 let processor = SequentialSignatureProcessor::new(
247 Arc::new(fake_consumer),
248 Arc::new(mock_certifier),
249 stop_rx,
250 logger,
251 Arc::new(metrics_service),
252 );
253
254 tokio::select!(
255 _res = processor.run() => {},
256 _res = sleep(Duration::from_millis(10)) => {
257 println!("Stopping signature processor...");
258 stop_tx.send(()).unwrap();
259 },
260 );
261 }
262}