mithril_aggregator/services/
stake_distribution.rs

1//! Stake Pool manager for the Runners
2//!
3
4use async_trait::async_trait;
5use std::{
6    fmt::Display,
7    sync::{Arc, RwLock},
8};
9use tokio::sync::{Mutex, MutexGuard};
10
11use mithril_common::{
12    chain_observer::ChainObserver,
13    entities::{Epoch, StakeDistribution},
14    StdError, StdResult,
15};
16use mithril_persistence::store::StakeStorer;
17
18use crate::database::repository::StakePoolStore;
19
20/// Errors related to the [StakeDistributionService].
21#[derive(Debug)]
22pub enum StakePoolDistributionServiceError {
23    /// Critical errors cannot be recovered.
24    Technical {
25        /// Error message
26        message: String,
27        /// Eventual nested error
28        error: Option<StdError>,
29    },
30    /// The stake distribution for the given Epoch is not available.
31    Unavailable(Epoch),
32    /// The stake distribution compute is in progress for this Epoch.
33    Busy(Epoch),
34}
35
36impl StakePoolDistributionServiceError {
37    /// Simple way to nest technical errors
38    pub fn technical_subsystem(error: StdError) -> Box<Self> {
39        Box::new(Self::Technical {
40            message: "Stake pool service subsystem error occurred.".to_string(),
41            error: Some(error),
42        })
43    }
44}
45
46impl TryFrom<StdError> for StakePoolDistributionServiceError {
47    type Error = Box<Self>;
48
49    fn try_from(value: StdError) -> Result<Self, Self::Error> {
50        Err(Box::new(Self::Technical {
51            message: "subsystem error".to_string(),
52            error: Some(value),
53        }))
54    }
55}
56
57impl Display for StakePoolDistributionServiceError {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        match self {
60            Self::Technical { message, error } => {
61                if let Some(nested_error) = error {
62                    write!(
63                        f,
64                        "Critical error: {message} (nested error: '{nested_error}')"
65                    )
66                } else {
67                    write!(f, "Critical error: {message}")
68                }
69            }
70            Self::Unavailable(epoch) => {
71                write!(
72                    f,
73                    "The stake distribution for epoch {epoch:?} is not available."
74                )
75            }
76            Self::Busy(epoch) => {
77                write!(
78                    f,
79                    "The stake distribution for epoch {epoch:?} is actually processed."
80                )
81            }
82        }
83    }
84}
85
86impl std::error::Error for StakePoolDistributionServiceError {}
87
88/// Responsible of synchronizing with Cardano stake distribution.
89#[cfg_attr(test, mockall::automock)]
90#[async_trait]
91pub trait StakeDistributionService: Sync + Send {
92    /// Return the stake distribution fot the given epoch.
93    async fn get_stake_distribution(
94        &self,
95        epoch: Epoch,
96    ) -> Result<StakeDistribution, Box<StakePoolDistributionServiceError>>;
97
98    /// This launches the stake distribution computation if not already started.
99    async fn update_stake_distribution(&self)
100        -> Result<(), Box<StakePoolDistributionServiceError>>;
101}
102
103/// Token to manage stake distribution update
104struct UpdateToken {
105    /// Stake distribution update semaphore
106    is_busy: Mutex<()>,
107    /// Last computed stake distribution
108    busy_on_epoch: RwLock<Epoch>,
109}
110
111impl Default for UpdateToken {
112    fn default() -> Self {
113        Self {
114            is_busy: Mutex::new(()),
115            busy_on_epoch: RwLock::new(Epoch(0)),
116        }
117    }
118}
119
120impl UpdateToken {
121    pub fn update(&self, epoch: Epoch) -> StdResult<MutexGuard<()>> {
122        let update_semaphore = self.is_busy.try_lock().map_err(|_| {
123            let last_updated_epoch = self.busy_on_epoch.read().unwrap();
124
125            StakePoolDistributionServiceError::Busy(*last_updated_epoch)
126        })?;
127        let mut last_updated_epoch = self.busy_on_epoch.write().unwrap();
128        *last_updated_epoch = epoch;
129
130        Ok(update_semaphore)
131    }
132
133    pub fn is_busy(&self) -> Option<Epoch> {
134        if self.is_busy.try_lock().is_err() {
135            Some(*self.busy_on_epoch.read().unwrap())
136        } else {
137            None
138        }
139    }
140}
141/// Implementation of the stake distribution service.
142pub struct MithrilStakeDistributionService {
143    /// internal stake persistent layer
144    stake_store: Arc<StakePoolStore>,
145    /// Chain interaction subsystem
146    chain_observer: Arc<dyn ChainObserver>,
147    /// Lock management for updates
148    update_token: UpdateToken,
149}
150
151impl MithrilStakeDistributionService {
152    /// Create a new service instance
153    pub fn new(stake_store: Arc<StakePoolStore>, chain_observer: Arc<dyn ChainObserver>) -> Self {
154        Self {
155            stake_store,
156            chain_observer,
157            update_token: UpdateToken::default(),
158        }
159    }
160}
161
162#[async_trait]
163impl StakeDistributionService for MithrilStakeDistributionService {
164    async fn get_stake_distribution(
165        &self,
166        epoch: Epoch,
167    ) -> Result<StakeDistribution, Box<StakePoolDistributionServiceError>> {
168        let stake_distribution = self
169            .stake_store
170            .get_stakes(epoch)
171            .await
172            .map_err(StakePoolDistributionServiceError::technical_subsystem)?
173            .unwrap_or_default();
174
175        if !stake_distribution.is_empty() {
176            Ok(stake_distribution)
177        } else if let Some(last_epoch) = self.update_token.is_busy() {
178            if last_epoch == epoch {
179                Err(StakePoolDistributionServiceError::Busy(epoch).into())
180            } else {
181                Err(StakePoolDistributionServiceError::Unavailable(epoch).into())
182            }
183        } else {
184            Err(StakePoolDistributionServiceError::Unavailable(epoch).into())
185        }
186    }
187
188    async fn update_stake_distribution(
189        &self,
190    ) -> Result<(), Box<StakePoolDistributionServiceError>> {
191        let current_epoch = self
192            .chain_observer
193            .get_current_epoch()
194            .await
195            .map_err(|e| StakePoolDistributionServiceError::technical_subsystem(e.into()))?
196            .expect("Chain observer get_current_epoch should never return None.")
197            .offset_to_recording_epoch();
198
199        match self.get_stake_distribution(current_epoch).await {
200            Ok(_) => return Ok(()),
201            Err(e) if matches!(*e, StakePoolDistributionServiceError::Unavailable(_)) => (),
202            Err(e) => return Err(e),
203        };
204        let _mutex = self
205            .update_token
206            .update(current_epoch)
207            .map_err(StakePoolDistributionServiceError::technical_subsystem)?;
208        let stake_distribution = self
209            .chain_observer
210            .get_current_stake_distribution()
211            .await
212            .map_err(|e| StakePoolDistributionServiceError::technical_subsystem(e.into()))?
213            .expect("ChainObserver get_current_stake_distribution should never return None.");
214
215        let _ = self
216            .stake_store
217            .save_stakes(current_epoch, stake_distribution)
218            .await
219            .map_err(StakePoolDistributionServiceError::technical_subsystem)?;
220
221        Ok(())
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use mithril_common::temp_dir;
228
229    use crate::dependency_injection::DependenciesBuilder;
230    use crate::tools::mocks::MockChainObserver;
231
232    use super::*;
233
234    async fn get_service(chain_observer: MockChainObserver) -> MithrilStakeDistributionService {
235        let mut builder = DependenciesBuilder::new_with_stdout_logger(
236            crate::Configuration::new_sample(temp_dir!()),
237        );
238        let stake_service = MithrilStakeDistributionService::new(
239            builder.get_stake_store().await.unwrap(),
240            Arc::new(chain_observer),
241        );
242
243        let store = builder.get_stake_store().await.unwrap();
244        for (epoch, stake_distribution) in [
245            (
246                Epoch(1),
247                [
248                    ("pool1".to_string(), 1000),
249                    ("pool2".to_string(), 1100),
250                    ("pool3".to_string(), 1300),
251                ],
252            ),
253            (
254                Epoch(2),
255                [
256                    ("pool1".to_string(), 1230),
257                    ("pool2".to_string(), 1090),
258                    ("pool3".to_string(), 1300),
259                ],
260            ),
261            (
262                Epoch(3),
263                [
264                    ("pool1".to_string(), 1250),
265                    ("pool2".to_string(), 1370),
266                    ("pool3".to_string(), 1300),
267                ],
268            ),
269        ] {
270            store
271                .save_stakes(epoch, StakeDistribution::from(stake_distribution))
272                .await
273                .unwrap();
274        }
275
276        stake_service
277    }
278
279    #[tokio::test]
280    async fn get_current_stake_distribution() {
281        let chain_observer = MockChainObserver::new();
282        let service = get_service(chain_observer).await;
283        let expected_stake_distribution: StakeDistribution =
284            [("pool2", 1370), ("pool3", 1300), ("pool1", 1250)]
285                .into_iter()
286                .map(|(pool_id, stake)| (pool_id.to_string(), stake as u64))
287                .collect();
288
289        assert_eq!(
290            expected_stake_distribution,
291            service.get_stake_distribution(Epoch(3)).await.unwrap()
292        );
293    }
294
295    #[tokio::test]
296    async fn get_unavailable_stake_distribution() {
297        let chain_observer = MockChainObserver::new();
298        let service = get_service(chain_observer).await;
299        let result = service.get_stake_distribution(Epoch(5)).await.unwrap_err();
300
301        assert!(matches!(
302            *result,
303            StakePoolDistributionServiceError::Unavailable(Epoch(x)) if x == 5
304        ));
305    }
306
307    #[tokio::test]
308    async fn update_stake_distribution_ok() {
309        let expected_stake_distribution = StakeDistribution::from_iter(
310            [("pool1", 2000), ("pool2", 2000), ("pool3", 2000)]
311                .into_iter()
312                .map(|(p, s)| (p.to_string(), s as u64)),
313        );
314        let returned_stake_distribution = expected_stake_distribution.clone();
315        let mut chain_observer = MockChainObserver::new();
316        chain_observer
317            .expect_get_current_epoch()
318            .returning(|| Ok(Some(Epoch(3))));
319        chain_observer
320            .expect_get_current_stake_distribution()
321            .return_once(|| Ok(Some(returned_stake_distribution)));
322        let service = get_service(chain_observer).await;
323        service.update_stake_distribution().await.unwrap();
324        let sd = service.get_stake_distribution(Epoch(4)).await.unwrap();
325
326        assert_eq!(expected_stake_distribution, sd);
327    }
328
329    #[tokio::test]
330    async fn update_stake_distribution_already() {
331        let mut chain_observer = MockChainObserver::new();
332        chain_observer
333            .expect_get_current_epoch()
334            .returning(|| Ok(Some(Epoch(2))))
335            .times(1);
336        let service = get_service(chain_observer).await;
337        service.update_stake_distribution().await.unwrap();
338    }
339
340    #[tokio::test]
341    async fn get_not_ready_yet() {
342        let mut chain_observer = MockChainObserver::new();
343        chain_observer
344            .expect_get_current_epoch()
345            .returning(|| Ok(Some(Epoch(3))));
346        let service = get_service(chain_observer).await;
347        let _mutex = service.update_token.update(Epoch(4)).unwrap();
348        let result = service.get_stake_distribution(Epoch(4)).await.unwrap_err();
349
350        assert!(matches!(
351            *result,
352            StakePoolDistributionServiceError::Busy(Epoch(x)) if x == 4
353        ));
354    }
355
356    #[tokio::test]
357    async fn get_not_ready_but_unavailable() {
358        let mut chain_observer = MockChainObserver::new();
359        chain_observer
360            .expect_get_current_epoch()
361            .returning(|| Ok(Some(Epoch(3))));
362        let service = get_service(chain_observer).await;
363        let _mutex = service.update_token.update(Epoch(4)).unwrap();
364        let result = service.get_stake_distribution(Epoch(0)).await.unwrap_err();
365
366        assert!(matches!(
367            *result,
368            StakePoolDistributionServiceError::Unavailable(Epoch(x)) if x == 0
369        ));
370    }
371
372    #[tokio::test]
373    async fn update_but_busy() {
374        let mut chain_observer = MockChainObserver::new();
375        chain_observer
376            .expect_get_current_epoch()
377            .returning(|| Ok(Some(Epoch(3))));
378        let service = get_service(chain_observer).await;
379        let _mutex = service.update_token.update(Epoch(4)).unwrap();
380        let result = service.update_stake_distribution().await.unwrap_err();
381
382        assert!(matches!(
383            *result,
384            StakePoolDistributionServiceError::Busy(Epoch(x)) if x == 4
385        ));
386    }
387}