mithril_aggregator/services/
signature_processor.rs1use std::sync::Arc;
2
3use slog::{Logger, error, warn};
4
5use mithril_common::{StdResult, logging::LoggerExtensions};
6use tokio::{select, sync::watch::Receiver};
7
8use crate::MetricsService;
9
10use super::{CertifierService, SignatureConsumer};
11
12#[cfg_attr(test, mockall::automock)]
14#[async_trait::async_trait]
15pub trait SignatureProcessor: Sync + Send {
16 async fn process_signatures(&self) -> StdResult<()>;
18
19 async fn run(&self) -> StdResult<()>;
21}
22
23pub struct SequentialSignatureProcessor {
25 consumer: Arc<dyn SignatureConsumer>,
26 certifier: Arc<dyn CertifierService>,
27 stop_rx: Receiver<()>,
28 logger: Logger,
29 metrics_service: Arc<MetricsService>,
30}
31
32impl SequentialSignatureProcessor {
33 pub fn new(
35 consumer: Arc<dyn SignatureConsumer>,
36 certifier: Arc<dyn CertifierService>,
37 stop_rx: Receiver<()>,
38 logger: Logger,
39 metrics_service: Arc<MetricsService>,
40 ) -> Self {
41 Self {
42 consumer,
43 certifier,
44 stop_rx,
45 logger: logger.new_with_component_name::<Self>(),
46 metrics_service,
47 }
48 }
49}
50
51#[async_trait::async_trait]
52impl SignatureProcessor for SequentialSignatureProcessor {
53 async fn process_signatures(&self) -> StdResult<()> {
54 match self.consumer.get_signatures().await {
55 Ok(signatures) => {
56 for (signature, signed_entity_type) in signatures {
57 match self
58 .certifier
59 .register_single_signature(&signed_entity_type, &signature)
60 .await
61 {
62 Err(e) => {
63 error!(self.logger, "Error dispatching single signature"; "error" => ?e);
64 }
65 _ => {
66 let origin_network = self.consumer.get_origin_tag();
67 self.metrics_service
68 .get_signature_registration_total_received_since_startup()
69 .increment(&[&origin_network]);
70 }
71 }
72 }
73 }
74 Err(e) => {
75 error!(self.logger, "Error consuming single signatures"; "error" => ?e);
76 }
77 }
78
79 Ok(())
80 }
81
82 async fn run(&self) -> StdResult<()> {
83 loop {
84 let mut stop_rx = self.stop_rx.clone();
85 select! {
86 _ = stop_rx.changed() => {
87 warn!(self.logger, "Stopping signature processor...");
88
89 return Ok(());
90 }
91 _ = self.process_signatures() => {}
92 }
93 }
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use anyhow::anyhow;
100 use mithril_common::{
101 entities::{Epoch, SignedEntityType},
102 test::double::fake_data,
103 };
104 use mockall::predicate::eq;
105 use tokio::{
106 sync::watch::channel,
107 time::{Duration, sleep},
108 };
109
110 use crate::{
111 services::{
112 FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
113 SignatureRegistrationStatus,
114 },
115 test::TestLogger,
116 };
117
118 use super::*;
119
120 #[tokio::test]
121 async fn processor_process_signatures_succeeds() {
122 let logger = TestLogger::stdout();
123 let single_signatures = vec![
124 (
125 fake_data::single_signature(vec![1, 2, 3]),
126 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
127 ),
128 (
129 fake_data::single_signature(vec![4, 5, 6]),
130 SignedEntityType::MithrilStakeDistribution(Epoch(2)),
131 ),
132 ];
133 let single_signatures_length = single_signatures.len();
134 let network_origin = "test_network";
135 let mock_consumer = {
136 let mut mock_consumer = MockSignatureConsumer::new();
137 mock_consumer
138 .expect_get_signatures()
139 .returning(move || Ok(single_signatures.clone()))
140 .times(1);
141 mock_consumer
142 .expect_get_origin_tag()
143 .returning(|| network_origin.to_string())
144 .times(single_signatures_length);
145 mock_consumer
146 };
147 let mock_certifier = {
148 let mut mock_certifier = MockCertifierService::new();
149 mock_certifier
150 .expect_register_single_signature()
151 .with(
152 eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
153 eq(fake_data::single_signature(vec![1, 2, 3])),
154 )
155 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
156 .times(1);
157 mock_certifier
158 .expect_register_single_signature()
159 .with(
160 eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
161 eq(fake_data::single_signature(vec![4, 5, 6])),
162 )
163 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
164 .times(1);
165 mock_certifier
166 };
167 let (_stop_tx, stop_rx) = channel(());
168 let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
169 let initial_counter_value = metrics_service
170 .get_signature_registration_total_received_since_startup()
171 .get(&[network_origin]);
172 let metrics_service = Arc::new(metrics_service);
173 let processor = SequentialSignatureProcessor::new(
174 Arc::new(mock_consumer),
175 Arc::new(mock_certifier),
176 stop_rx,
177 logger,
178 metrics_service.clone(),
179 );
180
181 processor
182 .process_signatures()
183 .await
184 .expect("Failed to process signatures");
185
186 assert_eq!(
187 initial_counter_value + single_signatures_length as u32,
188 metrics_service
189 .get_signature_registration_total_received_since_startup()
190 .get(&[network_origin])
191 )
192 }
193
194 #[tokio::test]
195 async fn processor_run_succeeds() {
196 let logger = TestLogger::stdout();
197 let fake_consumer = FakeSignatureConsumer::new(vec![
198 Err(anyhow!("Error consuming signatures")),
199 Ok(vec![(
200 fake_data::single_signature(vec![1, 2, 3]),
201 SignedEntityType::MithrilStakeDistribution(Epoch(1)),
202 )]),
203 ]);
204 let mock_certifier = {
205 let mut mock_certifier = MockCertifierService::new();
206 mock_certifier
207 .expect_register_single_signature()
208 .with(
209 eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
210 eq(fake_data::single_signature(vec![1, 2, 3])),
211 )
212 .returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
213 .times(1);
214
215 mock_certifier
216 };
217 let (stop_tx, stop_rx) = channel(());
218 let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
219 let processor = SequentialSignatureProcessor::new(
220 Arc::new(fake_consumer),
221 Arc::new(mock_certifier),
222 stop_rx,
223 logger,
224 Arc::new(metrics_service),
225 );
226
227 tokio::select!(
228 _res = processor.run() => {},
229 _res = sleep(Duration::from_millis(10)) => {
230 println!("Stopping signature processor...");
231 stop_tx.send(()).unwrap();
232 },
233 );
234 }
235}