mithril_aggregator/database/repository/
epoch_settings_store.rs1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::entities::{Epoch, ProtocolParameters};
7use mithril_common::StdResult;
8use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
9
10use crate::database::query::{
11 DeleteEpochSettingsQuery, GetEpochSettingsQuery, UpdateEpochSettingsQuery,
12};
13use crate::entities::AggregatorEpochSettings;
14use crate::services::EpochPruningTask;
15use crate::EpochSettingsStorer;
16
17pub struct EpochSettingsStore {
19 connection: Arc<SqliteConnection>,
20
21 retention_limit: Option<u64>,
24}
25
26impl EpochSettingsStore {
27 pub fn new(connection: Arc<SqliteConnection>, retention_limit: Option<u64>) -> Self {
29 Self {
30 connection,
31 retention_limit,
32 }
33 }
34}
35
36#[async_trait]
37impl EpochSettingsStorer for EpochSettingsStore {
38 async fn save_epoch_settings(
39 &self,
40 epoch: Epoch,
41 epoch_settings: AggregatorEpochSettings,
42 ) -> StdResult<Option<AggregatorEpochSettings>> {
43 let epoch_settings_record = self
44 .connection
45 .fetch_first(UpdateEpochSettingsQuery::one(epoch, epoch_settings))
46 .with_context(|| format!("persist epoch settings failure for epoch {epoch:?}"))?
47 .unwrap_or_else(|| panic!("No entity returned by the persister, epoch = {epoch:?}"));
48
49 Ok(Some(epoch_settings_record.into()))
50 }
51
52 async fn get_protocol_parameters(&self, epoch: Epoch) -> StdResult<Option<ProtocolParameters>> {
53 Ok(self
54 .get_epoch_settings(epoch)
55 .await?
56 .map(|epoch_settings| epoch_settings.protocol_parameters))
57 }
58
59 async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>> {
60 let mut cursor = self
61 .connection
62 .fetch(GetEpochSettingsQuery::by_epoch(epoch)?)
63 .with_context(|| format!("Could not get epoch settings: epoch = {epoch:?}"))?;
64
65 if let Some(epoch_settings_record) = cursor.next() {
66 return Ok(Some(epoch_settings_record.into()));
67 }
68 Ok(None)
69 }
70}
71
72#[async_trait]
73impl EpochPruningTask for EpochSettingsStore {
74 fn pruned_data(&self) -> &'static str {
75 "Epoch settings"
76 }
77
78 async fn prune(&self, epoch: Epoch) -> StdResult<()> {
80 if let Some(threshold) = self.retention_limit {
81 self.connection
82 .apply(DeleteEpochSettingsQuery::below_epoch_threshold(
83 epoch - threshold,
84 ))?;
85 }
86 Ok(())
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use crate::database::test_helper::{insert_epoch_settings, main_db_connection};
93
94 use super::*;
95
96 #[tokio::test]
97 async fn prune_epoch_settings_older_than_threshold() {
98 const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5;
99
100 let connection = main_db_connection().unwrap();
101 insert_epoch_settings(&connection, &[1, 2]).unwrap();
102 let store = EpochSettingsStore::new(
103 Arc::new(connection),
104 Some(EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD),
105 );
106
107 store
108 .prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD)
109 .await
110 .unwrap();
111
112 let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
113 let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
114
115 assert!(
116 epoch1_params.is_none(),
117 "Epoch settings at epoch 1 should have been pruned",
118 );
119 assert!(
120 epoch2_params.is_some(),
121 "Epoch settings at epoch 2 should still exist",
122 );
123 }
124
125 #[tokio::test]
126 async fn without_threshold_nothing_is_pruned() {
127 let connection = main_db_connection().unwrap();
128 insert_epoch_settings(&connection, &[1, 2]).unwrap();
129 let store = EpochSettingsStore::new(Arc::new(connection), None);
130
131 store.prune(Epoch(100)).await.unwrap();
132
133 let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
134 let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
135
136 assert!(
137 epoch1_params.is_some(),
138 "Epoch settings at epoch 1 should have been pruned",
139 );
140 assert!(
141 epoch2_params.is_some(),
142 "Epoch settings at epoch 2 should still exist",
143 );
144 }
145
146 #[tokio::test]
147 async fn save_epoch_settings_stores_in_database() {
148 let connection = main_db_connection().unwrap();
149
150 let store = EpochSettingsStore::new(Arc::new(connection), None);
151
152 store
153 .save_epoch_settings(Epoch(2), AggregatorEpochSettings::dummy())
154 .await
155 .expect("saving epoch settings should not fails");
156 {
157 let epoch_settings = store.get_epoch_settings(Epoch(1)).await.unwrap();
158 assert_eq!(None, epoch_settings);
159 }
160 {
161 let epoch_settings = store.get_epoch_settings(Epoch(2)).await.unwrap().unwrap();
162 assert_eq!(AggregatorEpochSettings::dummy(), epoch_settings);
163 }
164 {
165 let epoch_settings = store.get_epoch_settings(Epoch(3)).await.unwrap();
166 assert_eq!(None, epoch_settings);
167 }
168 }
169}