mithril_aggregator/store/
epoch_settings_storer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
use std::collections::HashMap;

use async_trait::async_trait;
use mithril_common::StdResult;
use tokio::sync::RwLock;

use mithril_common::entities::{Epoch, ProtocolParameters};

use crate::{entities::AggregatorEpochSettings, services::EpochPruningTask};

/// Store and get [aggregator epoch settings][AggregatorEpochSettings] for given epoch.
#[async_trait]
pub trait EpochSettingsStorer: EpochPruningTask + Sync + Send {
    /// Save the given `AggregatorEpochSettings` for the given [Epoch].
    async fn save_epoch_settings(
        &self,
        epoch: Epoch,
        epoch_settings: AggregatorEpochSettings,
    ) -> StdResult<Option<AggregatorEpochSettings>>;

    /// Get the saved `ProtocolParameter` for the given [Epoch] if any.
    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>>;

    /// Get the saved `AggregatorEpochSettings` for the given [Epoch] if any.
    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>>;

    /// Handle discrepancies at startup in the epoch settings store.
    /// In case an aggregator has been launched after some epochs of not being up or at initial startup,
    /// the discrepancies in the epoch settings store needs to be fixed.
    /// The epoch settings needs to be recorded for the working epoch and the next 2 epochs.
    async fn handle_discrepancies_at_startup(
        &self,
        current_epoch: Epoch,
        epoch_settings_configuration: &AggregatorEpochSettings,
    ) -> StdResult<()> {
        for epoch_offset in 0..=2 {
            let epoch = current_epoch + epoch_offset;
            if self.get_epoch_settings(epoch).await?.is_none() {
                self.save_epoch_settings(epoch, epoch_settings_configuration.clone())
                    .await?;
            }
        }

        Ok(())
    }
}

pub struct FakeEpochSettingsStorer {
    pub epoch_settings: RwLock<HashMap<Epoch, AggregatorEpochSettings>>,
}

impl FakeEpochSettingsStorer {
    #[cfg(test)]
    pub fn new(data: Vec<(Epoch, AggregatorEpochSettings)>) -> Self {
        let epoch_settings = RwLock::new(data.into_iter().collect());
        Self { epoch_settings }
    }
}

#[async_trait]
impl EpochSettingsStorer for FakeEpochSettingsStorer {
    async fn save_epoch_settings(
        &self,
        epoch: Epoch,
        epoch_settings: AggregatorEpochSettings,
    ) -> StdResult<Option<AggregatorEpochSettings>> {
        let mut epoch_settings_write = self.epoch_settings.write().await;

        Ok(epoch_settings_write.insert(epoch, epoch_settings))
    }

    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
        Ok(self
            .get_epoch_settings(epoch)
            .await?
            .map(|epoch_settings| epoch_settings.protocol_parameters.clone()))
    }

    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
        let epoch_settings = self.epoch_settings.read().await;

        Ok(epoch_settings.get(&epoch).cloned())
    }
}

#[async_trait]
impl EpochPruningTask for FakeEpochSettingsStorer {
    fn pruned_data(&self) -> &'static str {
        "Fake epoch settings"
    }

    async fn prune(&self, _epoch: Epoch) -> StdResult<()> {
        Ok(())
    }
}

#[cfg(test)]
mod tests {

    use mithril_common::entities::CardanoTransactionsSigningConfig;

    use super::*;

    #[tokio::test]
    async fn test_save_epoch_settings_do_not_exist_yet_return_none() {
        let epoch_settings = AggregatorEpochSettings::dummy();
        let epoch = Epoch(1);
        let store = FakeEpochSettingsStorer::new(vec![]);
        let epoch_settings_previous = store
            .save_epoch_settings(epoch, epoch_settings)
            .await
            .unwrap();

        assert!(epoch_settings_previous.is_none());
    }

    #[tokio::test]
    async fn test_save_epoch_settings_already_exist_return_previous_epoch_settings_stored() {
        let epoch_settings = AggregatorEpochSettings::dummy();
        let epoch = Epoch(1);
        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
        let protocol_parameters_new = ProtocolParameters {
            k: epoch_settings.protocol_parameters.k + 1,
            ..epoch_settings.protocol_parameters
        };
        let epoch_settings_previous = store
            .save_epoch_settings(
                epoch,
                AggregatorEpochSettings {
                    protocol_parameters: protocol_parameters_new.clone(),
                    ..epoch_settings.clone()
                },
            )
            .await
            .unwrap();

        assert_eq!(Some(epoch_settings), epoch_settings_previous);
    }

    #[tokio::test]
    async fn test_get_epoch_settings_exist() {
        let epoch_settings = AggregatorEpochSettings::dummy();
        let epoch = Epoch(1);
        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();

        assert_eq!(Some(epoch_settings), epoch_settings_stored);
    }

    #[tokio::test]
    async fn test_get_epoch_settings_do_not_exist() {
        let epoch_settings = AggregatorEpochSettings::dummy();
        let epoch = Epoch(1);
        let store = FakeEpochSettingsStorer::new(vec![(epoch, epoch_settings.clone())]);
        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();

        assert!(epoch_settings_stored.is_none());
    }

    #[tokio::test]
    async fn test_handle_discrepancies_at_startup_should_complete_at_least_two_epochs() {
        let epoch_settings = AggregatorEpochSettings::dummy();
        let epoch_settings_new = AggregatorEpochSettings {
            protocol_parameters: ProtocolParameters {
                k: epoch_settings.protocol_parameters.k + 1,
                ..epoch_settings.protocol_parameters
            },
            cardano_transactions_signing_config: CardanoTransactionsSigningConfig {
                step: epoch_settings.cardano_transactions_signing_config.step + 1,
                ..epoch_settings.cardano_transactions_signing_config
            },
        };
        let epoch = Epoch(1);
        let store = FakeEpochSettingsStorer::new(vec![
            (epoch, epoch_settings.clone()),
            (epoch + 1, epoch_settings.clone()),
        ]);

        store
            .handle_discrepancies_at_startup(epoch, &epoch_settings_new)
            .await
            .unwrap();

        let epoch_settings_stored = store.get_epoch_settings(epoch).await.unwrap();
        assert_eq!(Some(epoch_settings.clone()), epoch_settings_stored);

        let epoch_settings_stored = store.get_epoch_settings(epoch + 1).await.unwrap();
        assert_eq!(Some(epoch_settings.clone()), epoch_settings_stored);

        let epoch_settings_stored = store.get_epoch_settings(epoch + 2).await.unwrap();
        assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);

        let epoch_settings_stored = store.get_epoch_settings(epoch + 3).await.unwrap();
        assert!(epoch_settings_stored.is_none());
    }
}