mithril_aggregator/database/repository/
epoch_settings_store.rs1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::StdResult;
7use mithril_common::entities::{Epoch, ProtocolParameters};
8use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
9
10use crate::database::query::{
11 DeleteEpochSettingsQuery, GetEpochSettingsQuery, InsertOrIgnoreEpochSettingsQuery,
12};
13use crate::database::record::EpochSettingsRecord;
14use crate::entities::AggregatorEpochSettings;
15use crate::services::EpochPruningTask;
16use crate::{EpochSettingsStorer, ProtocolParametersRetriever};
17
18pub struct EpochSettingsStore {
20 connection: Arc<SqliteConnection>,
21
22 retention_limit: Option<u64>,
25}
26
27impl EpochSettingsStore {
28 pub fn new(connection: Arc<SqliteConnection>, retention_limit: Option<u64>) -> Self {
30 Self {
31 connection,
32 retention_limit,
33 }
34 }
35}
36
37#[async_trait]
38impl ProtocolParametersRetriever for EpochSettingsStore {
39 async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
40 Ok(self
41 .get_epoch_settings(epoch)
42 .await?
43 .map(|epoch_settings| epoch_settings.protocol_parameters))
44 }
45}
46
47#[async_trait]
48impl EpochSettingsStorer for EpochSettingsStore {
49 async fn save_epoch_settings(
50 &self,
51 epoch: Epoch,
52 epoch_settings: AggregatorEpochSettings,
53 ) -> StdResult<Option<AggregatorEpochSettings>> {
54 let record_to_insert = EpochSettingsRecord {
55 epoch_settings_id: epoch,
56 cardano_transactions_signing_config: epoch_settings.cardano_transactions_signing_config,
57 cardano_blocks_transactions_signing_config: epoch_settings
58 .cardano_blocks_transactions_signing_config,
59 protocol_parameters: epoch_settings.protocol_parameters,
60 };
61 let epoch_settings_record = self
62 .connection
63 .fetch_first(InsertOrIgnoreEpochSettingsQuery::one(record_to_insert))
64 .with_context(|| format!("persist epoch settings failure for epoch {epoch:?}"))?;
65
66 Ok(epoch_settings_record.map(Into::into))
67 }
68
69 async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
70 let mut cursor = self
71 .connection
72 .fetch(GetEpochSettingsQuery::by_epoch(epoch)?)
73 .with_context(|| format!("Could not get epoch settings: epoch = {epoch:?}"))?;
74
75 if let Some(epoch_settings_record) = cursor.next() {
76 return Ok(Some(epoch_settings_record.into()));
77 }
78 Ok(None)
79 }
80}
81
82#[async_trait]
83impl EpochPruningTask for EpochSettingsStore {
84 fn pruned_data(&self) -> &'static str {
85 "Epoch settings"
86 }
87
88 async fn prune(&self, epoch: Epoch) -> StdResult<()> {
90 if let Some(threshold) = self.retention_limit {
91 self.connection
92 .apply(DeleteEpochSettingsQuery::below_epoch_threshold(
93 epoch - threshold,
94 ))?;
95 }
96 Ok(())
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use mithril_common::test::double::Dummy;
103
104 use crate::database::test_helper::{insert_epoch_settings, main_db_connection};
105
106 use super::*;
107
108 #[tokio::test]
109 async fn prune_epoch_settings_older_than_threshold() {
110 const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5;
111
112 let connection = main_db_connection().unwrap();
113 insert_epoch_settings(&connection, &[1, 2]).unwrap();
114 let store = EpochSettingsStore::new(
115 Arc::new(connection),
116 Some(EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD),
117 );
118
119 store
120 .prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD)
121 .await
122 .unwrap();
123
124 let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
125 let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
126
127 assert!(
128 epoch1_params.is_none(),
129 "Epoch settings at epoch 1 should have been pruned",
130 );
131 assert!(
132 epoch2_params.is_some(),
133 "Epoch settings at epoch 2 should still exist",
134 );
135 }
136
137 #[tokio::test]
138 async fn without_threshold_nothing_is_pruned() {
139 let connection = main_db_connection().unwrap();
140 insert_epoch_settings(&connection, &[1, 2]).unwrap();
141 let store = EpochSettingsStore::new(Arc::new(connection), None);
142
143 store.prune(Epoch(100)).await.unwrap();
144
145 let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
146 let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
147
148 assert!(
149 epoch1_params.is_some(),
150 "Epoch settings at epoch 1 should have been pruned",
151 );
152 assert!(
153 epoch2_params.is_some(),
154 "Epoch settings at epoch 2 should still exist",
155 );
156 }
157
158 #[tokio::test]
159 async fn save_epoch_settings_stores_in_database() {
160 let connection = main_db_connection().unwrap();
161
162 let store = EpochSettingsStore::new(Arc::new(connection), None);
163
164 store
165 .save_epoch_settings(Epoch(2), AggregatorEpochSettings::dummy())
166 .await
167 .expect("saving epoch settings should not fails");
168 {
169 let epoch_settings = store.get_epoch_settings(Epoch(1)).await.unwrap();
170 assert_eq!(None, epoch_settings);
171 }
172 {
173 let epoch_settings = store.get_epoch_settings(Epoch(2)).await.unwrap().unwrap();
174 assert_eq!(AggregatorEpochSettings::dummy(), epoch_settings);
175 }
176 {
177 let epoch_settings = store.get_epoch_settings(Epoch(3)).await.unwrap();
178 assert_eq!(None, epoch_settings);
179 }
180 }
181
182 #[tokio::test]
183 async fn save_epoch_settings_does_not_replace_existing_value_in_database() {
184 let connection = main_db_connection().unwrap();
185
186 let store = EpochSettingsStore::new(Arc::new(connection), None);
187 let expected_epoch_settings = AggregatorEpochSettings {
188 protocol_parameters: ProtocolParameters::new(1, 1, 0.5),
189 ..Dummy::dummy()
190 };
191
192 store
193 .save_epoch_settings(Epoch(2), expected_epoch_settings.clone())
194 .await
195 .expect("saving epoch settings should not fails");
196
197 store
198 .save_epoch_settings(
199 Epoch(2),
200 AggregatorEpochSettings {
201 protocol_parameters: ProtocolParameters::new(2, 2, 1.5),
202 ..Dummy::dummy()
203 },
204 )
205 .await
206 .expect("saving epoch settings should not fails");
207
208 let epoch_settings = store.get_epoch_settings(Epoch(2)).await.unwrap().unwrap();
209 assert_eq!(expected_epoch_settings, epoch_settings);
210 }
211}