mithril_aggregator/store/
epoch_settings_storer.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use mithril_common::StdResult;
5use tokio::sync::RwLock;
6
7use mithril_common::entities::{Epoch, ProtocolParameters};
8
9use crate::{entities::AggregatorEpochSettings, services::EpochPruningTask};
10
11/// Retrieve the [ProtocolParameters] for the given epoch.
12#[async_trait]
13pub trait ProtocolParametersRetriever: Sync + Send {
14    /// Get the saved `ProtocolParameters` for the given [Epoch] if any.
15    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>>;
16}
17
18/// Store and get [aggregator epoch settings][AggregatorEpochSettings] for given epoch.
19#[async_trait]
20pub trait EpochSettingsStorer:
21    ProtocolParametersRetriever + EpochPruningTask + Sync + Send
22{
23    /// Save the given `AggregatorEpochSettings` for the given [Epoch].
24    async fn save_epoch_settings(
25        &self,
26        epoch: Epoch,
27        epoch_settings: AggregatorEpochSettings,
28    ) -> StdResult<Option<AggregatorEpochSettings>>;
29
30    /// Get the saved `AggregatorEpochSettings` for the given [Epoch] if any.
31    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>>;
32
33    /// Handle discrepancies at startup in the epoch settings store.
34    ///
35    /// In case an aggregator has been launched after some epochs of not being up or at initial startup,
36    /// the discrepancies in the epoch settings store needs to be fixed.
37    ///
38    /// The epoch settings needs to be recorded for the working epoch and the next 3 epochs.
39    /// We need data over four epochs because the epoch service use epoch settings over a window of
40    /// three epochs, and there may be an epoch change between this `handle_discrepancies_at_startup`
41    /// call and the epoch service call.
42    async fn handle_discrepancies_at_startup(
43        &self,
44        current_epoch: Epoch,
45        epoch_settings_configuration: &AggregatorEpochSettings,
46    ) -> StdResult<()> {
47        for epoch_offset in 0..=3 {
48            let epoch = current_epoch + epoch_offset;
49            if self.get_epoch_settings(epoch).await?.is_none() {
50                self.save_epoch_settings(epoch, epoch_settings_configuration.clone())
51                    .await?;
52            }
53        }
54
55        Ok(())
56    }
57}
58
59pub struct FakeEpochSettingsStorer {
60    pub epoch_settings: RwLock<HashMap<Epoch, AggregatorEpochSettings>>,
61}
62
63impl FakeEpochSettingsStorer {
64    #[cfg(test)]
65    pub fn new(data: Vec<(Epoch, AggregatorEpochSettings)>) -> Self {
66        let epoch_settings = RwLock::new(data.into_iter().collect());
67        Self { epoch_settings }
68    }
69}
70
71#[async_trait]
72impl ProtocolParametersRetriever for FakeEpochSettingsStorer {
73    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
74        Ok(self
75            .get_epoch_settings(epoch)
76            .await?
77            .map(|epoch_settings| epoch_settings.protocol_parameters.clone()))
78    }
79}
80
81#[async_trait]
82impl EpochSettingsStorer for FakeEpochSettingsStorer {
83    async fn save_epoch_settings(
84        &self,
85        epoch: Epoch,
86        epoch_settings: AggregatorEpochSettings,
87    ) -> StdResult<Option<AggregatorEpochSettings>> {
88        let mut epoch_settings_write = self.epoch_settings.write().await;
89
90        Ok(epoch_settings_write.insert(epoch, epoch_settings))
91    }
92
93    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
94        let epoch_settings = self.epoch_settings.read().await;
95
96        Ok(epoch_settings.get(&epoch).cloned())
97    }
98}
99
100#[async_trait]
101impl EpochPruningTask for FakeEpochSettingsStorer {
102    fn pruned_data(&self) -> &'static str {
103        "Fake epoch settings"
104    }
105
106    async fn prune(&self, _epoch: Epoch) -> StdResult<()> {
107        Ok(())
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use mithril_common::entities::CardanoTransactionsSigningConfig;
114    use mithril_common::test::double::Dummy;
115
116    use super::*;
117
118    #[tokio::test]
119    async fn test_save_epoch_settings_do_not_exist_yet_return_none() {
120        let epoch_settings = AggregatorEpochSettings::dummy();
121        let epoch = Epoch(1);
122        let store = FakeEpochSettingsStorer::new(vec![]);
123        let epoch_settings_previous =
124            store.save_epoch_settings(epoch, epoch_settings).await.unwrap();
125
126        assert!(epoch_settings_previous.is_none());
127    }
128
129    #[tokio::test]
130    async fn test_save_epoch_settings_already_exist_return_previous_epoch_settings_stored() {
131        let epoch_settings = AggregatorEpochSettings::dummy();
132        let epoch = Epoch(1);
133        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
134        let protocol_parameters_new = ProtocolParameters {
135            k: epoch_settings.protocol_parameters.k + 1,
136            ..epoch_settings.protocol_parameters
137        };
138        let epoch_settings_previous = store
139            .save_epoch_settings(
140                epoch,
141                AggregatorEpochSettings {
142                    protocol_parameters: protocol_parameters_new.clone(),
143                    ..epoch_settings.clone()
144                },
145            )
146            .await
147            .unwrap();
148
149        assert_eq!(Some(epoch_settings), epoch_settings_previous);
150    }
151
152    #[tokio::test]
153    async fn test_get_epoch_settings_exist() {
154        let epoch_settings = AggregatorEpochSettings::dummy();
155        let epoch = Epoch(1);
156        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
157        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
158
159        assert_eq!(Some(epoch_settings), epoch_settings_stored);
160    }
161
162    #[tokio::test]
163    async fn test_get_epoch_settings_do_not_exist() {
164        let epoch_settings = AggregatorEpochSettings::dummy();
165        let epoch = Epoch(1);
166        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
167        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
168
169        assert!(epoch_settings_stored.is_none());
170    }
171
172    #[tokio::test]
173    async fn test_handle_discrepancies_at_startup_should_complete_at_least_four_epochs() {
174        let epoch_settings = AggregatorEpochSettings::dummy();
175        let epoch_settings_new = AggregatorEpochSettings {
176            protocol_parameters: ProtocolParameters {
177                k: epoch_settings.protocol_parameters.k + 1,
178                ..epoch_settings.protocol_parameters
179            },
180            cardano_transactions_signing_config: CardanoTransactionsSigningConfig {
181                step: epoch_settings.cardano_transactions_signing_config.step + 1,
182                ..epoch_settings.cardano_transactions_signing_config
183            },
184        };
185        let epoch = Epoch(1);
186        let store = FakeEpochSettingsStorer::new(vec![
187            (epoch, epoch_settings.clone()),
188            (epoch + 1, epoch_settings.clone()),
189        ]);
190
191        store
192            .handle_discrepancies_at_startup(epoch, &epoch_settings_new)
193            .await
194            .unwrap();
195
196        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
197        assert_eq!(Some(epoch_settings.clone()), epoch_settings_stored);
198
199        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
200        assert_eq!(Some(epoch_settings.clone()), epoch_settings_stored);
201
202        let epoch_settings_stored = store.get_epoch_settings(epoch + 2).await.unwrap();
203        assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);
204
205        let epoch_settings_stored = store.get_epoch_settings(epoch + 3).await.unwrap();
206        assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);
207
208        let epoch_settings_stored = store.get_epoch_settings(epoch + 4).await.unwrap();
209        assert!(epoch_settings_stored.is_none());
210    }
211}