mithril_aggregator/store/
epoch_settings_storer.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use mithril_common::StdResult;
5use tokio::sync::RwLock;
6
7use mithril_common::entities::{Epoch, ProtocolParameters};
8
9use crate::{entities::AggregatorEpochSettings, services::EpochPruningTask};
10
11/// Store and get [aggregator epoch settings][AggregatorEpochSettings] for given epoch.
12#[async_trait]
13pub trait EpochSettingsStorer: EpochPruningTask + Sync + Send {
14    /// Save the given `AggregatorEpochSettings` for the given [Epoch].
15    async fn save_epoch_settings(
16        &self,
17        epoch: Epoch,
18        epoch_settings: AggregatorEpochSettings,
19    ) -> StdResult<Option<AggregatorEpochSettings>>;
20
21    /// Get the saved `ProtocolParameter` for the given [Epoch] if any.
22    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>>;
23
24    /// Get the saved `AggregatorEpochSettings` for the given [Epoch] if any.
25    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>>;
26
27    /// Handle discrepancies at startup in the epoch settings store.
28    ///
29    /// In case an aggregator has been launched after some epochs of not being up or at initial startup,
30    /// the discrepancies in the epoch settings store needs to be fixed.
31    ///
32    /// The epoch settings needs to be recorded for the working epoch and the next 3 epochs.
33    /// We need data over four epochs because the epoch service use epoch settings over a window of
34    /// three epochs, and there may be an epoch change between this `handle_discrepancies_at_startup`
35    /// call and the epoch service call.
36    async fn handle_discrepancies_at_startup(
37        &self,
38        current_epoch: Epoch,
39        epoch_settings_configuration: &AggregatorEpochSettings,
40    ) -> StdResult<()> {
41        for epoch_offset in 0..=3 {
42            let epoch = current_epoch + epoch_offset;
43            if self.get_epoch_settings(epoch).await?.is_none() {
44                self.save_epoch_settings(epoch, epoch_settings_configuration.clone())
45                    .await?;
46            }
47        }
48
49        Ok(())
50    }
51}
52
53pub struct FakeEpochSettingsStorer {
54    pub epoch_settings: RwLock<HashMap<Epoch, AggregatorEpochSettings>>,
55}
56
57impl FakeEpochSettingsStorer {
58    #[cfg(test)]
59    pub fn new(data: Vec<(Epoch, AggregatorEpochSettings)>) -> Self {
60        let epoch_settings = RwLock::new(data.into_iter().collect());
61        Self { epoch_settings }
62    }
63}
64
65#[async_trait]
66impl EpochSettingsStorer for FakeEpochSettingsStorer {
67    async fn save_epoch_settings(
68        &self,
69        epoch: Epoch,
70        epoch_settings: AggregatorEpochSettings,
71    ) -> StdResult<Option<AggregatorEpochSettings>> {
72        let mut epoch_settings_write = self.epoch_settings.write().await;
73
74        Ok(epoch_settings_write.insert(epoch, epoch_settings))
75    }
76
77    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
78        Ok(self
79            .get_epoch_settings(epoch)
80            .await?
81            .map(|epoch_settings| epoch_settings.protocol_parameters.clone()))
82    }
83
84    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
85        let epoch_settings = self.epoch_settings.read().await;
86
87        Ok(epoch_settings.get(&epoch).cloned())
88    }
89}
90
91#[async_trait]
92impl EpochPruningTask for FakeEpochSettingsStorer {
93    fn pruned_data(&self) -> &'static str {
94        "Fake epoch settings"
95    }
96
97    async fn prune(&self, _epoch: Epoch) -> StdResult<()> {
98        Ok(())
99    }
100}
101
102#[cfg(test)]
103mod tests {
104
105    use mithril_common::entities::CardanoTransactionsSigningConfig;
106
107    use super::*;
108
109    #[tokio::test]
110    async fn test_save_epoch_settings_do_not_exist_yet_return_none() {
111        let epoch_settings = AggregatorEpochSettings::dummy();
112        let epoch = Epoch(1);
113        let store = FakeEpochSettingsStorer::new(vec![]);
114        let epoch_settings_previous = store
115            .save_epoch_settings(epoch, epoch_settings)
116            .await
117            .unwrap();
118
119        assert!(epoch_settings_previous.is_none());
120    }
121
122    #[tokio::test]
123    async fn test_save_epoch_settings_already_exist_return_previous_epoch_settings_stored() {
124        let epoch_settings = AggregatorEpochSettings::dummy();
125        let epoch = Epoch(1);
126        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
127        let protocol_parameters_new = ProtocolParameters {
128            k: epoch_settings.protocol_parameters.k + 1,
129            ..epoch_settings.protocol_parameters
130        };
131        let epoch_settings_previous = store
132            .save_epoch_settings(
133                epoch,
134                AggregatorEpochSettings {
135                    protocol_parameters: protocol_parameters_new.clone(),
136                    ..epoch_settings.clone()
137                },
138            )
139            .await
140            .unwrap();
141
142        assert_eq!(Some(epoch_settings), epoch_settings_previous);
143    }
144
145    #[tokio::test]
146    async fn test_get_epoch_settings_exist() {
147        let epoch_settings = AggregatorEpochSettings::dummy();
148        let epoch = Epoch(1);
149        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
150        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
151
152        assert_eq!(Some(epoch_settings), epoch_settings_stored);
153    }
154
155    #[tokio::test]
156    async fn test_get_epoch_settings_do_not_exist() {
157        let epoch_settings = AggregatorEpochSettings::dummy();
158        let epoch = Epoch(1);
159        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
160        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
161
162        assert!(epoch_settings_stored.is_none());
163    }
164
165    #[tokio::test]
166    async fn test_handle_discrepancies_at_startup_should_complete_at_least_four_epochs() {
167        let epoch_settings = AggregatorEpochSettings::dummy();
168        let epoch_settings_new = AggregatorEpochSettings {
169            protocol_parameters: ProtocolParameters {
170                k: epoch_settings.protocol_parameters.k + 1,
171                ..epoch_settings.protocol_parameters
172            },
173            cardano_transactions_signing_config: CardanoTransactionsSigningConfig {
174                step: epoch_settings.cardano_transactions_signing_config.step + 1,
175                ..epoch_settings.cardano_transactions_signing_config
176            },
177        };
178        let epoch = Epoch(1);
179        let store = FakeEpochSettingsStorer::new(vec![
180            (epoch, epoch_settings.clone()),
181            (epoch + 1, epoch_settings.clone()),
182        ]);
183
184        store
185            .handle_discrepancies_at_startup(epoch, &epoch_settings_new)
186            .await
187            .unwrap();
188
189        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
190        assert_eq!(Some(epoch_settings.clone()), epoch_settings_stored);
191
192        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
193        assert_eq!(Some(epoch_settings.clone()), epoch_settings_stored);
194
195        let epoch_settings_stored = store.get_epoch_settings(epoch + 2).await.unwrap();
196        assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);
197
198        let epoch_settings_stored = store.get_epoch_settings(epoch + 3).await.unwrap();
199        assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);
200
201        let epoch_settings_stored = store.get_epoch_settings(epoch + 4).await.unwrap();
202        assert!(epoch_settings_stored.is_none());
203    }
204}