mithril_signer/database/repository/
signed_beacon_repository.rs1use async_trait::async_trait;
2use std::sync::Arc;
3
4use mithril_common::StdResult;
5use mithril_common::entities::{Epoch, SignedEntityType};
6use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
7
8use crate::database::query::{
9 DeleteSignedBeaconRecordQuery, GetSignedBeaconQuery, InsertSignedBeaconRecordQuery,
10};
11use crate::database::record::SignedBeaconRecord;
12use crate::entities::BeaconToSign;
13use crate::services::{EpochPruningTask, SignedBeaconStore};
14
15pub struct SignedBeaconRepository {
17 connection: Arc<SqliteConnection>,
18 store_retention_limit: Option<u64>,
19}
20
21impl SignedBeaconRepository {
22 pub fn new(connection: Arc<SqliteConnection>, store_retention_limit: Option<u64>) -> Self {
24 Self {
25 connection,
26 store_retention_limit,
27 }
28 }
29
30 pub fn get_last(&self) -> StdResult<Option<SignedBeaconRecord>> {
32 self.connection.fetch_first(GetSignedBeaconQuery::all())
33 }
34
35 pub fn prune_below_epoch(&self, epoch: Epoch) -> StdResult<()> {
37 let _ = self
38 .connection
39 .fetch_first(DeleteSignedBeaconRecordQuery::below_epoch_threshold(epoch))?;
40 Ok(())
41 }
42}
43
44#[async_trait]
45impl SignedBeaconStore for SignedBeaconRepository {
46 async fn filter_out_already_signed_entities(
47 &self,
48 entities: Vec<SignedEntityType>,
49 ) -> StdResult<Vec<SignedEntityType>> {
50 let already_signed_entities: Vec<SignedEntityType> = self
51 .connection
52 .fetch(GetSignedBeaconQuery::by_signed_entities(&entities)?)?
53 .map(|record| record.signed_entity_type)
54 .collect();
55
56 Ok(entities
57 .into_iter()
58 .filter(|e| !already_signed_entities.contains(e))
59 .collect())
60 }
61
62 async fn mark_beacon_as_signed(&self, entity: &BeaconToSign) -> StdResult<()> {
63 let record = entity.clone().into();
64 let _ = self
65 .connection
66 .fetch_first(InsertSignedBeaconRecordQuery::one(record)?)?;
67
68 Ok(())
69 }
70}
71
72#[async_trait]
73impl EpochPruningTask for SignedBeaconRepository {
74 fn pruned_data(&self) -> &'static str {
75 "Signed Beacon"
76 }
77
78 async fn prune(&self, current_epoch: Epoch) -> StdResult<()> {
79 match self.store_retention_limit.map(|limit| current_epoch - limit) {
80 Some(threshold) if *threshold > 0 => self.prune_below_epoch(threshold),
81 _ => Ok(()),
82 }
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use chrono::Utc;
89
90 use mithril_common::entities::{
91 BlockNumber, Epoch, SignedEntityConfig, SignedEntityTypeDiscriminants, TimePoint,
92 };
93 use mithril_common::test::double::Dummy;
94 use mithril_persistence::sqlite::ConnectionExtensions;
95
96 use crate::database::query::GetSignedBeaconQuery;
97 use crate::database::record::SignedBeaconRecord;
98 use crate::database::test_helper::{insert_signed_beacons, main_db_connection};
99
100 use super::*;
101
102 fn all_signed_entity_type_for(time_point: &TimePoint) -> Vec<SignedEntityType> {
103 let config = SignedEntityConfig {
104 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
105 ..SignedEntityConfig::dummy()
106 };
107 config.list_allowed_signed_entity_types(time_point).unwrap()
108 }
109
110 #[test]
111 fn get_last_stored_signed_beacon() {
112 let connection = Arc::new(main_db_connection().unwrap());
113 let repository = SignedBeaconRepository::new(connection.clone(), None);
114
115 let last_signed_beacon = repository.get_last().unwrap();
116 assert_eq!(None, last_signed_beacon);
117
118 insert_signed_beacons(
119 &connection,
120 vec![SignedBeaconRecord::fake(
121 Epoch(1941),
122 SignedEntityType::MithrilStakeDistribution(Epoch(1941)),
123 )],
124 );
125
126 let last_signed_beacon = repository.get_last().unwrap();
127 assert_eq!(
128 Some(SignedBeaconRecord::fake(
129 Epoch(1941),
130 SignedEntityType::MithrilStakeDistribution(Epoch(1941)),
131 )),
132 last_signed_beacon
133 );
134
135 insert_signed_beacons(
136 &connection,
137 SignedBeaconRecord::fakes(&[
138 (
139 Epoch(1942),
140 vec![SignedEntityType::MithrilStakeDistribution(Epoch(1942))],
141 ),
142 (
143 Epoch(1943),
144 vec![SignedEntityType::MithrilStakeDistribution(Epoch(1943))],
145 ),
146 ]),
147 );
148
149 let last_signed_beacon = repository.get_last().unwrap();
150 assert_eq!(
151 Some(SignedBeaconRecord::fake(
152 Epoch(1943),
153 SignedEntityType::MithrilStakeDistribution(Epoch(1943)),
154 )),
155 last_signed_beacon
156 );
157 }
158
159 #[tokio::test]
160 async fn filter_out_nothing_if_nothing_was_previously_signed() {
161 let connection = Arc::new(main_db_connection().unwrap());
162 let repository = SignedBeaconRepository::new(connection.clone(), None);
163
164 let to_filter = all_signed_entity_type_for(&TimePoint::dummy());
165 let available_entities = repository
166 .filter_out_already_signed_entities(to_filter.clone())
167 .await
168 .unwrap();
169
170 assert_eq!(to_filter, available_entities);
171 }
172
173 #[tokio::test]
174 async fn filter_out_nothing_if_previously_signed_entities_doesnt_match_passed_entities() {
175 let connection = Arc::new(main_db_connection().unwrap());
176 let repository = SignedBeaconRepository::new(connection.clone(), None);
177
178 let time_point = TimePoint::dummy();
179 insert_signed_beacons(
180 &connection,
181 SignedBeaconRecord::fakes(&[(
182 Epoch(1941),
183 vec![SignedEntityType::MithrilStakeDistribution(time_point.epoch - 2)],
184 )]),
185 );
186 let to_filter = all_signed_entity_type_for(&time_point);
187
188 let available_entities = repository
189 .filter_out_already_signed_entities(to_filter.clone())
190 .await
191 .unwrap();
192 assert_eq!(to_filter, available_entities);
193 }
194
195 #[tokio::test]
196 async fn filter_out_everything_if_previously_signed_entities_match_all_passed_entities() {
197 let connection = Arc::new(main_db_connection().unwrap());
198 let repository = SignedBeaconRepository::new(connection.clone(), None);
199
200 let to_filter = all_signed_entity_type_for(&TimePoint::dummy());
201 insert_signed_beacons(
202 &connection,
203 to_filter
204 .iter()
205 .map(|entity| SignedBeaconRecord::fake(Epoch(4872), entity.clone()))
206 .collect(),
207 );
208
209 let available_entities = repository
210 .filter_out_already_signed_entities(to_filter.clone())
211 .await
212 .unwrap();
213 assert_eq!(Vec::<SignedEntityType>::new(), available_entities);
214 }
215
216 #[tokio::test]
217 async fn filter_out_partially_if_some_previously_signed_entities_match_passed_entities() {
218 let connection = Arc::new(main_db_connection().unwrap());
219 let repository = SignedBeaconRepository::new(connection.clone(), None);
220
221 let time_point = TimePoint::dummy();
222 let signed_beacons = [
223 SignedEntityType::MithrilStakeDistribution(time_point.epoch),
224 SignedEntityType::CardanoTransactions(
225 time_point.epoch,
226 time_point.chain_point.block_number,
227 ),
228 ];
229 insert_signed_beacons(
230 &connection,
231 signed_beacons
232 .iter()
233 .map(|entity| SignedBeaconRecord::fake(Epoch(4872), entity.clone()))
234 .collect(),
235 );
236
237 let available_entities = repository
238 .filter_out_already_signed_entities(vec![
239 SignedEntityType::MithrilStakeDistribution(time_point.epoch),
240 SignedEntityType::CardanoStakeDistribution(time_point.epoch),
241 SignedEntityType::CardanoTransactions(
242 time_point.epoch,
243 time_point.chain_point.block_number,
244 ),
245 SignedEntityType::CardanoStakeDistribution(time_point.epoch + 10),
246 ])
247 .await
248 .unwrap();
249
250 assert_eq!(
251 vec![
252 SignedEntityType::CardanoStakeDistribution(time_point.epoch),
253 SignedEntityType::CardanoStakeDistribution(time_point.epoch + 10),
254 ],
255 available_entities
256 );
257 }
258
259 #[tokio::test]
260 async fn mark_beacon_as_signed() {
261 let connection = Arc::new(main_db_connection().unwrap());
262 let repository = SignedBeaconRepository::new(connection.clone(), None);
263
264 let beacon_to_sign = BeaconToSign {
265 epoch: Epoch(13),
266 signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(13)),
267 initiated_at: Utc::now(),
268 };
269
270 let signed_beacons: Vec<SignedBeaconRecord> =
271 connection.fetch_collect(GetSignedBeaconQuery::all()).unwrap();
272 assert_eq!(Vec::<SignedBeaconRecord>::new(), signed_beacons);
273
274 repository.mark_beacon_as_signed(&beacon_to_sign).await.unwrap();
275
276 let signed_beacon = connection
277 .fetch_first(GetSignedBeaconQuery::all())
278 .unwrap()
279 .expect("A signed beacon should have been inserted");
280 assert_eq!(beacon_to_sign, signed_beacon);
281 }
282
283 #[tokio::test]
284 async fn test_dont_execute_pruning_tasks_if_no_retention_limit_set() {
285 let connection = Arc::new(main_db_connection().unwrap());
286 let repository = SignedBeaconRepository::new(connection.clone(), None);
287 insert_signed_beacons(
288 &connection,
289 SignedBeaconRecord::fakes(&[(
290 Epoch(8),
291 vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
292 )]),
293 );
294
295 EpochPruningTask::prune(&repository, Epoch(1000)).await.unwrap();
296
297 let cursor = connection.fetch(GetSignedBeaconQuery::all()).unwrap();
298 assert_eq!(1, cursor.count(),);
299 }
300
301 #[tokio::test]
302 async fn test_dont_execute_pruning_tasks_if_current_epoch_minus_retention_limit_is_0() {
303 let connection = Arc::new(main_db_connection().unwrap());
304 let repository = SignedBeaconRepository::new(connection.clone(), Some(10));
305 insert_signed_beacons(
306 &connection,
307 SignedBeaconRecord::fakes(&[(
308 Epoch(8),
309 vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
310 )]),
311 );
312
313 EpochPruningTask::prune(&repository, Epoch(9)).await.unwrap();
314
315 let cursor = connection.fetch(GetSignedBeaconQuery::all()).unwrap();
316 assert_eq!(1, cursor.count(),);
317 }
318
319 #[tokio::test]
320 async fn test_prune_task_substract_set_retention_limit_to_given_epoch() {
321 let connection = Arc::new(main_db_connection().unwrap());
322 let repository = SignedBeaconRepository::new(connection.clone(), Some(10));
323 insert_signed_beacons(
324 &connection,
325 SignedBeaconRecord::fakes(&[
326 (
327 Epoch(7),
328 vec![
329 SignedEntityType::MithrilStakeDistribution(Epoch(7)),
330 SignedEntityType::CardanoTransactions(Epoch(7), BlockNumber(12)),
331 ],
332 ),
333 (
334 Epoch(8),
335 vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
336 ),
337 ]),
338 );
339
340 EpochPruningTask::prune(&repository, Epoch(18)).await.unwrap();
341
342 let signed_beacons: Vec<SignedBeaconRecord> =
343 connection.fetch_collect(GetSignedBeaconQuery::all()).unwrap();
344 assert_eq!(
345 vec![SignedBeaconRecord::fake(
346 Epoch(8),
347 SignedEntityType::MithrilStakeDistribution(Epoch(8))
348 )],
349 signed_beacons
350 );
351 }
352}