mithril_aggregator/store/
epoch_settings_storer.rs

1use async_trait::async_trait;
2#[cfg(test)]
3use std::collections::HashMap;
4#[cfg(test)]
5use tokio::sync::RwLock;
6
7use mithril_common::StdResult;
8use mithril_common::entities::{Epoch, ProtocolParameters};
9use mithril_protocol_config::model::MithrilNetworkConfiguration;
10
11use crate::{entities::AggregatorEpochSettings, services::EpochPruningTask};
12
13/// Retrieve the [ProtocolParameters] for the given epoch.
14#[async_trait]
15pub trait ProtocolParametersRetriever: Sync + Send {
16    /// Get the saved `ProtocolParameters` for the given [Epoch] if any.
17    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>>;
18}
19
20/// Store and get [aggregator epoch settings][AggregatorEpochSettings] for given epoch.
21#[async_trait]
22pub trait EpochSettingsStorer:
23    ProtocolParametersRetriever + EpochPruningTask + Sync + Send
24{
25    /// Save the given `AggregatorEpochSettings` for the given [Epoch].
26    async fn save_epoch_settings(
27        &self,
28        epoch: Epoch,
29        epoch_settings: AggregatorEpochSettings,
30    ) -> StdResult<Option<AggregatorEpochSettings>>;
31
32    /// Get the saved `AggregatorEpochSettings` for the given [Epoch] if any.
33    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>>;
34
35    /// Handle discrepancies at startup in the epoch settings store.
36    ///
37    /// In case an aggregator has been launched after some epochs of not being up or at initial startup,
38    /// the discrepancies in the epoch settings store needs to be fixed.
39    ///
40    /// The epoch settings needs to be recorded for the working epoch and the next 3 epochs.
41    /// We need data over four epochs because the epoch service use epoch settings over a window of
42    /// three epochs, and there may be an epoch change between this `handle_discrepancies_at_startup`
43    /// call and the epoch service call.
44    async fn handle_discrepancies_at_startup(
45        &self,
46        network_configuration: &MithrilNetworkConfiguration,
47    ) -> StdResult<()> {
48        for (epoch, epoch_configuration) in [
49            (
50                network_configuration.epoch.offset_to_signer_retrieval_epoch()?,
51                &network_configuration.configuration_for_aggregation,
52            ),
53            (
54                network_configuration.epoch,
55                &network_configuration.configuration_for_next_aggregation,
56            ),
57            (
58                network_configuration.epoch.offset_to_recording_epoch(),
59                &network_configuration.configuration_for_registration,
60            ),
61        ] {
62            if self.get_epoch_settings(epoch).await?.is_none() {
63                self.save_epoch_settings(
64                    epoch,
65                    AggregatorEpochSettings {
66                        protocol_parameters: epoch_configuration.protocol_parameters.clone(),
67                        cardano_transactions_signing_config: epoch_configuration
68                            .signed_entity_types_config
69                            .cardano_transactions
70                            .clone(),
71                        cardano_blocks_transactions_signing_config: epoch_configuration
72                            .signed_entity_types_config
73                            .cardano_blocks_transactions
74                            .clone(),
75                    },
76                )
77                .await?;
78            }
79        }
80
81        Ok(())
82    }
83}
84
85#[cfg(test)]
86pub struct FakeEpochSettingsStorer {
87    pub epoch_settings: RwLock<HashMap<Epoch, AggregatorEpochSettings>>,
88}
89
90#[cfg(test)]
91impl FakeEpochSettingsStorer {
92    pub fn new(data: Vec<(Epoch, AggregatorEpochSettings)>) -> Self {
93        let epoch_settings = RwLock::new(data.into_iter().collect());
94        Self { epoch_settings }
95    }
96}
97
98#[cfg(test)]
99#[async_trait]
100impl ProtocolParametersRetriever for FakeEpochSettingsStorer {
101    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
102        Ok(self
103            .get_epoch_settings(epoch)
104            .await?
105            .map(|epoch_settings| epoch_settings.protocol_parameters.clone()))
106    }
107}
108
109#[cfg(test)]
110#[async_trait]
111impl EpochSettingsStorer for FakeEpochSettingsStorer {
112    async fn save_epoch_settings(
113        &self,
114        epoch: Epoch,
115        epoch_settings: AggregatorEpochSettings,
116    ) -> StdResult<Option<AggregatorEpochSettings>> {
117        let mut epoch_settings_write = self.epoch_settings.write().await;
118
119        Ok(epoch_settings_write.insert(epoch, epoch_settings))
120    }
121
122    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
123        let epoch_settings = self.epoch_settings.read().await;
124
125        Ok(epoch_settings.get(&epoch).cloned())
126    }
127}
128
129#[cfg(test)]
130#[async_trait]
131impl EpochPruningTask for FakeEpochSettingsStorer {
132    fn pruned_data(&self) -> &'static str {
133        "Fake epoch settings"
134    }
135
136    async fn prune(&self, _epoch: Epoch) -> StdResult<()> {
137        Ok(())
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use std::collections::BTreeSet;
144
145    use mithril_common::test::double::Dummy;
146
147    use super::*;
148
149    #[tokio::test]
150    async fn test_save_epoch_settings_do_not_exist_yet_return_none() {
151        let epoch_settings = AggregatorEpochSettings::dummy();
152        let epoch = Epoch(1);
153        let store = FakeEpochSettingsStorer::new(vec![]);
154        let epoch_settings_previous =
155            store.save_epoch_settings(epoch, epoch_settings).await.unwrap();
156
157        assert!(epoch_settings_previous.is_none());
158    }
159
160    #[tokio::test]
161    async fn test_save_epoch_settings_already_exist_return_previous_epoch_settings_stored() {
162        let epoch_settings = AggregatorEpochSettings::dummy();
163        let epoch = Epoch(1);
164        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
165        let protocol_parameters_new = ProtocolParameters {
166            k: epoch_settings.protocol_parameters.k + 1,
167            ..epoch_settings.protocol_parameters
168        };
169        let epoch_settings_previous = store
170            .save_epoch_settings(
171                epoch,
172                AggregatorEpochSettings {
173                    protocol_parameters: protocol_parameters_new.clone(),
174                    ..epoch_settings.clone()
175                },
176            )
177            .await
178            .unwrap();
179
180        assert_eq!(Some(epoch_settings), epoch_settings_previous);
181    }
182
183    #[tokio::test]
184    async fn test_get_epoch_settings_exist() {
185        let epoch_settings = AggregatorEpochSettings::dummy();
186        let epoch = Epoch(1);
187        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
188        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
189
190        assert_eq!(Some(epoch_settings), epoch_settings_stored);
191    }
192
193    #[tokio::test]
194    async fn test_get_epoch_settings_do_not_exist() {
195        let epoch_settings = AggregatorEpochSettings::dummy();
196        let epoch = Epoch(1);
197        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
198        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
199
200        assert!(epoch_settings_stored.is_none());
201    }
202
203    #[tokio::test]
204    async fn test_handle_discrepancies_at_startup_should_complete_at_least_four_epochs() {
205        let epoch_settings = AggregatorEpochSettings::dummy();
206        let mut aggregation_epoch_settings = epoch_settings.clone();
207        aggregation_epoch_settings.protocol_parameters.k += 15;
208
209        let mut next_aggregation_epoch_settings = epoch_settings.clone();
210        next_aggregation_epoch_settings.protocol_parameters.k += 26;
211
212        let mut registration_epoch_settings = epoch_settings.clone();
213        registration_epoch_settings.protocol_parameters.k += 37;
214
215        let epoch = Epoch(5);
216        let store = FakeEpochSettingsStorer::new(vec![]);
217        store
218            .handle_discrepancies_at_startup(&MithrilNetworkConfiguration {
219                epoch,
220                configuration_for_aggregation: aggregation_epoch_settings
221                    .clone()
222                    .into_network_configuration_for_epoch(BTreeSet::new()),
223                configuration_for_next_aggregation: next_aggregation_epoch_settings
224                    .clone()
225                    .into_network_configuration_for_epoch(BTreeSet::new()),
226                configuration_for_registration: registration_epoch_settings
227                    .clone()
228                    .into_network_configuration_for_epoch(BTreeSet::new()),
229            })
230            .await
231            .unwrap();
232
233        let epoch_settings_stored = store.get_epoch_settings(epoch - 1).await.unwrap();
234        assert_eq!(Some(aggregation_epoch_settings), epoch_settings_stored);
235
236        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
237        assert_eq!(Some(next_aggregation_epoch_settings), epoch_settings_stored);
238
239        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
240        assert_eq!(Some(registration_epoch_settings), epoch_settings_stored);
241
242        let epoch_settings_stored = store.get_epoch_settings(epoch + 2).await.unwrap();
243        assert!(epoch_settings_stored.is_none());
244    }
245}