mithril_aggregator/database/repository/
epoch_settings_store.rs

1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::entities::{Epoch, ProtocolParameters};
7use mithril_common::StdResult;
8use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
9
10use crate::database::query::{
11    DeleteEpochSettingsQuery, GetEpochSettingsQuery, UpdateEpochSettingsQuery,
12};
13use crate::entities::AggregatorEpochSettings;
14use crate::services::EpochPruningTask;
15use crate::EpochSettingsStorer;
16
17/// Service to deal with epoch settings (read & write).
18pub struct EpochSettingsStore {
19    connection: Arc<SqliteConnection>,
20
21    /// Number of epochs before previous records will be pruned at the next call to
22    /// [save_protocol_parameters][EpochSettingStore::save_protocol_parameters].
23    retention_limit: Option<u64>,
24}
25
26impl EpochSettingsStore {
27    /// Create a new EpochSettings store
28    pub fn new(connection: Arc<SqliteConnection>, retention_limit: Option<u64>) -> Self {
29        Self {
30            connection,
31            retention_limit,
32        }
33    }
34}
35
36#[async_trait]
37impl EpochSettingsStorer for EpochSettingsStore {
38    async fn save_epoch_settings(
39        &self,
40        epoch: Epoch,
41        epoch_settings: AggregatorEpochSettings,
42    ) -> StdResult<Option<AggregatorEpochSettings>> {
43        let epoch_settings_record = self
44            .connection
45            .fetch_first(UpdateEpochSettingsQuery::one(epoch, epoch_settings))
46            .with_context(|| format!("persist epoch settings failure for epoch {epoch:?}"))?
47            .unwrap_or_else(|| panic!("No entity returned by the persister, epoch = {epoch:?}"));
48
49        Ok(Some(epoch_settings_record.into()))
50    }
51
52    async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
53        Ok(self
54            .get_epoch_settings(epoch)
55            .await?
56            .map(|epoch_settings| epoch_settings.protocol_parameters))
57    }
58
59    async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
60        let mut cursor = self
61            .connection
62            .fetch(GetEpochSettingsQuery::by_epoch(epoch)?)
63            .with_context(|| format!("Could not get epoch settings: epoch = {epoch:?}"))?;
64
65        if let Some(epoch_settings_record) = cursor.next() {
66            return Ok(Some(epoch_settings_record.into()));
67        }
68        Ok(None)
69    }
70}
71
72#[async_trait]
73impl EpochPruningTask for EpochSettingsStore {
74    fn pruned_data(&self) -> &'static str {
75        "Epoch settings"
76    }
77
78    /// Prune useless old epoch settings.
79    async fn prune(&self, epoch: Epoch) -> StdResult<()> {
80        if let Some(threshold) = self.retention_limit {
81            self.connection
82                .apply(DeleteEpochSettingsQuery::below_epoch_threshold(
83                    epoch - threshold,
84                ))?;
85        }
86        Ok(())
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use crate::database::test_helper::{insert_epoch_settings, main_db_connection};
93
94    use super::*;
95
96    #[tokio::test]
97    async fn prune_epoch_settings_older_than_threshold() {
98        const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5;
99
100        let connection = main_db_connection().unwrap();
101        insert_epoch_settings(&connection, &[1, 2]).unwrap();
102        let store = EpochSettingsStore::new(
103            Arc::new(connection),
104            Some(EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD),
105        );
106
107        store
108            .prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD)
109            .await
110            .unwrap();
111
112        let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
113        let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
114
115        assert!(
116            epoch1_params.is_none(),
117            "Epoch settings at epoch 1 should have been pruned",
118        );
119        assert!(
120            epoch2_params.is_some(),
121            "Epoch settings at epoch 2 should still exist",
122        );
123    }
124
125    #[tokio::test]
126    async fn without_threshold_nothing_is_pruned() {
127        let connection = main_db_connection().unwrap();
128        insert_epoch_settings(&connection, &[1, 2]).unwrap();
129        let store = EpochSettingsStore::new(Arc::new(connection), None);
130
131        store.prune(Epoch(100)).await.unwrap();
132
133        let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
134        let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
135
136        assert!(
137            epoch1_params.is_some(),
138            "Epoch settings at epoch 1 should have been pruned",
139        );
140        assert!(
141            epoch2_params.is_some(),
142            "Epoch settings at epoch 2 should still exist",
143        );
144    }
145
146    #[tokio::test]
147    async fn save_epoch_settings_stores_in_database() {
148        let connection = main_db_connection().unwrap();
149
150        let store = EpochSettingsStore::new(Arc::new(connection), None);
151
152        store
153            .save_epoch_settings(Epoch(2), AggregatorEpochSettings::dummy())
154            .await
155            .expect("saving epoch settings should not fails");
156        {
157            let epoch_settings = store.get_epoch_settings(Epoch(1)).await.unwrap();
158            assert_eq!(None, epoch_settings);
159        }
160        {
161            let epoch_settings = store.get_epoch_settings(Epoch(2)).await.unwrap().unwrap();
162            assert_eq!(AggregatorEpochSettings::dummy(), epoch_settings);
163        }
164        {
165            let epoch_settings = store.get_epoch_settings(Epoch(3)).await.unwrap();
166            assert_eq!(None, epoch_settings);
167        }
168    }
169}