mithril_persistence/database/repository/
cardano_transaction_repository.rs

1use std::ops::Range;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_trait::async_trait;
6
7use mithril_common::StdResult;
8use mithril_common::crypto_helper::{MKTreeNode, MKTreeStorer};
9use mithril_common::entities::{
10    BlockHash, BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber, TransactionHash,
11};
12use mithril_common::signable_builder::BlockRangeRootRetriever;
13
14use crate::database::query::{
15    DeleteBlockRangeRootQuery, DeleteCardanoTransactionQuery, GetBlockRangeRootQuery,
16    GetCardanoTransactionQuery, InsertBlockRangeRootQuery, InsertCardanoTransactionQuery,
17};
18use crate::database::record::{BlockRangeRootRecord, CardanoTransactionRecord};
19use crate::sqlite::{ConnectionExtensions, SqliteConnection, SqliteConnectionPool};
20
21/// ## Cardano transaction repository
22///
23/// This is a business oriented layer to perform actions on the database through
24/// queries.
25pub struct CardanoTransactionRepository {
26    connection_pool: Arc<SqliteConnectionPool>,
27}
28
29impl CardanoTransactionRepository {
30    /// Instantiate service
31    pub fn new(connection_pool: Arc<SqliteConnectionPool>) -> Self {
32        Self { connection_pool }
33    }
34
35    /// Return all the [CardanoTransactionRecord]s in the database.
36    pub async fn get_all_transactions(&self) -> StdResult<Vec<CardanoTransactionRecord>> {
37        self.connection_pool
38            .connection()?
39            .fetch_collect(GetCardanoTransactionQuery::all())
40    }
41
42    /// Return all the [CardanoTransactionRecord]s in the database where block number is in the
43    /// given range.
44    pub async fn get_transactions_in_range_blocks(
45        &self,
46        range: Range<BlockNumber>,
47    ) -> StdResult<Vec<CardanoTransactionRecord>> {
48        self.connection_pool
49            .connection()?
50            .fetch_collect(GetCardanoTransactionQuery::between_blocks(range))
51    }
52
53    /// Return the [CardanoTransactionRecord] for the given transaction hash.
54    pub async fn get_transaction<T: Into<TransactionHash>>(
55        &self,
56        transaction_hash: T,
57    ) -> StdResult<Option<CardanoTransactionRecord>> {
58        self.connection_pool.connection()?.fetch_first(
59            GetCardanoTransactionQuery::by_transaction_hash(&transaction_hash.into()),
60        )
61    }
62
63    /// Create a new [CardanoTransactionRecord] in the database.
64    pub async fn create_transaction<T: Into<TransactionHash>, U: Into<BlockHash>>(
65        &self,
66        transaction_hash: T,
67        block_number: BlockNumber,
68        slot_number: SlotNumber,
69        block_hash: U,
70    ) -> StdResult<Option<CardanoTransactionRecord>> {
71        let query = InsertCardanoTransactionQuery::insert_one(&CardanoTransactionRecord {
72            transaction_hash: transaction_hash.into(),
73            block_number,
74            slot_number,
75            block_hash: block_hash.into(),
76        })?;
77
78        self.connection_pool.connection()?.fetch_first(query)
79    }
80
81    /// Create new [CardanoTransactionRecord]s in the database.
82    pub async fn create_transactions<T: Into<CardanoTransactionRecord>>(
83        &self,
84        transactions: Vec<T>,
85    ) -> StdResult<Vec<CardanoTransactionRecord>> {
86        let connection = self.connection_pool.connection()?;
87
88        self.create_transactions_with_connection(transactions, &connection)
89            .await
90    }
91
92    /// Create new [CardanoTransactionRecord]s in the database.
93    async fn create_transactions_with_connection<T: Into<CardanoTransactionRecord>>(
94        &self,
95        transactions: Vec<T>,
96        connection: &SqliteConnection,
97    ) -> StdResult<Vec<CardanoTransactionRecord>> {
98        let records: Vec<CardanoTransactionRecord> =
99            transactions.into_iter().map(|tx| tx.into()).collect();
100
101        connection.fetch_collect(InsertCardanoTransactionQuery::insert_many(records)?)
102    }
103
104    /// Create new [BlockRangeRootRecord]s in the database.
105    pub async fn create_block_range_roots<T: Into<BlockRangeRootRecord>>(
106        &self,
107        block_ranges: Vec<T>,
108    ) -> StdResult<Vec<BlockRangeRootRecord>> {
109        let records: Vec<BlockRangeRootRecord> =
110            block_ranges.into_iter().map(|tx| tx.into()).collect();
111        let connection = self.connection_pool.connection()?;
112
113        connection.fetch_collect(InsertBlockRangeRootQuery::insert_many(records)?)
114    }
115
116    /// Get the highest [ChainPoint] of the cardano transactions stored in the database.
117    pub async fn get_transaction_highest_chain_point(&self) -> StdResult<Option<ChainPoint>> {
118        let first_transaction_with_highest_block_number = self
119            .connection_pool
120            .connection()?
121            .fetch_first(GetCardanoTransactionQuery::with_highest_block_number())?;
122
123        Ok(first_transaction_with_highest_block_number.map(|record| {
124            ChainPoint::new(record.slot_number, record.block_number, record.block_hash)
125        }))
126    }
127
128    /// Get the highest start [BlockNumber] of the block range roots stored in the database.
129    pub async fn get_highest_start_block_number_for_block_range_roots(
130        &self,
131    ) -> StdResult<Option<BlockNumber>> {
132        let highest: Option<i64> = self
133            .connection_pool
134            .connection()?
135            .query_single_cell("select max(start) as highest from block_range_root;", &[])?;
136        highest
137            .map(u64::try_from)
138            .transpose()
139            .map(|num| num.map(BlockNumber))
140            .with_context(||
141                format!("Integer field max(start) (value={highest:?}) is incompatible with u64 representation.")
142            )
143    }
144
145    /// Retrieve all the Block Range Roots in database up to the block range that contains the given
146    /// block number.
147    pub async fn retrieve_block_range_roots_up_to(
148        &self,
149        block_number: BlockNumber,
150    ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + '_>> {
151        let block_range_roots = self
152            .connection_pool
153            .connection()?
154            .fetch(GetBlockRangeRootQuery::contains_or_below_block_number(
155                block_number,
156            ))?
157            .map(|record| -> (BlockRange, MKTreeNode) { record.into() })
158            .collect::<Vec<_>>(); // TODO: remove this collect to return the iterator directly
159
160        Ok(Box::new(block_range_roots.into_iter()))
161    }
162
163    /// Retrieve the block range root with the highest bounds in the database.
164    pub async fn retrieve_highest_block_range_root(
165        &self,
166    ) -> StdResult<Option<BlockRangeRootRecord>> {
167        self.connection_pool
168            .connection()?
169            .fetch_first(GetBlockRangeRootQuery::highest())
170    }
171
172    /// Retrieve all the [CardanoTransaction] in database.
173    pub async fn get_all(&self) -> StdResult<Vec<CardanoTransaction>> {
174        let records = self
175            .connection_pool
176            .connection()?
177            .fetch(GetCardanoTransactionQuery::all())?
178            .map(|record| record.into())
179            .collect();
180
181        Ok(records)
182    }
183
184    /// Retrieve all the [BlockRangeRootRecord] in database.
185    pub fn get_all_block_range_root(&self) -> StdResult<Vec<BlockRangeRootRecord>> {
186        self.connection_pool
187            .connection()?
188            .fetch_collect(GetBlockRangeRootQuery::all())
189    }
190
191    /// Store the given transactions in the database.
192    ///
193    /// The storage is done in chunks to avoid exceeding sqlite binding limitations.
194    pub async fn store_transactions<T: Into<CardanoTransactionRecord> + Clone>(
195        &self,
196        transactions: Vec<T>,
197    ) -> StdResult<()> {
198        const DB_TRANSACTION_SIZE: usize = 100000;
199        for transactions_in_db_transaction_chunk in transactions.chunks(DB_TRANSACTION_SIZE) {
200            let connection = self.connection_pool.connection()?;
201            let transaction = connection.begin_transaction()?;
202
203            // Chunk transactions to avoid an error when we exceed sqlite binding limitations
204            for transactions_in_chunk in transactions_in_db_transaction_chunk.chunks(100) {
205                self.create_transactions_with_connection(
206                    transactions_in_chunk.to_vec(),
207                    &connection,
208                )
209                .await
210                .with_context(|| "CardanoTransactionRepository can not store transactions")?;
211            }
212
213            transaction.commit()?;
214        }
215        Ok(())
216    }
217
218    /// Get the closest block number above a given slot number
219    pub async fn get_closest_block_number_above_slot_number(
220        &self,
221        slot_number: SlotNumber,
222    ) -> StdResult<Option<BlockNumber>> {
223        let query =
224            GetCardanoTransactionQuery::with_highest_block_number_below_slot_number(slot_number);
225        let record = self.connection_pool.connection()?.fetch_first(query)?;
226
227        Ok(record.map(|r| r.block_number))
228    }
229
230    /// Get the [CardanoTransactionRecord] for the given transaction hashes, up to a block number
231    pub async fn get_transaction_by_hashes<T: Into<TransactionHash>>(
232        &self,
233        hashes: Vec<T>,
234        up_to: BlockNumber,
235    ) -> StdResult<Vec<CardanoTransactionRecord>> {
236        let query = GetCardanoTransactionQuery::by_transaction_hashes(
237            hashes.into_iter().map(Into::into).collect(),
238            up_to,
239        );
240        self.connection_pool.connection()?.fetch_collect(query)
241    }
242
243    /// Get the [CardanoTransactionRecord] for the given block ranges.
244    pub async fn get_transaction_by_block_ranges(
245        &self,
246        block_ranges: Vec<BlockRange>,
247    ) -> StdResult<Vec<CardanoTransactionRecord>> {
248        let mut transactions = vec![];
249        for block_range in block_ranges {
250            let block_range_transactions: Vec<CardanoTransactionRecord> =
251                self.connection_pool.connection()?.fetch_collect(
252                    GetCardanoTransactionQuery::by_block_ranges(vec![block_range]),
253                )?;
254            transactions.extend(block_range_transactions);
255        }
256
257        Ok(transactions)
258    }
259
260    /// Prune the transactions older than the given number of blocks (based on the block range root
261    /// stored).
262    pub async fn prune_transaction(&self, number_of_blocks_to_keep: BlockNumber) -> StdResult<()> {
263        if let Some(highest_block_range_start) =
264            self.get_highest_start_block_number_for_block_range_roots().await?
265        {
266            let threshold = highest_block_range_start - number_of_blocks_to_keep;
267            let query = DeleteCardanoTransactionQuery::below_block_number_threshold(threshold)?;
268
269            let connection = self.connection_pool.connection()?;
270            connection.fetch_first(query)?;
271        }
272
273        Ok(())
274    }
275
276    /// Remove transactions and block range roots that are in a rolled-back fork
277    ///
278    /// * Remove transactions with block number strictly greater than the given block number
279    /// * Remove block range roots that have lower bound range strictly above the given block number
280    pub async fn remove_rolled_back_transactions_and_block_range_by_block_number(
281        &self,
282        block_number: BlockNumber,
283    ) -> StdResult<()> {
284        let connection = self.connection_pool.connection()?;
285        let transaction = connection.begin_transaction()?;
286        let query = DeleteCardanoTransactionQuery::above_block_number_threshold(block_number)?;
287        connection.fetch_first(query)?;
288
289        let query =
290            DeleteBlockRangeRootQuery::contains_or_above_block_number_threshold(block_number)?;
291        connection.fetch_first(query)?;
292        transaction.commit()?;
293
294        Ok(())
295    }
296
297    /// Remove transactions and block range roots that are in a rolled-back fork
298    ///
299    /// * Remove transactions with closest block number strictly greater than the given slot number if exists
300    /// * Remove block range roots that have lower bound range strictly above the aforementioned block number
301    pub async fn remove_rolled_back_transactions_and_block_range_by_slot_number(
302        &self,
303        slot_number: SlotNumber,
304    ) -> StdResult<()> {
305        if let Some(block_number) =
306            self.get_closest_block_number_above_slot_number(slot_number).await?
307        {
308            self.remove_rolled_back_transactions_and_block_range_by_block_number(block_number)
309                .await?;
310        }
311
312        Ok(())
313    }
314}
315
316#[async_trait]
317impl<S: MKTreeStorer> BlockRangeRootRetriever<S> for CardanoTransactionRepository {
318    async fn retrieve_block_range_roots<'a>(
319        &'a self,
320        up_to_beacon: BlockNumber,
321    ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + 'a>> {
322        self.retrieve_block_range_roots_up_to(up_to_beacon).await
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use mithril_common::test_utils::CardanoTransactionsBuilder;
329
330    use crate::database::query::GetBlockRangeRootQuery;
331    use crate::database::test_helper::cardano_tx_db_connection;
332
333    use super::*;
334
335    #[tokio::test]
336    async fn repository_create_and_get_transaction() {
337        let repository = CardanoTransactionRepository::new(Arc::new(
338            SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
339        ));
340
341        repository
342            .create_transactions(vec![
343                CardanoTransaction::new(
344                    "tx_hash-123",
345                    BlockNumber(10),
346                    SlotNumber(50),
347                    "block_hash-123",
348                ),
349                CardanoTransaction::new(
350                    "tx_hash-456",
351                    BlockNumber(11),
352                    SlotNumber(51),
353                    "block_hash-456",
354                ),
355            ])
356            .await
357            .unwrap();
358
359        {
360            let transaction_result = repository.get_transaction("tx_hash-123").await.unwrap();
361            assert_eq!(
362                Some(CardanoTransactionRecord {
363                    transaction_hash: "tx_hash-123".to_string(),
364                    block_number: BlockNumber(10),
365                    slot_number: SlotNumber(50),
366                    block_hash: "block_hash-123".to_string(),
367                }),
368                transaction_result
369            );
370        }
371        {
372            let transaction_result = repository.get_transaction("not-exist").await.unwrap();
373            assert_eq!(None, transaction_result);
374        }
375    }
376
377    #[tokio::test]
378    async fn repository_get_transaction_by_hashes() {
379        let repository = CardanoTransactionRepository::new(Arc::new(
380            SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
381        ));
382
383        repository
384            .create_transactions(vec![
385                CardanoTransactionRecord::new(
386                    "tx_hash-123",
387                    BlockNumber(10),
388                    SlotNumber(50),
389                    "block_hash-123",
390                ),
391                CardanoTransactionRecord::new(
392                    "tx_hash-456",
393                    BlockNumber(11),
394                    SlotNumber(51),
395                    "block_hash-456",
396                ),
397                CardanoTransactionRecord::new(
398                    "tx_hash-789",
399                    BlockNumber(12),
400                    SlotNumber(52),
401                    "block_hash-789",
402                ),
403                CardanoTransactionRecord::new(
404                    "tx_hash-000",
405                    BlockNumber(101),
406                    SlotNumber(100),
407                    "block_hash-000",
408                ),
409            ])
410            .await
411            .unwrap();
412
413        {
414            let transactions = repository
415                .get_transaction_by_hashes(vec!["tx_hash-123", "tx_hash-789"], BlockNumber(100))
416                .await
417                .unwrap();
418
419            assert_eq!(
420                vec![
421                    CardanoTransactionRecord::new(
422                        "tx_hash-123",
423                        BlockNumber(10),
424                        SlotNumber(50),
425                        "block_hash-123"
426                    ),
427                    CardanoTransactionRecord::new(
428                        "tx_hash-789",
429                        BlockNumber(12),
430                        SlotNumber(52),
431                        "block_hash-789"
432                    ),
433                ],
434                transactions
435            );
436        }
437        {
438            let transactions = repository
439                .get_transaction_by_hashes(
440                    vec!["tx_hash-123", "tx_hash-789", "tx_hash-000"],
441                    BlockNumber(100),
442                )
443                .await
444                .unwrap();
445
446            assert_eq!(
447                vec![
448                    CardanoTransactionRecord::new(
449                        "tx_hash-123",
450                        BlockNumber(10),
451                        SlotNumber(50),
452                        "block_hash-123"
453                    ),
454                    CardanoTransactionRecord::new(
455                        "tx_hash-789",
456                        BlockNumber(12),
457                        SlotNumber(52),
458                        "block_hash-789"
459                    ),
460                ],
461                transactions
462            );
463        }
464        {
465            let transactions = repository
466                .get_transaction_by_hashes(
467                    vec!["tx_hash-123", "tx_hash-789", "tx_hash-000"],
468                    BlockNumber(101),
469                )
470                .await
471                .unwrap();
472
473            assert_eq!(
474                vec![
475                    CardanoTransactionRecord::new(
476                        "tx_hash-123",
477                        BlockNumber(10),
478                        SlotNumber(50),
479                        "block_hash-123"
480                    ),
481                    CardanoTransactionRecord::new(
482                        "tx_hash-789",
483                        BlockNumber(12),
484                        SlotNumber(52),
485                        "block_hash-789"
486                    ),
487                    CardanoTransactionRecord::new(
488                        "tx_hash-000",
489                        BlockNumber(101),
490                        SlotNumber(100),
491                        "block_hash-000"
492                    ),
493                ],
494                transactions
495            );
496        }
497        {
498            let transactions = repository
499                .get_transaction_by_hashes(vec!["not-exist".to_string()], BlockNumber(100))
500                .await
501                .unwrap();
502
503            assert_eq!(Vec::<CardanoTransactionRecord>::new(), transactions);
504        }
505    }
506
507    #[tokio::test]
508    async fn repository_create_ignore_further_transactions_when_exists() {
509        let connection = cardano_tx_db_connection().unwrap();
510        let repository = CardanoTransactionRepository::new(Arc::new(
511            SqliteConnectionPool::build_from_connection(connection),
512        ));
513
514        repository
515            .create_transaction(
516                "tx-hash-123",
517                BlockNumber(10),
518                SlotNumber(50),
519                "block_hash-123",
520            )
521            .await
522            .unwrap();
523        repository
524            .create_transaction(
525                "tx-hash-123",
526                BlockNumber(11),
527                SlotNumber(51),
528                "block_hash-123-bis",
529            )
530            .await
531            .unwrap();
532        let transaction_result = repository.get_transaction("tx-hash-123").await.unwrap();
533
534        assert_eq!(
535            Some(CardanoTransactionRecord {
536                transaction_hash: "tx-hash-123".to_string(),
537                block_number: BlockNumber(10),
538                slot_number: SlotNumber(50),
539                block_hash: "block_hash-123".to_string(),
540            }),
541            transaction_result
542        );
543    }
544
545    #[tokio::test]
546    async fn repository_store_transactions_and_get_stored_transactions() {
547        let connection = cardano_tx_db_connection().unwrap();
548        let repository = CardanoTransactionRepository::new(Arc::new(
549            SqliteConnectionPool::build_from_connection(connection),
550        ));
551
552        let cardano_transactions = vec![
553            CardanoTransaction::new(
554                "tx-hash-123",
555                BlockNumber(10),
556                SlotNumber(50),
557                "block-hash-123",
558            ),
559            CardanoTransaction::new(
560                "tx-hash-456",
561                BlockNumber(11),
562                SlotNumber(51),
563                "block-hash-456",
564            ),
565        ];
566        repository.create_transactions(cardano_transactions).await.unwrap();
567
568        let transaction_result = repository.get_transaction("tx-hash-123").await.unwrap();
569
570        assert_eq!(
571            Some(CardanoTransactionRecord {
572                transaction_hash: "tx-hash-123".to_string(),
573                block_number: BlockNumber(10),
574                slot_number: SlotNumber(50),
575                block_hash: "block-hash-123".to_string(),
576            }),
577            transaction_result
578        );
579
580        let transaction_result = repository.get_transaction("tx-hash-456").await.unwrap();
581
582        assert_eq!(
583            Some(CardanoTransactionRecord {
584                transaction_hash: "tx-hash-456".to_string(),
585                block_number: BlockNumber(11),
586                slot_number: SlotNumber(51),
587                block_hash: "block-hash-456".to_string(),
588            }),
589            transaction_result
590        );
591    }
592
593    #[tokio::test]
594    async fn repository_get_all_stored_transactions() {
595        let connection = cardano_tx_db_connection().unwrap();
596        let repository = CardanoTransactionRepository::new(Arc::new(
597            SqliteConnectionPool::build_from_connection(connection),
598        ));
599
600        let cardano_transactions = vec![
601            CardanoTransaction::new(
602                "tx-hash-123",
603                BlockNumber(10),
604                SlotNumber(50),
605                "block-hash-123",
606            ),
607            CardanoTransaction::new(
608                "tx-hash-456",
609                BlockNumber(11),
610                SlotNumber(51),
611                "block-hash-456",
612            ),
613        ];
614        repository
615            .create_transactions(cardano_transactions.clone())
616            .await
617            .unwrap();
618
619        let transactions_result = repository.get_all_transactions().await.unwrap();
620        let transactions_expected: Vec<CardanoTransactionRecord> =
621            cardano_transactions.iter().map(|tx| tx.clone().into()).collect();
622
623        assert_eq!(transactions_expected, transactions_result);
624    }
625
626    #[tokio::test]
627    async fn repository_store_transactions_doesnt_erase_existing_data() {
628        let connection = cardano_tx_db_connection().unwrap();
629        let repository = CardanoTransactionRepository::new(Arc::new(
630            SqliteConnectionPool::build_from_connection(connection),
631        ));
632
633        repository
634            .create_transaction("tx-hash-000", BlockNumber(1), SlotNumber(5), "block-hash")
635            .await
636            .unwrap();
637
638        let cardano_transactions = vec![CardanoTransaction::new(
639            "tx-hash-123",
640            BlockNumber(10),
641            SlotNumber(50),
642            "block-hash-123",
643        )];
644        repository.create_transactions(cardano_transactions).await.unwrap();
645
646        let transaction_result = repository.get_transaction("tx-hash-000").await.unwrap();
647
648        assert_eq!(
649            Some(CardanoTransactionRecord {
650                transaction_hash: "tx-hash-000".to_string(),
651                block_number: BlockNumber(1),
652                slot_number: SlotNumber(5),
653                block_hash: "block-hash".to_string(),
654            }),
655            transaction_result
656        );
657    }
658
659    #[tokio::test]
660    async fn repository_get_transaction_highest_chain_point_without_transactions_in_db() {
661        let connection = cardano_tx_db_connection().unwrap();
662        let repository = CardanoTransactionRepository::new(Arc::new(
663            SqliteConnectionPool::build_from_connection(connection),
664        ));
665
666        let highest_beacon = repository.get_transaction_highest_chain_point().await.unwrap();
667        assert_eq!(None, highest_beacon);
668    }
669
670    #[tokio::test]
671    async fn repository_get_transaction_highest_chain_point_with_transactions_in_db() {
672        let connection = cardano_tx_db_connection().unwrap();
673        let repository = CardanoTransactionRepository::new(Arc::new(
674            SqliteConnectionPool::build_from_connection(connection),
675        ));
676
677        let cardano_transactions = vec![
678            CardanoTransaction::new(
679                "tx-hash-123",
680                BlockNumber(10),
681                SlotNumber(50),
682                "block-hash-10",
683            ),
684            CardanoTransaction::new(
685                "tx-hash-456",
686                BlockNumber(25),
687                SlotNumber(51),
688                "block-hash-25",
689            ),
690        ];
691        repository.create_transactions(cardano_transactions).await.unwrap();
692
693        let highest_beacon = repository.get_transaction_highest_chain_point().await.unwrap();
694        assert_eq!(
695            Some(ChainPoint {
696                slot_number: SlotNumber(51),
697                block_number: BlockNumber(25),
698                block_hash: "block-hash-25".to_string()
699            }),
700            highest_beacon
701        );
702    }
703
704    #[tokio::test]
705    async fn repository_get_transaction_highest_chain_point_with_transactions_with_same_block_number_in_db()
706     {
707        let connection = cardano_tx_db_connection().unwrap();
708        let repository = CardanoTransactionRepository::new(Arc::new(
709            SqliteConnectionPool::build_from_connection(connection),
710        ));
711
712        let cardano_transactions = vec![
713            CardanoTransaction::new(
714                "tx-hash-123",
715                BlockNumber(10),
716                SlotNumber(50),
717                "block-hash-10",
718            ),
719            CardanoTransaction::new(
720                "tx-hash-456",
721                BlockNumber(25),
722                SlotNumber(51),
723                "block-hash-25",
724            ),
725            CardanoTransaction::new(
726                "tx-hash-789",
727                BlockNumber(25),
728                SlotNumber(51),
729                "block-hash-25",
730            ),
731        ];
732        repository.create_transactions(cardano_transactions).await.unwrap();
733
734        let highest_beacon = repository.get_transaction_highest_chain_point().await.unwrap();
735        assert_eq!(
736            Some(ChainPoint {
737                slot_number: SlotNumber(51),
738                block_number: BlockNumber(25),
739                block_hash: "block-hash-25".to_string()
740            }),
741            highest_beacon
742        );
743    }
744
745    #[tokio::test]
746    async fn repository_get_transactions_in_range_blocks() {
747        let connection = cardano_tx_db_connection().unwrap();
748        let repository = CardanoTransactionRepository::new(Arc::new(
749            SqliteConnectionPool::build_from_connection(connection),
750        ));
751
752        let transactions = vec![
753            CardanoTransactionRecord::new(
754                "tx-hash-1",
755                BlockNumber(10),
756                SlotNumber(50),
757                "block-hash-1",
758            ),
759            CardanoTransactionRecord::new(
760                "tx-hash-2",
761                BlockNumber(11),
762                SlotNumber(51),
763                "block-hash-2",
764            ),
765            CardanoTransactionRecord::new(
766                "tx-hash-3",
767                BlockNumber(12),
768                SlotNumber(52),
769                "block-hash-3",
770            ),
771        ];
772        repository.create_transactions(transactions.clone()).await.unwrap();
773
774        {
775            let transaction_result = repository
776                .get_transactions_in_range_blocks(BlockNumber(0)..BlockNumber(10))
777                .await
778                .unwrap();
779            assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
780        }
781        {
782            let transaction_result = repository
783                .get_transactions_in_range_blocks(BlockNumber(13)..BlockNumber(21))
784                .await
785                .unwrap();
786            assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
787        }
788        {
789            let transaction_result = repository
790                .get_transactions_in_range_blocks(BlockNumber(9)..BlockNumber(12))
791                .await
792                .unwrap();
793            assert_eq!(transactions[0..=1].to_vec(), transaction_result);
794        }
795        {
796            let transaction_result = repository
797                .get_transactions_in_range_blocks(BlockNumber(10)..BlockNumber(13))
798                .await
799                .unwrap();
800            assert_eq!(transactions.clone(), transaction_result);
801        }
802        {
803            let transaction_result = repository
804                .get_transactions_in_range_blocks(BlockNumber(11)..BlockNumber(14))
805                .await
806                .unwrap();
807            assert_eq!(transactions[1..=2].to_vec(), transaction_result);
808        }
809    }
810
811    #[tokio::test]
812    async fn repository_get_transactions_by_block_ranges() {
813        let connection = cardano_tx_db_connection().unwrap();
814        let repository = CardanoTransactionRepository::new(Arc::new(
815            SqliteConnectionPool::build_from_connection(connection),
816        ));
817
818        let transactions = vec![
819            CardanoTransactionRecord::new(
820                "tx-hash-1",
821                BlockNumber(10),
822                SlotNumber(50),
823                "block-hash-1",
824            ),
825            CardanoTransactionRecord::new(
826                "tx-hash-2",
827                BlockNumber(11),
828                SlotNumber(51),
829                "block-hash-2",
830            ),
831            CardanoTransactionRecord::new(
832                "tx-hash-3",
833                BlockNumber(20),
834                SlotNumber(52),
835                "block-hash-3",
836            ),
837            CardanoTransactionRecord::new(
838                "tx-hash-4",
839                BlockNumber(31),
840                SlotNumber(53),
841                "block-hash-4",
842            ),
843            CardanoTransactionRecord::new(
844                "tx-hash-5",
845                BlockNumber(35),
846                SlotNumber(54),
847                "block-hash-5",
848            ),
849            CardanoTransactionRecord::new(
850                "tx-hash-6",
851                BlockNumber(46),
852                SlotNumber(55),
853                "block-hash-6",
854            ),
855        ];
856        repository.create_transactions(transactions.clone()).await.unwrap();
857
858        {
859            let transaction_result = repository
860                .get_transaction_by_block_ranges(vec![BlockRange::from_block_number(BlockNumber(
861                    100,
862                ))])
863                .await
864                .unwrap();
865            assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
866        }
867        {
868            let transaction_result = repository
869                .get_transaction_by_block_ranges(vec![BlockRange::from_block_number(BlockNumber(
870                    0,
871                ))])
872                .await
873                .unwrap();
874            assert_eq!(transactions[0..=1].to_vec(), transaction_result);
875        }
876        {
877            let transaction_result = repository
878                .get_transaction_by_block_ranges(vec![
879                    BlockRange::from_block_number(BlockNumber(0)),
880                    BlockRange::from_block_number(BlockNumber(15)),
881                ])
882                .await
883                .unwrap();
884            assert_eq!(transactions[0..=2].to_vec(), transaction_result);
885        }
886        {
887            let transaction_result = repository
888                .get_transaction_by_block_ranges(vec![
889                    BlockRange::from_block_number(BlockNumber(0)),
890                    BlockRange::from_block_number(BlockNumber(30)),
891                ])
892                .await
893                .unwrap();
894            assert_eq!(
895                [transactions[0..=1].to_vec(), transactions[3..=4].to_vec()].concat(),
896                transaction_result
897            );
898        }
899    }
900
901    #[tokio::test]
902    async fn repository_get_closest_block_number_by_slot_number() {
903        let connection = cardano_tx_db_connection().unwrap();
904        let repository = CardanoTransactionRepository::new(Arc::new(
905            SqliteConnectionPool::build_from_connection(connection),
906        ));
907
908        let transactions = vec![
909            CardanoTransactionRecord::new("tx-1", BlockNumber(100), SlotNumber(500), "block-1"),
910            CardanoTransactionRecord::new("tx-2", BlockNumber(100), SlotNumber(500), "block-1"),
911            CardanoTransactionRecord::new("tx-3", BlockNumber(101), SlotNumber(501), "block-1"),
912        ];
913        repository.create_transactions(transactions.clone()).await.unwrap();
914
915        let transaction_block_number_retrieved = repository
916            .get_closest_block_number_above_slot_number(SlotNumber(500))
917            .await
918            .unwrap();
919
920        assert_eq!(transaction_block_number_retrieved, Some(BlockNumber(100)));
921    }
922
923    #[tokio::test]
924    async fn repository_store_block_range() {
925        let connection = cardano_tx_db_connection().unwrap();
926        let repository = CardanoTransactionRepository::new(Arc::new(
927            SqliteConnectionPool::build_from_connection(connection),
928        ));
929
930        repository
931            .create_block_range_roots(vec![
932                (
933                    BlockRange::from_block_number(BlockNumber(0)),
934                    MKTreeNode::from_hex("AAAA").unwrap(),
935                ),
936                (
937                    BlockRange::from_block_number(BlockRange::LENGTH),
938                    MKTreeNode::from_hex("BBBB").unwrap(),
939                ),
940            ])
941            .await
942            .unwrap();
943
944        let records: Vec<BlockRangeRootRecord> = repository
945            .connection_pool
946            .connection()
947            .unwrap()
948            .fetch_collect(GetBlockRangeRootQuery::all())
949            .unwrap();
950        assert_eq!(
951            vec![
952                BlockRangeRootRecord {
953                    range: BlockRange::from_block_number(BlockNumber(0)),
954                    merkle_root: MKTreeNode::from_hex("AAAA").unwrap(),
955                },
956                BlockRangeRootRecord {
957                    range: BlockRange::from_block_number(BlockRange::LENGTH),
958                    merkle_root: MKTreeNode::from_hex("BBBB").unwrap(),
959                }
960            ],
961            records
962        );
963    }
964
965    #[tokio::test]
966    async fn repository_store_block_range_with_existing_hash_doesnt_erase_existing_data() {
967        let connection = cardano_tx_db_connection().unwrap();
968        let repository = CardanoTransactionRepository::new(Arc::new(
969            SqliteConnectionPool::build_from_connection(connection),
970        ));
971        let range = BlockRange::from_block_number(BlockNumber(0));
972
973        repository
974            .create_block_range_roots(vec![(range.clone(), MKTreeNode::from_hex("AAAA").unwrap())])
975            .await
976            .unwrap();
977        repository
978            .create_block_range_roots(vec![(range.clone(), MKTreeNode::from_hex("BBBB").unwrap())])
979            .await
980            .unwrap();
981
982        let record: Vec<BlockRangeRootRecord> = repository
983            .connection_pool
984            .connection()
985            .unwrap()
986            .fetch_collect(GetBlockRangeRootQuery::all())
987            .unwrap();
988        assert_eq!(
989            vec![BlockRangeRootRecord {
990                range,
991                merkle_root: MKTreeNode::from_hex("AAAA").unwrap()
992            }],
993            record
994        );
995    }
996
997    #[tokio::test]
998    async fn repository_retrieve_block_range_roots_up_to() {
999        let connection = cardano_tx_db_connection().unwrap();
1000        let repository = CardanoTransactionRepository::new(Arc::new(
1001            SqliteConnectionPool::build_from_connection(connection),
1002        ));
1003        let block_range_roots = vec![
1004            (
1005                BlockRange::from_block_number(BlockNumber(15)),
1006                MKTreeNode::from_hex("AAAA").unwrap(),
1007            ),
1008            (
1009                BlockRange::from_block_number(BlockNumber(30)),
1010                MKTreeNode::from_hex("BBBB").unwrap(),
1011            ),
1012            (
1013                BlockRange::from_block_number(BlockNumber(45)),
1014                MKTreeNode::from_hex("CCCC").unwrap(),
1015            ),
1016        ];
1017        repository
1018            .create_block_range_roots(block_range_roots.clone())
1019            .await
1020            .unwrap();
1021
1022        let retrieved_block_ranges = repository
1023            .retrieve_block_range_roots_up_to(BlockNumber(45))
1024            .await
1025            .unwrap();
1026        assert_eq!(
1027            block_range_roots[0..2].to_vec(),
1028            retrieved_block_ranges.collect::<Vec<_>>()
1029        );
1030    }
1031
1032    #[tokio::test]
1033    async fn repository_retrieve_highest_block_range_roots() {
1034        let connection = cardano_tx_db_connection().unwrap();
1035        let repository = CardanoTransactionRepository::new(Arc::new(
1036            SqliteConnectionPool::build_from_connection(connection),
1037        ));
1038        let block_range_roots = vec![
1039            BlockRangeRootRecord {
1040                range: BlockRange::from_block_number(BlockNumber(15)),
1041                merkle_root: MKTreeNode::from_hex("AAAA").unwrap(),
1042            },
1043            BlockRangeRootRecord {
1044                range: BlockRange::from_block_number(BlockNumber(30)),
1045                merkle_root: MKTreeNode::from_hex("BBBB").unwrap(),
1046            },
1047            BlockRangeRootRecord {
1048                range: BlockRange::from_block_number(BlockNumber(45)),
1049                merkle_root: MKTreeNode::from_hex("CCCC").unwrap(),
1050            },
1051        ];
1052        repository
1053            .create_block_range_roots(block_range_roots.clone())
1054            .await
1055            .unwrap();
1056
1057        let retrieved_block_range = repository.retrieve_highest_block_range_root().await.unwrap();
1058        assert_eq!(block_range_roots.last().cloned(), retrieved_block_range);
1059    }
1060
1061    #[tokio::test]
1062    async fn repository_prune_transactions() {
1063        let connection = cardano_tx_db_connection().unwrap();
1064        let repository = CardanoTransactionRepository::new(Arc::new(
1065            SqliteConnectionPool::build_from_connection(connection),
1066        ));
1067
1068        let cardano_transactions: Vec<CardanoTransactionRecord> = CardanoTransactionsBuilder::new()
1069            .blocks_per_block_range(15)
1070            .build_transactions(53)
1071            .into_iter()
1072            .map(CardanoTransactionRecord::from)
1073            .collect();
1074
1075        repository
1076            .create_transactions(cardano_transactions.clone())
1077            .await
1078            .unwrap();
1079        // Use by 'prune_transaction' to get the block_range of the highest block number
1080        repository
1081            .create_block_range_roots(vec![(
1082                BlockRange::from_block_number(BlockNumber(45)),
1083                MKTreeNode::from_hex("BBBB").unwrap(),
1084            )])
1085            .await
1086            .unwrap();
1087
1088        let transaction_result = repository.get_all().await.unwrap();
1089        assert_eq!(cardano_transactions.len(), transaction_result.len());
1090
1091        // Pruning with a number of block to keep greater than the highest block range start should
1092        // do nothing.
1093        repository.prune_transaction(BlockNumber(10_000_000)).await.unwrap();
1094        let transaction_result = repository.get_all_transactions().await.unwrap();
1095        assert_eq!(cardano_transactions, transaction_result);
1096
1097        // Since the highest block range start is 45, pruning with 20 should remove transactions
1098        // with a block number strictly below 25.
1099        repository.prune_transaction(BlockNumber(20)).await.unwrap();
1100        let transaction_result = repository
1101            .get_transactions_in_range_blocks(BlockNumber(0)..BlockNumber(25))
1102            .await
1103            .unwrap();
1104        assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
1105
1106        let transaction_result = repository
1107            .get_transactions_in_range_blocks(BlockNumber(25)..BlockNumber(1000))
1108            .await
1109            .unwrap();
1110        assert_eq!(28, transaction_result.len());
1111    }
1112
1113    #[tokio::test]
1114    async fn get_highest_start_block_number_for_block_range_roots() {
1115        let connection = cardano_tx_db_connection().unwrap();
1116        let repository = CardanoTransactionRepository::new(Arc::new(
1117            SqliteConnectionPool::build_from_connection(connection),
1118        ));
1119
1120        let highest = repository
1121            .get_highest_start_block_number_for_block_range_roots()
1122            .await
1123            .unwrap();
1124        assert_eq!(None, highest);
1125
1126        let block_range_roots = vec![
1127            (
1128                BlockRange::from_block_number(BlockNumber(15)),
1129                MKTreeNode::from_hex("AAAA").unwrap(),
1130            ),
1131            (
1132                BlockRange::from_block_number(BlockNumber(30)),
1133                MKTreeNode::from_hex("BBBB").unwrap(),
1134            ),
1135        ];
1136        repository
1137            .create_block_range_roots(block_range_roots.clone())
1138            .await
1139            .unwrap();
1140
1141        let highest = repository
1142            .get_highest_start_block_number_for_block_range_roots()
1143            .await
1144            .unwrap();
1145        assert_eq!(Some(BlockNumber(30)), highest);
1146    }
1147
1148    #[tokio::test]
1149    async fn remove_transactions_and_block_range_greater_than_given_block_number() {
1150        let connection = cardano_tx_db_connection().unwrap();
1151        let repository = CardanoTransactionRepository::new(Arc::new(
1152            SqliteConnectionPool::build_from_connection(connection),
1153        ));
1154
1155        let cardano_transactions = vec![
1156            CardanoTransaction::new(
1157                "tx-hash-123",
1158                BlockRange::LENGTH,
1159                SlotNumber(50),
1160                "block-hash-123",
1161            ),
1162            CardanoTransaction::new(
1163                "tx-hash-123",
1164                BlockRange::LENGTH * 3 - 1,
1165                SlotNumber(50),
1166                "block-hash-123",
1167            ),
1168            CardanoTransaction::new(
1169                "tx-hash-456",
1170                BlockRange::LENGTH * 3,
1171                SlotNumber(51),
1172                "block-hash-456",
1173            ),
1174        ];
1175        repository.create_transactions(cardano_transactions).await.unwrap();
1176        repository
1177            .create_block_range_roots(vec![
1178                (
1179                    BlockRange::from_block_number(BlockRange::LENGTH),
1180                    MKTreeNode::from_hex("AAAA").unwrap(),
1181                ),
1182                (
1183                    BlockRange::from_block_number(BlockRange::LENGTH * 2),
1184                    MKTreeNode::from_hex("AAAA").unwrap(),
1185                ),
1186                (
1187                    BlockRange::from_block_number(BlockRange::LENGTH * 3),
1188                    MKTreeNode::from_hex("AAAA").unwrap(),
1189                ),
1190            ])
1191            .await
1192            .unwrap();
1193
1194        repository
1195            .remove_rolled_back_transactions_and_block_range_by_block_number(BlockRange::LENGTH * 3)
1196            .await
1197            .unwrap();
1198        assert_eq!(2, repository.get_all_transactions().await.unwrap().len());
1199        assert_eq!(2, repository.get_all_block_range_root().unwrap().len());
1200    }
1201
1202    #[tokio::test]
1203    async fn remove_rolled_back_transactions_and_block_range_by_slot_number() {
1204        fn transaction_record(
1205            block_number: BlockNumber,
1206            slot_number: SlotNumber,
1207            tx_hash: &str,
1208        ) -> CardanoTransactionRecord {
1209            CardanoTransactionRecord::new(
1210                tx_hash,
1211                block_number,
1212                slot_number,
1213                format!("block-hash-{block_number}"),
1214            )
1215        }
1216
1217        let repository = CardanoTransactionRepository::new(Arc::new(
1218            SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
1219        ));
1220
1221        repository
1222            .create_transactions(vec![
1223                transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1224                transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1225                transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1226                transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1227                transaction_record(BlockNumber(101), SlotNumber(100), "tx-hash-5"),
1228                transaction_record(BlockNumber(202), SlotNumber(200), "tx-hash-56"),
1229            ])
1230            .await
1231            .unwrap();
1232
1233        {
1234            repository
1235                .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(110))
1236                .await
1237                .expect("Failed to remove rolled back transactions");
1238
1239            let transactions = repository
1240                .get_all()
1241                .await
1242                .unwrap()
1243                .into_iter()
1244                .map(|v| v.into())
1245                .collect::<Vec<_>>();
1246            assert_eq!(
1247                vec![
1248                    transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1249                    transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1250                    transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1251                    transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1252                    transaction_record(BlockNumber(101), SlotNumber(100), "tx-hash-5"),
1253                ],
1254                transactions
1255            );
1256        }
1257
1258        {
1259            repository
1260                .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(53))
1261                .await
1262                .expect("Failed to remove rolled back transactions");
1263
1264            let transactions = repository
1265                .get_all()
1266                .await
1267                .unwrap()
1268                .into_iter()
1269                .map(|v| v.into())
1270                .collect::<Vec<_>>();
1271            assert_eq!(
1272                vec![
1273                    transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1274                    transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1275                    transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1276                    transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1277                ],
1278                transactions
1279            );
1280        }
1281
1282        {
1283            repository
1284                .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(51))
1285                .await
1286                .expect("Failed to remove rolled back transactions");
1287
1288            let transactions = repository
1289                .get_all()
1290                .await
1291                .unwrap()
1292                .into_iter()
1293                .map(|v| v.into())
1294                .collect::<Vec<_>>();
1295            assert_eq!(
1296                vec![
1297                    transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1298                    transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1299                ],
1300                transactions
1301            );
1302        }
1303    }
1304}