mithril_signer/database/repository/
signed_beacon_repository.rs

1use async_trait::async_trait;
2use std::sync::Arc;
3
4use mithril_common::entities::{Epoch, SignedEntityType};
5use mithril_common::StdResult;
6use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
7
8use crate::database::query::{
9    DeleteSignedBeaconRecordQuery, GetSignedBeaconQuery, InsertSignedBeaconRecordQuery,
10};
11use crate::database::record::SignedBeaconRecord;
12use crate::entities::BeaconToSign;
13use crate::services::{EpochPruningTask, SignedBeaconStore};
14
15/// A [SignedBeaconStore] implementation using SQLite.
16pub struct SignedBeaconRepository {
17    connection: Arc<SqliteConnection>,
18    store_retention_limit: Option<u64>,
19}
20
21impl SignedBeaconRepository {
22    /// Create a new instance of the `SignedBeaconRepository`.
23    pub fn new(connection: Arc<SqliteConnection>, store_retention_limit: Option<u64>) -> Self {
24        Self {
25            connection,
26            store_retention_limit,
27        }
28    }
29
30    /// Get the last signed beacon.
31    pub fn get_last(&self) -> StdResult<Option<SignedBeaconRecord>> {
32        self.connection.fetch_first(GetSignedBeaconQuery::all())
33    }
34
35    /// Prune all signed beacons that have an epoch below the given threshold.
36    pub fn prune_below_epoch(&self, epoch: Epoch) -> StdResult<()> {
37        let _ = self
38            .connection
39            .fetch_first(DeleteSignedBeaconRecordQuery::below_epoch_threshold(epoch))?;
40        Ok(())
41    }
42}
43
44#[async_trait]
45impl SignedBeaconStore for SignedBeaconRepository {
46    async fn filter_out_already_signed_entities(
47        &self,
48        entities: Vec<SignedEntityType>,
49    ) -> StdResult<Vec<SignedEntityType>> {
50        let already_signed_entities: Vec<SignedEntityType> = self
51            .connection
52            .fetch(GetSignedBeaconQuery::by_signed_entities(&entities)?)?
53            .map(|record| record.signed_entity_type)
54            .collect();
55
56        Ok(entities
57            .into_iter()
58            .filter(|e| !already_signed_entities.contains(e))
59            .collect())
60    }
61
62    async fn mark_beacon_as_signed(&self, entity: &BeaconToSign) -> StdResult<()> {
63        let record = entity.clone().into();
64        let _ = self
65            .connection
66            .fetch_first(InsertSignedBeaconRecordQuery::one(record)?)?;
67
68        Ok(())
69    }
70}
71
72#[async_trait]
73impl EpochPruningTask for SignedBeaconRepository {
74    fn pruned_data(&self) -> &'static str {
75        "Signed Beacon"
76    }
77
78    async fn prune(&self, current_epoch: Epoch) -> StdResult<()> {
79        match self
80            .store_retention_limit
81            .map(|limit| current_epoch - limit)
82        {
83            Some(threshold) if *threshold > 0 => self.prune_below_epoch(threshold),
84            _ => Ok(()),
85        }
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use chrono::Utc;
92
93    use mithril_common::entities::{
94        BlockNumber, Epoch, SignedEntityConfig, SignedEntityTypeDiscriminants, TimePoint,
95    };
96    use mithril_persistence::sqlite::ConnectionExtensions;
97
98    use crate::database::query::GetSignedBeaconQuery;
99    use crate::database::record::SignedBeaconRecord;
100    use crate::database::test_helper::{insert_signed_beacons, main_db_connection};
101
102    use super::*;
103
104    fn all_signed_entity_type_for(time_point: &TimePoint) -> Vec<SignedEntityType> {
105        let config = SignedEntityConfig {
106            allowed_discriminants: SignedEntityTypeDiscriminants::all(),
107            ..SignedEntityConfig::dummy()
108        };
109        config.list_allowed_signed_entity_types(time_point).unwrap()
110    }
111
112    #[test]
113    fn get_last_stored_signed_beacon() {
114        let connection = Arc::new(main_db_connection().unwrap());
115        let repository = SignedBeaconRepository::new(connection.clone(), None);
116
117        let last_signed_beacon = repository.get_last().unwrap();
118        assert_eq!(None, last_signed_beacon);
119
120        insert_signed_beacons(
121            &connection,
122            vec![SignedBeaconRecord::fake(
123                Epoch(1941),
124                SignedEntityType::MithrilStakeDistribution(Epoch(1941)),
125            )],
126        );
127
128        let last_signed_beacon = repository.get_last().unwrap();
129        assert_eq!(
130            Some(SignedBeaconRecord::fake(
131                Epoch(1941),
132                SignedEntityType::MithrilStakeDistribution(Epoch(1941)),
133            )),
134            last_signed_beacon
135        );
136
137        insert_signed_beacons(
138            &connection,
139            SignedBeaconRecord::fakes(&[
140                (
141                    Epoch(1942),
142                    vec![SignedEntityType::MithrilStakeDistribution(Epoch(1942))],
143                ),
144                (
145                    Epoch(1943),
146                    vec![SignedEntityType::MithrilStakeDistribution(Epoch(1943))],
147                ),
148            ]),
149        );
150
151        let last_signed_beacon = repository.get_last().unwrap();
152        assert_eq!(
153            Some(SignedBeaconRecord::fake(
154                Epoch(1943),
155                SignedEntityType::MithrilStakeDistribution(Epoch(1943)),
156            )),
157            last_signed_beacon
158        );
159    }
160
161    #[tokio::test]
162    async fn filter_out_nothing_if_nothing_was_previously_signed() {
163        let connection = Arc::new(main_db_connection().unwrap());
164        let repository = SignedBeaconRepository::new(connection.clone(), None);
165
166        let to_filter = all_signed_entity_type_for(&TimePoint::dummy());
167        let available_entities = repository
168            .filter_out_already_signed_entities(to_filter.clone())
169            .await
170            .unwrap();
171
172        assert_eq!(to_filter, available_entities);
173    }
174
175    #[tokio::test]
176    async fn filter_out_nothing_if_previously_signed_entities_doesnt_match_passed_entities() {
177        let connection = Arc::new(main_db_connection().unwrap());
178        let repository = SignedBeaconRepository::new(connection.clone(), None);
179
180        let time_point = TimePoint::dummy();
181        insert_signed_beacons(
182            &connection,
183            SignedBeaconRecord::fakes(&[(
184                Epoch(1941),
185                vec![SignedEntityType::MithrilStakeDistribution(
186                    time_point.epoch - 2,
187                )],
188            )]),
189        );
190        let to_filter = all_signed_entity_type_for(&time_point);
191
192        let available_entities = repository
193            .filter_out_already_signed_entities(to_filter.clone())
194            .await
195            .unwrap();
196        assert_eq!(to_filter, available_entities);
197    }
198
199    #[tokio::test]
200    async fn filter_out_everything_if_previously_signed_entities_match_all_passed_entities() {
201        let connection = Arc::new(main_db_connection().unwrap());
202        let repository = SignedBeaconRepository::new(connection.clone(), None);
203
204        let to_filter = all_signed_entity_type_for(&TimePoint::dummy());
205        insert_signed_beacons(
206            &connection,
207            to_filter
208                .iter()
209                .map(|entity| SignedBeaconRecord::fake(Epoch(4872), entity.clone()))
210                .collect(),
211        );
212
213        let available_entities = repository
214            .filter_out_already_signed_entities(to_filter.clone())
215            .await
216            .unwrap();
217        assert_eq!(Vec::<SignedEntityType>::new(), available_entities);
218    }
219
220    #[tokio::test]
221    async fn filter_out_partially_if_some_previously_signed_entities_match_passed_entities() {
222        let connection = Arc::new(main_db_connection().unwrap());
223        let repository = SignedBeaconRepository::new(connection.clone(), None);
224
225        let time_point = TimePoint::dummy();
226        let signed_beacons = [
227            SignedEntityType::MithrilStakeDistribution(time_point.epoch),
228            SignedEntityType::CardanoTransactions(
229                time_point.epoch,
230                time_point.chain_point.block_number,
231            ),
232        ];
233        insert_signed_beacons(
234            &connection,
235            signed_beacons
236                .iter()
237                .map(|entity| SignedBeaconRecord::fake(Epoch(4872), entity.clone()))
238                .collect(),
239        );
240
241        let available_entities = repository
242            .filter_out_already_signed_entities(vec![
243                SignedEntityType::MithrilStakeDistribution(time_point.epoch),
244                SignedEntityType::CardanoStakeDistribution(time_point.epoch),
245                SignedEntityType::CardanoTransactions(
246                    time_point.epoch,
247                    time_point.chain_point.block_number,
248                ),
249                SignedEntityType::CardanoStakeDistribution(time_point.epoch + 10),
250            ])
251            .await
252            .unwrap();
253
254        assert_eq!(
255            vec![
256                SignedEntityType::CardanoStakeDistribution(time_point.epoch),
257                SignedEntityType::CardanoStakeDistribution(time_point.epoch + 10),
258            ],
259            available_entities
260        );
261    }
262
263    #[tokio::test]
264    async fn mark_beacon_as_signed() {
265        let connection = Arc::new(main_db_connection().unwrap());
266        let repository = SignedBeaconRepository::new(connection.clone(), None);
267
268        let beacon_to_sign = BeaconToSign {
269            epoch: Epoch(13),
270            signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(13)),
271            initiated_at: Utc::now(),
272        };
273
274        let signed_beacons: Vec<SignedBeaconRecord> = connection
275            .fetch_collect(GetSignedBeaconQuery::all())
276            .unwrap();
277        assert_eq!(Vec::<SignedBeaconRecord>::new(), signed_beacons);
278
279        repository
280            .mark_beacon_as_signed(&beacon_to_sign)
281            .await
282            .unwrap();
283
284        let signed_beacon = connection
285            .fetch_first(GetSignedBeaconQuery::all())
286            .unwrap()
287            .expect("A signed beacon should have been inserted");
288        assert_eq!(beacon_to_sign, signed_beacon);
289    }
290
291    #[tokio::test]
292    async fn test_dont_execute_pruning_tasks_if_no_retention_limit_set() {
293        let connection = Arc::new(main_db_connection().unwrap());
294        let repository = SignedBeaconRepository::new(connection.clone(), None);
295        insert_signed_beacons(
296            &connection,
297            SignedBeaconRecord::fakes(&[(
298                Epoch(8),
299                vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
300            )]),
301        );
302
303        EpochPruningTask::prune(&repository, Epoch(1000))
304            .await
305            .unwrap();
306
307        let cursor = connection.fetch(GetSignedBeaconQuery::all()).unwrap();
308        assert_eq!(1, cursor.count(),);
309    }
310
311    #[tokio::test]
312    async fn test_dont_execute_pruning_tasks_if_current_epoch_minus_retention_limit_is_0() {
313        let connection = Arc::new(main_db_connection().unwrap());
314        let repository = SignedBeaconRepository::new(connection.clone(), Some(10));
315        insert_signed_beacons(
316            &connection,
317            SignedBeaconRecord::fakes(&[(
318                Epoch(8),
319                vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
320            )]),
321        );
322
323        EpochPruningTask::prune(&repository, Epoch(9))
324            .await
325            .unwrap();
326
327        let cursor = connection.fetch(GetSignedBeaconQuery::all()).unwrap();
328        assert_eq!(1, cursor.count(),);
329    }
330
331    #[tokio::test]
332    async fn test_prune_task_substract_set_retention_limit_to_given_epoch() {
333        let connection = Arc::new(main_db_connection().unwrap());
334        let repository = SignedBeaconRepository::new(connection.clone(), Some(10));
335        insert_signed_beacons(
336            &connection,
337            SignedBeaconRecord::fakes(&[
338                (
339                    Epoch(7),
340                    vec![
341                        SignedEntityType::MithrilStakeDistribution(Epoch(7)),
342                        SignedEntityType::CardanoTransactions(Epoch(7), BlockNumber(12)),
343                    ],
344                ),
345                (
346                    Epoch(8),
347                    vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
348                ),
349            ]),
350        );
351
352        EpochPruningTask::prune(&repository, Epoch(18))
353            .await
354            .unwrap();
355
356        let signed_beacons: Vec<SignedBeaconRecord> = connection
357            .fetch_collect(GetSignedBeaconQuery::all())
358            .unwrap();
359        assert_eq!(
360            vec![SignedBeaconRecord::fake(
361                Epoch(8),
362                SignedEntityType::MithrilStakeDistribution(Epoch(8))
363            )],
364            signed_beacons
365        );
366    }
367}