mithril_aggregator/database/repository/
signed_entity_store.rs

1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::StdResult;
7use mithril_common::entities::{Epoch, SignedEntityTypeDiscriminants};
8use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
9
10use crate::database::query::{
11    GetSignedEntityRecordQuery, InsertSignedEntityRecordQuery, UpdateSignedEntityQuery,
12};
13use crate::database::record::SignedEntityRecord;
14
15/// Signed entity storer trait
16#[cfg_attr(test, mockall::automock)]
17#[async_trait]
18pub trait SignedEntityStorer: Sync + Send {
19    /// Store a signed entity
20    async fn store_signed_entity(&self, signed_entity: &SignedEntityRecord) -> StdResult<()>;
21
22    /// Get signed entity type
23    async fn get_signed_entity(
24        &self,
25        signed_entity_id: &str,
26    ) -> StdResult<Option<SignedEntityRecord>>;
27
28    /// Get signed entity type by certificate id
29    async fn get_signed_entity_by_certificate_id(
30        &self,
31        certificate_hash: &str,
32    ) -> StdResult<Option<SignedEntityRecord>>;
33
34    /// Get signed entities type by certificates ids
35    async fn get_signed_entities_by_certificates_ids<'a>(
36        &self,
37        certificates_ids: &[&'a str],
38    ) -> StdResult<Vec<SignedEntityRecord>>;
39
40    /// Get last signed entities by signed entity type
41    async fn get_last_signed_entities_by_type(
42        &self,
43        signed_entity_type_id: &SignedEntityTypeDiscriminants,
44        total: usize,
45    ) -> StdResult<Vec<SignedEntityRecord>>;
46
47    /// Get signed entities by signed entity type and epoch
48    async fn get_last_signed_entities_by_type_and_epoch(
49        &self,
50        signed_entity_type_id: &SignedEntityTypeDiscriminants,
51        epoch: Epoch,
52        total: usize,
53    ) -> StdResult<Vec<SignedEntityRecord>>;
54
55    /// Get Cardano stake distribution signed entity by epoch
56    async fn get_cardano_stake_distribution_signed_entity_by_epoch(
57        &self,
58        epoch: Epoch,
59    ) -> StdResult<Option<SignedEntityRecord>>;
60
61    /// Perform an update for all the given signed entities.
62    async fn update_signed_entities(
63        &self,
64        signed_entities: Vec<SignedEntityRecord>,
65    ) -> StdResult<Vec<SignedEntityRecord>>;
66}
67
68/// Service to deal with signed_entity (read & write).
69pub struct SignedEntityStore {
70    connection: Arc<SqliteConnection>,
71}
72
73impl SignedEntityStore {
74    /// Create a new SignedEntityStoreAdapter service
75    pub fn new(connection: Arc<SqliteConnection>) -> Self {
76        Self { connection }
77    }
78}
79
80#[async_trait]
81impl SignedEntityStorer for SignedEntityStore {
82    async fn store_signed_entity(&self, signed_entity: &SignedEntityRecord) -> StdResult<()> {
83        let _ = self
84            .connection
85            .fetch_first(InsertSignedEntityRecordQuery::one(signed_entity.clone()));
86
87        Ok(())
88    }
89
90    async fn get_signed_entity(
91        &self,
92        signed_entity_id: &str,
93    ) -> StdResult<Option<SignedEntityRecord>> {
94        self.connection
95            .fetch_first(GetSignedEntityRecordQuery::by_signed_entity_id(
96                signed_entity_id,
97            ))
98            .with_context(|| format!("get signed entity by id failure, id: {signed_entity_id}"))
99    }
100
101    async fn get_signed_entity_by_certificate_id(
102        &self,
103        certificate_id: &str,
104    ) -> StdResult<Option<SignedEntityRecord>> {
105        self.connection
106            .fetch_first(GetSignedEntityRecordQuery::by_certificate_id(
107                certificate_id,
108            ))
109            .with_context(|| {
110                format!(
111                    "get signed entity by certificate id failure, certificate_id: {certificate_id}"
112                )
113            })
114    }
115
116    async fn get_signed_entities_by_certificates_ids<'a>(
117        &self,
118        certificates_ids: &[&'a str],
119    ) -> StdResult<Vec<SignedEntityRecord>> {
120        self.connection
121            .fetch_collect(GetSignedEntityRecordQuery::by_certificates_ids(
122                certificates_ids,
123            ))
124    }
125
126    async fn get_last_signed_entities_by_type(
127        &self,
128        signed_entity_type_id: &SignedEntityTypeDiscriminants,
129        total: usize,
130    ) -> StdResult<Vec<SignedEntityRecord>> {
131        let cursor = self
132            .connection
133            .fetch(GetSignedEntityRecordQuery::by_signed_entity_type(
134                signed_entity_type_id,
135            )?)
136            .with_context(|| {
137                format!("get last signed entity by type failure, type: {signed_entity_type_id:?}")
138            })?;
139        let signed_entities: Vec<SignedEntityRecord> = cursor.take(total).collect();
140
141        Ok(signed_entities)
142    }
143
144    async fn get_last_signed_entities_by_type_and_epoch(
145        &self,
146        signed_entity_type_id: &SignedEntityTypeDiscriminants,
147        epoch: Epoch,
148        total: usize,
149    ) -> StdResult<Vec<SignedEntityRecord>> {
150        let cursor = self
151            .connection
152            .fetch(GetSignedEntityRecordQuery::by_signed_entity_type_and_epoch(
153                signed_entity_type_id,
154                epoch,
155            ))
156            .with_context(|| {
157                format!("get last signed entity by type and epoch failure, type: {signed_entity_type_id:?}, epoch: {epoch}")
158            })?;
159        let signed_entities: Vec<SignedEntityRecord> = cursor.take(total).collect();
160
161        Ok(signed_entities)
162    }
163
164    async fn get_cardano_stake_distribution_signed_entity_by_epoch(
165        &self,
166        epoch: Epoch,
167    ) -> StdResult<Option<SignedEntityRecord>> {
168        self.connection
169            .fetch_first(GetSignedEntityRecordQuery::by_signed_entity_type_and_epoch(
170                &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
171                epoch,
172            ))
173    }
174
175    async fn update_signed_entities(
176        &self,
177        signed_entities: Vec<SignedEntityRecord>,
178    ) -> StdResult<Vec<SignedEntityRecord>> {
179        let mut updated_records = vec![];
180
181        for record in signed_entities {
182            let id = record.signed_entity_id.clone();
183            let updated_record =
184                self.connection.fetch_first(UpdateSignedEntityQuery::one(record)?)?;
185
186            updated_records.push(updated_record.unwrap_or_else(|| {
187                panic!("Updating a signed_entity should not return nothing, id = {id:?}",)
188            }));
189        }
190
191        Ok(updated_records)
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use mithril_common::{
198        entities::{CardanoDbBeacon, Epoch, MithrilStakeDistribution, Snapshot},
199        signable_builder::SignedEntity,
200        test::double::fake_data,
201    };
202
203    use crate::database::test_helper::{insert_signed_entities, main_db_connection};
204
205    use super::*;
206
207    fn insert_golden_signed_entities(connection: &SqliteConnection) {
208        connection
209            .execute(r#"
210            -- Cardano immutable file full
211            insert into signed_entity values(
212                'bfcd77e372a25e13353bb77697d0d08785ba98b703e22640a317c5054dc05fb1',
213                2,
214                '258edf0f1238c60985d0229869a6d4c4c635c118915b4d524d2686515be99946',
215                '{"epoch":142,"immutable_file_number":2847}',
216                '2023-05-09T13:11:15Z',
217                '{
218                    "digest":"bfcd77e372a25e13353bb77697d0d08785ba98b703e22640a317c5054dc05fb1",
219                    "network":"preview",
220                    "beacon":{"epoch":142,"immutable_file_number":2847},
221                    "size":1689696245,
222                    "locations":["https://storage.googleapis.com/mithril-testing-preview-cs/preview-e142-i2847.bfcd77e372a25e13353bb77697d0d08785ba98b703e22640a317c5054dc05fb1.tar.gz"],
223                    "compression_algorithm":"gzip",
224                    "cardano_node_version": "0.0.1"
225                }'
226            );
227
228            -- Mithril stake distribution
229            insert into signed_entity
230            values(
231                '2da62e3ffee5e284ffd1e29ee52ee5547c5ff5ef34bee0a49dc54ea5e375f77e',
232                0,
233                'ad2d3705693dfaae8baac099b6976a5cc3e0f708245d0fa79d149a3fcbc79f00',
234                203,
235                '2023-05-16T02:17:16.203859116Z',
236                '{
237                    "type":"MithrilStakeDistribution",
238                    "epoch":203,
239                    "signers_with_stake":[{
240                        "party_id":"pool1r0tln8nct3mpyvehgy6uu3cdlmjnmtr2fxjcqnfl6v0qg0we42e",
241                        "verification_key":"7b22766b223a5b3138352c3132342c3231382c31362c3133312c3137382c3136302c37312c35382c3235312c31382c36382c37372c3135342c35382c3131352c3133392c3139392c38392c3230382c3139312c3235332c3138362c3232302c3133372c3135382c34312c3230332c382c3136352c3232362c3139342c3133382c3135322c35382c3131352c35342c3136322c3230332c32322c32332c3232382c3139342c34382c3137322c3139342c3130352c36382c302c3138302c3131332c3230312c3130392c3234372c39362c39342c3232372c3135372c36322c3139352c3134382c33352c3230352c3133372c3132312c3135322c3130302c3138342c3136372c3230362c3133322c34352c3133382c3131312c38392c3138322c3230352c3138372c3135382c32322c32332c3231382c36342c3137332c35392c3134312c3133332c3138302c3131392c36302c3134392c3134382c38332c3234312c3230312c33375d2c22706f70223a5b3137352c3135382c3130322c34352c3133322c38352c33312c3130342c36352c3230342c38352c312c34332c3137382c3138372c3233382c3135372c32372c39312c3230332c35342c37332c36322c35352c3131322c3131352c302c32312c3130302c3230382c37392c3135382c3233332c3132372c38332c3234352c3134362c3231382c3131382c3139332c38322c3139352c3137302c32312c36382c3231342c3138352c38372c3136382c3135372c3230322c3136382c3132392c3233332c38372c3230332c37372c36332c3232312c31362c3130392c33302c3235312c35312c38342c36392c3233372c39382c3133372c36302c39312c37362c38302c3232322c302c3130342c3231332c3132352c31332c3135312c3133312c3130312c3230312c33322c3138392c3137362c3139392c3131342c3234302c31352c33312c3136302c31332c3136352c32372c3134335d7d",
242                        "verification_key_signature":"7b227369676d61223a7b227369676d61223a7b227369676d61223a7b227369676d61223a7b227369676d61223a7b227369676d61223a5b36322c33302c32362c3131302c3134382c3137342c36302c32392c3139302c34362c37342c3135392c3137312c3134362c31342c3231362c37352c32302c38352c3134342c32332c3134352c3132352c39382c36362c3132342c3139332c32352c3233352c3234372c3130342c382c352c3133342c302c3134302c3131352c33362c31342c33382c31322c34392c39372c33392c3232312c3234352c39322c362c35302c3134362c3135372c3136362c32382c38352c33312c33352c3232392c3233312c34332c3230322c34372c3235352c3138322c375d2c226c68735f706b223a5b3234322c3132322c34322c39332c39352c31322c36352c3131332c31342c35322c3135352c3133372c3130312c3137382c3232362c3133332c31372c39302c3138332c3132312c3136362c35322c342c39312c3135332c3232332c32352c3133372c37332c3137332c3235332c3233315d2c227268735f706b223a5b322c3231312c3230372c3234392c3234322c362c3131322c362c3235322c31322c3135362c3139332c38362c3133362c3138352c36342c3132342c35302c3230392c38382c3138322c3133352c32392c3138372c3133302c3138392c34312c3134302c34322c33342c3135392c3234365d7d2c226c68735f706b223a5b37352c3232312c3235302c3235322c3135382c3134362c35362c34312c39382c3137362c3139382c3231392c33352c3130392c3136332c36312c3139362c3139342c3137382c3130392c3132382c3131352c3130302c3135322c33392c3231392c35382c34392c3235302c33312c3138342c395d2c227268735f706b223a5b3135372c3232312c38372c3139342c3235322c3234382c3132372c33312c3136362c3235322c3233342c3232362c33362c3139352c3230312c33312c34372c3232302c3233372c3137342c3130372c3134342c33382c3234372c3135352c3135382c34372c3139302c3235322c3134302c3235342c3131375d7d2c226c68735f706b223a5b34322c37372c37392c36302c3137312c3234372c31392c3230332c3232302c36332c3231352c3135372c3132392c3230382c3135382c35352c3131302c3232312c3139372c3233322c38372c33312c31312c3235342c3133352c32372c3234352c3137352c3135342c3231382c3232312c3138345d2c227268735f706b223a5b36382c3230332c38362c31352c37352c33352c3232332c342c3130392c3234392c3231372c3132352c34372c3231372c3130342c36352c3131332c3234312c3235332c3138332c3138362c36362c37352c3135302c3233342c3138362c3137332c3233302c3130332c3139342c3135322c3132375d7d2c226c68735f706b223a5b3133312c38372c3135382c3233352c34312c3233372c33332c34362c3235342c38302c3235322c3132392c37332c3234382c3135332c33382c3138332c33342c3231362c3135362c3131302c36392c37322c39302c32382c36382c3131342c33352c33352c3134332c3234312c3231305d2c227268735f706b223a5b3232382c3135352c3133332c34312c3137392c39342c3233362c3133392c3231362c3136302c3130382c3137362c3134362c3232352c3134302c3231352c35392c3130372c32302c3133372c3139372c33392c3135332c3132362c3233372c3135382c3132332c392c3133322c3139342c3132312c3232355d7d2c226c68735f706b223a5b3232322c39362c35332c34312c32342c38382c3136342c39382c3133312c33342c3132362c3133392c32382c37342c34362c3137332c35302c3133362c39372c3137312c3130312c3136322c33312c3137352c32332c3130352c3231352c36332c37362c3132342c31322c3131365d2c227268735f706b223a5b3133382c31382c31302c352c3231382c3134372c35332c35322c33362c3234342c33362c3131302c31302c33382c3134382c3132332c3235302c3131352c36342c36372c3137332c3130352c3137392c3235342c3130352c332c3132372c32302c31322c3230352c37372c3230365d7d2c226c68735f706b223a5b3132342c382c37312c3135302c34352c3130362c3232322c3234372c3130302c3137342c34352c3135322c3136312c3130382c3135382c32372c3234342c35362c3131352c3233322c3136332c3234342c38372c3138332c3232372c3235302c3232372c3234382c3137352c3136332c3230392c37345d2c227268735f706b223a5b3134302c33372c3131392c36332c39302c3132302c3131332c3135372c3130352c34362c31342c33332c3230372c3131322c3131332c3235342c38342c37332c3131302c33392c392c3230372c3133312c342c3232352c39302c3135312c32302c31352c36342c39372c39385d7d",
243                        "operational_certificate":"5b5b5b3138332c33342c3231362c34362c3232372c3235312c37342c3130312c31352c3233332c3234392c34322c312c37372c37322c3234382c3137392c32312c3137332c3131332c3131382c3139382c36322c3133352c34352c38382c3138372c3233332c34302c37322c31362c36365d2c312c3132332c5b31362c3136392c3134312c3138332c32322c3137342c3131312c33322c36342c35322c3234392c36382c3230322c33352c3130362c332c38362c3230352c37382c3230302c3138362c39342c3139372c3232382c37392c3137352c32392c31342c3132382c36332c35392c3139382c36322c3233302c34362c34312c38342c39382c3131392c3134352c32392c3132312c33352c3139372c3132382c3137322c302c3135342c392c31332c32362c3138332c3138362c3138362c33312c3234392c3133322c3232392c3235332c3134332c3130322c3235342c3231322c315d5d2c5b3234312c32372c31332c34342c3131342c37382c3138392c3234392c3135302c3135302c35332c3134342c3233362c3135312c38382c3134302c3132382c3136322c36302c3232382c38382c3131312c392c3134342c3233322c38332c39342c3231302c3135362c3136382c33352c3234325d5d",
244                        "kes_period":12,
245                        "stake":9497629046
246                    }],
247                    "hash":"2da62e3ffee5e284ffd1e29ee52ee5547c5ff5ef34bee0a49dc54ea5e375f77e",
248                    "protocol_parameters":{"k":2422,"m":20973,"phi_f":0.2}}'
249            );
250
251            "#,
252            )
253            .unwrap();
254    }
255
256    #[tokio::test]
257    async fn test_golden_master() {
258        let connection = main_db_connection().unwrap();
259        insert_golden_signed_entities(&connection);
260
261        let store = SignedEntityStore::new(Arc::new(connection));
262        let cardano_immutable_files_fulls: Vec<SignedEntity<Snapshot>> = store
263            .get_last_signed_entities_by_type(
264                &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
265                usize::MAX,
266            )
267            .await
268            .expect("Getting Golden snapshot signed entities should not fail")
269            .into_iter()
270            .map(|r| r.try_into().unwrap())
271            .collect();
272        let mithril_stake_distributions: Vec<SignedEntity<MithrilStakeDistribution>> = store
273            .get_last_signed_entities_by_type(
274                &SignedEntityTypeDiscriminants::MithrilStakeDistribution,
275                usize::MAX,
276            )
277            .await
278            .expect("Getting Golden mithril stake distribution signed entities should not fail")
279            .into_iter()
280            .map(|r| r.try_into().unwrap())
281            .collect();
282
283        assert_eq!(cardano_immutable_files_fulls.len(), 1);
284        assert_eq!(mithril_stake_distributions.len(), 1);
285    }
286
287    #[tokio::test]
288    async fn test_get_signed_entity_record_by_certificate_id() {
289        let expected_record = SignedEntityRecord::fake_records(1).remove(0);
290        let connection = main_db_connection().unwrap();
291        insert_signed_entities(&connection, vec![expected_record.clone()]).unwrap();
292        let store = SignedEntityStore::new(Arc::new(connection));
293
294        let record = store
295            .get_signed_entity_by_certificate_id(&expected_record.certificate_id)
296            .await
297            .expect("querying signed entity record by certificate id should not fail");
298
299        assert_eq!(Some(expected_record), record);
300    }
301
302    #[tokio::test]
303    async fn test_get_signed_entity_record_by_certificates_ids() {
304        let expected_records = SignedEntityRecord::fake_records(3);
305        let connection = main_db_connection().unwrap();
306        insert_signed_entities(&connection, expected_records.clone()).unwrap();
307        let store = SignedEntityStore::new(Arc::new(connection));
308        let certificates_ids: Vec<&str> =
309            expected_records.iter().map(|r| r.certificate_id.as_str()).collect();
310
311        let queried_records = store
312            .get_signed_entities_by_certificates_ids(&certificates_ids)
313            .await
314            .expect("querying signed entity record by certificates ids should not fail");
315
316        assert_eq!(
317            // Records are inserted older to earlier and queried the other way round
318            expected_records.into_iter().rev().collect::<Vec<_>>(),
319            queried_records
320        );
321    }
322
323    #[tokio::test]
324    async fn update_only_given_entities() {
325        let mut signed_entity_records = SignedEntityRecord::fake_records(5);
326
327        let connection = main_db_connection().unwrap();
328        insert_signed_entities(&connection, signed_entity_records.clone()).unwrap();
329        let store = SignedEntityStore::new(Arc::new(connection));
330
331        let records_to_update: Vec<SignedEntityRecord> = signed_entity_records
332            .drain(2..)
333            .map(|mut r| {
334                r.certificate_id = format!("updated-{}", r.certificate_id);
335                r
336            })
337            .collect();
338        let expected_records: Vec<SignedEntityRecord> = signed_entity_records
339            .into_iter()
340            .chain(records_to_update.clone())
341            .rev() // Records are returned from latest to oldest
342            .collect();
343
344        let updated_records = store
345            .update_signed_entities(records_to_update.clone())
346            .await
347            .expect("updating signed entities should not fail");
348
349        let stored_records = store
350            .get_last_signed_entities_by_type(
351                &SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
352                usize::MAX,
353            )
354            .await
355            .expect("getting signed entities should not fail");
356
357        assert_eq!(records_to_update, updated_records);
358        assert_eq!(expected_records, stored_records);
359    }
360
361    #[tokio::test]
362    async fn get_cardano_stake_distribution_signed_entity_by_epoch_when_nothing_found() {
363        let epoch_to_retrieve = Epoch(4);
364        let connection = main_db_connection().unwrap();
365        let store = SignedEntityStore::new(Arc::new(connection));
366
367        let record = store
368            .get_cardano_stake_distribution_signed_entity_by_epoch(epoch_to_retrieve)
369            .await
370            .unwrap();
371
372        assert_eq!(None, record);
373    }
374
375    #[tokio::test]
376    async fn get_cardano_stake_distribution_signed_entity_by_epoch_when_signed_entity_found_for_epoch()
377     {
378        let cardano_stake_distribution = fake_data::cardano_stake_distribution(Epoch(4));
379
380        let expected_record: SignedEntityRecord = cardano_stake_distribution.into();
381
382        let connection = main_db_connection().unwrap();
383        insert_signed_entities(&connection, vec![expected_record.clone()]).unwrap();
384        let store = SignedEntityStore::new(Arc::new(connection));
385
386        let record = store
387            .get_cardano_stake_distribution_signed_entity_by_epoch(Epoch(4))
388            .await
389            .unwrap();
390
391        assert_eq!(Some(expected_record), record);
392    }
393
394    #[tokio::test]
395    async fn get_last_signed_entities_by_type_and_epoch_when_nothing_found() {
396        let epoch_to_retrieve = Epoch(4);
397        let connection = main_db_connection().unwrap();
398        let store = SignedEntityStore::new(Arc::new(connection));
399
400        let record = store
401            .get_last_signed_entities_by_type_and_epoch(
402                &SignedEntityTypeDiscriminants::CardanoDatabase,
403                epoch_to_retrieve,
404                usize::MAX,
405            )
406            .await
407            .unwrap();
408
409        assert_eq!(Vec::<SignedEntityRecord>::new(), record);
410    }
411
412    #[tokio::test]
413    async fn get_last_signed_entities_by_type_and_epoch_when_signed_entity_found_for_epoch() {
414        let mut cardano_database_snapshot = fake_data::cardano_database_snapshot(50);
415        cardano_database_snapshot.beacon = CardanoDbBeacon::new(3, 50);
416        let cardano_stake_distribution = fake_data::cardano_stake_distribution(Epoch(3));
417
418        let expected_cdb_record: SignedEntityRecord = cardano_database_snapshot.into();
419        let expected_csd_record: SignedEntityRecord = cardano_stake_distribution.into();
420
421        let connection = main_db_connection().unwrap();
422        insert_signed_entities(
423            &connection,
424            vec![expected_cdb_record.clone(), expected_csd_record.clone()],
425        )
426        .unwrap();
427        let store = SignedEntityStore::new(Arc::new(connection));
428
429        // With limit = 0 no records should be returned even if some exists
430        let record = store
431            .get_last_signed_entities_by_type_and_epoch(
432                &SignedEntityTypeDiscriminants::CardanoDatabase,
433                Epoch(3),
434                0,
435            )
436            .await
437            .unwrap();
438        assert_eq!(Vec::<SignedEntityRecord>::new(), record);
439
440        let record = store
441            .get_last_signed_entities_by_type_and_epoch(
442                &SignedEntityTypeDiscriminants::CardanoDatabase,
443                Epoch(3),
444                usize::MAX,
445            )
446            .await
447            .unwrap();
448
449        assert_eq!(vec![expected_cdb_record], record);
450
451        let record = store
452            .get_last_signed_entities_by_type_and_epoch(
453                &SignedEntityTypeDiscriminants::CardanoStakeDistribution,
454                Epoch(3),
455                usize::MAX,
456            )
457            .await
458            .unwrap();
459
460        assert_eq!(vec![expected_csd_record], record);
461    }
462}