mithril_signer/database/repository/
signed_beacon_repository.rs1use async_trait::async_trait;
2use std::sync::Arc;
3
4use mithril_common::entities::{Epoch, SignedEntityType};
5use mithril_common::StdResult;
6use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
7
8use crate::database::query::{
9 DeleteSignedBeaconRecordQuery, GetSignedBeaconQuery, InsertSignedBeaconRecordQuery,
10};
11use crate::database::record::SignedBeaconRecord;
12use crate::entities::BeaconToSign;
13use crate::services::{EpochPruningTask, SignedBeaconStore};
14
15pub struct SignedBeaconRepository {
17 connection: Arc<SqliteConnection>,
18 store_retention_limit: Option<u64>,
19}
20
21impl SignedBeaconRepository {
22 pub fn new(connection: Arc<SqliteConnection>, store_retention_limit: Option<u64>) -> Self {
24 Self {
25 connection,
26 store_retention_limit,
27 }
28 }
29
30 pub fn get_last(&self) -> StdResult<Option<SignedBeaconRecord>> {
32 self.connection.fetch_first(GetSignedBeaconQuery::all())
33 }
34
35 pub fn prune_below_epoch(&self, epoch: Epoch) -> StdResult<()> {
37 let _ = self
38 .connection
39 .fetch_first(DeleteSignedBeaconRecordQuery::below_epoch_threshold(epoch))?;
40 Ok(())
41 }
42}
43
44#[async_trait]
45impl SignedBeaconStore for SignedBeaconRepository {
46 async fn filter_out_already_signed_entities(
47 &self,
48 entities: Vec<SignedEntityType>,
49 ) -> StdResult<Vec<SignedEntityType>> {
50 let already_signed_entities: Vec<SignedEntityType> = self
51 .connection
52 .fetch(GetSignedBeaconQuery::by_signed_entities(&entities)?)?
53 .map(|record| record.signed_entity_type)
54 .collect();
55
56 Ok(entities
57 .into_iter()
58 .filter(|e| !already_signed_entities.contains(e))
59 .collect())
60 }
61
62 async fn mark_beacon_as_signed(&self, entity: &BeaconToSign) -> StdResult<()> {
63 let record = entity.clone().into();
64 let _ = self
65 .connection
66 .fetch_first(InsertSignedBeaconRecordQuery::one(record)?)?;
67
68 Ok(())
69 }
70}
71
72#[async_trait]
73impl EpochPruningTask for SignedBeaconRepository {
74 fn pruned_data(&self) -> &'static str {
75 "Signed Beacon"
76 }
77
78 async fn prune(&self, current_epoch: Epoch) -> StdResult<()> {
79 match self
80 .store_retention_limit
81 .map(|limit| current_epoch - limit)
82 {
83 Some(threshold) if *threshold > 0 => self.prune_below_epoch(threshold),
84 _ => Ok(()),
85 }
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use chrono::Utc;
92
93 use mithril_common::entities::{
94 BlockNumber, Epoch, SignedEntityConfig, SignedEntityTypeDiscriminants, TimePoint,
95 };
96 use mithril_persistence::sqlite::ConnectionExtensions;
97
98 use crate::database::query::GetSignedBeaconQuery;
99 use crate::database::record::SignedBeaconRecord;
100 use crate::database::test_helper::{insert_signed_beacons, main_db_connection};
101
102 use super::*;
103
104 fn all_signed_entity_type_for(time_point: &TimePoint) -> Vec<SignedEntityType> {
105 let config = SignedEntityConfig {
106 allowed_discriminants: SignedEntityTypeDiscriminants::all(),
107 ..SignedEntityConfig::dummy()
108 };
109 config.list_allowed_signed_entity_types(time_point).unwrap()
110 }
111
112 #[test]
113 fn get_last_stored_signed_beacon() {
114 let connection = Arc::new(main_db_connection().unwrap());
115 let repository = SignedBeaconRepository::new(connection.clone(), None);
116
117 let last_signed_beacon = repository.get_last().unwrap();
118 assert_eq!(None, last_signed_beacon);
119
120 insert_signed_beacons(
121 &connection,
122 vec![SignedBeaconRecord::fake(
123 Epoch(1941),
124 SignedEntityType::MithrilStakeDistribution(Epoch(1941)),
125 )],
126 );
127
128 let last_signed_beacon = repository.get_last().unwrap();
129 assert_eq!(
130 Some(SignedBeaconRecord::fake(
131 Epoch(1941),
132 SignedEntityType::MithrilStakeDistribution(Epoch(1941)),
133 )),
134 last_signed_beacon
135 );
136
137 insert_signed_beacons(
138 &connection,
139 SignedBeaconRecord::fakes(&[
140 (
141 Epoch(1942),
142 vec![SignedEntityType::MithrilStakeDistribution(Epoch(1942))],
143 ),
144 (
145 Epoch(1943),
146 vec![SignedEntityType::MithrilStakeDistribution(Epoch(1943))],
147 ),
148 ]),
149 );
150
151 let last_signed_beacon = repository.get_last().unwrap();
152 assert_eq!(
153 Some(SignedBeaconRecord::fake(
154 Epoch(1943),
155 SignedEntityType::MithrilStakeDistribution(Epoch(1943)),
156 )),
157 last_signed_beacon
158 );
159 }
160
161 #[tokio::test]
162 async fn filter_out_nothing_if_nothing_was_previously_signed() {
163 let connection = Arc::new(main_db_connection().unwrap());
164 let repository = SignedBeaconRepository::new(connection.clone(), None);
165
166 let to_filter = all_signed_entity_type_for(&TimePoint::dummy());
167 let available_entities = repository
168 .filter_out_already_signed_entities(to_filter.clone())
169 .await
170 .unwrap();
171
172 assert_eq!(to_filter, available_entities);
173 }
174
175 #[tokio::test]
176 async fn filter_out_nothing_if_previously_signed_entities_doesnt_match_passed_entities() {
177 let connection = Arc::new(main_db_connection().unwrap());
178 let repository = SignedBeaconRepository::new(connection.clone(), None);
179
180 let time_point = TimePoint::dummy();
181 insert_signed_beacons(
182 &connection,
183 SignedBeaconRecord::fakes(&[(
184 Epoch(1941),
185 vec![SignedEntityType::MithrilStakeDistribution(
186 time_point.epoch - 2,
187 )],
188 )]),
189 );
190 let to_filter = all_signed_entity_type_for(&time_point);
191
192 let available_entities = repository
193 .filter_out_already_signed_entities(to_filter.clone())
194 .await
195 .unwrap();
196 assert_eq!(to_filter, available_entities);
197 }
198
199 #[tokio::test]
200 async fn filter_out_everything_if_previously_signed_entities_match_all_passed_entities() {
201 let connection = Arc::new(main_db_connection().unwrap());
202 let repository = SignedBeaconRepository::new(connection.clone(), None);
203
204 let to_filter = all_signed_entity_type_for(&TimePoint::dummy());
205 insert_signed_beacons(
206 &connection,
207 to_filter
208 .iter()
209 .map(|entity| SignedBeaconRecord::fake(Epoch(4872), entity.clone()))
210 .collect(),
211 );
212
213 let available_entities = repository
214 .filter_out_already_signed_entities(to_filter.clone())
215 .await
216 .unwrap();
217 assert_eq!(Vec::<SignedEntityType>::new(), available_entities);
218 }
219
220 #[tokio::test]
221 async fn filter_out_partially_if_some_previously_signed_entities_match_passed_entities() {
222 let connection = Arc::new(main_db_connection().unwrap());
223 let repository = SignedBeaconRepository::new(connection.clone(), None);
224
225 let time_point = TimePoint::dummy();
226 let signed_beacons = [
227 SignedEntityType::MithrilStakeDistribution(time_point.epoch),
228 SignedEntityType::CardanoTransactions(
229 time_point.epoch,
230 time_point.chain_point.block_number,
231 ),
232 ];
233 insert_signed_beacons(
234 &connection,
235 signed_beacons
236 .iter()
237 .map(|entity| SignedBeaconRecord::fake(Epoch(4872), entity.clone()))
238 .collect(),
239 );
240
241 let available_entities = repository
242 .filter_out_already_signed_entities(vec![
243 SignedEntityType::MithrilStakeDistribution(time_point.epoch),
244 SignedEntityType::CardanoStakeDistribution(time_point.epoch),
245 SignedEntityType::CardanoTransactions(
246 time_point.epoch,
247 time_point.chain_point.block_number,
248 ),
249 SignedEntityType::CardanoStakeDistribution(time_point.epoch + 10),
250 ])
251 .await
252 .unwrap();
253
254 assert_eq!(
255 vec![
256 SignedEntityType::CardanoStakeDistribution(time_point.epoch),
257 SignedEntityType::CardanoStakeDistribution(time_point.epoch + 10),
258 ],
259 available_entities
260 );
261 }
262
263 #[tokio::test]
264 async fn mark_beacon_as_signed() {
265 let connection = Arc::new(main_db_connection().unwrap());
266 let repository = SignedBeaconRepository::new(connection.clone(), None);
267
268 let beacon_to_sign = BeaconToSign {
269 epoch: Epoch(13),
270 signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(13)),
271 initiated_at: Utc::now(),
272 };
273
274 let signed_beacons: Vec<SignedBeaconRecord> = connection
275 .fetch_collect(GetSignedBeaconQuery::all())
276 .unwrap();
277 assert_eq!(Vec::<SignedBeaconRecord>::new(), signed_beacons);
278
279 repository
280 .mark_beacon_as_signed(&beacon_to_sign)
281 .await
282 .unwrap();
283
284 let signed_beacon = connection
285 .fetch_first(GetSignedBeaconQuery::all())
286 .unwrap()
287 .expect("A signed beacon should have been inserted");
288 assert_eq!(beacon_to_sign, signed_beacon);
289 }
290
291 #[tokio::test]
292 async fn test_dont_execute_pruning_tasks_if_no_retention_limit_set() {
293 let connection = Arc::new(main_db_connection().unwrap());
294 let repository = SignedBeaconRepository::new(connection.clone(), None);
295 insert_signed_beacons(
296 &connection,
297 SignedBeaconRecord::fakes(&[(
298 Epoch(8),
299 vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
300 )]),
301 );
302
303 EpochPruningTask::prune(&repository, Epoch(1000))
304 .await
305 .unwrap();
306
307 let cursor = connection.fetch(GetSignedBeaconQuery::all()).unwrap();
308 assert_eq!(1, cursor.count(),);
309 }
310
311 #[tokio::test]
312 async fn test_dont_execute_pruning_tasks_if_current_epoch_minus_retention_limit_is_0() {
313 let connection = Arc::new(main_db_connection().unwrap());
314 let repository = SignedBeaconRepository::new(connection.clone(), Some(10));
315 insert_signed_beacons(
316 &connection,
317 SignedBeaconRecord::fakes(&[(
318 Epoch(8),
319 vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
320 )]),
321 );
322
323 EpochPruningTask::prune(&repository, Epoch(9))
324 .await
325 .unwrap();
326
327 let cursor = connection.fetch(GetSignedBeaconQuery::all()).unwrap();
328 assert_eq!(1, cursor.count(),);
329 }
330
331 #[tokio::test]
332 async fn test_prune_task_substract_set_retention_limit_to_given_epoch() {
333 let connection = Arc::new(main_db_connection().unwrap());
334 let repository = SignedBeaconRepository::new(connection.clone(), Some(10));
335 insert_signed_beacons(
336 &connection,
337 SignedBeaconRecord::fakes(&[
338 (
339 Epoch(7),
340 vec![
341 SignedEntityType::MithrilStakeDistribution(Epoch(7)),
342 SignedEntityType::CardanoTransactions(Epoch(7), BlockNumber(12)),
343 ],
344 ),
345 (
346 Epoch(8),
347 vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
348 ),
349 ]),
350 );
351
352 EpochPruningTask::prune(&repository, Epoch(18))
353 .await
354 .unwrap();
355
356 let signed_beacons: Vec<SignedBeaconRecord> = connection
357 .fetch_collect(GetSignedBeaconQuery::all())
358 .unwrap();
359 assert_eq!(
360 vec![SignedBeaconRecord::fake(
361 Epoch(8),
362 SignedEntityType::MithrilStakeDistribution(Epoch(8))
363 )],
364 signed_beacons
365 );
366 }
367}