mithril_aggregator/database/repository/
epoch_settings_store.rs

1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::StdResult;
7use mithril_common::entities::{Epoch, ProtocolParameters};
8use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
9
10use crate::database::query::{
11    DeleteEpochSettingsQuery, GetEpochSettingsQuery, UpdateEpochSettingsQuery,
12};
13use crate::entities::AggregatorEpochSettings;
14use crate::services::EpochPruningTask;
15use crate::{EpochSettingsStorer, ProtocolParametersRetriever};
16
17/// Service to deal with epoch settings (read & write).
18pub struct EpochSettingsStore {
19    connection: Arc<SqliteConnection>,
20
21    /// Number of epochs before previous records will be pruned at the next call to
22    /// [save_protocol_parameters][EpochSettingStore::save_protocol_parameters].
23    retention_limit: Option<u64>,
24}
25
26impl EpochSettingsStore {
27    /// Create a new EpochSettings store
28    pub fn new(connection: Arc<SqliteConnection>, retention_limit: Option<u64>) -> Self {
29        Self {
30            connection,
31            retention_limit,
32        }
33    }
34}
35
36#[async_trait]
37impl ProtocolParametersRetriever for EpochSettingsStore {
38    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
39        Ok(self
40            .get_epoch_settings(epoch)
41            .await?
42            .map(|epoch_settings| epoch_settings.protocol_parameters))
43    }
44}
45
46#[async_trait]
47impl EpochSettingsStorer for EpochSettingsStore {
48    async fn save_epoch_settings(
49        &self,
50        epoch: Epoch,
51        epoch_settings: AggregatorEpochSettings,
52    ) -> StdResult<Option<AggregatorEpochSettings>> {
53        let epoch_settings_record = self
54            .connection
55            .fetch_first(UpdateEpochSettingsQuery::one(epoch, epoch_settings))
56            .with_context(|| format!("persist epoch settings failure for epoch {epoch:?}"))?
57            .unwrap_or_else(|| panic!("No entity returned by the persister, epoch = {epoch:?}"));
58
59        Ok(Some(epoch_settings_record.into()))
60    }
61
62    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
63        let mut cursor = self
64            .connection
65            .fetch(GetEpochSettingsQuery::by_epoch(epoch)?)
66            .with_context(|| format!("Could not get epoch settings: epoch = {epoch:?}"))?;
67
68        if let Some(epoch_settings_record) = cursor.next() {
69            return Ok(Some(epoch_settings_record.into()));
70        }
71        Ok(None)
72    }
73}
74
75#[async_trait]
76impl EpochPruningTask for EpochSettingsStore {
77    fn pruned_data(&self) -> &'static str {
78        "Epoch settings"
79    }
80
81    /// Prune useless old epoch settings.
82    async fn prune(&self, epoch: Epoch) -> StdResult<()> {
83        if let Some(threshold) = self.retention_limit {
84            self.connection
85                .apply(DeleteEpochSettingsQuery::below_epoch_threshold(
86                    epoch - threshold,
87                ))?;
88        }
89        Ok(())
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use mithril_common::test::double::Dummy;
96
97    use crate::database::test_helper::{insert_epoch_settings, main_db_connection};
98
99    use super::*;
100
101    #[tokio::test]
102    async fn prune_epoch_settings_older_than_threshold() {
103        const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5;
104
105        let connection = main_db_connection().unwrap();
106        insert_epoch_settings(&connection, &[1, 2]).unwrap();
107        let store = EpochSettingsStore::new(
108            Arc::new(connection),
109            Some(EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD),
110        );
111
112        store
113            .prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD)
114            .await
115            .unwrap();
116
117        let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
118        let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
119
120        assert!(
121            epoch1_params.is_none(),
122            "Epoch settings at epoch 1 should have been pruned",
123        );
124        assert!(
125            epoch2_params.is_some(),
126            "Epoch settings at epoch 2 should still exist",
127        );
128    }
129
130    #[tokio::test]
131    async fn without_threshold_nothing_is_pruned() {
132        let connection = main_db_connection().unwrap();
133        insert_epoch_settings(&connection, &[1, 2]).unwrap();
134        let store = EpochSettingsStore::new(Arc::new(connection), None);
135
136        store.prune(Epoch(100)).await.unwrap();
137
138        let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
139        let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
140
141        assert!(
142            epoch1_params.is_some(),
143            "Epoch settings at epoch 1 should have been pruned",
144        );
145        assert!(
146            epoch2_params.is_some(),
147            "Epoch settings at epoch 2 should still exist",
148        );
149    }
150
151    #[tokio::test]
152    async fn save_epoch_settings_stores_in_database() {
153        let connection = main_db_connection().unwrap();
154
155        let store = EpochSettingsStore::new(Arc::new(connection), None);
156
157        store
158            .save_epoch_settings(Epoch(2), AggregatorEpochSettings::dummy())
159            .await
160            .expect("saving epoch settings should not fails");
161        {
162            let epoch_settings = store.get_epoch_settings(Epoch(1)).await.unwrap();
163            assert_eq!(None, epoch_settings);
164        }
165        {
166            let epoch_settings = store.get_epoch_settings(Epoch(2)).await.unwrap().unwrap();
167            assert_eq!(AggregatorEpochSettings::dummy(), epoch_settings);
168        }
169        {
170            let epoch_settings = store.get_epoch_settings(Epoch(3)).await.unwrap();
171            assert_eq!(None, epoch_settings);
172        }
173    }
174}