mithril_aggregator/store/
epoch_settings_storer.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use mithril_common::StdResult;
5use tokio::sync::RwLock;
6
7use mithril_common::entities::{Epoch, ProtocolParameters};
8
9use crate::{entities::AggregatorEpochSettings, services::EpochPruningTask};
10
11/// Retrieve the [ProtocolParameters] for the given epoch.
12#[async_trait]
13pub trait ProtocolParametersRetriever: Sync + Send {
14    /// Get the saved `ProtocolParameters` for the given [Epoch] if any.
15    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>>;
16}
17
18/// Store and get [aggregator epoch settings][AggregatorEpochSettings] for given epoch.
19#[async_trait]
20pub trait EpochSettingsStorer:
21    ProtocolParametersRetriever + EpochPruningTask + Sync + Send
22{
23    /// Save the given `AggregatorEpochSettings` for the given [Epoch].
24    async fn save_epoch_settings(
25        &self,
26        epoch: Epoch,
27        epoch_settings: AggregatorEpochSettings,
28    ) -> StdResult<Option<AggregatorEpochSettings>>;
29
30    /// Get the saved `AggregatorEpochSettings` for the given [Epoch] if any.
31    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>>;
32
33    /// Handle discrepancies at startup in the epoch settings store.
34    ///
35    /// In case an aggregator has been launched after some epochs of not being up or at initial startup,
36    /// the discrepancies in the epoch settings store needs to be fixed.
37    ///
38    /// The epoch settings needs to be recorded for the working epoch and the next 3 epochs.
39    /// We need data over four epochs because the epoch service use epoch settings over a window of
40    /// three epochs, and there may be an epoch change between this `handle_discrepancies_at_startup`
41    /// call and the epoch service call.
42    async fn handle_discrepancies_at_startup(
43        &self,
44        current_epoch: Epoch,
45        epoch_settings_configuration: &AggregatorEpochSettings,
46    ) -> StdResult<()> {
47        for epoch_offset in 0..=3 {
48            let epoch = current_epoch + epoch_offset;
49            if self.get_epoch_settings(epoch).await?.is_none() {
50                self.save_epoch_settings(epoch, epoch_settings_configuration.clone())
51                    .await?;
52            }
53        }
54
55        Ok(())
56    }
57}
58
59pub struct FakeEpochSettingsStorer {
60    pub epoch_settings: RwLock<HashMap<Epoch, AggregatorEpochSettings>>,
61}
62
63impl FakeEpochSettingsStorer {
64    #[cfg(test)]
65    pub fn new(data: Vec<(Epoch, AggregatorEpochSettings)>) -> Self {
66        let epoch_settings = RwLock::new(data.into_iter().collect());
67        Self { epoch_settings }
68    }
69}
70
71#[async_trait]
72impl ProtocolParametersRetriever for FakeEpochSettingsStorer {
73    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
74        Ok(self
75            .get_epoch_settings(epoch)
76            .await?
77            .map(|epoch_settings| epoch_settings.protocol_parameters.clone()))
78    }
79}
80
81#[async_trait]
82impl EpochSettingsStorer for FakeEpochSettingsStorer {
83    async fn save_epoch_settings(
84        &self,
85        epoch: Epoch,
86        epoch_settings: AggregatorEpochSettings,
87    ) -> StdResult<Option<AggregatorEpochSettings>> {
88        let mut epoch_settings_write = self.epoch_settings.write().await;
89
90        Ok(epoch_settings_write.insert(epoch, epoch_settings))
91    }
92
93    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
94        let epoch_settings = self.epoch_settings.read().await;
95
96        Ok(epoch_settings.get(&epoch).cloned())
97    }
98}
99
100#[async_trait]
101impl EpochPruningTask for FakeEpochSettingsStorer {
102    fn pruned_data(&self) -> &'static str {
103        "Fake epoch settings"
104    }
105
106    async fn prune(&self, _epoch: Epoch) -> StdResult<()> {
107        Ok(())
108    }
109}
110
111#[cfg(test)]
112mod tests {
113
114    use mithril_common::entities::CardanoTransactionsSigningConfig;
115
116    use super::*;
117
118    #[tokio::test]
119    async fn test_save_epoch_settings_do_not_exist_yet_return_none() {
120        let epoch_settings = AggregatorEpochSettings::dummy();
121        let epoch = Epoch(1);
122        let store = FakeEpochSettingsStorer::new(vec![]);
123        let epoch_settings_previous = store
124            .save_epoch_settings(epoch, epoch_settings)
125            .await
126            .unwrap();
127
128        assert!(epoch_settings_previous.is_none());
129    }
130
131    #[tokio::test]
132    async fn test_save_epoch_settings_already_exist_return_previous_epoch_settings_stored() {
133        let epoch_settings = AggregatorEpochSettings::dummy();
134        let epoch = Epoch(1);
135        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
136        let protocol_parameters_new = ProtocolParameters {
137            k: epoch_settings.protocol_parameters.k + 1,
138            ..epoch_settings.protocol_parameters
139        };
140        let epoch_settings_previous = store
141            .save_epoch_settings(
142                epoch,
143                AggregatorEpochSettings {
144                    protocol_parameters: protocol_parameters_new.clone(),
145                    ..epoch_settings.clone()
146                },
147            )
148            .await
149            .unwrap();
150
151        assert_eq!(Some(epoch_settings), epoch_settings_previous);
152    }
153
154    #[tokio::test]
155    async fn test_get_epoch_settings_exist() {
156        let epoch_settings = AggregatorEpochSettings::dummy();
157        let epoch = Epoch(1);
158        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
159        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
160
161        assert_eq!(Some(epoch_settings), epoch_settings_stored);
162    }
163
164    #[tokio::test]
165    async fn test_get_epoch_settings_do_not_exist() {
166        let epoch_settings = AggregatorEpochSettings::dummy();
167        let epoch = Epoch(1);
168        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
169        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
170
171        assert!(epoch_settings_stored.is_none());
172    }
173
174    #[tokio::test]
175    async fn test_handle_discrepancies_at_startup_should_complete_at_least_four_epochs() {
176        let epoch_settings = AggregatorEpochSettings::dummy();
177        let epoch_settings_new = AggregatorEpochSettings {
178            protocol_parameters: ProtocolParameters {
179                k: epoch_settings.protocol_parameters.k + 1,
180                ..epoch_settings.protocol_parameters
181            },
182            cardano_transactions_signing_config: CardanoTransactionsSigningConfig {
183                step: epoch_settings.cardano_transactions_signing_config.step + 1,
184                ..epoch_settings.cardano_transactions_signing_config
185            },
186        };
187        let epoch = Epoch(1);
188        let store = FakeEpochSettingsStorer::new(vec![
189            (epoch, epoch_settings.clone()),
190            (epoch + 1, epoch_settings.clone()),
191        ]);
192
193        store
194            .handle_discrepancies_at_startup(epoch, &epoch_settings_new)
195            .await
196            .unwrap();
197
198        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
199        assert_eq!(Some(epoch_settings.clone()), epoch_settings_stored);
200
201        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
202        assert_eq!(Some(epoch_settings.clone()), epoch_settings_stored);
203
204        let epoch_settings_stored = store.get_epoch_settings(epoch + 2).await.unwrap();
205        assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);
206
207        let epoch_settings_stored = store.get_epoch_settings(epoch + 3).await.unwrap();
208        assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);
209
210        let epoch_settings_stored = store.get_epoch_settings(epoch + 4).await.unwrap();
211        assert!(epoch_settings_stored.is_none());
212    }
213}