mithril_persistence/database/repository/
cardano_transaction_repository.rs

1use std::ops::Range;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_trait::async_trait;
6
7use mithril_common::crypto_helper::{MKTreeNode, MKTreeStorer};
8use mithril_common::entities::{
9    BlockHash, BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber, TransactionHash,
10};
11use mithril_common::signable_builder::BlockRangeRootRetriever;
12use mithril_common::StdResult;
13
14use crate::database::query::{
15    DeleteBlockRangeRootQuery, DeleteCardanoTransactionQuery, GetBlockRangeRootQuery,
16    GetCardanoTransactionQuery, InsertBlockRangeRootQuery, InsertCardanoTransactionQuery,
17};
18use crate::database::record::{BlockRangeRootRecord, CardanoTransactionRecord};
19use crate::sqlite::{ConnectionExtensions, SqliteConnection, SqliteConnectionPool};
20
21/// ## Cardano transaction repository
22///
23/// This is a business oriented layer to perform actions on the database through
24/// queries.
25pub struct CardanoTransactionRepository {
26    connection_pool: Arc<SqliteConnectionPool>,
27}
28
29impl CardanoTransactionRepository {
30    /// Instantiate service
31    pub fn new(connection_pool: Arc<SqliteConnectionPool>) -> Self {
32        Self { connection_pool }
33    }
34
35    /// Return all the [CardanoTransactionRecord]s in the database.
36    pub async fn get_all_transactions(&self) -> StdResult<Vec<CardanoTransactionRecord>> {
37        self.connection_pool
38            .connection()?
39            .fetch_collect(GetCardanoTransactionQuery::all())
40    }
41
42    /// Return all the [CardanoTransactionRecord]s in the database where block number is in the
43    /// given range.
44    pub async fn get_transactions_in_range_blocks(
45        &self,
46        range: Range<BlockNumber>,
47    ) -> StdResult<Vec<CardanoTransactionRecord>> {
48        self.connection_pool
49            .connection()?
50            .fetch_collect(GetCardanoTransactionQuery::between_blocks(range))
51    }
52
53    /// Return the [CardanoTransactionRecord] for the given transaction hash.
54    pub async fn get_transaction<T: Into<TransactionHash>>(
55        &self,
56        transaction_hash: T,
57    ) -> StdResult<Option<CardanoTransactionRecord>> {
58        self.connection_pool.connection()?.fetch_first(
59            GetCardanoTransactionQuery::by_transaction_hash(&transaction_hash.into()),
60        )
61    }
62
63    /// Create a new [CardanoTransactionRecord] in the database.
64    pub async fn create_transaction<T: Into<TransactionHash>, U: Into<BlockHash>>(
65        &self,
66        transaction_hash: T,
67        block_number: BlockNumber,
68        slot_number: SlotNumber,
69        block_hash: U,
70    ) -> StdResult<Option<CardanoTransactionRecord>> {
71        let query = InsertCardanoTransactionQuery::insert_one(&CardanoTransactionRecord {
72            transaction_hash: transaction_hash.into(),
73            block_number,
74            slot_number,
75            block_hash: block_hash.into(),
76        })?;
77
78        self.connection_pool.connection()?.fetch_first(query)
79    }
80
81    /// Create new [CardanoTransactionRecord]s in the database.
82    pub async fn create_transactions<T: Into<CardanoTransactionRecord>>(
83        &self,
84        transactions: Vec<T>,
85    ) -> StdResult<Vec<CardanoTransactionRecord>> {
86        let connection = self.connection_pool.connection()?;
87
88        self.create_transactions_with_connection(transactions, &connection)
89            .await
90    }
91
92    /// Create new [CardanoTransactionRecord]s in the database.
93    async fn create_transactions_with_connection<T: Into<CardanoTransactionRecord>>(
94        &self,
95        transactions: Vec<T>,
96        connection: &SqliteConnection,
97    ) -> StdResult<Vec<CardanoTransactionRecord>> {
98        let records: Vec<CardanoTransactionRecord> =
99            transactions.into_iter().map(|tx| tx.into()).collect();
100
101        connection.fetch_collect(InsertCardanoTransactionQuery::insert_many(records)?)
102    }
103
104    /// Create new [BlockRangeRootRecord]s in the database.
105    pub async fn create_block_range_roots<T: Into<BlockRangeRootRecord>>(
106        &self,
107        block_ranges: Vec<T>,
108    ) -> StdResult<Vec<BlockRangeRootRecord>> {
109        let records: Vec<BlockRangeRootRecord> =
110            block_ranges.into_iter().map(|tx| tx.into()).collect();
111        let connection = self.connection_pool.connection()?;
112
113        connection.fetch_collect(InsertBlockRangeRootQuery::insert_many(records)?)
114    }
115
116    /// Get the highest [ChainPoint] of the cardano transactions stored in the database.
117    pub async fn get_transaction_highest_chain_point(&self) -> StdResult<Option<ChainPoint>> {
118        let first_transaction_with_highest_block_number = self
119            .connection_pool
120            .connection()?
121            .fetch_first(GetCardanoTransactionQuery::with_highest_block_number())?;
122
123        Ok(first_transaction_with_highest_block_number.map(|record| {
124            ChainPoint::new(record.slot_number, record.block_number, record.block_hash)
125        }))
126    }
127
128    /// Get the highest start [BlockNumber] of the block range roots stored in the database.
129    pub async fn get_highest_start_block_number_for_block_range_roots(
130        &self,
131    ) -> StdResult<Option<BlockNumber>> {
132        let highest: Option<i64> = self
133            .connection_pool
134            .connection()?
135            .query_single_cell("select max(start) as highest from block_range_root;", &[])?;
136        highest
137            .map(u64::try_from)
138            .transpose()
139            .map(|num| num.map(BlockNumber))
140            .with_context(||
141                format!("Integer field max(start) (value={highest:?}) is incompatible with u64 representation.")
142            )
143    }
144
145    /// Retrieve all the Block Range Roots in database up to the block range that contains the given
146    /// block number.
147    pub async fn retrieve_block_range_roots_up_to(
148        &self,
149        block_number: BlockNumber,
150    ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + '_>> {
151        let block_range_roots = self
152            .connection_pool
153            .connection()?
154            .fetch(GetBlockRangeRootQuery::contains_or_below_block_number(
155                block_number,
156            ))?
157            .map(|record| -> (BlockRange, MKTreeNode) { record.into() })
158            .collect::<Vec<_>>(); // TODO: remove this collect to return the iterator directly
159
160        Ok(Box::new(block_range_roots.into_iter()))
161    }
162
163    /// Retrieve the block range root with the highest bounds in the database.
164    pub async fn retrieve_highest_block_range_root(
165        &self,
166    ) -> StdResult<Option<BlockRangeRootRecord>> {
167        self.connection_pool
168            .connection()?
169            .fetch_first(GetBlockRangeRootQuery::highest())
170    }
171
172    /// Retrieve all the [CardanoTransaction] in database.
173    pub async fn get_all(&self) -> StdResult<Vec<CardanoTransaction>> {
174        let records = self
175            .connection_pool
176            .connection()?
177            .fetch(GetCardanoTransactionQuery::all())?
178            .map(|record| record.into())
179            .collect();
180
181        Ok(records)
182    }
183
184    /// Retrieve all the [BlockRangeRootRecord] in database.
185    pub fn get_all_block_range_root(&self) -> StdResult<Vec<BlockRangeRootRecord>> {
186        self.connection_pool
187            .connection()?
188            .fetch_collect(GetBlockRangeRootQuery::all())
189    }
190
191    /// Store the given transactions in the database.
192    ///
193    /// The storage is done in chunks to avoid exceeding sqlite binding limitations.
194    pub async fn store_transactions<T: Into<CardanoTransactionRecord> + Clone>(
195        &self,
196        transactions: Vec<T>,
197    ) -> StdResult<()> {
198        const DB_TRANSACTION_SIZE: usize = 100000;
199        for transactions_in_db_transaction_chunk in transactions.chunks(DB_TRANSACTION_SIZE) {
200            let connection = self.connection_pool.connection()?;
201            let transaction = connection.begin_transaction()?;
202
203            // Chunk transactions to avoid an error when we exceed sqlite binding limitations
204            for transactions_in_chunk in transactions_in_db_transaction_chunk.chunks(100) {
205                self.create_transactions_with_connection(
206                    transactions_in_chunk.to_vec(),
207                    &connection,
208                )
209                .await
210                .with_context(|| "CardanoTransactionRepository can not store transactions")?;
211            }
212
213            transaction.commit()?;
214        }
215        Ok(())
216    }
217
218    /// Get the closest block number above a given slot number
219    pub async fn get_closest_block_number_above_slot_number(
220        &self,
221        slot_number: SlotNumber,
222    ) -> StdResult<Option<BlockNumber>> {
223        let query =
224            GetCardanoTransactionQuery::with_highest_block_number_below_slot_number(slot_number);
225        let record = self.connection_pool.connection()?.fetch_first(query)?;
226
227        Ok(record.map(|r| r.block_number))
228    }
229
230    /// Get the [CardanoTransactionRecord] for the given transaction hashes, up to a block number
231    pub async fn get_transaction_by_hashes<T: Into<TransactionHash>>(
232        &self,
233        hashes: Vec<T>,
234        up_to: BlockNumber,
235    ) -> StdResult<Vec<CardanoTransactionRecord>> {
236        let query = GetCardanoTransactionQuery::by_transaction_hashes(
237            hashes.into_iter().map(Into::into).collect(),
238            up_to,
239        );
240        self.connection_pool.connection()?.fetch_collect(query)
241    }
242
243    /// Get the [CardanoTransactionRecord] for the given block ranges.
244    pub async fn get_transaction_by_block_ranges(
245        &self,
246        block_ranges: Vec<BlockRange>,
247    ) -> StdResult<Vec<CardanoTransactionRecord>> {
248        let mut transactions = vec![];
249        for block_range in block_ranges {
250            let block_range_transactions: Vec<CardanoTransactionRecord> =
251                self.connection_pool.connection()?.fetch_collect(
252                    GetCardanoTransactionQuery::by_block_ranges(vec![block_range]),
253                )?;
254            transactions.extend(block_range_transactions);
255        }
256
257        Ok(transactions)
258    }
259
260    /// Prune the transactions older than the given number of blocks (based on the block range root
261    /// stored).
262    pub async fn prune_transaction(&self, number_of_blocks_to_keep: BlockNumber) -> StdResult<()> {
263        if let Some(highest_block_range_start) = self
264            .get_highest_start_block_number_for_block_range_roots()
265            .await?
266        {
267            let threshold = highest_block_range_start - number_of_blocks_to_keep;
268            let query = DeleteCardanoTransactionQuery::below_block_number_threshold(threshold)?;
269
270            let connection = self.connection_pool.connection()?;
271            connection.fetch_first(query)?;
272        }
273
274        Ok(())
275    }
276
277    /// Remove transactions and block range roots that are in a rolled-back fork
278    ///
279    /// * Remove transactions with block number strictly greater than the given block number
280    /// * Remove block range roots that have lower bound range strictly above the given block number
281    pub async fn remove_rolled_back_transactions_and_block_range_by_block_number(
282        &self,
283        block_number: BlockNumber,
284    ) -> StdResult<()> {
285        let connection = self.connection_pool.connection()?;
286        let transaction = connection.begin_transaction()?;
287        let query = DeleteCardanoTransactionQuery::above_block_number_threshold(block_number)?;
288        connection.fetch_first(query)?;
289
290        let query =
291            DeleteBlockRangeRootQuery::contains_or_above_block_number_threshold(block_number)?;
292        connection.fetch_first(query)?;
293        transaction.commit()?;
294
295        Ok(())
296    }
297
298    /// Remove transactions and block range roots that are in a rolled-back fork
299    ///
300    /// * Remove transactions with closest block number strictly greater than the given slot number if exists
301    /// * Remove block range roots that have lower bound range strictly above the aforementioned block number
302    pub async fn remove_rolled_back_transactions_and_block_range_by_slot_number(
303        &self,
304        slot_number: SlotNumber,
305    ) -> StdResult<()> {
306        if let Some(block_number) = self
307            .get_closest_block_number_above_slot_number(slot_number)
308            .await?
309        {
310            self.remove_rolled_back_transactions_and_block_range_by_block_number(block_number)
311                .await?;
312        }
313
314        Ok(())
315    }
316}
317
318#[async_trait]
319impl<S: MKTreeStorer> BlockRangeRootRetriever<S> for CardanoTransactionRepository {
320    async fn retrieve_block_range_roots<'a>(
321        &'a self,
322        up_to_beacon: BlockNumber,
323    ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + 'a>> {
324        self.retrieve_block_range_roots_up_to(up_to_beacon).await
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use mithril_common::test_utils::CardanoTransactionsBuilder;
331
332    use crate::database::query::GetBlockRangeRootQuery;
333    use crate::database::test_helper::cardano_tx_db_connection;
334
335    use super::*;
336
337    #[tokio::test]
338    async fn repository_create_and_get_transaction() {
339        let repository = CardanoTransactionRepository::new(Arc::new(
340            SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
341        ));
342
343        repository
344            .create_transactions(vec![
345                CardanoTransaction::new(
346                    "tx_hash-123",
347                    BlockNumber(10),
348                    SlotNumber(50),
349                    "block_hash-123",
350                ),
351                CardanoTransaction::new(
352                    "tx_hash-456",
353                    BlockNumber(11),
354                    SlotNumber(51),
355                    "block_hash-456",
356                ),
357            ])
358            .await
359            .unwrap();
360
361        {
362            let transaction_result = repository.get_transaction("tx_hash-123").await.unwrap();
363            assert_eq!(
364                Some(CardanoTransactionRecord {
365                    transaction_hash: "tx_hash-123".to_string(),
366                    block_number: BlockNumber(10),
367                    slot_number: SlotNumber(50),
368                    block_hash: "block_hash-123".to_string(),
369                }),
370                transaction_result
371            );
372        }
373        {
374            let transaction_result = repository.get_transaction("not-exist").await.unwrap();
375            assert_eq!(None, transaction_result);
376        }
377    }
378
379    #[tokio::test]
380    async fn repository_get_transaction_by_hashes() {
381        let repository = CardanoTransactionRepository::new(Arc::new(
382            SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
383        ));
384
385        repository
386            .create_transactions(vec![
387                CardanoTransactionRecord::new(
388                    "tx_hash-123",
389                    BlockNumber(10),
390                    SlotNumber(50),
391                    "block_hash-123",
392                ),
393                CardanoTransactionRecord::new(
394                    "tx_hash-456",
395                    BlockNumber(11),
396                    SlotNumber(51),
397                    "block_hash-456",
398                ),
399                CardanoTransactionRecord::new(
400                    "tx_hash-789",
401                    BlockNumber(12),
402                    SlotNumber(52),
403                    "block_hash-789",
404                ),
405                CardanoTransactionRecord::new(
406                    "tx_hash-000",
407                    BlockNumber(101),
408                    SlotNumber(100),
409                    "block_hash-000",
410                ),
411            ])
412            .await
413            .unwrap();
414
415        {
416            let transactions = repository
417                .get_transaction_by_hashes(vec!["tx_hash-123", "tx_hash-789"], BlockNumber(100))
418                .await
419                .unwrap();
420
421            assert_eq!(
422                vec![
423                    CardanoTransactionRecord::new(
424                        "tx_hash-123",
425                        BlockNumber(10),
426                        SlotNumber(50),
427                        "block_hash-123"
428                    ),
429                    CardanoTransactionRecord::new(
430                        "tx_hash-789",
431                        BlockNumber(12),
432                        SlotNumber(52),
433                        "block_hash-789"
434                    ),
435                ],
436                transactions
437            );
438        }
439        {
440            let transactions = repository
441                .get_transaction_by_hashes(
442                    vec!["tx_hash-123", "tx_hash-789", "tx_hash-000"],
443                    BlockNumber(100),
444                )
445                .await
446                .unwrap();
447
448            assert_eq!(
449                vec![
450                    CardanoTransactionRecord::new(
451                        "tx_hash-123",
452                        BlockNumber(10),
453                        SlotNumber(50),
454                        "block_hash-123"
455                    ),
456                    CardanoTransactionRecord::new(
457                        "tx_hash-789",
458                        BlockNumber(12),
459                        SlotNumber(52),
460                        "block_hash-789"
461                    ),
462                ],
463                transactions
464            );
465        }
466        {
467            let transactions = repository
468                .get_transaction_by_hashes(
469                    vec!["tx_hash-123", "tx_hash-789", "tx_hash-000"],
470                    BlockNumber(101),
471                )
472                .await
473                .unwrap();
474
475            assert_eq!(
476                vec![
477                    CardanoTransactionRecord::new(
478                        "tx_hash-123",
479                        BlockNumber(10),
480                        SlotNumber(50),
481                        "block_hash-123"
482                    ),
483                    CardanoTransactionRecord::new(
484                        "tx_hash-789",
485                        BlockNumber(12),
486                        SlotNumber(52),
487                        "block_hash-789"
488                    ),
489                    CardanoTransactionRecord::new(
490                        "tx_hash-000",
491                        BlockNumber(101),
492                        SlotNumber(100),
493                        "block_hash-000"
494                    ),
495                ],
496                transactions
497            );
498        }
499        {
500            let transactions = repository
501                .get_transaction_by_hashes(vec!["not-exist".to_string()], BlockNumber(100))
502                .await
503                .unwrap();
504
505            assert_eq!(Vec::<CardanoTransactionRecord>::new(), transactions);
506        }
507    }
508
509    #[tokio::test]
510    async fn repository_create_ignore_further_transactions_when_exists() {
511        let connection = cardano_tx_db_connection().unwrap();
512        let repository = CardanoTransactionRepository::new(Arc::new(
513            SqliteConnectionPool::build_from_connection(connection),
514        ));
515
516        repository
517            .create_transaction(
518                "tx-hash-123",
519                BlockNumber(10),
520                SlotNumber(50),
521                "block_hash-123",
522            )
523            .await
524            .unwrap();
525        repository
526            .create_transaction(
527                "tx-hash-123",
528                BlockNumber(11),
529                SlotNumber(51),
530                "block_hash-123-bis",
531            )
532            .await
533            .unwrap();
534        let transaction_result = repository.get_transaction("tx-hash-123").await.unwrap();
535
536        assert_eq!(
537            Some(CardanoTransactionRecord {
538                transaction_hash: "tx-hash-123".to_string(),
539                block_number: BlockNumber(10),
540                slot_number: SlotNumber(50),
541                block_hash: "block_hash-123".to_string(),
542            }),
543            transaction_result
544        );
545    }
546
547    #[tokio::test]
548    async fn repository_store_transactions_and_get_stored_transactions() {
549        let connection = cardano_tx_db_connection().unwrap();
550        let repository = CardanoTransactionRepository::new(Arc::new(
551            SqliteConnectionPool::build_from_connection(connection),
552        ));
553
554        let cardano_transactions = vec![
555            CardanoTransaction::new(
556                "tx-hash-123",
557                BlockNumber(10),
558                SlotNumber(50),
559                "block-hash-123",
560            ),
561            CardanoTransaction::new(
562                "tx-hash-456",
563                BlockNumber(11),
564                SlotNumber(51),
565                "block-hash-456",
566            ),
567        ];
568        repository
569            .create_transactions(cardano_transactions)
570            .await
571            .unwrap();
572
573        let transaction_result = repository.get_transaction("tx-hash-123").await.unwrap();
574
575        assert_eq!(
576            Some(CardanoTransactionRecord {
577                transaction_hash: "tx-hash-123".to_string(),
578                block_number: BlockNumber(10),
579                slot_number: SlotNumber(50),
580                block_hash: "block-hash-123".to_string(),
581            }),
582            transaction_result
583        );
584
585        let transaction_result = repository.get_transaction("tx-hash-456").await.unwrap();
586
587        assert_eq!(
588            Some(CardanoTransactionRecord {
589                transaction_hash: "tx-hash-456".to_string(),
590                block_number: BlockNumber(11),
591                slot_number: SlotNumber(51),
592                block_hash: "block-hash-456".to_string(),
593            }),
594            transaction_result
595        );
596    }
597
598    #[tokio::test]
599    async fn repository_get_all_stored_transactions() {
600        let connection = cardano_tx_db_connection().unwrap();
601        let repository = CardanoTransactionRepository::new(Arc::new(
602            SqliteConnectionPool::build_from_connection(connection),
603        ));
604
605        let cardano_transactions = vec![
606            CardanoTransaction::new(
607                "tx-hash-123",
608                BlockNumber(10),
609                SlotNumber(50),
610                "block-hash-123",
611            ),
612            CardanoTransaction::new(
613                "tx-hash-456",
614                BlockNumber(11),
615                SlotNumber(51),
616                "block-hash-456",
617            ),
618        ];
619        repository
620            .create_transactions(cardano_transactions.clone())
621            .await
622            .unwrap();
623
624        let transactions_result = repository.get_all_transactions().await.unwrap();
625        let transactions_expected: Vec<CardanoTransactionRecord> = cardano_transactions
626            .iter()
627            .map(|tx| tx.clone().into())
628            .collect();
629
630        assert_eq!(transactions_expected, transactions_result);
631    }
632
633    #[tokio::test]
634    async fn repository_store_transactions_doesnt_erase_existing_data() {
635        let connection = cardano_tx_db_connection().unwrap();
636        let repository = CardanoTransactionRepository::new(Arc::new(
637            SqliteConnectionPool::build_from_connection(connection),
638        ));
639
640        repository
641            .create_transaction("tx-hash-000", BlockNumber(1), SlotNumber(5), "block-hash")
642            .await
643            .unwrap();
644
645        let cardano_transactions = vec![CardanoTransaction::new(
646            "tx-hash-123",
647            BlockNumber(10),
648            SlotNumber(50),
649            "block-hash-123",
650        )];
651        repository
652            .create_transactions(cardano_transactions)
653            .await
654            .unwrap();
655
656        let transaction_result = repository.get_transaction("tx-hash-000").await.unwrap();
657
658        assert_eq!(
659            Some(CardanoTransactionRecord {
660                transaction_hash: "tx-hash-000".to_string(),
661                block_number: BlockNumber(1),
662                slot_number: SlotNumber(5),
663                block_hash: "block-hash".to_string(),
664            }),
665            transaction_result
666        );
667    }
668
669    #[tokio::test]
670    async fn repository_get_transaction_highest_chain_point_without_transactions_in_db() {
671        let connection = cardano_tx_db_connection().unwrap();
672        let repository = CardanoTransactionRepository::new(Arc::new(
673            SqliteConnectionPool::build_from_connection(connection),
674        ));
675
676        let highest_beacon = repository
677            .get_transaction_highest_chain_point()
678            .await
679            .unwrap();
680        assert_eq!(None, highest_beacon);
681    }
682
683    #[tokio::test]
684    async fn repository_get_transaction_highest_chain_point_with_transactions_in_db() {
685        let connection = cardano_tx_db_connection().unwrap();
686        let repository = CardanoTransactionRepository::new(Arc::new(
687            SqliteConnectionPool::build_from_connection(connection),
688        ));
689
690        let cardano_transactions = vec![
691            CardanoTransaction::new(
692                "tx-hash-123",
693                BlockNumber(10),
694                SlotNumber(50),
695                "block-hash-10",
696            ),
697            CardanoTransaction::new(
698                "tx-hash-456",
699                BlockNumber(25),
700                SlotNumber(51),
701                "block-hash-25",
702            ),
703        ];
704        repository
705            .create_transactions(cardano_transactions)
706            .await
707            .unwrap();
708
709        let highest_beacon = repository
710            .get_transaction_highest_chain_point()
711            .await
712            .unwrap();
713        assert_eq!(
714            Some(ChainPoint {
715                slot_number: SlotNumber(51),
716                block_number: BlockNumber(25),
717                block_hash: "block-hash-25".to_string()
718            }),
719            highest_beacon
720        );
721    }
722
723    #[tokio::test]
724    async fn repository_get_transaction_highest_chain_point_with_transactions_with_same_block_number_in_db(
725    ) {
726        let connection = cardano_tx_db_connection().unwrap();
727        let repository = CardanoTransactionRepository::new(Arc::new(
728            SqliteConnectionPool::build_from_connection(connection),
729        ));
730
731        let cardano_transactions = vec![
732            CardanoTransaction::new(
733                "tx-hash-123",
734                BlockNumber(10),
735                SlotNumber(50),
736                "block-hash-10",
737            ),
738            CardanoTransaction::new(
739                "tx-hash-456",
740                BlockNumber(25),
741                SlotNumber(51),
742                "block-hash-25",
743            ),
744            CardanoTransaction::new(
745                "tx-hash-789",
746                BlockNumber(25),
747                SlotNumber(51),
748                "block-hash-25",
749            ),
750        ];
751        repository
752            .create_transactions(cardano_transactions)
753            .await
754            .unwrap();
755
756        let highest_beacon = repository
757            .get_transaction_highest_chain_point()
758            .await
759            .unwrap();
760        assert_eq!(
761            Some(ChainPoint {
762                slot_number: SlotNumber(51),
763                block_number: BlockNumber(25),
764                block_hash: "block-hash-25".to_string()
765            }),
766            highest_beacon
767        );
768    }
769
770    #[tokio::test]
771    async fn repository_get_transactions_in_range_blocks() {
772        let connection = cardano_tx_db_connection().unwrap();
773        let repository = CardanoTransactionRepository::new(Arc::new(
774            SqliteConnectionPool::build_from_connection(connection),
775        ));
776
777        let transactions = vec![
778            CardanoTransactionRecord::new(
779                "tx-hash-1",
780                BlockNumber(10),
781                SlotNumber(50),
782                "block-hash-1",
783            ),
784            CardanoTransactionRecord::new(
785                "tx-hash-2",
786                BlockNumber(11),
787                SlotNumber(51),
788                "block-hash-2",
789            ),
790            CardanoTransactionRecord::new(
791                "tx-hash-3",
792                BlockNumber(12),
793                SlotNumber(52),
794                "block-hash-3",
795            ),
796        ];
797        repository
798            .create_transactions(transactions.clone())
799            .await
800            .unwrap();
801
802        {
803            let transaction_result = repository
804                .get_transactions_in_range_blocks(BlockNumber(0)..BlockNumber(10))
805                .await
806                .unwrap();
807            assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
808        }
809        {
810            let transaction_result = repository
811                .get_transactions_in_range_blocks(BlockNumber(13)..BlockNumber(21))
812                .await
813                .unwrap();
814            assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
815        }
816        {
817            let transaction_result = repository
818                .get_transactions_in_range_blocks(BlockNumber(9)..BlockNumber(12))
819                .await
820                .unwrap();
821            assert_eq!(transactions[0..=1].to_vec(), transaction_result);
822        }
823        {
824            let transaction_result = repository
825                .get_transactions_in_range_blocks(BlockNumber(10)..BlockNumber(13))
826                .await
827                .unwrap();
828            assert_eq!(transactions.clone(), transaction_result);
829        }
830        {
831            let transaction_result = repository
832                .get_transactions_in_range_blocks(BlockNumber(11)..BlockNumber(14))
833                .await
834                .unwrap();
835            assert_eq!(transactions[1..=2].to_vec(), transaction_result);
836        }
837    }
838
839    #[tokio::test]
840    async fn repository_get_transactions_by_block_ranges() {
841        let connection = cardano_tx_db_connection().unwrap();
842        let repository = CardanoTransactionRepository::new(Arc::new(
843            SqliteConnectionPool::build_from_connection(connection),
844        ));
845
846        let transactions = vec![
847            CardanoTransactionRecord::new(
848                "tx-hash-1",
849                BlockNumber(10),
850                SlotNumber(50),
851                "block-hash-1",
852            ),
853            CardanoTransactionRecord::new(
854                "tx-hash-2",
855                BlockNumber(11),
856                SlotNumber(51),
857                "block-hash-2",
858            ),
859            CardanoTransactionRecord::new(
860                "tx-hash-3",
861                BlockNumber(20),
862                SlotNumber(52),
863                "block-hash-3",
864            ),
865            CardanoTransactionRecord::new(
866                "tx-hash-4",
867                BlockNumber(31),
868                SlotNumber(53),
869                "block-hash-4",
870            ),
871            CardanoTransactionRecord::new(
872                "tx-hash-5",
873                BlockNumber(35),
874                SlotNumber(54),
875                "block-hash-5",
876            ),
877            CardanoTransactionRecord::new(
878                "tx-hash-6",
879                BlockNumber(46),
880                SlotNumber(55),
881                "block-hash-6",
882            ),
883        ];
884        repository
885            .create_transactions(transactions.clone())
886            .await
887            .unwrap();
888
889        {
890            let transaction_result = repository
891                .get_transaction_by_block_ranges(vec![BlockRange::from_block_number(BlockNumber(
892                    100,
893                ))])
894                .await
895                .unwrap();
896            assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
897        }
898        {
899            let transaction_result = repository
900                .get_transaction_by_block_ranges(vec![BlockRange::from_block_number(BlockNumber(
901                    0,
902                ))])
903                .await
904                .unwrap();
905            assert_eq!(transactions[0..=1].to_vec(), transaction_result);
906        }
907        {
908            let transaction_result = repository
909                .get_transaction_by_block_ranges(vec![
910                    BlockRange::from_block_number(BlockNumber(0)),
911                    BlockRange::from_block_number(BlockNumber(15)),
912                ])
913                .await
914                .unwrap();
915            assert_eq!(transactions[0..=2].to_vec(), transaction_result);
916        }
917        {
918            let transaction_result = repository
919                .get_transaction_by_block_ranges(vec![
920                    BlockRange::from_block_number(BlockNumber(0)),
921                    BlockRange::from_block_number(BlockNumber(30)),
922                ])
923                .await
924                .unwrap();
925            assert_eq!(
926                [transactions[0..=1].to_vec(), transactions[3..=4].to_vec()].concat(),
927                transaction_result
928            );
929        }
930    }
931
932    #[tokio::test]
933    async fn repository_get_closest_block_number_by_slot_number() {
934        let connection = cardano_tx_db_connection().unwrap();
935        let repository = CardanoTransactionRepository::new(Arc::new(
936            SqliteConnectionPool::build_from_connection(connection),
937        ));
938
939        let transactions = vec![
940            CardanoTransactionRecord::new("tx-1", BlockNumber(100), SlotNumber(500), "block-1"),
941            CardanoTransactionRecord::new("tx-2", BlockNumber(100), SlotNumber(500), "block-1"),
942            CardanoTransactionRecord::new("tx-3", BlockNumber(101), SlotNumber(501), "block-1"),
943        ];
944        repository
945            .create_transactions(transactions.clone())
946            .await
947            .unwrap();
948
949        let transaction_block_number_retrieved = repository
950            .get_closest_block_number_above_slot_number(SlotNumber(500))
951            .await
952            .unwrap();
953
954        assert_eq!(transaction_block_number_retrieved, Some(BlockNumber(100)));
955    }
956
957    #[tokio::test]
958    async fn repository_store_block_range() {
959        let connection = cardano_tx_db_connection().unwrap();
960        let repository = CardanoTransactionRepository::new(Arc::new(
961            SqliteConnectionPool::build_from_connection(connection),
962        ));
963
964        repository
965            .create_block_range_roots(vec![
966                (
967                    BlockRange::from_block_number(BlockNumber(0)),
968                    MKTreeNode::from_hex("AAAA").unwrap(),
969                ),
970                (
971                    BlockRange::from_block_number(BlockRange::LENGTH),
972                    MKTreeNode::from_hex("BBBB").unwrap(),
973                ),
974            ])
975            .await
976            .unwrap();
977
978        let records: Vec<BlockRangeRootRecord> = repository
979            .connection_pool
980            .connection()
981            .unwrap()
982            .fetch_collect(GetBlockRangeRootQuery::all())
983            .unwrap();
984        assert_eq!(
985            vec![
986                BlockRangeRootRecord {
987                    range: BlockRange::from_block_number(BlockNumber(0)),
988                    merkle_root: MKTreeNode::from_hex("AAAA").unwrap(),
989                },
990                BlockRangeRootRecord {
991                    range: BlockRange::from_block_number(BlockRange::LENGTH),
992                    merkle_root: MKTreeNode::from_hex("BBBB").unwrap(),
993                }
994            ],
995            records
996        );
997    }
998
999    #[tokio::test]
1000    async fn repository_store_block_range_with_existing_hash_doesnt_erase_existing_data() {
1001        let connection = cardano_tx_db_connection().unwrap();
1002        let repository = CardanoTransactionRepository::new(Arc::new(
1003            SqliteConnectionPool::build_from_connection(connection),
1004        ));
1005        let range = BlockRange::from_block_number(BlockNumber(0));
1006
1007        repository
1008            .create_block_range_roots(vec![(range.clone(), MKTreeNode::from_hex("AAAA").unwrap())])
1009            .await
1010            .unwrap();
1011        repository
1012            .create_block_range_roots(vec![(range.clone(), MKTreeNode::from_hex("BBBB").unwrap())])
1013            .await
1014            .unwrap();
1015
1016        let record: Vec<BlockRangeRootRecord> = repository
1017            .connection_pool
1018            .connection()
1019            .unwrap()
1020            .fetch_collect(GetBlockRangeRootQuery::all())
1021            .unwrap();
1022        assert_eq!(
1023            vec![BlockRangeRootRecord {
1024                range,
1025                merkle_root: MKTreeNode::from_hex("AAAA").unwrap()
1026            }],
1027            record
1028        );
1029    }
1030
1031    #[tokio::test]
1032    async fn repository_retrieve_block_range_roots_up_to() {
1033        let connection = cardano_tx_db_connection().unwrap();
1034        let repository = CardanoTransactionRepository::new(Arc::new(
1035            SqliteConnectionPool::build_from_connection(connection),
1036        ));
1037        let block_range_roots = vec![
1038            (
1039                BlockRange::from_block_number(BlockNumber(15)),
1040                MKTreeNode::from_hex("AAAA").unwrap(),
1041            ),
1042            (
1043                BlockRange::from_block_number(BlockNumber(30)),
1044                MKTreeNode::from_hex("BBBB").unwrap(),
1045            ),
1046            (
1047                BlockRange::from_block_number(BlockNumber(45)),
1048                MKTreeNode::from_hex("CCCC").unwrap(),
1049            ),
1050        ];
1051        repository
1052            .create_block_range_roots(block_range_roots.clone())
1053            .await
1054            .unwrap();
1055
1056        let retrieved_block_ranges = repository
1057            .retrieve_block_range_roots_up_to(BlockNumber(45))
1058            .await
1059            .unwrap();
1060        assert_eq!(
1061            block_range_roots[0..2].to_vec(),
1062            retrieved_block_ranges.collect::<Vec<_>>()
1063        );
1064    }
1065
1066    #[tokio::test]
1067    async fn repository_retrieve_highest_block_range_roots() {
1068        let connection = cardano_tx_db_connection().unwrap();
1069        let repository = CardanoTransactionRepository::new(Arc::new(
1070            SqliteConnectionPool::build_from_connection(connection),
1071        ));
1072        let block_range_roots = vec![
1073            BlockRangeRootRecord {
1074                range: BlockRange::from_block_number(BlockNumber(15)),
1075                merkle_root: MKTreeNode::from_hex("AAAA").unwrap(),
1076            },
1077            BlockRangeRootRecord {
1078                range: BlockRange::from_block_number(BlockNumber(30)),
1079                merkle_root: MKTreeNode::from_hex("BBBB").unwrap(),
1080            },
1081            BlockRangeRootRecord {
1082                range: BlockRange::from_block_number(BlockNumber(45)),
1083                merkle_root: MKTreeNode::from_hex("CCCC").unwrap(),
1084            },
1085        ];
1086        repository
1087            .create_block_range_roots(block_range_roots.clone())
1088            .await
1089            .unwrap();
1090
1091        let retrieved_block_range = repository
1092            .retrieve_highest_block_range_root()
1093            .await
1094            .unwrap();
1095        assert_eq!(block_range_roots.last().cloned(), retrieved_block_range);
1096    }
1097
1098    #[tokio::test]
1099    async fn repository_prune_transactions() {
1100        let connection = cardano_tx_db_connection().unwrap();
1101        let repository = CardanoTransactionRepository::new(Arc::new(
1102            SqliteConnectionPool::build_from_connection(connection),
1103        ));
1104
1105        let cardano_transactions: Vec<CardanoTransactionRecord> = CardanoTransactionsBuilder::new()
1106            .blocks_per_block_range(15)
1107            .build_transactions(53)
1108            .into_iter()
1109            .map(CardanoTransactionRecord::from)
1110            .collect();
1111
1112        repository
1113            .create_transactions(cardano_transactions.clone())
1114            .await
1115            .unwrap();
1116        // Use by 'prune_transaction' to get the block_range of the highest block number
1117        repository
1118            .create_block_range_roots(vec![(
1119                BlockRange::from_block_number(BlockNumber(45)),
1120                MKTreeNode::from_hex("BBBB").unwrap(),
1121            )])
1122            .await
1123            .unwrap();
1124
1125        let transaction_result = repository.get_all().await.unwrap();
1126        assert_eq!(cardano_transactions.len(), transaction_result.len());
1127
1128        // Pruning with a number of block to keep greater than the highest block range start should
1129        // do nothing.
1130        repository
1131            .prune_transaction(BlockNumber(10_000_000))
1132            .await
1133            .unwrap();
1134        let transaction_result = repository.get_all_transactions().await.unwrap();
1135        assert_eq!(cardano_transactions, transaction_result);
1136
1137        // Since the highest block range start is 45, pruning with 20 should remove transactions
1138        // with a block number strictly below 25.
1139        repository.prune_transaction(BlockNumber(20)).await.unwrap();
1140        let transaction_result = repository
1141            .get_transactions_in_range_blocks(BlockNumber(0)..BlockNumber(25))
1142            .await
1143            .unwrap();
1144        assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
1145
1146        let transaction_result = repository
1147            .get_transactions_in_range_blocks(BlockNumber(25)..BlockNumber(1000))
1148            .await
1149            .unwrap();
1150        assert_eq!(28, transaction_result.len());
1151    }
1152
1153    #[tokio::test]
1154    async fn get_highest_start_block_number_for_block_range_roots() {
1155        let connection = cardano_tx_db_connection().unwrap();
1156        let repository = CardanoTransactionRepository::new(Arc::new(
1157            SqliteConnectionPool::build_from_connection(connection),
1158        ));
1159
1160        let highest = repository
1161            .get_highest_start_block_number_for_block_range_roots()
1162            .await
1163            .unwrap();
1164        assert_eq!(None, highest);
1165
1166        let block_range_roots = vec![
1167            (
1168                BlockRange::from_block_number(BlockNumber(15)),
1169                MKTreeNode::from_hex("AAAA").unwrap(),
1170            ),
1171            (
1172                BlockRange::from_block_number(BlockNumber(30)),
1173                MKTreeNode::from_hex("BBBB").unwrap(),
1174            ),
1175        ];
1176        repository
1177            .create_block_range_roots(block_range_roots.clone())
1178            .await
1179            .unwrap();
1180
1181        let highest = repository
1182            .get_highest_start_block_number_for_block_range_roots()
1183            .await
1184            .unwrap();
1185        assert_eq!(Some(BlockNumber(30)), highest);
1186    }
1187
1188    #[tokio::test]
1189    async fn remove_transactions_and_block_range_greater_than_given_block_number() {
1190        let connection = cardano_tx_db_connection().unwrap();
1191        let repository = CardanoTransactionRepository::new(Arc::new(
1192            SqliteConnectionPool::build_from_connection(connection),
1193        ));
1194
1195        let cardano_transactions = vec![
1196            CardanoTransaction::new(
1197                "tx-hash-123",
1198                BlockRange::LENGTH,
1199                SlotNumber(50),
1200                "block-hash-123",
1201            ),
1202            CardanoTransaction::new(
1203                "tx-hash-123",
1204                BlockRange::LENGTH * 3 - 1,
1205                SlotNumber(50),
1206                "block-hash-123",
1207            ),
1208            CardanoTransaction::new(
1209                "tx-hash-456",
1210                BlockRange::LENGTH * 3,
1211                SlotNumber(51),
1212                "block-hash-456",
1213            ),
1214        ];
1215        repository
1216            .create_transactions(cardano_transactions)
1217            .await
1218            .unwrap();
1219        repository
1220            .create_block_range_roots(vec![
1221                (
1222                    BlockRange::from_block_number(BlockRange::LENGTH),
1223                    MKTreeNode::from_hex("AAAA").unwrap(),
1224                ),
1225                (
1226                    BlockRange::from_block_number(BlockRange::LENGTH * 2),
1227                    MKTreeNode::from_hex("AAAA").unwrap(),
1228                ),
1229                (
1230                    BlockRange::from_block_number(BlockRange::LENGTH * 3),
1231                    MKTreeNode::from_hex("AAAA").unwrap(),
1232                ),
1233            ])
1234            .await
1235            .unwrap();
1236
1237        repository
1238            .remove_rolled_back_transactions_and_block_range_by_block_number(BlockRange::LENGTH * 3)
1239            .await
1240            .unwrap();
1241        assert_eq!(2, repository.get_all_transactions().await.unwrap().len());
1242        assert_eq!(2, repository.get_all_block_range_root().unwrap().len());
1243    }
1244
1245    #[tokio::test]
1246    async fn remove_rolled_back_transactions_and_block_range_by_slot_number() {
1247        fn transaction_record(
1248            block_number: BlockNumber,
1249            slot_number: SlotNumber,
1250            tx_hash: &str,
1251        ) -> CardanoTransactionRecord {
1252            CardanoTransactionRecord::new(
1253                tx_hash,
1254                block_number,
1255                slot_number,
1256                format!("block-hash-{}", block_number),
1257            )
1258        }
1259
1260        let repository = CardanoTransactionRepository::new(Arc::new(
1261            SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
1262        ));
1263
1264        repository
1265            .create_transactions(vec![
1266                transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1267                transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1268                transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1269                transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1270                transaction_record(BlockNumber(101), SlotNumber(100), "tx-hash-5"),
1271                transaction_record(BlockNumber(202), SlotNumber(200), "tx-hash-56"),
1272            ])
1273            .await
1274            .unwrap();
1275
1276        {
1277            repository
1278                .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(110))
1279                .await
1280                .expect("Failed to remove rolled back transactions");
1281
1282            let transactions = repository
1283                .get_all()
1284                .await
1285                .unwrap()
1286                .into_iter()
1287                .map(|v| v.into())
1288                .collect::<Vec<_>>();
1289            assert_eq!(
1290                vec![
1291                    transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1292                    transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1293                    transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1294                    transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1295                    transaction_record(BlockNumber(101), SlotNumber(100), "tx-hash-5"),
1296                ],
1297                transactions
1298            );
1299        }
1300
1301        {
1302            repository
1303                .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(53))
1304                .await
1305                .expect("Failed to remove rolled back transactions");
1306
1307            let transactions = repository
1308                .get_all()
1309                .await
1310                .unwrap()
1311                .into_iter()
1312                .map(|v| v.into())
1313                .collect::<Vec<_>>();
1314            assert_eq!(
1315                vec![
1316                    transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1317                    transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1318                    transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1319                    transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1320                ],
1321                transactions
1322            );
1323        }
1324
1325        {
1326            repository
1327                .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(51))
1328                .await
1329                .expect("Failed to remove rolled back transactions");
1330
1331            let transactions = repository
1332                .get_all()
1333                .await
1334                .unwrap()
1335                .into_iter()
1336                .map(|v| v.into())
1337                .collect::<Vec<_>>();
1338            assert_eq!(
1339                vec![
1340                    transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1341                    transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1342                ],
1343                transactions
1344            );
1345        }
1346    }
1347}