mithril_aggregator/database/repository/
epoch_settings_store.rs

1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::StdResult;
7use mithril_common::entities::{Epoch, ProtocolParameters};
8use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
9
10use crate::database::query::{
11    DeleteEpochSettingsQuery, GetEpochSettingsQuery, InsertOrIgnoreEpochSettingsQuery,
12};
13use crate::database::record::EpochSettingsRecord;
14use crate::entities::AggregatorEpochSettings;
15use crate::services::EpochPruningTask;
16use crate::{EpochSettingsStorer, ProtocolParametersRetriever};
17
18/// Service to deal with epoch settings (read & write).
19pub struct EpochSettingsStore {
20    connection: Arc<SqliteConnection>,
21
22    /// Number of epochs before previous records will be pruned at the next call to
23    /// [save_protocol_parameters][EpochSettingStore::save_protocol_parameters].
24    retention_limit: Option<u64>,
25}
26
27impl EpochSettingsStore {
28    /// Create a new EpochSettings store
29    pub fn new(connection: Arc<SqliteConnection>, retention_limit: Option<u64>) -> Self {
30        Self {
31            connection,
32            retention_limit,
33        }
34    }
35}
36
37#[async_trait]
38impl ProtocolParametersRetriever for EpochSettingsStore {
39    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
40        Ok(self
41            .get_epoch_settings(epoch)
42            .await?
43            .map(|epoch_settings| epoch_settings.protocol_parameters))
44    }
45}
46
47#[async_trait]
48impl EpochSettingsStorer for EpochSettingsStore {
49    async fn save_epoch_settings(
50        &self,
51        epoch: Epoch,
52        epoch_settings: AggregatorEpochSettings,
53    ) -> StdResult<Option<AggregatorEpochSettings>> {
54        let record_to_insert = EpochSettingsRecord {
55            epoch_settings_id: epoch,
56            cardano_transactions_signing_config: epoch_settings.cardano_transactions_signing_config,
57            protocol_parameters: epoch_settings.protocol_parameters,
58        };
59        let epoch_settings_record = self
60            .connection
61            .fetch_first(InsertOrIgnoreEpochSettingsQuery::one(record_to_insert))
62            .with_context(|| format!("persist epoch settings failure for epoch {epoch:?}"))?;
63
64        Ok(epoch_settings_record.map(Into::into))
65    }
66
67    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
68        let mut cursor = self
69            .connection
70            .fetch(GetEpochSettingsQuery::by_epoch(epoch)?)
71            .with_context(|| format!("Could not get epoch settings: epoch = {epoch:?}"))?;
72
73        if let Some(epoch_settings_record) = cursor.next() {
74            return Ok(Some(epoch_settings_record.into()));
75        }
76        Ok(None)
77    }
78}
79
80#[async_trait]
81impl EpochPruningTask for EpochSettingsStore {
82    fn pruned_data(&self) -> &'static str {
83        "Epoch settings"
84    }
85
86    /// Prune useless old epoch settings.
87    async fn prune(&self, epoch: Epoch) -> StdResult<()> {
88        if let Some(threshold) = self.retention_limit {
89            self.connection
90                .apply(DeleteEpochSettingsQuery::below_epoch_threshold(
91                    epoch - threshold,
92                ))?;
93        }
94        Ok(())
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use mithril_common::test::double::Dummy;
101
102    use crate::database::test_helper::{insert_epoch_settings, main_db_connection};
103
104    use super::*;
105
106    #[tokio::test]
107    async fn prune_epoch_settings_older_than_threshold() {
108        const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5;
109
110        let connection = main_db_connection().unwrap();
111        insert_epoch_settings(&connection, &[1, 2]).unwrap();
112        let store = EpochSettingsStore::new(
113            Arc::new(connection),
114            Some(EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD),
115        );
116
117        store
118            .prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD)
119            .await
120            .unwrap();
121
122        let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
123        let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
124
125        assert!(
126            epoch1_params.is_none(),
127            "Epoch settings at epoch 1 should have been pruned",
128        );
129        assert!(
130            epoch2_params.is_some(),
131            "Epoch settings at epoch 2 should still exist",
132        );
133    }
134
135    #[tokio::test]
136    async fn without_threshold_nothing_is_pruned() {
137        let connection = main_db_connection().unwrap();
138        insert_epoch_settings(&connection, &[1, 2]).unwrap();
139        let store = EpochSettingsStore::new(Arc::new(connection), None);
140
141        store.prune(Epoch(100)).await.unwrap();
142
143        let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
144        let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
145
146        assert!(
147            epoch1_params.is_some(),
148            "Epoch settings at epoch 1 should have been pruned",
149        );
150        assert!(
151            epoch2_params.is_some(),
152            "Epoch settings at epoch 2 should still exist",
153        );
154    }
155
156    #[tokio::test]
157    async fn save_epoch_settings_stores_in_database() {
158        let connection = main_db_connection().unwrap();
159
160        let store = EpochSettingsStore::new(Arc::new(connection), None);
161
162        store
163            .save_epoch_settings(Epoch(2), AggregatorEpochSettings::dummy())
164            .await
165            .expect("saving epoch settings should not fails");
166        {
167            let epoch_settings = store.get_epoch_settings(Epoch(1)).await.unwrap();
168            assert_eq!(None, epoch_settings);
169        }
170        {
171            let epoch_settings = store.get_epoch_settings(Epoch(2)).await.unwrap().unwrap();
172            assert_eq!(AggregatorEpochSettings::dummy(), epoch_settings);
173        }
174        {
175            let epoch_settings = store.get_epoch_settings(Epoch(3)).await.unwrap();
176            assert_eq!(None, epoch_settings);
177        }
178    }
179
180    #[tokio::test]
181    async fn save_epoch_settings_does_not_replace_existing_value_in_database() {
182        let connection = main_db_connection().unwrap();
183
184        let store = EpochSettingsStore::new(Arc::new(connection), None);
185        let expected_epoch_settings = AggregatorEpochSettings {
186            protocol_parameters: ProtocolParameters::new(1, 1, 0.5),
187            ..Dummy::dummy()
188        };
189
190        store
191            .save_epoch_settings(Epoch(2), expected_epoch_settings.clone())
192            .await
193            .expect("saving epoch settings should not fails");
194
195        store
196            .save_epoch_settings(
197                Epoch(2),
198                AggregatorEpochSettings {
199                    protocol_parameters: ProtocolParameters::new(2, 2, 1.5),
200                    ..Dummy::dummy()
201                },
202            )
203            .await
204            .expect("saving epoch settings should not fails");
205
206        let epoch_settings = store.get_epoch_settings(Epoch(2)).await.unwrap().unwrap();
207        assert_eq!(expected_epoch_settings, epoch_settings);
208    }
209}