mithril_aggregator/database/repository/
epoch_settings_store.rs1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::StdResult;
7use mithril_common::entities::{Epoch, ProtocolParameters};
8use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
9
10use crate::database::query::{
11 DeleteEpochSettingsQuery, GetEpochSettingsQuery, UpdateEpochSettingsQuery,
12};
13use crate::entities::AggregatorEpochSettings;
14use crate::services::EpochPruningTask;
15use crate::{EpochSettingsStorer, ProtocolParametersRetriever};
16
17pub struct EpochSettingsStore {
19 connection: Arc<SqliteConnection>,
20
21 retention_limit: Option<u64>,
24}
25
26impl EpochSettingsStore {
27 pub fn new(connection: Arc<SqliteConnection>, retention_limit: Option<u64>) -> Self {
29 Self {
30 connection,
31 retention_limit,
32 }
33 }
34}
35
36#[async_trait]
37impl ProtocolParametersRetriever for EpochSettingsStore {
38 async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
39 Ok(self
40 .get_epoch_settings(epoch)
41 .await?
42 .map(|epoch_settings| epoch_settings.protocol_parameters))
43 }
44}
45
46#[async_trait]
47impl EpochSettingsStorer for EpochSettingsStore {
48 async fn save_epoch_settings(
49 &self,
50 epoch: Epoch,
51 epoch_settings: AggregatorEpochSettings,
52 ) -> StdResult<Option<AggregatorEpochSettings>> {
53 let epoch_settings_record = self
54 .connection
55 .fetch_first(UpdateEpochSettingsQuery::one(epoch, epoch_settings))
56 .with_context(|| format!("persist epoch settings failure for epoch {epoch:?}"))?
57 .unwrap_or_else(|| panic!("No entity returned by the persister, epoch = {epoch:?}"));
58
59 Ok(Some(epoch_settings_record.into()))
60 }
61
62 async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
63 let mut cursor = self
64 .connection
65 .fetch(GetEpochSettingsQuery::by_epoch(epoch)?)
66 .with_context(|| format!("Could not get epoch settings: epoch = {epoch:?}"))?;
67
68 if let Some(epoch_settings_record) = cursor.next() {
69 return Ok(Some(epoch_settings_record.into()));
70 }
71 Ok(None)
72 }
73}
74
75#[async_trait]
76impl EpochPruningTask for EpochSettingsStore {
77 fn pruned_data(&self) -> &'static str {
78 "Epoch settings"
79 }
80
81 async fn prune(&self, epoch: Epoch) -> StdResult<()> {
83 if let Some(threshold) = self.retention_limit {
84 self.connection
85 .apply(DeleteEpochSettingsQuery::below_epoch_threshold(
86 epoch - threshold,
87 ))?;
88 }
89 Ok(())
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use mithril_common::test::double::Dummy;
96
97 use crate::database::test_helper::{insert_epoch_settings, main_db_connection};
98
99 use super::*;
100
101 #[tokio::test]
102 async fn prune_epoch_settings_older_than_threshold() {
103 const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5;
104
105 let connection = main_db_connection().unwrap();
106 insert_epoch_settings(&connection, &[1, 2]).unwrap();
107 let store = EpochSettingsStore::new(
108 Arc::new(connection),
109 Some(EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD),
110 );
111
112 store
113 .prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD)
114 .await
115 .unwrap();
116
117 let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
118 let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
119
120 assert!(
121 epoch1_params.is_none(),
122 "Epoch settings at epoch 1 should have been pruned",
123 );
124 assert!(
125 epoch2_params.is_some(),
126 "Epoch settings at epoch 2 should still exist",
127 );
128 }
129
130 #[tokio::test]
131 async fn without_threshold_nothing_is_pruned() {
132 let connection = main_db_connection().unwrap();
133 insert_epoch_settings(&connection, &[1, 2]).unwrap();
134 let store = EpochSettingsStore::new(Arc::new(connection), None);
135
136 store.prune(Epoch(100)).await.unwrap();
137
138 let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
139 let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
140
141 assert!(
142 epoch1_params.is_some(),
143 "Epoch settings at epoch 1 should have been pruned",
144 );
145 assert!(
146 epoch2_params.is_some(),
147 "Epoch settings at epoch 2 should still exist",
148 );
149 }
150
151 #[tokio::test]
152 async fn save_epoch_settings_stores_in_database() {
153 let connection = main_db_connection().unwrap();
154
155 let store = EpochSettingsStore::new(Arc::new(connection), None);
156
157 store
158 .save_epoch_settings(Epoch(2), AggregatorEpochSettings::dummy())
159 .await
160 .expect("saving epoch settings should not fails");
161 {
162 let epoch_settings = store.get_epoch_settings(Epoch(1)).await.unwrap();
163 assert_eq!(None, epoch_settings);
164 }
165 {
166 let epoch_settings = store.get_epoch_settings(Epoch(2)).await.unwrap().unwrap();
167 assert_eq!(AggregatorEpochSettings::dummy(), epoch_settings);
168 }
169 {
170 let epoch_settings = store.get_epoch_settings(Epoch(3)).await.unwrap();
171 assert_eq!(None, epoch_settings);
172 }
173 }
174}