mithril_aggregator/store/
epoch_settings_storer.rs

1#[cfg(test)]
2use std::collections::HashMap;
3
4use async_trait::async_trait;
5use mithril_common::StdResult;
6#[cfg(test)]
7use tokio::sync::RwLock;
8
9use mithril_common::entities::{Epoch, ProtocolParameters};
10
11use crate::{entities::AggregatorEpochSettings, services::EpochPruningTask};
12
13/// Retrieve the [ProtocolParameters] for the given epoch.
14#[async_trait]
15pub trait ProtocolParametersRetriever: Sync + Send {
16    /// Get the saved `ProtocolParameters` for the given [Epoch] if any.
17    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>>;
18}
19
20/// Store and get [aggregator epoch settings][AggregatorEpochSettings] for given epoch.
21#[async_trait]
22pub trait EpochSettingsStorer:
23    ProtocolParametersRetriever + EpochPruningTask + Sync + Send
24{
25    /// Save the given `AggregatorEpochSettings` for the given [Epoch].
26    async fn save_epoch_settings(
27        &self,
28        epoch: Epoch,
29        epoch_settings: AggregatorEpochSettings,
30    ) -> StdResult<Option<AggregatorEpochSettings>>;
31
32    /// Get the saved `AggregatorEpochSettings` for the given [Epoch] if any.
33    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>>;
34
35    /// Handle discrepancies at startup in the epoch settings store.
36    ///
37    /// In case an aggregator has been launched after some epochs of not being up or at initial startup,
38    /// the discrepancies in the epoch settings store needs to be fixed.
39    ///
40    /// The epoch settings needs to be recorded for the working epoch and the next 3 epochs.
41    /// We need data over four epochs because the epoch service use epoch settings over a window of
42    /// three epochs, and there may be an epoch change between this `handle_discrepancies_at_startup`
43    /// call and the epoch service call.
44    async fn handle_discrepancies_at_startup(
45        &self,
46        current_epoch: Epoch,
47        epoch_settings_configuration: &AggregatorEpochSettings,
48    ) -> StdResult<()> {
49        for epoch_offset in 0..=3 {
50            let epoch = current_epoch + epoch_offset;
51            if self.get_epoch_settings(epoch).await?.is_none() {
52                self.save_epoch_settings(epoch, epoch_settings_configuration.clone())
53                    .await?;
54            }
55        }
56
57        Ok(())
58    }
59}
60
61#[cfg(test)]
62pub struct FakeEpochSettingsStorer {
63    pub epoch_settings: RwLock<HashMap<Epoch, AggregatorEpochSettings>>,
64}
65
66#[cfg(test)]
67impl FakeEpochSettingsStorer {
68    pub fn new(data: Vec<(Epoch, AggregatorEpochSettings)>) -> Self {
69        let epoch_settings = RwLock::new(data.into_iter().collect());
70        Self { epoch_settings }
71    }
72}
73
74#[cfg(test)]
75#[async_trait]
76impl ProtocolParametersRetriever for FakeEpochSettingsStorer {
77    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
78        Ok(self
79            .get_epoch_settings(epoch)
80            .await?
81            .map(|epoch_settings| epoch_settings.protocol_parameters.clone()))
82    }
83}
84
85#[cfg(test)]
86#[async_trait]
87impl EpochSettingsStorer for FakeEpochSettingsStorer {
88    async fn save_epoch_settings(
89        &self,
90        epoch: Epoch,
91        epoch_settings: AggregatorEpochSettings,
92    ) -> StdResult<Option<AggregatorEpochSettings>> {
93        let mut epoch_settings_write = self.epoch_settings.write().await;
94
95        Ok(epoch_settings_write.insert(epoch, epoch_settings))
96    }
97
98    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
99        let epoch_settings = self.epoch_settings.read().await;
100
101        Ok(epoch_settings.get(&epoch).cloned())
102    }
103}
104
105#[cfg(test)]
106#[async_trait]
107impl EpochPruningTask for FakeEpochSettingsStorer {
108    fn pruned_data(&self) -> &'static str {
109        "Fake epoch settings"
110    }
111
112    async fn prune(&self, _epoch: Epoch) -> StdResult<()> {
113        Ok(())
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use mithril_common::entities::CardanoTransactionsSigningConfig;
120    use mithril_common::test::double::Dummy;
121
122    use super::*;
123
124    #[tokio::test]
125    async fn test_save_epoch_settings_do_not_exist_yet_return_none() {
126        let epoch_settings = AggregatorEpochSettings::dummy();
127        let epoch = Epoch(1);
128        let store = FakeEpochSettingsStorer::new(vec![]);
129        let epoch_settings_previous =
130            store.save_epoch_settings(epoch, epoch_settings).await.unwrap();
131
132        assert!(epoch_settings_previous.is_none());
133    }
134
135    #[tokio::test]
136    async fn test_save_epoch_settings_already_exist_return_previous_epoch_settings_stored() {
137        let epoch_settings = AggregatorEpochSettings::dummy();
138        let epoch = Epoch(1);
139        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
140        let protocol_parameters_new = ProtocolParameters {
141            k: epoch_settings.protocol_parameters.k + 1,
142            ..epoch_settings.protocol_parameters
143        };
144        let epoch_settings_previous = store
145            .save_epoch_settings(
146                epoch,
147                AggregatorEpochSettings {
148                    protocol_parameters: protocol_parameters_new.clone(),
149                    ..epoch_settings.clone()
150                },
151            )
152            .await
153            .unwrap();
154
155        assert_eq!(Some(epoch_settings), epoch_settings_previous);
156    }
157
158    #[tokio::test]
159    async fn test_get_epoch_settings_exist() {
160        let epoch_settings = AggregatorEpochSettings::dummy();
161        let epoch = Epoch(1);
162        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
163        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
164
165        assert_eq!(Some(epoch_settings), epoch_settings_stored);
166    }
167
168    #[tokio::test]
169    async fn test_get_epoch_settings_do_not_exist() {
170        let epoch_settings = AggregatorEpochSettings::dummy();
171        let epoch = Epoch(1);
172        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
173        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
174
175        assert!(epoch_settings_stored.is_none());
176    }
177
178    #[tokio::test]
179    async fn test_handle_discrepancies_at_startup_should_complete_at_least_four_epochs() {
180        let epoch_settings = AggregatorEpochSettings::dummy();
181        let epoch_settings_new = AggregatorEpochSettings {
182            protocol_parameters: ProtocolParameters {
183                k: epoch_settings.protocol_parameters.k + 1,
184                ..epoch_settings.protocol_parameters
185            },
186            cardano_transactions_signing_config: CardanoTransactionsSigningConfig {
187                step: epoch_settings.cardano_transactions_signing_config.step + 1,
188                ..epoch_settings.cardano_transactions_signing_config
189            },
190        };
191        let epoch = Epoch(1);
192        let store = FakeEpochSettingsStorer::new(vec![
193            (epoch, epoch_settings.clone()),
194            (epoch + 1, epoch_settings.clone()),
195        ]);
196
197        store
198            .handle_discrepancies_at_startup(epoch, &epoch_settings_new)
199            .await
200            .unwrap();
201
202        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
203        assert_eq!(Some(epoch_settings.clone()), epoch_settings_stored);
204
205        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
206        assert_eq!(Some(epoch_settings.clone()), epoch_settings_stored);
207
208        let epoch_settings_stored = store.get_epoch_settings(epoch + 2).await.unwrap();
209        assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);
210
211        let epoch_settings_stored = store.get_epoch_settings(epoch + 3).await.unwrap();
212        assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);
213
214        let epoch_settings_stored = store.get_epoch_settings(epoch + 4).await.unwrap();
215        assert!(epoch_settings_stored.is_none());
216    }
217}