mithril_signer/services/cardano_transactions/importer/
service.rs

1use std::mem;
2use std::ops::Range;
3use std::sync::Arc;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use slog::{Logger, debug};
8use tokio::{runtime::Handle, sync::Mutex, task};
9
10use mithril_cardano_node_chain::chain_scanner::{BlockScanner, ChainScannedBlocks};
11use mithril_cardano_node_chain::entities::RawCardanoPoint;
12use mithril_common::StdResult;
13use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory};
14use mithril_common::entities::{
15    BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber,
16};
17use mithril_common::logging::LoggerExtensions;
18use mithril_common::signable_builder::TransactionsImporter;
19
20/// Cardano transactions store
21#[cfg_attr(test, mockall::automock)]
22#[async_trait]
23pub trait TransactionStore: Send + Sync {
24    /// Get the highest known transaction beacon
25    async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>>;
26
27    /// Get the highest stored block range root bounds
28    async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>>;
29
30    /// Store list of transactions
31    async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()>;
32
33    /// Get transactions in an interval of blocks
34    async fn get_transactions_in_range(
35        &self,
36        range: Range<BlockNumber>,
37    ) -> StdResult<Vec<CardanoTransaction>>;
38
39    /// Store list of block ranges with their corresponding merkle root
40    async fn store_block_range_roots(
41        &self,
42        block_ranges: Vec<(BlockRange, MKTreeNode)>,
43    ) -> StdResult<()>;
44
45    /// Remove transactions and block range roots that are in a rolled-back fork
46    ///
47    /// * Remove transactions with slot number strictly greater than the given slot number
48    /// * Remove block range roots that have lower bound range strictly above the given slot number
49    async fn remove_rolled_back_transactions_and_block_range(
50        &self,
51        slot_number: SlotNumber,
52    ) -> StdResult<()>;
53}
54
55/// Import and store [CardanoTransaction].
56#[derive(Clone)]
57pub struct CardanoTransactionsImporter {
58    block_scanner: Arc<dyn BlockScanner>,
59    transaction_store: Arc<dyn TransactionStore>,
60    last_polled_point: Arc<Mutex<Option<RawCardanoPoint>>>,
61    logger: Logger,
62}
63
64impl CardanoTransactionsImporter {
65    /// Constructor
66    pub fn new(
67        block_scanner: Arc<dyn BlockScanner>,
68        transaction_store: Arc<dyn TransactionStore>,
69        logger: Logger,
70    ) -> Self {
71        Self {
72            block_scanner,
73            transaction_store,
74            last_polled_point: Arc::new(Mutex::new(None)),
75            logger: logger.new_with_component_name::<Self>(),
76        }
77    }
78
79    async fn start_point(
80        &self,
81        highest_stored_chain_point: &Option<ChainPoint>,
82    ) -> StdResult<Option<RawCardanoPoint>> {
83        let last_polled_point = self.last_polled_point.lock().await.clone();
84        if last_polled_point.is_none() {
85            debug!(
86                self.logger,
87                "No last polled point available, falling back to the highest stored chain point"
88            );
89        }
90
91        Ok(last_polled_point.or(highest_stored_chain_point.as_ref().map(RawCardanoPoint::from)))
92    }
93
94    async fn import_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
95        let highest_stored_beacon = self.transaction_store.get_highest_beacon().await?;
96        let from = self.start_point(&highest_stored_beacon).await?;
97
98        if highest_stored_beacon
99            .as_ref()
100            .is_some_and(|f| f.block_number >= up_to_beacon)
101        {
102            debug!(
103                self.logger,
104                "No need to retrieve Cardano transactions, the database is up to date for block_number '{up_to_beacon}'",
105            );
106
107            Ok(())
108        } else {
109            debug!(
110                self.logger, "Retrieving Cardano transactions until block numbered '{up_to_beacon}'";
111                "starting_slot_number" => ?from.as_ref().map(|c| c.slot_number),
112                "highest_stored_block_number" => ?highest_stored_beacon.as_ref().map(|c| c.block_number),
113            );
114
115            self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon)
116                .await
117        }
118    }
119
120    async fn parse_and_store_transactions_not_imported_yet(
121        &self,
122        from: Option<RawCardanoPoint>,
123        until: BlockNumber,
124    ) -> StdResult<()> {
125        let mut streamer = self.block_scanner.scan(from, until).await?;
126
127        while let Some(blocks) = streamer.poll_next().await? {
128            match blocks {
129                ChainScannedBlocks::RollForwards(forward_blocks) => {
130                    let parsed_transactions: Vec<CardanoTransaction> = forward_blocks
131                        .into_iter()
132                        .flat_map(|b| b.into_transactions())
133                        .collect();
134
135                    self.transaction_store.store_transactions(parsed_transactions).await?;
136                }
137                ChainScannedBlocks::RollBackward(slot_number) => {
138                    self.transaction_store
139                        .remove_rolled_back_transactions_and_block_range(slot_number)
140                        .await?;
141                }
142            }
143        }
144
145        if let Some(point) = streamer.last_polled_point() {
146            *self.last_polled_point.lock().await = Some(point);
147        }
148
149        Ok(())
150    }
151
152    async fn import_block_ranges(&self, until: BlockNumber) -> StdResult<()> {
153        let block_ranges = match self.transaction_store.get_highest_block_range().await?.map(
154            |highest_stored_block_range| {
155                BlockRange::all_block_ranges_in(
156                    BlockRange::start(highest_stored_block_range.end)..=(until),
157                )
158            },
159        ) {
160            // No block range root stored yet, start from the beginning
161            None => BlockRange::all_block_ranges_in(BlockNumber(0)..=(until)),
162            // Not enough block to form at least one block range
163            Some(ranges) if ranges.is_empty() => return Ok(()),
164            Some(ranges) => ranges,
165        };
166
167        debug!(
168            self.logger, "Computing Block Range Roots";
169            "start_block" => *block_ranges.start(), "end_block" => *block_ranges.end(),
170        );
171
172        let mut block_ranges_with_merkle_root: Vec<(BlockRange, MKTreeNode)> = vec![];
173        for block_range in block_ranges {
174            let transactions = self
175                .transaction_store
176                .get_transactions_in_range(block_range.start..block_range.end)
177                .await?;
178
179            if transactions.is_empty() {
180                continue;
181            }
182
183            let merkle_root = MKTree::<MKTreeStoreInMemory>::new(&transactions)?.compute_root()?;
184            block_ranges_with_merkle_root.push((block_range, merkle_root));
185
186            if block_ranges_with_merkle_root.len() >= 100 {
187                let block_ranges_with_merkle_root_save =
188                    mem::take(&mut block_ranges_with_merkle_root);
189                self.transaction_store
190                    .store_block_range_roots(block_ranges_with_merkle_root_save)
191                    .await?;
192            }
193        }
194
195        self.transaction_store
196            .store_block_range_roots(block_ranges_with_merkle_root)
197            .await
198    }
199}
200
201#[async_trait]
202impl TransactionsImporter for CardanoTransactionsImporter {
203    async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
204        let importer = self.clone();
205        task::spawn_blocking(move || {
206            Handle::current().block_on(async move {
207                importer.import_transactions(up_to_beacon).await?;
208                importer.import_block_ranges(up_to_beacon).await?;
209                Ok(())
210            })
211        })
212        .await
213        .with_context(|| "TransactionsImporter - worker thread crashed")?
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use std::sync::atomic::AtomicUsize;
220    use std::time::Duration;
221
222    use mockall::mock;
223
224    use mithril_cardano_node_chain::chain_scanner::BlockStreamer;
225    use mithril_cardano_node_chain::entities::ScannedBlock;
226    use mithril_cardano_node_chain::test::double::{DumbBlockScanner, DumbBlockStreamer};
227    use mithril_common::crypto_helper::MKTree;
228    use mithril_common::entities::{BlockNumber, BlockRangesSequence};
229    use mithril_persistence::database::repository::CardanoTransactionRepository;
230    use mithril_persistence::sqlite::SqliteConnectionPool;
231
232    use crate::database::test_helper::cardano_tx_db_connection;
233    use crate::test_tools::TestLogger;
234
235    use super::*;
236
237    mock! {
238        pub BlockScannerImpl { }
239
240        #[async_trait]
241        impl BlockScanner for BlockScannerImpl {
242            async fn scan(
243              &self,
244              from: Option<RawCardanoPoint>,
245              until: BlockNumber,
246            ) -> StdResult<Box<dyn BlockStreamer>>;
247        }
248    }
249
250    impl CardanoTransactionsImporter {
251        pub(crate) fn new_for_test(
252            scanner: Arc<dyn BlockScanner>,
253            transaction_store: Arc<dyn TransactionStore>,
254        ) -> Self {
255            CardanoTransactionsImporter::new(scanner, transaction_store, TestLogger::stdout())
256        }
257    }
258
259    fn build_blocks(
260        start_block_number: BlockNumber,
261        number_of_consecutive_block: BlockNumber,
262    ) -> Vec<ScannedBlock> {
263        (*start_block_number..*(start_block_number + number_of_consecutive_block))
264            .map(|block_number| {
265                ScannedBlock::new(
266                    format!("block_hash-{block_number}"),
267                    BlockNumber(block_number),
268                    SlotNumber(block_number * 100),
269                    vec![format!("tx_hash-{}", block_number)],
270                )
271            })
272            .collect()
273    }
274
275    fn into_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoTransaction> {
276        blocks.iter().flat_map(|b| b.clone().into_transactions()).collect()
277    }
278
279    fn merkle_root_for_blocks(block_ranges: &[ScannedBlock]) -> MKTreeNode {
280        let tx: Vec<_> = block_ranges
281            .iter()
282            .flat_map(|br| br.clone().into_transactions())
283            .collect();
284        MKTree::<MKTreeStoreInMemory>::new(&tx)
285            .unwrap()
286            .compute_root()
287            .unwrap()
288    }
289
290    #[tokio::test]
291    async fn if_nothing_stored_parse_and_store_all_transactions() {
292        let connection = cardano_tx_db_connection().unwrap();
293        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
294            SqliteConnectionPool::build_from_connection(connection),
295        )));
296
297        let blocks = vec![
298            ScannedBlock::new(
299                "block_hash-1",
300                BlockNumber(10),
301                SlotNumber(15),
302                vec!["tx_hash-1", "tx_hash-2"],
303            ),
304            ScannedBlock::new(
305                "block_hash-2",
306                BlockNumber(20),
307                SlotNumber(25),
308                vec!["tx_hash-3", "tx_hash-4"],
309            ),
310        ];
311        let expected_transactions = into_transactions(&blocks);
312        let up_to_block_number = BlockNumber(1000);
313
314        let importer = {
315            let mut scanner_mock = MockBlockScannerImpl::new();
316            scanner_mock
317                .expect_scan()
318                .withf(move |from, until| from.is_none() && until == up_to_block_number)
319                .return_once(move |_, _| {
320                    Ok(Box::new(DumbBlockStreamer::new().forwards(vec![blocks])))
321                });
322            CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
323        };
324
325        importer
326            .import_transactions(up_to_block_number)
327            .await
328            .expect("Transactions Importer should succeed");
329
330        let stored_transactions = repository.get_all().await.unwrap();
331        assert_eq!(expected_transactions, stored_transactions);
332    }
333
334    #[tokio::test]
335    async fn if_nothing_stored_parse_and_store_all_block_ranges() {
336        let connection = cardano_tx_db_connection().unwrap();
337        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
338            SqliteConnectionPool::build_from_connection(connection),
339        )));
340
341        let up_to_block_number = BlockRange::LENGTH * 5;
342        let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
343        let transactions = into_transactions(&blocks);
344        repository.store_transactions(transactions).await.unwrap();
345
346        let importer = CardanoTransactionsImporter::new_for_test(
347            Arc::new(MockBlockScannerImpl::new()),
348            repository.clone(),
349        );
350
351        importer
352            .import_block_ranges(up_to_block_number)
353            .await
354            .expect("Transactions Importer should succeed");
355
356        let block_range_roots = repository.get_all_block_range_root().unwrap();
357        assert_eq!(
358            vec![
359                BlockRange::from_block_number(BlockNumber(0)),
360                BlockRange::from_block_number(BlockRange::LENGTH),
361                BlockRange::from_block_number(BlockRange::LENGTH * 2),
362                BlockRange::from_block_number(BlockRange::LENGTH * 3),
363                BlockRange::from_block_number(BlockRange::LENGTH * 4),
364            ],
365            block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
366        );
367    }
368
369    #[tokio::test]
370    async fn if_theres_gap_between_two_stored_block_ranges_it_can_still_compute_their_root() {
371        let connection = cardano_tx_db_connection().unwrap();
372        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
373            SqliteConnectionPool::build_from_connection(connection),
374        )));
375
376        let up_to_block_number = BlockRange::LENGTH * 4;
377        // Two block ranges with a gap
378        let blocks: Vec<ScannedBlock> = [
379            build_blocks(BlockNumber(0), BlockRange::LENGTH),
380            build_blocks(BlockRange::LENGTH * 3, BlockRange::LENGTH),
381        ]
382        .concat();
383        let transactions = into_transactions(&blocks);
384        repository.store_transactions(transactions).await.unwrap();
385
386        let importer = CardanoTransactionsImporter::new_for_test(
387            Arc::new(MockBlockScannerImpl::new()),
388            repository.clone(),
389        );
390
391        importer
392            .import_block_ranges(up_to_block_number)
393            .await
394            .expect("Transactions Importer should succeed");
395
396        let block_range_roots = repository.get_all_block_range_root().unwrap();
397        assert_eq!(
398            vec![
399                BlockRange::from_block_number(BlockNumber(0)),
400                BlockRange::from_block_number(BlockRange::LENGTH * 3),
401            ],
402            block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
403        );
404    }
405
406    #[tokio::test]
407    async fn if_all_block_ranges_computed_nothing_computed_and_stored() {
408        let connection = cardano_tx_db_connection().unwrap();
409        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
410            SqliteConnectionPool::build_from_connection(connection),
411        )));
412
413        let importer = CardanoTransactionsImporter::new_for_test(
414            Arc::new(MockBlockScannerImpl::new()),
415            repository.clone(),
416        );
417
418        importer
419            .import_block_ranges(BlockNumber(10_000))
420            .await
421            .expect("Transactions Importer should succeed");
422
423        let block_range_roots = repository.get_all_block_range_root().unwrap();
424        assert!(
425            block_range_roots.is_empty(),
426            "No block range root should be stored, found: {block_range_roots:?}"
427        );
428    }
429
430    #[tokio::test]
431    async fn if_all_transactions_stored_nothing_is_parsed_and_stored() {
432        let up_to_block_number = BlockNumber(12);
433        let connection = cardano_tx_db_connection().unwrap();
434        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
435            SqliteConnectionPool::build_from_connection(connection),
436        )));
437        let scanner = DumbBlockScanner::new().forwards(vec![vec![
438            ScannedBlock::new(
439                "block_hash-1",
440                BlockNumber(10),
441                SlotNumber(15),
442                vec!["tx_hash-1", "tx_hash-2"],
443            ),
444            ScannedBlock::new(
445                "block_hash-2",
446                BlockNumber(20),
447                SlotNumber(25),
448                vec!["tx_hash-3", "tx_hash-4"],
449            ),
450        ]]);
451
452        let last_tx = CardanoTransaction::new(
453            "tx-20",
454            BlockNumber(30),
455            SlotNumber(35),
456            hex::encode("block_hash-3"),
457        );
458        repository.store_transactions(vec![last_tx.clone()]).await.unwrap();
459
460        let importer =
461            CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
462
463        importer
464            .import_transactions(up_to_block_number)
465            .await
466            .expect("Transactions Importer should succeed");
467
468        let transactions = repository.get_all().await.unwrap();
469        assert_eq!(vec![last_tx], transactions);
470    }
471
472    #[tokio::test]
473    async fn if_half_transactions_are_already_stored_the_other_half_is_parsed_and_stored() {
474        let connection = cardano_tx_db_connection().unwrap();
475        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
476            SqliteConnectionPool::build_from_connection(connection),
477        )));
478
479        let highest_stored_chain_point = ChainPoint::new(
480            SlotNumber(134),
481            BlockNumber(10),
482            hex::encode("block_hash-1"),
483        );
484        let stored_block = ScannedBlock::new(
485            hex::decode(highest_stored_chain_point.block_hash.clone()).unwrap(),
486            highest_stored_chain_point.block_number,
487            highest_stored_chain_point.slot_number,
488            vec!["tx_hash-1", "tx_hash-2"],
489        );
490        let to_store_block = ScannedBlock::new(
491            "block_hash-2",
492            BlockNumber(20),
493            SlotNumber(229),
494            vec!["tx_hash-3", "tx_hash-4"],
495        );
496        let expected_transactions: Vec<CardanoTransaction> = [
497            stored_block.clone().into_transactions(),
498            to_store_block.clone().into_transactions(),
499        ]
500        .concat();
501        let up_to_block_number = BlockNumber(22);
502
503        repository
504            .store_transactions(stored_block.clone().into_transactions())
505            .await
506            .unwrap();
507
508        let importer = {
509            let scanned_blocks = vec![to_store_block.clone()];
510            let mut scanner_mock = MockBlockScannerImpl::new();
511            scanner_mock
512                .expect_scan()
513                .withf(move |from, until| {
514                    from == &Some(highest_stored_chain_point.clone().into())
515                        && *until == up_to_block_number
516                })
517                .return_once(move |_, _| {
518                    Ok(Box::new(
519                        DumbBlockStreamer::new().forwards(vec![scanned_blocks]),
520                    ))
521                })
522                .once();
523            CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
524        };
525
526        let stored_transactions = repository.get_all().await.unwrap();
527        assert_eq!(stored_block.into_transactions(), stored_transactions);
528
529        importer
530            .import_transactions(up_to_block_number)
531            .await
532            .expect("Transactions Importer should succeed");
533
534        let stored_transactions = repository.get_all().await.unwrap();
535        assert_eq!(expected_transactions, stored_transactions);
536    }
537
538    #[tokio::test]
539    async fn if_half_block_ranges_are_stored_the_other_half_is_computed_and_stored() {
540        let connection = cardano_tx_db_connection().unwrap();
541        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
542            SqliteConnectionPool::build_from_connection(connection),
543        )));
544
545        let up_to_block_number = BlockRange::LENGTH * 4;
546        let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
547        let transactions = into_transactions(&blocks);
548        repository.store_transactions(transactions).await.unwrap();
549        repository
550            .store_block_range_roots(
551                blocks[0..(*(BlockRange::LENGTH * 2) as usize)]
552                    .iter()
553                    .map(|b| {
554                        (
555                            BlockRange::from_block_number(b.block_number),
556                            MKTreeNode::from_hex("AAAA").unwrap(),
557                        )
558                    })
559                    .collect(),
560            )
561            .await
562            .unwrap();
563
564        let importer = CardanoTransactionsImporter::new_for_test(
565            Arc::new(MockBlockScannerImpl::new()),
566            repository.clone(),
567        );
568
569        let block_range_roots = repository.get_all_block_range_root().unwrap();
570        assert_eq!(
571            vec![
572                BlockRange::from_block_number(BlockNumber(0)),
573                BlockRange::from_block_number(BlockRange::LENGTH),
574            ],
575            block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
576        );
577
578        importer
579            .import_block_ranges(up_to_block_number)
580            .await
581            .expect("Transactions Importer should succeed");
582
583        let block_range_roots = repository.get_all_block_range_root().unwrap();
584        assert_eq!(
585            vec![
586                BlockRange::from_block_number(BlockNumber(0)),
587                BlockRange::from_block_number(BlockRange::LENGTH),
588                BlockRange::from_block_number(BlockRange::LENGTH * 2),
589                BlockRange::from_block_number(BlockRange::LENGTH * 3),
590            ],
591            block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
592        );
593    }
594
595    #[tokio::test]
596    async fn can_compute_block_ranges_up_to_the_strict_end_of_a_block_range() {
597        let connection = cardano_tx_db_connection().unwrap();
598        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
599            SqliteConnectionPool::build_from_connection(connection),
600        )));
601
602        // Transactions for all blocks in the (15..=29) interval
603        let blocks = build_blocks(BlockRange::LENGTH, BlockRange::LENGTH - 1);
604        let transactions = into_transactions(&blocks);
605        repository.store_transactions(transactions).await.unwrap();
606
607        let importer = CardanoTransactionsImporter::new_for_test(
608            Arc::new(MockBlockScannerImpl::new()),
609            repository.clone(),
610        );
611
612        importer
613            .import_block_ranges(BlockRange::LENGTH * 2 - 1)
614            .await
615            .expect("Transactions Importer should succeed");
616
617        let block_range_roots = repository.get_all_block_range_root().unwrap();
618        assert_eq!(
619            vec![BlockRange::from_block_number(BlockRange::LENGTH)],
620            block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
621        );
622    }
623
624    #[tokio::test]
625    async fn can_compute_block_ranges_even_if_last_blocks_in_range_dont_have_transactions() {
626        let connection = cardano_tx_db_connection().unwrap();
627        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
628            SqliteConnectionPool::build_from_connection(connection),
629        )));
630
631        // For the block range (15..=29) we only have transactions in the 10 first blocks (15..=24)
632        let blocks = build_blocks(BlockRange::LENGTH, BlockNumber(10));
633        let transactions = into_transactions(&blocks);
634        repository.store_transactions(transactions).await.unwrap();
635
636        let importer = CardanoTransactionsImporter::new_for_test(
637            Arc::new(MockBlockScannerImpl::new()),
638            repository.clone(),
639        );
640
641        importer
642            .import_block_ranges(BlockRange::LENGTH * 2)
643            .await
644            .expect("Transactions Importer should succeed");
645
646        let block_range_roots = repository.get_all_block_range_root().unwrap();
647        assert_eq!(
648            vec![BlockRange::from_block_number(BlockRange::LENGTH)],
649            block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
650        );
651    }
652
653    #[tokio::test]
654    async fn block_range_root_retrieves_only_strictly_required_transactions() {
655        fn transactions_for_block(range: Range<BlockNumber>) -> StdResult<Vec<CardanoTransaction>> {
656            Ok(build_blocks(range.start, range.end - range.start)
657                .into_iter()
658                .flat_map(|b| b.into_transactions())
659                .collect())
660        }
661        const HIGHEST_BLOCK_RANGE_START: BlockNumber = BlockRange::LENGTH;
662        let up_to_block_number = BlockRange::LENGTH * 5;
663
664        let importer = {
665            let mut store_mock = MockTransactionStore::new();
666            store_mock
667                .expect_get_highest_block_range()
668                .returning(|| {
669                    Ok(Some(BlockRange::from_block_number(
670                        HIGHEST_BLOCK_RANGE_START,
671                    )))
672                })
673                .once();
674            store_mock
675                .expect_get_transactions_in_range()
676                // Lower bound should be the end block number of the last known block range
677                // Upper bound should be the block number provided to `import_block_ranges`
678                .withf(move |range| {
679                    BlockRangesSequence::new(HIGHEST_BLOCK_RANGE_START..=up_to_block_number)
680                        .contains(range)
681                })
682                .returning(transactions_for_block);
683            store_mock.expect_store_block_range_roots().returning(|_| Ok(()));
684
685            CardanoTransactionsImporter::new_for_test(
686                Arc::new(MockBlockScannerImpl::new()),
687                Arc::new(store_mock),
688            )
689        };
690
691        importer
692            .import_block_ranges(up_to_block_number)
693            .await
694            .expect("Transactions Importer should succeed");
695    }
696
697    #[tokio::test]
698    async fn compute_block_range_merkle_root() {
699        let connection = cardano_tx_db_connection().unwrap();
700        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
701            SqliteConnectionPool::build_from_connection(connection),
702        )));
703
704        // 2 block ranges worth of blocks with one more block that should be ignored for merkle root computation
705        let up_to_block_number = BlockRange::LENGTH * 2;
706        let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
707        let transactions = into_transactions(&blocks);
708        let expected_block_range_roots = vec![
709            (
710                BlockRange::from_block_number(BlockNumber(0)),
711                merkle_root_for_blocks(&blocks[0..(*BlockRange::LENGTH as usize)]),
712            ),
713            (
714                BlockRange::from_block_number(BlockRange::LENGTH),
715                merkle_root_for_blocks(
716                    &blocks[(*BlockRange::LENGTH as usize)..((*BlockRange::LENGTH * 2) as usize)],
717                ),
718            ),
719        ];
720
721        repository.store_transactions(transactions).await.unwrap();
722
723        let importer = CardanoTransactionsImporter::new_for_test(
724            Arc::new(MockBlockScannerImpl::new()),
725            repository.clone(),
726        );
727
728        importer
729            .import_block_ranges(up_to_block_number)
730            .await
731            .expect("Transactions Importer should succeed");
732
733        let block_range_roots = repository.get_all_block_range_root().unwrap();
734        assert_eq!(
735            expected_block_range_roots,
736            block_range_roots.into_iter().map(|br| br.into()).collect::<Vec<_>>()
737        );
738    }
739
740    #[tokio::test]
741    async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_transactions_in_same_order()
742     {
743        let blocks = vec![
744            ScannedBlock::new(
745                "block_hash-1",
746                BlockNumber(10),
747                SlotNumber(15),
748                vec!["tx_hash-1", "tx_hash-2"],
749            ),
750            ScannedBlock::new(
751                "block_hash-2",
752                BlockNumber(20),
753                SlotNumber(25),
754                vec!["tx_hash-3", "tx_hash-4"],
755            ),
756        ];
757        let up_to_block_number = BlockNumber(1000);
758        let transactions = into_transactions(&blocks);
759
760        let (importer, repository) = {
761            let connection = cardano_tx_db_connection().unwrap();
762            let connection_pool = Arc::new(SqliteConnectionPool::build_from_connection(connection));
763            let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));
764            let importer = CardanoTransactionsImporter::new_for_test(
765                Arc::new(DumbBlockScanner::new().forwards(vec![blocks.clone()])),
766                repository.clone(),
767            );
768            (importer, repository)
769        };
770
771        importer
772            .import(up_to_block_number)
773            .await
774            .expect("Transactions Importer should succeed");
775        let cold_imported_transactions = repository.get_all().await.unwrap();
776
777        importer
778            .import(up_to_block_number)
779            .await
780            .expect("Transactions Importer should succeed");
781        let warm_imported_transactions = repository.get_all().await.unwrap();
782
783        assert_eq!(transactions, cold_imported_transactions);
784        assert_eq!(cold_imported_transactions, warm_imported_transactions);
785    }
786
787    mod transactions_import_start_point {
788        use super::*;
789
790        async fn importer_with_last_polled_point(
791            last_polled_point: Option<RawCardanoPoint>,
792        ) -> CardanoTransactionsImporter {
793            let connection = cardano_tx_db_connection().unwrap();
794            let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
795                SqliteConnectionPool::build_from_connection(connection),
796            )));
797
798            CardanoTransactionsImporter {
799                last_polled_point: Arc::new(Mutex::new(last_polled_point)),
800                ..CardanoTransactionsImporter::new_for_test(
801                    Arc::new(DumbBlockScanner::new()),
802                    repository,
803                )
804            }
805        }
806
807        #[tokio::test]
808        async fn cloning_keep_last_polled_point() {
809            let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
810                SlotNumber(15),
811                "block_hash-1",
812            )))
813            .await;
814
815            let cloned_importer = importer.clone();
816            let start_point = cloned_importer.start_point(&None).await.unwrap();
817            assert_eq!(
818                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
819                start_point
820            );
821        }
822
823        #[tokio::test]
824        async fn none_if_nothing_stored_nor_scanned() {
825            let importer = importer_with_last_polled_point(None).await;
826            let highest_stored_block_number = None;
827
828            let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
829            assert_eq!(None, start_point);
830        }
831
832        #[tokio::test]
833        async fn start_at_last_stored_point_if_nothing_scanned() {
834            let importer = importer_with_last_polled_point(None).await;
835            let highest_stored_block_number = Some(ChainPoint::new(
836                SlotNumber(25),
837                BlockNumber(20),
838                hex::encode("block_hash-2"),
839            ));
840
841            let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
842            assert_eq!(
843                Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
844                start_point
845            );
846        }
847
848        #[tokio::test]
849        async fn start_at_last_scanned_point_when_nothing_stored() {
850            let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
851                SlotNumber(15),
852                "block_hash-1",
853            )))
854            .await;
855            let highest_stored_block_number = None;
856
857            let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
858            assert_eq!(
859                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
860                start_point
861            );
862        }
863
864        #[tokio::test]
865        async fn start_at_last_scanned_point_even_if_something_stored() {
866            let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
867                SlotNumber(15),
868                "block_hash-1",
869            )))
870            .await;
871            let highest_stored_block_number = Some(ChainPoint::new(
872                SlotNumber(25),
873                BlockNumber(20),
874                hex::encode("block_hash-2"),
875            ));
876
877            let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
878            assert_eq!(
879                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
880                start_point
881            );
882        }
883
884        #[tokio::test]
885        async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() {
886            let connection = cardano_tx_db_connection().unwrap();
887            let importer = CardanoTransactionsImporter {
888                last_polled_point: Arc::new(Mutex::new(None)),
889                ..CardanoTransactionsImporter::new_for_test(
890                    Arc::new(
891                        DumbBlockScanner::new()
892                            .forwards(vec![vec![ScannedBlock::new(
893                                "block_hash-1",
894                                BlockNumber(10),
895                                SlotNumber(15),
896                                Vec::<&str>::new(),
897                            )]])
898                            .last_polled_point(Some(RawCardanoPoint::new(
899                                SlotNumber(25),
900                                "block_hash-2",
901                            ))),
902                    ),
903                    Arc::new(CardanoTransactionRepository::new(Arc::new(
904                        SqliteConnectionPool::build_from_connection(connection),
905                    ))),
906                )
907            };
908            let highest_stored_block_number = None;
909
910            let start_point_before_import =
911                importer.start_point(&highest_stored_block_number).await.unwrap();
912            assert_eq!(None, start_point_before_import);
913
914            importer.import_transactions(BlockNumber(1000)).await.unwrap();
915
916            let start_point_after_import =
917                importer.start_point(&highest_stored_block_number).await.unwrap();
918            assert_eq!(
919                Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
920                start_point_after_import
921            );
922        }
923
924        #[tokio::test]
925        async fn importing_transactions_dont_update_start_point_if_streamer_did_nothing() {
926            let connection = cardano_tx_db_connection().unwrap();
927            let importer = CardanoTransactionsImporter {
928                last_polled_point: Arc::new(Mutex::new(Some(RawCardanoPoint::new(
929                    SlotNumber(15),
930                    "block_hash-1",
931                )))),
932                ..CardanoTransactionsImporter::new_for_test(
933                    Arc::new(DumbBlockScanner::new()),
934                    Arc::new(CardanoTransactionRepository::new(Arc::new(
935                        SqliteConnectionPool::build_from_connection(connection),
936                    ))),
937                )
938            };
939            let highest_stored_block_number = None;
940
941            importer.import_transactions(BlockNumber(1000)).await.unwrap();
942
943            let start_point_after_import =
944                importer.start_point(&highest_stored_block_number).await.unwrap();
945            assert_eq!(
946                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
947                start_point_after_import
948            );
949        }
950    }
951
952    #[tokio::test]
953    async fn when_rollbackward_should_remove_transactions() {
954        let connection = cardano_tx_db_connection().unwrap();
955        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
956            SqliteConnectionPool::build_from_connection(connection),
957        )));
958
959        let expected_remaining_transactions = ScannedBlock::new(
960            "block_hash-130",
961            BlockNumber(130),
962            SlotNumber(5),
963            vec!["tx_hash-6", "tx_hash-7"],
964        )
965        .into_transactions();
966        repository
967            .store_transactions(expected_remaining_transactions.clone())
968            .await
969            .unwrap();
970        repository
971            .store_transactions(
972                ScannedBlock::new(
973                    "block_hash-131",
974                    BlockNumber(131),
975                    SlotNumber(10),
976                    vec!["tx_hash-8", "tx_hash-9", "tx_hash-10"],
977                )
978                .into_transactions(),
979            )
980            .await
981            .unwrap();
982
983        let chain_point = ChainPoint::new(SlotNumber(5), BlockNumber(130), "block_hash-130");
984        let scanner = DumbBlockScanner::new().backward(chain_point);
985
986        let importer =
987            CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
988
989        importer
990            .import_transactions(BlockNumber(3000))
991            .await
992            .expect("Transactions Importer should succeed");
993
994        let stored_transactions = repository.get_all().await.unwrap();
995        assert_eq!(expected_remaining_transactions, stored_transactions);
996    }
997
998    #[tokio::test]
999    async fn when_rollbackward_should_remove_block_ranges() {
1000        let connection = cardano_tx_db_connection().unwrap();
1001        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
1002            SqliteConnectionPool::build_from_connection(connection),
1003        )));
1004
1005        let expected_remaining_block_ranges = vec![
1006            BlockRange::from_block_number(BlockNumber(0)),
1007            BlockRange::from_block_number(BlockRange::LENGTH),
1008            BlockRange::from_block_number(BlockRange::LENGTH * 2),
1009        ];
1010
1011        repository
1012            .store_block_range_roots(
1013                expected_remaining_block_ranges
1014                    .iter()
1015                    .map(|b| (b.clone(), MKTreeNode::from_hex("AAAA").unwrap()))
1016                    .collect(),
1017            )
1018            .await
1019            .unwrap();
1020        repository
1021            .store_block_range_roots(
1022                [
1023                    BlockRange::from_block_number(BlockRange::LENGTH * 3),
1024                    BlockRange::from_block_number(BlockRange::LENGTH * 4),
1025                    BlockRange::from_block_number(BlockRange::LENGTH * 5),
1026                ]
1027                .iter()
1028                .map(|b| (b.clone(), MKTreeNode::from_hex("AAAA").unwrap()))
1029                .collect(),
1030            )
1031            .await
1032            .unwrap();
1033        repository
1034            .store_transactions(
1035                ScannedBlock::new(
1036                    "block_hash-131",
1037                    BlockRange::from_block_number(BlockRange::LENGTH * 3).start,
1038                    SlotNumber(1),
1039                    vec!["tx_hash-1", "tx_hash-2", "tx_hash-3"],
1040                )
1041                .into_transactions(),
1042            )
1043            .await
1044            .unwrap();
1045
1046        let block_range_roots = repository.get_all_block_range_root().unwrap();
1047        assert_eq!(6, block_range_roots.len());
1048
1049        let chain_point = ChainPoint::new(SlotNumber(1), BlockRange::LENGTH * 3, "block_hash-131");
1050        let scanner = DumbBlockScanner::new().backward(chain_point);
1051
1052        let importer =
1053            CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
1054
1055        importer
1056            .import_transactions(BlockNumber(3000))
1057            .await
1058            .expect("Transactions Importer should succeed");
1059
1060        let block_range_roots = repository.get_all_block_range_root().unwrap();
1061        assert_eq!(
1062            expected_remaining_block_ranges,
1063            block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
1064        );
1065    }
1066
1067    #[tokio::test]
1068    async fn test_import_is_non_blocking() {
1069        static COUNTER: AtomicUsize = AtomicUsize::new(0);
1070        static MAX_COUNTER: usize = 25;
1071        static WAIT_TIME: u64 = 50;
1072
1073        // Use a local set to ensure the counter task is not dispatched on a different thread
1074        let local = task::LocalSet::new();
1075        local
1076            .run_until(async {
1077                let importer = CardanoTransactionsImporter::new_for_test(
1078                    Arc::new(DumbBlockScanner::new()),
1079                    Arc::new(BlockingRepository {
1080                        wait_time: Duration::from_millis(WAIT_TIME),
1081                    }),
1082                );
1083
1084                let importer_future = importer.import(BlockNumber(100));
1085                let counter_task = task::spawn_local(async {
1086                    while COUNTER.load(std::sync::atomic::Ordering::SeqCst) < MAX_COUNTER {
1087                        tokio::time::sleep(Duration::from_millis(1)).await;
1088                        COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1089                    }
1090                });
1091                importer_future.await.unwrap();
1092
1093                counter_task.abort();
1094            })
1095            .await;
1096
1097        assert_eq!(
1098            MAX_COUNTER,
1099            COUNTER.load(std::sync::atomic::Ordering::SeqCst)
1100        );
1101
1102        struct BlockingRepository {
1103            wait_time: Duration,
1104        }
1105
1106        impl BlockingRepository {
1107            fn block_thread(&self) {
1108                std::thread::sleep(self.wait_time);
1109            }
1110        }
1111
1112        #[async_trait]
1113        impl TransactionStore for BlockingRepository {
1114            async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
1115                self.block_thread();
1116                Ok(None)
1117            }
1118
1119            async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
1120                self.block_thread();
1121                Ok(None)
1122            }
1123
1124            async fn store_transactions(&self, _: Vec<CardanoTransaction>) -> StdResult<()> {
1125                self.block_thread();
1126                Ok(())
1127            }
1128
1129            async fn get_transactions_in_range(
1130                &self,
1131                _: Range<BlockNumber>,
1132            ) -> StdResult<Vec<CardanoTransaction>> {
1133                self.block_thread();
1134                Ok(vec![])
1135            }
1136
1137            async fn store_block_range_roots(
1138                &self,
1139                _: Vec<(BlockRange, MKTreeNode)>,
1140            ) -> StdResult<()> {
1141                self.block_thread();
1142                Ok(())
1143            }
1144
1145            async fn remove_rolled_back_transactions_and_block_range(
1146                &self,
1147                _: SlotNumber,
1148            ) -> StdResult<()> {
1149                self.block_thread();
1150                Ok(())
1151            }
1152        }
1153    }
1154}