mithril_aggregator/services/
stake_distribution.rs1use async_trait::async_trait;
5use std::{
6 fmt::Display,
7 sync::{Arc, RwLock},
8};
9use tokio::sync::{Mutex, MutexGuard};
10
11use mithril_common::{
12 chain_observer::ChainObserver,
13 entities::{Epoch, StakeDistribution},
14 StdError, StdResult,
15};
16use mithril_persistence::store::StakeStorer;
17
18use crate::database::repository::StakePoolStore;
19
20#[derive(Debug)]
22pub enum StakePoolDistributionServiceError {
23 Technical {
25 message: String,
27 error: Option<StdError>,
29 },
30 Unavailable(Epoch),
32 Busy(Epoch),
34}
35
36impl StakePoolDistributionServiceError {
37 pub fn technical_subsystem(error: StdError) -> Box<Self> {
39 Box::new(Self::Technical {
40 message: "Stake pool service subsystem error occurred.".to_string(),
41 error: Some(error),
42 })
43 }
44}
45
46impl TryFrom<StdError> for StakePoolDistributionServiceError {
47 type Error = Box<Self>;
48
49 fn try_from(value: StdError) -> Result<Self, Self::Error> {
50 Err(Box::new(Self::Technical {
51 message: "subsystem error".to_string(),
52 error: Some(value),
53 }))
54 }
55}
56
57impl Display for StakePoolDistributionServiceError {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 match self {
60 Self::Technical { message, error } => {
61 if let Some(nested_error) = error {
62 write!(
63 f,
64 "Critical error: {message} (nested error: '{nested_error}')"
65 )
66 } else {
67 write!(f, "Critical error: {message}")
68 }
69 }
70 Self::Unavailable(epoch) => {
71 write!(
72 f,
73 "The stake distribution for epoch {epoch:?} is not available."
74 )
75 }
76 Self::Busy(epoch) => {
77 write!(
78 f,
79 "The stake distribution for epoch {epoch:?} is actually processed."
80 )
81 }
82 }
83 }
84}
85
86impl std::error::Error for StakePoolDistributionServiceError {}
87
88#[cfg_attr(test, mockall::automock)]
90#[async_trait]
91pub trait StakeDistributionService: Sync + Send {
92 async fn get_stake_distribution(
94 &self,
95 epoch: Epoch,
96 ) -> Result<StakeDistribution, Box<StakePoolDistributionServiceError>>;
97
98 async fn update_stake_distribution(&self)
100 -> Result<(), Box<StakePoolDistributionServiceError>>;
101}
102
103struct UpdateToken {
105 is_busy: Mutex<()>,
107 busy_on_epoch: RwLock<Epoch>,
109}
110
111impl Default for UpdateToken {
112 fn default() -> Self {
113 Self {
114 is_busy: Mutex::new(()),
115 busy_on_epoch: RwLock::new(Epoch(0)),
116 }
117 }
118}
119
120impl UpdateToken {
121 pub fn update(&self, epoch: Epoch) -> StdResult<MutexGuard<()>> {
122 let update_semaphore = self.is_busy.try_lock().map_err(|_| {
123 let last_updated_epoch = self.busy_on_epoch.read().unwrap();
124
125 StakePoolDistributionServiceError::Busy(*last_updated_epoch)
126 })?;
127 let mut last_updated_epoch = self.busy_on_epoch.write().unwrap();
128 *last_updated_epoch = epoch;
129
130 Ok(update_semaphore)
131 }
132
133 pub fn is_busy(&self) -> Option<Epoch> {
134 if self.is_busy.try_lock().is_err() {
135 Some(*self.busy_on_epoch.read().unwrap())
136 } else {
137 None
138 }
139 }
140}
141pub struct MithrilStakeDistributionService {
143 stake_store: Arc<StakePoolStore>,
145 chain_observer: Arc<dyn ChainObserver>,
147 update_token: UpdateToken,
149}
150
151impl MithrilStakeDistributionService {
152 pub fn new(stake_store: Arc<StakePoolStore>, chain_observer: Arc<dyn ChainObserver>) -> Self {
154 Self {
155 stake_store,
156 chain_observer,
157 update_token: UpdateToken::default(),
158 }
159 }
160}
161
162#[async_trait]
163impl StakeDistributionService for MithrilStakeDistributionService {
164 async fn get_stake_distribution(
165 &self,
166 epoch: Epoch,
167 ) -> Result<StakeDistribution, Box<StakePoolDistributionServiceError>> {
168 let stake_distribution = self
169 .stake_store
170 .get_stakes(epoch)
171 .await
172 .map_err(StakePoolDistributionServiceError::technical_subsystem)?
173 .unwrap_or_default();
174
175 if !stake_distribution.is_empty() {
176 Ok(stake_distribution)
177 } else if let Some(last_epoch) = self.update_token.is_busy() {
178 if last_epoch == epoch {
179 Err(StakePoolDistributionServiceError::Busy(epoch).into())
180 } else {
181 Err(StakePoolDistributionServiceError::Unavailable(epoch).into())
182 }
183 } else {
184 Err(StakePoolDistributionServiceError::Unavailable(epoch).into())
185 }
186 }
187
188 async fn update_stake_distribution(
189 &self,
190 ) -> Result<(), Box<StakePoolDistributionServiceError>> {
191 let current_epoch = self
192 .chain_observer
193 .get_current_epoch()
194 .await
195 .map_err(|e| StakePoolDistributionServiceError::technical_subsystem(e.into()))?
196 .expect("Chain observer get_current_epoch should never return None.")
197 .offset_to_recording_epoch();
198
199 match self.get_stake_distribution(current_epoch).await {
200 Ok(_) => return Ok(()),
201 Err(e) if matches!(*e, StakePoolDistributionServiceError::Unavailable(_)) => (),
202 Err(e) => return Err(e),
203 };
204 let _mutex = self
205 .update_token
206 .update(current_epoch)
207 .map_err(StakePoolDistributionServiceError::technical_subsystem)?;
208 let stake_distribution = self
209 .chain_observer
210 .get_current_stake_distribution()
211 .await
212 .map_err(|e| StakePoolDistributionServiceError::technical_subsystem(e.into()))?
213 .expect("ChainObserver get_current_stake_distribution should never return None.");
214
215 let _ = self
216 .stake_store
217 .save_stakes(current_epoch, stake_distribution)
218 .await
219 .map_err(StakePoolDistributionServiceError::technical_subsystem)?;
220
221 Ok(())
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use mithril_common::temp_dir;
228
229 use crate::dependency_injection::DependenciesBuilder;
230 use crate::tools::mocks::MockChainObserver;
231
232 use super::*;
233
234 async fn get_service(chain_observer: MockChainObserver) -> MithrilStakeDistributionService {
235 let mut builder = DependenciesBuilder::new_with_stdout_logger(
236 crate::Configuration::new_sample(temp_dir!()),
237 );
238 let stake_service = MithrilStakeDistributionService::new(
239 builder.get_stake_store().await.unwrap(),
240 Arc::new(chain_observer),
241 );
242
243 let store = builder.get_stake_store().await.unwrap();
244 for (epoch, stake_distribution) in [
245 (
246 Epoch(1),
247 [
248 ("pool1".to_string(), 1000),
249 ("pool2".to_string(), 1100),
250 ("pool3".to_string(), 1300),
251 ],
252 ),
253 (
254 Epoch(2),
255 [
256 ("pool1".to_string(), 1230),
257 ("pool2".to_string(), 1090),
258 ("pool3".to_string(), 1300),
259 ],
260 ),
261 (
262 Epoch(3),
263 [
264 ("pool1".to_string(), 1250),
265 ("pool2".to_string(), 1370),
266 ("pool3".to_string(), 1300),
267 ],
268 ),
269 ] {
270 store
271 .save_stakes(epoch, StakeDistribution::from(stake_distribution))
272 .await
273 .unwrap();
274 }
275
276 stake_service
277 }
278
279 #[tokio::test]
280 async fn get_current_stake_distribution() {
281 let chain_observer = MockChainObserver::new();
282 let service = get_service(chain_observer).await;
283 let expected_stake_distribution: StakeDistribution =
284 [("pool2", 1370), ("pool3", 1300), ("pool1", 1250)]
285 .into_iter()
286 .map(|(pool_id, stake)| (pool_id.to_string(), stake as u64))
287 .collect();
288
289 assert_eq!(
290 expected_stake_distribution,
291 service.get_stake_distribution(Epoch(3)).await.unwrap()
292 );
293 }
294
295 #[tokio::test]
296 async fn get_unavailable_stake_distribution() {
297 let chain_observer = MockChainObserver::new();
298 let service = get_service(chain_observer).await;
299 let result = service.get_stake_distribution(Epoch(5)).await.unwrap_err();
300
301 assert!(matches!(
302 *result,
303 StakePoolDistributionServiceError::Unavailable(Epoch(x)) if x == 5
304 ));
305 }
306
307 #[tokio::test]
308 async fn update_stake_distribution_ok() {
309 let expected_stake_distribution = StakeDistribution::from_iter(
310 [("pool1", 2000), ("pool2", 2000), ("pool3", 2000)]
311 .into_iter()
312 .map(|(p, s)| (p.to_string(), s as u64)),
313 );
314 let returned_stake_distribution = expected_stake_distribution.clone();
315 let mut chain_observer = MockChainObserver::new();
316 chain_observer
317 .expect_get_current_epoch()
318 .returning(|| Ok(Some(Epoch(3))));
319 chain_observer
320 .expect_get_current_stake_distribution()
321 .return_once(|| Ok(Some(returned_stake_distribution)));
322 let service = get_service(chain_observer).await;
323 service.update_stake_distribution().await.unwrap();
324 let sd = service.get_stake_distribution(Epoch(4)).await.unwrap();
325
326 assert_eq!(expected_stake_distribution, sd);
327 }
328
329 #[tokio::test]
330 async fn update_stake_distribution_already() {
331 let mut chain_observer = MockChainObserver::new();
332 chain_observer
333 .expect_get_current_epoch()
334 .returning(|| Ok(Some(Epoch(2))))
335 .times(1);
336 let service = get_service(chain_observer).await;
337 service.update_stake_distribution().await.unwrap();
338 }
339
340 #[tokio::test]
341 async fn get_not_ready_yet() {
342 let mut chain_observer = MockChainObserver::new();
343 chain_observer
344 .expect_get_current_epoch()
345 .returning(|| Ok(Some(Epoch(3))));
346 let service = get_service(chain_observer).await;
347 let _mutex = service.update_token.update(Epoch(4)).unwrap();
348 let result = service.get_stake_distribution(Epoch(4)).await.unwrap_err();
349
350 assert!(matches!(
351 *result,
352 StakePoolDistributionServiceError::Busy(Epoch(x)) if x == 4
353 ));
354 }
355
356 #[tokio::test]
357 async fn get_not_ready_but_unavailable() {
358 let mut chain_observer = MockChainObserver::new();
359 chain_observer
360 .expect_get_current_epoch()
361 .returning(|| Ok(Some(Epoch(3))));
362 let service = get_service(chain_observer).await;
363 let _mutex = service.update_token.update(Epoch(4)).unwrap();
364 let result = service.get_stake_distribution(Epoch(0)).await.unwrap_err();
365
366 assert!(matches!(
367 *result,
368 StakePoolDistributionServiceError::Unavailable(Epoch(x)) if x == 0
369 ));
370 }
371
372 #[tokio::test]
373 async fn update_but_busy() {
374 let mut chain_observer = MockChainObserver::new();
375 chain_observer
376 .expect_get_current_epoch()
377 .returning(|| Ok(Some(Epoch(3))));
378 let service = get_service(chain_observer).await;
379 let _mutex = service.update_token.update(Epoch(4)).unwrap();
380 let result = service.update_stake_distribution().await.unwrap_err();
381
382 assert!(matches!(
383 *result,
384 StakePoolDistributionServiceError::Busy(Epoch(x)) if x == 4
385 ));
386 }
387}