mithril_signer/database/repository/
signed_beacon_repository.rs

1use async_trait::async_trait;
2use std::sync::Arc;
3
4use mithril_common::StdResult;
5use mithril_common::entities::{Epoch, SignedEntityType};
6use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
7
8use crate::database::query::{
9    DeleteSignedBeaconRecordQuery, GetSignedBeaconQuery, InsertSignedBeaconRecordQuery,
10};
11use crate::database::record::SignedBeaconRecord;
12use crate::entities::BeaconToSign;
13use crate::services::{EpochPruningTask, SignedBeaconStore};
14
15/// A [SignedBeaconStore] implementation using SQLite.
16pub struct SignedBeaconRepository {
17    connection: Arc<SqliteConnection>,
18    store_retention_limit: Option<u64>,
19}
20
21impl SignedBeaconRepository {
22    /// Create a new instance of the `SignedBeaconRepository`.
23    pub fn new(connection: Arc<SqliteConnection>, store_retention_limit: Option<u64>) -> Self {
24        Self {
25            connection,
26            store_retention_limit,
27        }
28    }
29
30    /// Get the last signed beacon.
31    pub fn get_last(&self) -> StdResult<Option<SignedBeaconRecord>> {
32        self.connection.fetch_first(GetSignedBeaconQuery::all())
33    }
34
35    /// Prune all signed beacons that have an epoch below the given threshold.
36    pub fn prune_below_epoch(&self, epoch: Epoch) -> StdResult<()> {
37        let _ = self
38            .connection
39            .fetch_first(DeleteSignedBeaconRecordQuery::below_epoch_threshold(epoch))?;
40        Ok(())
41    }
42}
43
44#[async_trait]
45impl SignedBeaconStore for SignedBeaconRepository {
46    async fn filter_out_already_signed_entities(
47        &self,
48        entities: Vec<SignedEntityType>,
49    ) -> StdResult<Vec<SignedEntityType>> {
50        let already_signed_entities: Vec<SignedEntityType> = self
51            .connection
52            .fetch(GetSignedBeaconQuery::by_signed_entities(&entities)?)?
53            .map(|record| record.signed_entity_type)
54            .collect();
55
56        Ok(entities
57            .into_iter()
58            .filter(|e| !already_signed_entities.contains(e))
59            .collect())
60    }
61
62    async fn mark_beacon_as_signed(&self, entity: &BeaconToSign) -> StdResult<()> {
63        let record = entity.clone().into();
64        let _ = self
65            .connection
66            .fetch_first(InsertSignedBeaconRecordQuery::one(record)?)?;
67
68        Ok(())
69    }
70}
71
72#[async_trait]
73impl EpochPruningTask for SignedBeaconRepository {
74    fn pruned_data(&self) -> &'static str {
75        "Signed Beacon"
76    }
77
78    async fn prune(&self, current_epoch: Epoch) -> StdResult<()> {
79        match self.store_retention_limit.map(|limit| current_epoch - limit) {
80            Some(threshold) if *threshold > 0 => self.prune_below_epoch(threshold),
81            _ => Ok(()),
82        }
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use chrono::Utc;
89
90    use mithril_common::entities::{
91        BlockNumber, Epoch, SignedEntityConfig, SignedEntityTypeDiscriminants, TimePoint,
92    };
93    use mithril_common::test::double::Dummy;
94    use mithril_persistence::sqlite::ConnectionExtensions;
95
96    use crate::database::query::GetSignedBeaconQuery;
97    use crate::database::record::SignedBeaconRecord;
98    use crate::database::test_helper::{insert_signed_beacons, main_db_connection};
99
100    use super::*;
101
102    fn all_signed_entity_type_for(time_point: &TimePoint) -> Vec<SignedEntityType> {
103        let config = SignedEntityConfig {
104            allowed_discriminants: SignedEntityTypeDiscriminants::all(),
105            ..SignedEntityConfig::dummy()
106        };
107        config.list_allowed_signed_entity_types(time_point).unwrap()
108    }
109
110    #[test]
111    fn get_last_stored_signed_beacon() {
112        let connection = Arc::new(main_db_connection().unwrap());
113        let repository = SignedBeaconRepository::new(connection.clone(), None);
114
115        let last_signed_beacon = repository.get_last().unwrap();
116        assert_eq!(None, last_signed_beacon);
117
118        insert_signed_beacons(
119            &connection,
120            vec![SignedBeaconRecord::fake(
121                Epoch(1941),
122                SignedEntityType::MithrilStakeDistribution(Epoch(1941)),
123            )],
124        );
125
126        let last_signed_beacon = repository.get_last().unwrap();
127        assert_eq!(
128            Some(SignedBeaconRecord::fake(
129                Epoch(1941),
130                SignedEntityType::MithrilStakeDistribution(Epoch(1941)),
131            )),
132            last_signed_beacon
133        );
134
135        insert_signed_beacons(
136            &connection,
137            SignedBeaconRecord::fakes(&[
138                (
139                    Epoch(1942),
140                    vec![SignedEntityType::MithrilStakeDistribution(Epoch(1942))],
141                ),
142                (
143                    Epoch(1943),
144                    vec![SignedEntityType::MithrilStakeDistribution(Epoch(1943))],
145                ),
146            ]),
147        );
148
149        let last_signed_beacon = repository.get_last().unwrap();
150        assert_eq!(
151            Some(SignedBeaconRecord::fake(
152                Epoch(1943),
153                SignedEntityType::MithrilStakeDistribution(Epoch(1943)),
154            )),
155            last_signed_beacon
156        );
157    }
158
159    #[tokio::test]
160    async fn filter_out_nothing_if_nothing_was_previously_signed() {
161        let connection = Arc::new(main_db_connection().unwrap());
162        let repository = SignedBeaconRepository::new(connection.clone(), None);
163
164        let to_filter = all_signed_entity_type_for(&TimePoint::dummy());
165        let available_entities = repository
166            .filter_out_already_signed_entities(to_filter.clone())
167            .await
168            .unwrap();
169
170        assert_eq!(to_filter, available_entities);
171    }
172
173    #[tokio::test]
174    async fn filter_out_nothing_if_previously_signed_entities_doesnt_match_passed_entities() {
175        let connection = Arc::new(main_db_connection().unwrap());
176        let repository = SignedBeaconRepository::new(connection.clone(), None);
177
178        let time_point = TimePoint::dummy();
179        insert_signed_beacons(
180            &connection,
181            SignedBeaconRecord::fakes(&[(
182                Epoch(1941),
183                vec![SignedEntityType::MithrilStakeDistribution(time_point.epoch - 2)],
184            )]),
185        );
186        let to_filter = all_signed_entity_type_for(&time_point);
187
188        let available_entities = repository
189            .filter_out_already_signed_entities(to_filter.clone())
190            .await
191            .unwrap();
192        assert_eq!(to_filter, available_entities);
193    }
194
195    #[tokio::test]
196    async fn filter_out_everything_if_previously_signed_entities_match_all_passed_entities() {
197        let connection = Arc::new(main_db_connection().unwrap());
198        let repository = SignedBeaconRepository::new(connection.clone(), None);
199
200        let to_filter = all_signed_entity_type_for(&TimePoint::dummy());
201        insert_signed_beacons(
202            &connection,
203            to_filter
204                .iter()
205                .map(|entity| SignedBeaconRecord::fake(Epoch(4872), entity.clone()))
206                .collect(),
207        );
208
209        let available_entities = repository
210            .filter_out_already_signed_entities(to_filter.clone())
211            .await
212            .unwrap();
213        assert_eq!(Vec::<SignedEntityType>::new(), available_entities);
214    }
215
216    #[tokio::test]
217    async fn filter_out_partially_if_some_previously_signed_entities_match_passed_entities() {
218        let connection = Arc::new(main_db_connection().unwrap());
219        let repository = SignedBeaconRepository::new(connection.clone(), None);
220
221        let time_point = TimePoint::dummy();
222        let signed_beacons = [
223            SignedEntityType::MithrilStakeDistribution(time_point.epoch),
224            SignedEntityType::CardanoTransactions(
225                time_point.epoch,
226                time_point.chain_point.block_number,
227            ),
228        ];
229        insert_signed_beacons(
230            &connection,
231            signed_beacons
232                .iter()
233                .map(|entity| SignedBeaconRecord::fake(Epoch(4872), entity.clone()))
234                .collect(),
235        );
236
237        let available_entities = repository
238            .filter_out_already_signed_entities(vec![
239                SignedEntityType::MithrilStakeDistribution(time_point.epoch),
240                SignedEntityType::CardanoStakeDistribution(time_point.epoch),
241                SignedEntityType::CardanoTransactions(
242                    time_point.epoch,
243                    time_point.chain_point.block_number,
244                ),
245                SignedEntityType::CardanoStakeDistribution(time_point.epoch + 10),
246            ])
247            .await
248            .unwrap();
249
250        assert_eq!(
251            vec![
252                SignedEntityType::CardanoStakeDistribution(time_point.epoch),
253                SignedEntityType::CardanoStakeDistribution(time_point.epoch + 10),
254            ],
255            available_entities
256        );
257    }
258
259    #[tokio::test]
260    async fn mark_beacon_as_signed() {
261        let connection = Arc::new(main_db_connection().unwrap());
262        let repository = SignedBeaconRepository::new(connection.clone(), None);
263
264        let beacon_to_sign = BeaconToSign {
265            epoch: Epoch(13),
266            signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(13)),
267            initiated_at: Utc::now(),
268        };
269
270        let signed_beacons: Vec<SignedBeaconRecord> =
271            connection.fetch_collect(GetSignedBeaconQuery::all()).unwrap();
272        assert_eq!(Vec::<SignedBeaconRecord>::new(), signed_beacons);
273
274        repository.mark_beacon_as_signed(&beacon_to_sign).await.unwrap();
275
276        let signed_beacon = connection
277            .fetch_first(GetSignedBeaconQuery::all())
278            .unwrap()
279            .expect("A signed beacon should have been inserted");
280        assert_eq!(beacon_to_sign, signed_beacon);
281    }
282
283    #[tokio::test]
284    async fn test_dont_execute_pruning_tasks_if_no_retention_limit_set() {
285        let connection = Arc::new(main_db_connection().unwrap());
286        let repository = SignedBeaconRepository::new(connection.clone(), None);
287        insert_signed_beacons(
288            &connection,
289            SignedBeaconRecord::fakes(&[(
290                Epoch(8),
291                vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
292            )]),
293        );
294
295        EpochPruningTask::prune(&repository, Epoch(1000)).await.unwrap();
296
297        let cursor = connection.fetch(GetSignedBeaconQuery::all()).unwrap();
298        assert_eq!(1, cursor.count(),);
299    }
300
301    #[tokio::test]
302    async fn test_dont_execute_pruning_tasks_if_current_epoch_minus_retention_limit_is_0() {
303        let connection = Arc::new(main_db_connection().unwrap());
304        let repository = SignedBeaconRepository::new(connection.clone(), Some(10));
305        insert_signed_beacons(
306            &connection,
307            SignedBeaconRecord::fakes(&[(
308                Epoch(8),
309                vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
310            )]),
311        );
312
313        EpochPruningTask::prune(&repository, Epoch(9)).await.unwrap();
314
315        let cursor = connection.fetch(GetSignedBeaconQuery::all()).unwrap();
316        assert_eq!(1, cursor.count(),);
317    }
318
319    #[tokio::test]
320    async fn test_prune_task_substract_set_retention_limit_to_given_epoch() {
321        let connection = Arc::new(main_db_connection().unwrap());
322        let repository = SignedBeaconRepository::new(connection.clone(), Some(10));
323        insert_signed_beacons(
324            &connection,
325            SignedBeaconRecord::fakes(&[
326                (
327                    Epoch(7),
328                    vec![
329                        SignedEntityType::MithrilStakeDistribution(Epoch(7)),
330                        SignedEntityType::CardanoTransactions(Epoch(7), BlockNumber(12)),
331                    ],
332                ),
333                (
334                    Epoch(8),
335                    vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
336                ),
337            ]),
338        );
339
340        EpochPruningTask::prune(&repository, Epoch(18)).await.unwrap();
341
342        let signed_beacons: Vec<SignedBeaconRecord> =
343            connection.fetch_collect(GetSignedBeaconQuery::all()).unwrap();
344        assert_eq!(
345            vec![SignedBeaconRecord::fake(
346                Epoch(8),
347                SignedEntityType::MithrilStakeDistribution(Epoch(8))
348            )],
349            signed_beacons
350        );
351    }
352}