mithril_aggregator/database/repository/
epoch_settings_store.rs

1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::StdResult;
7use mithril_common::entities::{Epoch, ProtocolParameters};
8use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
9
10use crate::database::query::{
11    DeleteEpochSettingsQuery, GetEpochSettingsQuery, InsertOrIgnoreEpochSettingsQuery,
12};
13use crate::database::record::EpochSettingsRecord;
14use crate::entities::AggregatorEpochSettings;
15use crate::services::EpochPruningTask;
16use crate::{EpochSettingsStorer, ProtocolParametersRetriever};
17
18/// Service to deal with epoch settings (read & write).
19pub struct EpochSettingsStore {
20    connection: Arc<SqliteConnection>,
21
22    /// Number of epochs before previous records will be pruned at the next call to
23    /// [save_protocol_parameters][EpochSettingStore::save_protocol_parameters].
24    retention_limit: Option<u64>,
25}
26
27impl EpochSettingsStore {
28    /// Create a new EpochSettings store
29    pub fn new(connection: Arc<SqliteConnection>, retention_limit: Option<u64>) -> Self {
30        Self {
31            connection,
32            retention_limit,
33        }
34    }
35}
36
37#[async_trait]
38impl ProtocolParametersRetriever for EpochSettingsStore {
39    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
40        Ok(self
41            .get_epoch_settings(epoch)
42            .await?
43            .map(|epoch_settings| epoch_settings.protocol_parameters))
44    }
45}
46
47#[async_trait]
48impl EpochSettingsStorer for EpochSettingsStore {
49    async fn save_epoch_settings(
50        &self,
51        epoch: Epoch,
52        epoch_settings: AggregatorEpochSettings,
53    ) -> StdResult<Option<AggregatorEpochSettings>> {
54        let record_to_insert = EpochSettingsRecord {
55            epoch_settings_id: epoch,
56            cardano_transactions_signing_config: epoch_settings.cardano_transactions_signing_config,
57            cardano_blocks_transactions_signing_config: epoch_settings
58                .cardano_blocks_transactions_signing_config,
59            protocol_parameters: epoch_settings.protocol_parameters,
60        };
61        let epoch_settings_record = self
62            .connection
63            .fetch_first(InsertOrIgnoreEpochSettingsQuery::one(record_to_insert))
64            .with_context(|| format!("persist epoch settings failure for epoch {epoch:?}"))?;
65
66        Ok(epoch_settings_record.map(Into::into))
67    }
68
69    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
70        let mut cursor = self
71            .connection
72            .fetch(GetEpochSettingsQuery::by_epoch(epoch)?)
73            .with_context(|| format!("Could not get epoch settings: epoch = {epoch:?}"))?;
74
75        if let Some(epoch_settings_record) = cursor.next() {
76            return Ok(Some(epoch_settings_record.into()));
77        }
78        Ok(None)
79    }
80}
81
82#[async_trait]
83impl EpochPruningTask for EpochSettingsStore {
84    fn pruned_data(&self) -> &'static str {
85        "Epoch settings"
86    }
87
88    /// Prune useless old epoch settings.
89    async fn prune(&self, epoch: Epoch) -> StdResult<()> {
90        if let Some(threshold) = self.retention_limit {
91            self.connection
92                .apply(DeleteEpochSettingsQuery::below_epoch_threshold(
93                    epoch - threshold,
94                ))?;
95        }
96        Ok(())
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use mithril_common::test::double::Dummy;
103
104    use crate::database::test_helper::{insert_epoch_settings, main_db_connection};
105
106    use super::*;
107
108    #[tokio::test]
109    async fn prune_epoch_settings_older_than_threshold() {
110        const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5;
111
112        let connection = main_db_connection().unwrap();
113        insert_epoch_settings(&connection, &[1, 2]).unwrap();
114        let store = EpochSettingsStore::new(
115            Arc::new(connection),
116            Some(EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD),
117        );
118
119        store
120            .prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD)
121            .await
122            .unwrap();
123
124        let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
125        let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
126
127        assert!(
128            epoch1_params.is_none(),
129            "Epoch settings at epoch 1 should have been pruned",
130        );
131        assert!(
132            epoch2_params.is_some(),
133            "Epoch settings at epoch 2 should still exist",
134        );
135    }
136
137    #[tokio::test]
138    async fn without_threshold_nothing_is_pruned() {
139        let connection = main_db_connection().unwrap();
140        insert_epoch_settings(&connection, &[1, 2]).unwrap();
141        let store = EpochSettingsStore::new(Arc::new(connection), None);
142
143        store.prune(Epoch(100)).await.unwrap();
144
145        let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
146        let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
147
148        assert!(
149            epoch1_params.is_some(),
150            "Epoch settings at epoch 1 should have been pruned",
151        );
152        assert!(
153            epoch2_params.is_some(),
154            "Epoch settings at epoch 2 should still exist",
155        );
156    }
157
158    #[tokio::test]
159    async fn save_epoch_settings_stores_in_database() {
160        let connection = main_db_connection().unwrap();
161
162        let store = EpochSettingsStore::new(Arc::new(connection), None);
163
164        store
165            .save_epoch_settings(Epoch(2), AggregatorEpochSettings::dummy())
166            .await
167            .expect("saving epoch settings should not fails");
168        {
169            let epoch_settings = store.get_epoch_settings(Epoch(1)).await.unwrap();
170            assert_eq!(None, epoch_settings);
171        }
172        {
173            let epoch_settings = store.get_epoch_settings(Epoch(2)).await.unwrap().unwrap();
174            assert_eq!(AggregatorEpochSettings::dummy(), epoch_settings);
175        }
176        {
177            let epoch_settings = store.get_epoch_settings(Epoch(3)).await.unwrap();
178            assert_eq!(None, epoch_settings);
179        }
180    }
181
182    #[tokio::test]
183    async fn save_epoch_settings_does_not_replace_existing_value_in_database() {
184        let connection = main_db_connection().unwrap();
185
186        let store = EpochSettingsStore::new(Arc::new(connection), None);
187        let expected_epoch_settings = AggregatorEpochSettings {
188            protocol_parameters: ProtocolParameters::new(1, 1, 0.5),
189            ..Dummy::dummy()
190        };
191
192        store
193            .save_epoch_settings(Epoch(2), expected_epoch_settings.clone())
194            .await
195            .expect("saving epoch settings should not fails");
196
197        store
198            .save_epoch_settings(
199                Epoch(2),
200                AggregatorEpochSettings {
201                    protocol_parameters: ProtocolParameters::new(2, 2, 1.5),
202                    ..Dummy::dummy()
203                },
204            )
205            .await
206            .expect("saving epoch settings should not fails");
207
208        let epoch_settings = store.get_epoch_settings(Epoch(2)).await.unwrap().unwrap();
209        assert_eq!(expected_epoch_settings, epoch_settings);
210    }
211}