mithril_aggregator/store/
epoch_settings_storer.rs

1use async_trait::async_trait;
2#[cfg(test)]
3use std::collections::HashMap;
4#[cfg(test)]
5use tokio::sync::RwLock;
6
7use mithril_common::StdResult;
8use mithril_common::entities::{Epoch, ProtocolParameters};
9use mithril_protocol_config::model::MithrilNetworkConfiguration;
10
11use crate::{entities::AggregatorEpochSettings, services::EpochPruningTask};
12
13/// Retrieve the [ProtocolParameters] for the given epoch.
14#[async_trait]
15pub trait ProtocolParametersRetriever: Sync + Send {
16    /// Get the saved `ProtocolParameters` for the given [Epoch] if any.
17    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>>;
18}
19
20/// Store and get [aggregator epoch settings][AggregatorEpochSettings] for given epoch.
21#[async_trait]
22pub trait EpochSettingsStorer:
23    ProtocolParametersRetriever + EpochPruningTask + Sync + Send
24{
25    /// Save the given `AggregatorEpochSettings` for the given [Epoch].
26    async fn save_epoch_settings(
27        &self,
28        epoch: Epoch,
29        epoch_settings: AggregatorEpochSettings,
30    ) -> StdResult<Option<AggregatorEpochSettings>>;
31
32    /// Get the saved `AggregatorEpochSettings` for the given [Epoch] if any.
33    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>>;
34
35    /// Handle discrepancies at startup in the epoch settings store.
36    ///
37    /// In case an aggregator has been launched after some epochs of not being up or at initial startup,
38    /// the discrepancies in the epoch settings store needs to be fixed.
39    ///
40    /// The epoch settings needs to be recorded for the working epoch and the next 3 epochs.
41    /// We need data over four epochs because the epoch service use epoch settings over a window of
42    /// three epochs, and there may be an epoch change between this `handle_discrepancies_at_startup`
43    /// call and the epoch service call.
44    async fn handle_discrepancies_at_startup(
45        &self,
46        network_configuration: &MithrilNetworkConfiguration,
47    ) -> StdResult<()> {
48        for (epoch, epoch_configuration) in [
49            (
50                network_configuration.epoch.offset_to_signer_retrieval_epoch()?,
51                &network_configuration.configuration_for_aggregation,
52            ),
53            (
54                network_configuration.epoch,
55                &network_configuration.configuration_for_next_aggregation,
56            ),
57            (
58                network_configuration.epoch.offset_to_recording_epoch(),
59                &network_configuration.configuration_for_registration,
60            ),
61        ] {
62            if self.get_epoch_settings(epoch).await?.is_none() {
63                self.save_epoch_settings(
64                    epoch,
65                    AggregatorEpochSettings {
66                        protocol_parameters: epoch_configuration.protocol_parameters.clone(),
67                        cardano_transactions_signing_config: epoch_configuration
68                            .signed_entity_types_config
69                            .cardano_transactions
70                            .clone(),
71                    },
72                )
73                .await?;
74            }
75        }
76
77        Ok(())
78    }
79}
80
81#[cfg(test)]
82pub struct FakeEpochSettingsStorer {
83    pub epoch_settings: RwLock<HashMap<Epoch, AggregatorEpochSettings>>,
84}
85
86#[cfg(test)]
87impl FakeEpochSettingsStorer {
88    pub fn new(data: Vec<(Epoch, AggregatorEpochSettings)>) -> Self {
89        let epoch_settings = RwLock::new(data.into_iter().collect());
90        Self { epoch_settings }
91    }
92}
93
94#[cfg(test)]
95#[async_trait]
96impl ProtocolParametersRetriever for FakeEpochSettingsStorer {
97    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
98        Ok(self
99            .get_epoch_settings(epoch)
100            .await?
101            .map(|epoch_settings| epoch_settings.protocol_parameters.clone()))
102    }
103}
104
105#[cfg(test)]
106#[async_trait]
107impl EpochSettingsStorer for FakeEpochSettingsStorer {
108    async fn save_epoch_settings(
109        &self,
110        epoch: Epoch,
111        epoch_settings: AggregatorEpochSettings,
112    ) -> StdResult<Option<AggregatorEpochSettings>> {
113        let mut epoch_settings_write = self.epoch_settings.write().await;
114
115        Ok(epoch_settings_write.insert(epoch, epoch_settings))
116    }
117
118    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
119        let epoch_settings = self.epoch_settings.read().await;
120
121        Ok(epoch_settings.get(&epoch).cloned())
122    }
123}
124
125#[cfg(test)]
126#[async_trait]
127impl EpochPruningTask for FakeEpochSettingsStorer {
128    fn pruned_data(&self) -> &'static str {
129        "Fake epoch settings"
130    }
131
132    async fn prune(&self, _epoch: Epoch) -> StdResult<()> {
133        Ok(())
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use std::collections::BTreeSet;
140
141    use mithril_common::test::double::Dummy;
142
143    use super::*;
144
145    #[tokio::test]
146    async fn test_save_epoch_settings_do_not_exist_yet_return_none() {
147        let epoch_settings = AggregatorEpochSettings::dummy();
148        let epoch = Epoch(1);
149        let store = FakeEpochSettingsStorer::new(vec![]);
150        let epoch_settings_previous =
151            store.save_epoch_settings(epoch, epoch_settings).await.unwrap();
152
153        assert!(epoch_settings_previous.is_none());
154    }
155
156    #[tokio::test]
157    async fn test_save_epoch_settings_already_exist_return_previous_epoch_settings_stored() {
158        let epoch_settings = AggregatorEpochSettings::dummy();
159        let epoch = Epoch(1);
160        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
161        let protocol_parameters_new = ProtocolParameters {
162            k: epoch_settings.protocol_parameters.k + 1,
163            ..epoch_settings.protocol_parameters
164        };
165        let epoch_settings_previous = store
166            .save_epoch_settings(
167                epoch,
168                AggregatorEpochSettings {
169                    protocol_parameters: protocol_parameters_new.clone(),
170                    ..epoch_settings.clone()
171                },
172            )
173            .await
174            .unwrap();
175
176        assert_eq!(Some(epoch_settings), epoch_settings_previous);
177    }
178
179    #[tokio::test]
180    async fn test_get_epoch_settings_exist() {
181        let epoch_settings = AggregatorEpochSettings::dummy();
182        let epoch = Epoch(1);
183        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
184        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
185
186        assert_eq!(Some(epoch_settings), epoch_settings_stored);
187    }
188
189    #[tokio::test]
190    async fn test_get_epoch_settings_do_not_exist() {
191        let epoch_settings = AggregatorEpochSettings::dummy();
192        let epoch = Epoch(1);
193        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
194        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
195
196        assert!(epoch_settings_stored.is_none());
197    }
198
199    #[tokio::test]
200    async fn test_handle_discrepancies_at_startup_should_complete_at_least_four_epochs() {
201        let epoch_settings = AggregatorEpochSettings::dummy();
202        let mut aggregation_epoch_settings = epoch_settings.clone();
203        aggregation_epoch_settings.protocol_parameters.k += 15;
204
205        let mut next_aggregation_epoch_settings = epoch_settings.clone();
206        next_aggregation_epoch_settings.protocol_parameters.k += 26;
207
208        let mut registration_epoch_settings = epoch_settings.clone();
209        registration_epoch_settings.protocol_parameters.k += 37;
210
211        let epoch = Epoch(5);
212        let store = FakeEpochSettingsStorer::new(vec![]);
213        store
214            .handle_discrepancies_at_startup(&MithrilNetworkConfiguration {
215                epoch,
216                configuration_for_aggregation: aggregation_epoch_settings
217                    .clone()
218                    .into_network_configuration_for_epoch(BTreeSet::new()),
219                configuration_for_next_aggregation: next_aggregation_epoch_settings
220                    .clone()
221                    .into_network_configuration_for_epoch(BTreeSet::new()),
222                configuration_for_registration: registration_epoch_settings
223                    .clone()
224                    .into_network_configuration_for_epoch(BTreeSet::new()),
225            })
226            .await
227            .unwrap();
228
229        let epoch_settings_stored = store.get_epoch_settings(epoch - 1).await.unwrap();
230        assert_eq!(Some(aggregation_epoch_settings), epoch_settings_stored);
231
232        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
233        assert_eq!(Some(next_aggregation_epoch_settings), epoch_settings_stored);
234
235        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
236        assert_eq!(Some(registration_epoch_settings), epoch_settings_stored);
237
238        let epoch_settings_stored = store.get_epoch_settings(epoch + 2).await.unwrap();
239        assert!(epoch_settings_stored.is_none());
240    }
241}