mithril_aggregator/services/
cardano_transactions_importer.rs

1use std::mem;
2use std::ops::Range;
3use std::sync::Arc;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use slog::{debug, Logger};
8use tokio::{runtime::Handle, sync::Mutex, task};
9
10use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks, RawCardanoPoint};
11use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory};
12use mithril_common::entities::{
13    BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber,
14};
15use mithril_common::logging::LoggerExtensions;
16use mithril_common::signable_builder::TransactionsImporter;
17use mithril_common::StdResult;
18
19/// Cardano transactions store
20#[cfg_attr(test, mockall::automock)]
21#[async_trait]
22pub trait TransactionStore: Send + Sync {
23    /// Get the highest known transaction beacon
24    async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>>;
25
26    /// Get the highest stored block range root bounds
27    async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>>;
28
29    /// Store list of transactions
30    async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()>;
31
32    /// Get transactions in an interval of blocks
33    async fn get_transactions_in_range(
34        &self,
35        range: Range<BlockNumber>,
36    ) -> StdResult<Vec<CardanoTransaction>>;
37
38    /// Store list of block ranges with their corresponding merkle root
39    async fn store_block_range_roots(
40        &self,
41        block_ranges: Vec<(BlockRange, MKTreeNode)>,
42    ) -> StdResult<()>;
43
44    /// Remove transactions and block range roots that are in a rolled-back fork
45    ///
46    /// * Remove transactions with slot number strictly greater than the given slot number
47    /// * Remove block range roots that have lower bound range strictly above the given slot number
48    async fn remove_rolled_back_transactions_and_block_range(
49        &self,
50        slot_number: SlotNumber,
51    ) -> StdResult<()>;
52}
53
54/// Import and store [CardanoTransaction].
55#[derive(Clone)]
56pub struct CardanoTransactionsImporter {
57    block_scanner: Arc<dyn BlockScanner>,
58    transaction_store: Arc<dyn TransactionStore>,
59    last_polled_point: Arc<Mutex<Option<RawCardanoPoint>>>,
60    logger: Logger,
61}
62
63impl CardanoTransactionsImporter {
64    /// Constructor
65    pub fn new(
66        block_scanner: Arc<dyn BlockScanner>,
67        transaction_store: Arc<dyn TransactionStore>,
68        logger: Logger,
69    ) -> Self {
70        Self {
71            block_scanner,
72            transaction_store,
73            last_polled_point: Arc::new(Mutex::new(None)),
74            logger: logger.new_with_component_name::<Self>(),
75        }
76    }
77
78    async fn start_point(
79        &self,
80        highest_stored_chain_point: &Option<ChainPoint>,
81    ) -> StdResult<Option<RawCardanoPoint>> {
82        let last_polled_point = self.last_polled_point.lock().await.clone();
83        if last_polled_point.is_none() {
84            debug!(
85                self.logger,
86                "No last polled point available, falling back to the highest stored chain point"
87            );
88        }
89
90        Ok(last_polled_point.or(highest_stored_chain_point
91            .as_ref()
92            .map(RawCardanoPoint::from)))
93    }
94
95    async fn import_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
96        let highest_stored_beacon = self.transaction_store.get_highest_beacon().await?;
97        let from = self.start_point(&highest_stored_beacon).await?;
98
99        if highest_stored_beacon
100            .as_ref()
101            .is_some_and(|f| f.block_number >= up_to_beacon)
102        {
103            debug!(
104                self.logger,
105                "No need to retrieve Cardano transactions, the database is up to date for block_number '{up_to_beacon}'",
106            );
107
108            Ok(())
109        } else {
110            debug!(
111                self.logger, "Retrieving Cardano transactions until block numbered '{up_to_beacon}'";
112                "starting_slot_number" => ?from.as_ref().map(|c| c.slot_number),
113                "highest_stored_block_number" => ?highest_stored_beacon.as_ref().map(|c| c.block_number),
114            );
115
116            self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon)
117                .await
118        }
119    }
120
121    async fn parse_and_store_transactions_not_imported_yet(
122        &self,
123        from: Option<RawCardanoPoint>,
124        until: BlockNumber,
125    ) -> StdResult<()> {
126        let mut streamer = self.block_scanner.scan(from, until).await?;
127
128        while let Some(blocks) = streamer.poll_next().await? {
129            match blocks {
130                ChainScannedBlocks::RollForwards(forward_blocks) => {
131                    let parsed_transactions: Vec<CardanoTransaction> = forward_blocks
132                        .into_iter()
133                        .flat_map(|b| b.into_transactions())
134                        .collect();
135
136                    self.transaction_store
137                        .store_transactions(parsed_transactions)
138                        .await?;
139                }
140                ChainScannedBlocks::RollBackward(slot_number) => {
141                    self.transaction_store
142                        .remove_rolled_back_transactions_and_block_range(slot_number)
143                        .await?;
144                }
145            }
146        }
147
148        if let Some(point) = streamer.last_polled_point() {
149            *self.last_polled_point.lock().await = Some(point);
150        }
151
152        Ok(())
153    }
154
155    async fn import_block_ranges(&self, until: BlockNumber) -> StdResult<()> {
156        let block_ranges = match self.transaction_store.get_highest_block_range().await?.map(
157            |highest_stored_block_range| {
158                BlockRange::all_block_ranges_in(
159                    BlockRange::start(highest_stored_block_range.end)..=(until),
160                )
161            },
162        ) {
163            // No block range root stored yet, start from the beginning
164            None => BlockRange::all_block_ranges_in(BlockNumber(0)..=(until)),
165            // Not enough block to form at least one block range
166            Some(ranges) if ranges.is_empty() => return Ok(()),
167            Some(ranges) => ranges,
168        };
169
170        debug!(
171            self.logger, "Computing Block Range Roots";
172            "start_block" => *block_ranges.start(), "end_block" => *block_ranges.end(),
173        );
174
175        let mut block_ranges_with_merkle_root: Vec<(BlockRange, MKTreeNode)> = vec![];
176        for block_range in block_ranges {
177            let transactions = self
178                .transaction_store
179                .get_transactions_in_range(block_range.start..block_range.end)
180                .await?;
181
182            if transactions.is_empty() {
183                continue;
184            }
185
186            let merkle_root = MKTree::<MKTreeStoreInMemory>::new(&transactions)?.compute_root()?;
187            block_ranges_with_merkle_root.push((block_range, merkle_root));
188
189            if block_ranges_with_merkle_root.len() >= 100 {
190                let block_ranges_with_merkle_root_save =
191                    mem::take(&mut block_ranges_with_merkle_root);
192                self.transaction_store
193                    .store_block_range_roots(block_ranges_with_merkle_root_save)
194                    .await?;
195            }
196        }
197
198        self.transaction_store
199            .store_block_range_roots(block_ranges_with_merkle_root)
200            .await
201    }
202}
203
204#[async_trait]
205impl TransactionsImporter for CardanoTransactionsImporter {
206    async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
207        let importer = self.clone();
208        task::spawn_blocking(move || {
209            Handle::current().block_on(async move {
210                importer.import_transactions(up_to_beacon).await?;
211                importer.import_block_ranges(up_to_beacon).await?;
212                Ok(())
213            })
214        })
215        .await
216        .with_context(|| "TransactionsImporter - worker thread crashed")?
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use std::sync::atomic::AtomicUsize;
223    use std::time::Duration;
224
225    use mockall::mock;
226
227    use mithril_common::cardano_block_scanner::{
228        BlockStreamer, DumbBlockScanner, DumbBlockStreamer, ScannedBlock,
229    };
230    use mithril_common::crypto_helper::MKTree;
231    use mithril_common::entities::{BlockNumber, BlockRangesSequence};
232    use mithril_persistence::database::repository::CardanoTransactionRepository;
233    use mithril_persistence::sqlite::SqliteConnectionPool;
234
235    use crate::database::test_helper::cardano_tx_db_connection;
236    use crate::test_tools::TestLogger;
237
238    use super::*;
239
240    mock! {
241        pub BlockScannerImpl { }
242
243        #[async_trait]
244        impl BlockScanner for BlockScannerImpl {
245            async fn scan(
246              &self,
247              from: Option<RawCardanoPoint>,
248              until: BlockNumber,
249            ) -> StdResult<Box<dyn BlockStreamer>>;
250        }
251    }
252
253    impl CardanoTransactionsImporter {
254        pub(crate) fn new_for_test(
255            scanner: Arc<dyn BlockScanner>,
256            transaction_store: Arc<dyn TransactionStore>,
257        ) -> Self {
258            CardanoTransactionsImporter::new(scanner, transaction_store, TestLogger::stdout())
259        }
260    }
261
262    fn build_blocks(
263        start_block_number: BlockNumber,
264        number_of_consecutive_block: BlockNumber,
265    ) -> Vec<ScannedBlock> {
266        (*start_block_number..*(start_block_number + number_of_consecutive_block))
267            .map(|block_number| {
268                ScannedBlock::new(
269                    format!("block_hash-{}", block_number),
270                    BlockNumber(block_number),
271                    SlotNumber(block_number * 100),
272                    vec![format!("tx_hash-{}", block_number)],
273                )
274            })
275            .collect()
276    }
277
278    fn into_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoTransaction> {
279        blocks
280            .iter()
281            .flat_map(|b| b.clone().into_transactions())
282            .collect()
283    }
284
285    fn merkle_root_for_blocks(block_ranges: &[ScannedBlock]) -> MKTreeNode {
286        let tx: Vec<_> = block_ranges
287            .iter()
288            .flat_map(|br| br.clone().into_transactions())
289            .collect();
290        MKTree::<MKTreeStoreInMemory>::new(&tx)
291            .unwrap()
292            .compute_root()
293            .unwrap()
294    }
295
296    #[tokio::test]
297    async fn if_nothing_stored_parse_and_store_all_transactions() {
298        let connection = cardano_tx_db_connection().unwrap();
299        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
300            SqliteConnectionPool::build_from_connection(connection),
301        )));
302
303        let blocks = vec![
304            ScannedBlock::new(
305                "block_hash-1",
306                BlockNumber(10),
307                SlotNumber(15),
308                vec!["tx_hash-1", "tx_hash-2"],
309            ),
310            ScannedBlock::new(
311                "block_hash-2",
312                BlockNumber(20),
313                SlotNumber(25),
314                vec!["tx_hash-3", "tx_hash-4"],
315            ),
316        ];
317        let expected_transactions = into_transactions(&blocks);
318        let up_to_block_number = BlockNumber(1000);
319
320        let importer = {
321            let mut scanner_mock = MockBlockScannerImpl::new();
322            scanner_mock
323                .expect_scan()
324                .withf(move |from, until| from.is_none() && until == up_to_block_number)
325                .return_once(move |_, _| {
326                    Ok(Box::new(DumbBlockStreamer::new().forwards(vec![blocks])))
327                });
328            CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
329        };
330
331        importer
332            .import_transactions(up_to_block_number)
333            .await
334            .expect("Transactions Importer should succeed");
335
336        let stored_transactions = repository.get_all().await.unwrap();
337        assert_eq!(expected_transactions, stored_transactions);
338    }
339
340    #[tokio::test]
341    async fn if_nothing_stored_parse_and_store_all_block_ranges() {
342        let connection = cardano_tx_db_connection().unwrap();
343        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
344            SqliteConnectionPool::build_from_connection(connection),
345        )));
346
347        let up_to_block_number = BlockRange::LENGTH * 5;
348        let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
349        let transactions = into_transactions(&blocks);
350        repository.store_transactions(transactions).await.unwrap();
351
352        let importer = CardanoTransactionsImporter::new_for_test(
353            Arc::new(MockBlockScannerImpl::new()),
354            repository.clone(),
355        );
356
357        importer
358            .import_block_ranges(up_to_block_number)
359            .await
360            .expect("Transactions Importer should succeed");
361
362        let block_range_roots = repository.get_all_block_range_root().unwrap();
363        assert_eq!(
364            vec![
365                BlockRange::from_block_number(BlockNumber(0)),
366                BlockRange::from_block_number(BlockRange::LENGTH),
367                BlockRange::from_block_number(BlockRange::LENGTH * 2),
368                BlockRange::from_block_number(BlockRange::LENGTH * 3),
369                BlockRange::from_block_number(BlockRange::LENGTH * 4),
370            ],
371            block_range_roots
372                .into_iter()
373                .map(|r| r.range)
374                .collect::<Vec<_>>()
375        );
376    }
377
378    #[tokio::test]
379    async fn if_theres_gap_between_two_stored_block_ranges_it_can_still_compute_their_root() {
380        let connection = cardano_tx_db_connection().unwrap();
381        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
382            SqliteConnectionPool::build_from_connection(connection),
383        )));
384
385        let up_to_block_number = BlockRange::LENGTH * 4;
386        // Two block ranges with a gap
387        let blocks: Vec<ScannedBlock> = [
388            build_blocks(BlockNumber(0), BlockRange::LENGTH),
389            build_blocks(BlockRange::LENGTH * 3, BlockRange::LENGTH),
390        ]
391        .concat();
392        let transactions = into_transactions(&blocks);
393        repository.store_transactions(transactions).await.unwrap();
394
395        let importer = CardanoTransactionsImporter::new_for_test(
396            Arc::new(MockBlockScannerImpl::new()),
397            repository.clone(),
398        );
399
400        importer
401            .import_block_ranges(up_to_block_number)
402            .await
403            .expect("Transactions Importer should succeed");
404
405        let block_range_roots = repository.get_all_block_range_root().unwrap();
406        assert_eq!(
407            vec![
408                BlockRange::from_block_number(BlockNumber(0)),
409                BlockRange::from_block_number(BlockRange::LENGTH * 3),
410            ],
411            block_range_roots
412                .into_iter()
413                .map(|r| r.range)
414                .collect::<Vec<_>>()
415        );
416    }
417
418    #[tokio::test]
419    async fn if_all_block_ranges_computed_nothing_computed_and_stored() {
420        let connection = cardano_tx_db_connection().unwrap();
421        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
422            SqliteConnectionPool::build_from_connection(connection),
423        )));
424
425        let importer = CardanoTransactionsImporter::new_for_test(
426            Arc::new(MockBlockScannerImpl::new()),
427            repository.clone(),
428        );
429
430        importer
431            .import_block_ranges(BlockNumber(10_000))
432            .await
433            .expect("Transactions Importer should succeed");
434
435        let block_range_roots = repository.get_all_block_range_root().unwrap();
436        assert!(
437            block_range_roots.is_empty(),
438            "No block range root should be stored, found: {block_range_roots:?}"
439        );
440    }
441
442    #[tokio::test]
443    async fn if_all_transactions_stored_nothing_is_parsed_and_stored() {
444        let up_to_block_number = BlockNumber(12);
445        let connection = cardano_tx_db_connection().unwrap();
446        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
447            SqliteConnectionPool::build_from_connection(connection),
448        )));
449        let scanner = DumbBlockScanner::new().forwards(vec![vec![
450            ScannedBlock::new(
451                "block_hash-1",
452                BlockNumber(10),
453                SlotNumber(15),
454                vec!["tx_hash-1", "tx_hash-2"],
455            ),
456            ScannedBlock::new(
457                "block_hash-2",
458                BlockNumber(20),
459                SlotNumber(25),
460                vec!["tx_hash-3", "tx_hash-4"],
461            ),
462        ]]);
463
464        let last_tx = CardanoTransaction::new(
465            "tx-20",
466            BlockNumber(30),
467            SlotNumber(35),
468            hex::encode("block_hash-3"),
469        );
470        repository
471            .store_transactions(vec![last_tx.clone()])
472            .await
473            .unwrap();
474
475        let importer =
476            CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
477
478        importer
479            .import_transactions(up_to_block_number)
480            .await
481            .expect("Transactions Importer should succeed");
482
483        let transactions = repository.get_all().await.unwrap();
484        assert_eq!(vec![last_tx], transactions);
485    }
486
487    #[tokio::test]
488    async fn if_half_transactions_are_already_stored_the_other_half_is_parsed_and_stored() {
489        let connection = cardano_tx_db_connection().unwrap();
490        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
491            SqliteConnectionPool::build_from_connection(connection),
492        )));
493
494        let highest_stored_chain_point = ChainPoint::new(
495            SlotNumber(134),
496            BlockNumber(10),
497            hex::encode("block_hash-1"),
498        );
499        let stored_block = ScannedBlock::new(
500            hex::decode(highest_stored_chain_point.block_hash.clone()).unwrap(),
501            highest_stored_chain_point.block_number,
502            highest_stored_chain_point.slot_number,
503            vec!["tx_hash-1", "tx_hash-2"],
504        );
505        let to_store_block = ScannedBlock::new(
506            "block_hash-2",
507            BlockNumber(20),
508            SlotNumber(229),
509            vec!["tx_hash-3", "tx_hash-4"],
510        );
511        let expected_transactions: Vec<CardanoTransaction> = [
512            stored_block.clone().into_transactions(),
513            to_store_block.clone().into_transactions(),
514        ]
515        .concat();
516        let up_to_block_number = BlockNumber(22);
517
518        repository
519            .store_transactions(stored_block.clone().into_transactions())
520            .await
521            .unwrap();
522
523        let importer = {
524            let scanned_blocks = vec![to_store_block.clone()];
525            let mut scanner_mock = MockBlockScannerImpl::new();
526            scanner_mock
527                .expect_scan()
528                .withf(move |from, until| {
529                    from == &Some(highest_stored_chain_point.clone().into())
530                        && *until == up_to_block_number
531                })
532                .return_once(move |_, _| {
533                    Ok(Box::new(
534                        DumbBlockStreamer::new().forwards(vec![scanned_blocks]),
535                    ))
536                })
537                .once();
538            CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
539        };
540
541        let stored_transactions = repository.get_all().await.unwrap();
542        assert_eq!(stored_block.into_transactions(), stored_transactions);
543
544        importer
545            .import_transactions(up_to_block_number)
546            .await
547            .expect("Transactions Importer should succeed");
548
549        let stored_transactions = repository.get_all().await.unwrap();
550        assert_eq!(expected_transactions, stored_transactions);
551    }
552
553    #[tokio::test]
554    async fn if_half_block_ranges_are_stored_the_other_half_is_computed_and_stored() {
555        let connection = cardano_tx_db_connection().unwrap();
556        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
557            SqliteConnectionPool::build_from_connection(connection),
558        )));
559
560        let up_to_block_number = BlockRange::LENGTH * 4;
561        let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
562        let transactions = into_transactions(&blocks);
563        repository.store_transactions(transactions).await.unwrap();
564        repository
565            .store_block_range_roots(
566                blocks[0..(*(BlockRange::LENGTH * 2) as usize)]
567                    .iter()
568                    .map(|b| {
569                        (
570                            BlockRange::from_block_number(b.block_number),
571                            MKTreeNode::from_hex("AAAA").unwrap(),
572                        )
573                    })
574                    .collect(),
575            )
576            .await
577            .unwrap();
578
579        let importer = CardanoTransactionsImporter::new_for_test(
580            Arc::new(MockBlockScannerImpl::new()),
581            repository.clone(),
582        );
583
584        let block_range_roots = repository.get_all_block_range_root().unwrap();
585        assert_eq!(
586            vec![
587                BlockRange::from_block_number(BlockNumber(0)),
588                BlockRange::from_block_number(BlockRange::LENGTH),
589            ],
590            block_range_roots
591                .into_iter()
592                .map(|r| r.range)
593                .collect::<Vec<_>>()
594        );
595
596        importer
597            .import_block_ranges(up_to_block_number)
598            .await
599            .expect("Transactions Importer should succeed");
600
601        let block_range_roots = repository.get_all_block_range_root().unwrap();
602        assert_eq!(
603            vec![
604                BlockRange::from_block_number(BlockNumber(0)),
605                BlockRange::from_block_number(BlockRange::LENGTH),
606                BlockRange::from_block_number(BlockRange::LENGTH * 2),
607                BlockRange::from_block_number(BlockRange::LENGTH * 3),
608            ],
609            block_range_roots
610                .into_iter()
611                .map(|r| r.range)
612                .collect::<Vec<_>>()
613        );
614    }
615
616    #[tokio::test]
617    async fn can_compute_block_ranges_up_to_the_strict_end_of_a_block_range() {
618        let connection = cardano_tx_db_connection().unwrap();
619        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
620            SqliteConnectionPool::build_from_connection(connection),
621        )));
622
623        // Transactions for all blocks in the (15..=29) interval
624        let blocks = build_blocks(BlockRange::LENGTH, BlockRange::LENGTH - 1);
625        let transactions = into_transactions(&blocks);
626        repository.store_transactions(transactions).await.unwrap();
627
628        let importer = CardanoTransactionsImporter::new_for_test(
629            Arc::new(MockBlockScannerImpl::new()),
630            repository.clone(),
631        );
632
633        importer
634            .import_block_ranges(BlockRange::LENGTH * 2 - 1)
635            .await
636            .expect("Transactions Importer should succeed");
637
638        let block_range_roots = repository.get_all_block_range_root().unwrap();
639        assert_eq!(
640            vec![BlockRange::from_block_number(BlockRange::LENGTH)],
641            block_range_roots
642                .into_iter()
643                .map(|r| r.range)
644                .collect::<Vec<_>>()
645        );
646    }
647
648    #[tokio::test]
649    async fn can_compute_block_ranges_even_if_last_blocks_in_range_dont_have_transactions() {
650        let connection = cardano_tx_db_connection().unwrap();
651        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
652            SqliteConnectionPool::build_from_connection(connection),
653        )));
654
655        // For the block range (15..=29) we only have transactions in the 10 first blocks (15..=24)
656        let blocks = build_blocks(BlockRange::LENGTH, BlockNumber(10));
657        let transactions = into_transactions(&blocks);
658        repository.store_transactions(transactions).await.unwrap();
659
660        let importer = CardanoTransactionsImporter::new_for_test(
661            Arc::new(MockBlockScannerImpl::new()),
662            repository.clone(),
663        );
664
665        importer
666            .import_block_ranges(BlockRange::LENGTH * 2)
667            .await
668            .expect("Transactions Importer should succeed");
669
670        let block_range_roots = repository.get_all_block_range_root().unwrap();
671        assert_eq!(
672            vec![BlockRange::from_block_number(BlockRange::LENGTH)],
673            block_range_roots
674                .into_iter()
675                .map(|r| r.range)
676                .collect::<Vec<_>>()
677        );
678    }
679
680    #[tokio::test]
681    async fn block_range_root_retrieves_only_strictly_required_transactions() {
682        fn transactions_for_block(range: Range<BlockNumber>) -> StdResult<Vec<CardanoTransaction>> {
683            Ok(build_blocks(range.start, range.end - range.start)
684                .into_iter()
685                .flat_map(|b| b.into_transactions())
686                .collect())
687        }
688        const HIGHEST_BLOCK_RANGE_START: BlockNumber = BlockRange::LENGTH;
689        let up_to_block_number = BlockRange::LENGTH * 5;
690
691        let importer = {
692            let mut store_mock = MockTransactionStore::new();
693            store_mock
694                .expect_get_highest_block_range()
695                .returning(|| {
696                    Ok(Some(BlockRange::from_block_number(
697                        HIGHEST_BLOCK_RANGE_START,
698                    )))
699                })
700                .once();
701            store_mock
702                .expect_get_transactions_in_range()
703                // Lower bound should be the end block number of the last known block range
704                // Upper bound should be the block number provided to `import_block_ranges`
705                .withf(move |range| {
706                    BlockRangesSequence::new(HIGHEST_BLOCK_RANGE_START..=up_to_block_number)
707                        .contains(range)
708                })
709                .returning(transactions_for_block);
710            store_mock
711                .expect_store_block_range_roots()
712                .returning(|_| Ok(()));
713
714            CardanoTransactionsImporter::new_for_test(
715                Arc::new(MockBlockScannerImpl::new()),
716                Arc::new(store_mock),
717            )
718        };
719
720        importer
721            .import_block_ranges(up_to_block_number)
722            .await
723            .expect("Transactions Importer should succeed");
724    }
725
726    #[tokio::test]
727    async fn compute_block_range_merkle_root() {
728        let connection = cardano_tx_db_connection().unwrap();
729        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
730            SqliteConnectionPool::build_from_connection(connection),
731        )));
732
733        // 2 block ranges worth of blocks with one more block that should be ignored for merkle root computation
734        let up_to_block_number = BlockRange::LENGTH * 2;
735        let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
736        let transactions = into_transactions(&blocks);
737        let expected_block_range_roots = vec![
738            (
739                BlockRange::from_block_number(BlockNumber(0)),
740                merkle_root_for_blocks(&blocks[0..(*BlockRange::LENGTH as usize)]),
741            ),
742            (
743                BlockRange::from_block_number(BlockRange::LENGTH),
744                merkle_root_for_blocks(
745                    &blocks[(*BlockRange::LENGTH as usize)..((*BlockRange::LENGTH * 2) as usize)],
746                ),
747            ),
748        ];
749
750        repository.store_transactions(transactions).await.unwrap();
751
752        let importer = CardanoTransactionsImporter::new_for_test(
753            Arc::new(MockBlockScannerImpl::new()),
754            repository.clone(),
755        );
756
757        importer
758            .import_block_ranges(up_to_block_number)
759            .await
760            .expect("Transactions Importer should succeed");
761
762        let block_range_roots = repository.get_all_block_range_root().unwrap();
763        assert_eq!(
764            expected_block_range_roots,
765            block_range_roots
766                .into_iter()
767                .map(|br| br.into())
768                .collect::<Vec<_>>()
769        );
770    }
771
772    #[tokio::test]
773    async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_transactions_in_same_order(
774    ) {
775        let blocks = vec![
776            ScannedBlock::new(
777                "block_hash-1",
778                BlockNumber(10),
779                SlotNumber(15),
780                vec!["tx_hash-1", "tx_hash-2"],
781            ),
782            ScannedBlock::new(
783                "block_hash-2",
784                BlockNumber(20),
785                SlotNumber(25),
786                vec!["tx_hash-3", "tx_hash-4"],
787            ),
788        ];
789        let up_to_block_number = BlockNumber(1000);
790        let transactions = into_transactions(&blocks);
791
792        let (importer, repository) = {
793            let connection = cardano_tx_db_connection().unwrap();
794            let connection_pool = Arc::new(SqliteConnectionPool::build_from_connection(connection));
795            let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));
796            let importer = CardanoTransactionsImporter::new_for_test(
797                Arc::new(DumbBlockScanner::new().forwards(vec![blocks.clone()])),
798                repository.clone(),
799            );
800            (importer, repository)
801        };
802
803        importer
804            .import(up_to_block_number)
805            .await
806            .expect("Transactions Importer should succeed");
807        let cold_imported_transactions = repository.get_all().await.unwrap();
808
809        importer
810            .import(up_to_block_number)
811            .await
812            .expect("Transactions Importer should succeed");
813        let warm_imported_transactions = repository.get_all().await.unwrap();
814
815        assert_eq!(transactions, cold_imported_transactions);
816        assert_eq!(cold_imported_transactions, warm_imported_transactions);
817    }
818
819    mod transactions_import_start_point {
820        use super::*;
821
822        async fn importer_with_last_polled_point(
823            last_polled_point: Option<RawCardanoPoint>,
824        ) -> CardanoTransactionsImporter {
825            let connection = cardano_tx_db_connection().unwrap();
826            let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
827                SqliteConnectionPool::build_from_connection(connection),
828            )));
829
830            CardanoTransactionsImporter {
831                last_polled_point: Arc::new(Mutex::new(last_polled_point)),
832                ..CardanoTransactionsImporter::new_for_test(
833                    Arc::new(DumbBlockScanner::new()),
834                    repository,
835                )
836            }
837        }
838
839        #[tokio::test]
840        async fn cloning_keep_last_polled_point() {
841            let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
842                SlotNumber(15),
843                "block_hash-1",
844            )))
845            .await;
846
847            let cloned_importer = importer.clone();
848            let start_point = cloned_importer.start_point(&None).await.unwrap();
849            assert_eq!(
850                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
851                start_point
852            );
853        }
854
855        #[tokio::test]
856        async fn none_if_nothing_stored_nor_scanned() {
857            let importer = importer_with_last_polled_point(None).await;
858            let highest_stored_block_number = None;
859
860            let start_point = importer
861                .start_point(&highest_stored_block_number)
862                .await
863                .unwrap();
864            assert_eq!(None, start_point);
865        }
866
867        #[tokio::test]
868        async fn start_at_last_stored_point_if_nothing_scanned() {
869            let importer = importer_with_last_polled_point(None).await;
870            let highest_stored_block_number = Some(ChainPoint::new(
871                SlotNumber(25),
872                BlockNumber(20),
873                hex::encode("block_hash-2"),
874            ));
875
876            let start_point = importer
877                .start_point(&highest_stored_block_number)
878                .await
879                .unwrap();
880            assert_eq!(
881                Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
882                start_point
883            );
884        }
885
886        #[tokio::test]
887        async fn start_at_last_scanned_point_when_nothing_stored() {
888            let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
889                SlotNumber(15),
890                "block_hash-1",
891            )))
892            .await;
893            let highest_stored_block_number = None;
894
895            let start_point = importer
896                .start_point(&highest_stored_block_number)
897                .await
898                .unwrap();
899            assert_eq!(
900                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
901                start_point
902            );
903        }
904
905        #[tokio::test]
906        async fn start_at_last_scanned_point_even_if_something_stored() {
907            let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
908                SlotNumber(15),
909                "block_hash-1",
910            )))
911            .await;
912            let highest_stored_block_number = Some(ChainPoint::new(
913                SlotNumber(25),
914                BlockNumber(20),
915                hex::encode("block_hash-2"),
916            ));
917
918            let start_point = importer
919                .start_point(&highest_stored_block_number)
920                .await
921                .unwrap();
922            assert_eq!(
923                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
924                start_point
925            );
926        }
927
928        #[tokio::test]
929        async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() {
930            let connection = cardano_tx_db_connection().unwrap();
931            let importer = CardanoTransactionsImporter {
932                last_polled_point: Arc::new(Mutex::new(None)),
933                ..CardanoTransactionsImporter::new_for_test(
934                    Arc::new(
935                        DumbBlockScanner::new()
936                            .forwards(vec![vec![ScannedBlock::new(
937                                "block_hash-1",
938                                BlockNumber(10),
939                                SlotNumber(15),
940                                Vec::<&str>::new(),
941                            )]])
942                            .last_polled_point(Some(RawCardanoPoint::new(
943                                SlotNumber(25),
944                                "block_hash-2",
945                            ))),
946                    ),
947                    Arc::new(CardanoTransactionRepository::new(Arc::new(
948                        SqliteConnectionPool::build_from_connection(connection),
949                    ))),
950                )
951            };
952            let highest_stored_block_number = None;
953
954            let start_point_before_import = importer
955                .start_point(&highest_stored_block_number)
956                .await
957                .unwrap();
958            assert_eq!(None, start_point_before_import);
959
960            importer
961                .import_transactions(BlockNumber(1000))
962                .await
963                .unwrap();
964
965            let start_point_after_import = importer
966                .start_point(&highest_stored_block_number)
967                .await
968                .unwrap();
969            assert_eq!(
970                Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
971                start_point_after_import
972            );
973        }
974
975        #[tokio::test]
976        async fn importing_transactions_dont_update_start_point_if_streamer_did_nothing() {
977            let connection = cardano_tx_db_connection().unwrap();
978            let importer = CardanoTransactionsImporter {
979                last_polled_point: Arc::new(Mutex::new(Some(RawCardanoPoint::new(
980                    SlotNumber(15),
981                    "block_hash-1",
982                )))),
983                ..CardanoTransactionsImporter::new_for_test(
984                    Arc::new(DumbBlockScanner::new()),
985                    Arc::new(CardanoTransactionRepository::new(Arc::new(
986                        SqliteConnectionPool::build_from_connection(connection),
987                    ))),
988                )
989            };
990            let highest_stored_block_number = None;
991
992            importer
993                .import_transactions(BlockNumber(1000))
994                .await
995                .unwrap();
996
997            let start_point_after_import = importer
998                .start_point(&highest_stored_block_number)
999                .await
1000                .unwrap();
1001            assert_eq!(
1002                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
1003                start_point_after_import
1004            );
1005        }
1006    }
1007
1008    #[tokio::test]
1009    async fn when_rollbackward_should_remove_transactions() {
1010        let connection = cardano_tx_db_connection().unwrap();
1011        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
1012            SqliteConnectionPool::build_from_connection(connection),
1013        )));
1014
1015        let expected_remaining_transactions = ScannedBlock::new(
1016            "block_hash-130",
1017            BlockNumber(130),
1018            SlotNumber(5),
1019            vec!["tx_hash-6", "tx_hash-7"],
1020        )
1021        .into_transactions();
1022        repository
1023            .store_transactions(expected_remaining_transactions.clone())
1024            .await
1025            .unwrap();
1026        repository
1027            .store_transactions(
1028                ScannedBlock::new(
1029                    "block_hash-131",
1030                    BlockNumber(131),
1031                    SlotNumber(10),
1032                    vec!["tx_hash-8", "tx_hash-9", "tx_hash-10"],
1033                )
1034                .into_transactions(),
1035            )
1036            .await
1037            .unwrap();
1038
1039        let chain_point = ChainPoint::new(SlotNumber(5), BlockNumber(130), "block_hash-130");
1040        let scanner = DumbBlockScanner::new().backward(chain_point);
1041
1042        let importer =
1043            CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
1044
1045        importer
1046            .import_transactions(BlockNumber(3000))
1047            .await
1048            .expect("Transactions Importer should succeed");
1049
1050        let stored_transactions = repository.get_all().await.unwrap();
1051        assert_eq!(expected_remaining_transactions, stored_transactions);
1052    }
1053
1054    #[tokio::test]
1055    async fn when_rollbackward_should_remove_block_ranges() {
1056        let connection = cardano_tx_db_connection().unwrap();
1057        let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
1058            SqliteConnectionPool::build_from_connection(connection),
1059        )));
1060
1061        let expected_remaining_block_ranges = vec![
1062            BlockRange::from_block_number(BlockNumber(0)),
1063            BlockRange::from_block_number(BlockRange::LENGTH),
1064            BlockRange::from_block_number(BlockRange::LENGTH * 2),
1065        ];
1066
1067        repository
1068            .store_block_range_roots(
1069                expected_remaining_block_ranges
1070                    .iter()
1071                    .map(|b| (b.clone(), MKTreeNode::from_hex("AAAA").unwrap()))
1072                    .collect(),
1073            )
1074            .await
1075            .unwrap();
1076        repository
1077            .store_block_range_roots(
1078                [
1079                    BlockRange::from_block_number(BlockRange::LENGTH * 3),
1080                    BlockRange::from_block_number(BlockRange::LENGTH * 4),
1081                    BlockRange::from_block_number(BlockRange::LENGTH * 5),
1082                ]
1083                .iter()
1084                .map(|b| (b.clone(), MKTreeNode::from_hex("AAAA").unwrap()))
1085                .collect(),
1086            )
1087            .await
1088            .unwrap();
1089        repository
1090            .store_transactions(
1091                ScannedBlock::new(
1092                    "block_hash-131",
1093                    BlockRange::from_block_number(BlockRange::LENGTH * 3).start,
1094                    SlotNumber(1),
1095                    vec!["tx_hash-1", "tx_hash-2", "tx_hash-3"],
1096                )
1097                .into_transactions(),
1098            )
1099            .await
1100            .unwrap();
1101
1102        let block_range_roots = repository.get_all_block_range_root().unwrap();
1103        assert_eq!(6, block_range_roots.len());
1104
1105        let chain_point = ChainPoint::new(SlotNumber(1), BlockRange::LENGTH * 3, "block_hash-131");
1106        let scanner = DumbBlockScanner::new().backward(chain_point);
1107
1108        let importer =
1109            CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
1110
1111        importer
1112            .import_transactions(BlockNumber(3000))
1113            .await
1114            .expect("Transactions Importer should succeed");
1115
1116        let block_range_roots = repository.get_all_block_range_root().unwrap();
1117        assert_eq!(
1118            expected_remaining_block_ranges,
1119            block_range_roots
1120                .into_iter()
1121                .map(|r| r.range)
1122                .collect::<Vec<_>>()
1123        );
1124    }
1125
1126    #[tokio::test]
1127    async fn test_import_is_non_blocking() {
1128        static COUNTER: AtomicUsize = AtomicUsize::new(0);
1129        static MAX_COUNTER: usize = 25;
1130        static WAIT_TIME: u64 = 50;
1131
1132        // Use a local set to ensure the counter task is not dispatched on a different thread
1133        let local = task::LocalSet::new();
1134        local
1135            .run_until(async {
1136                let importer = CardanoTransactionsImporter::new_for_test(
1137                    Arc::new(DumbBlockScanner::new()),
1138                    Arc::new(BlockingRepository {
1139                        wait_time: Duration::from_millis(WAIT_TIME),
1140                    }),
1141                );
1142
1143                let importer_future = importer.import(BlockNumber(100));
1144                let counter_task = task::spawn_local(async {
1145                    while COUNTER.load(std::sync::atomic::Ordering::SeqCst) < MAX_COUNTER {
1146                        tokio::time::sleep(Duration::from_millis(1)).await;
1147                        COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1148                    }
1149                });
1150                importer_future.await.unwrap();
1151
1152                counter_task.abort();
1153            })
1154            .await;
1155
1156        assert_eq!(
1157            MAX_COUNTER,
1158            COUNTER.load(std::sync::atomic::Ordering::SeqCst)
1159        );
1160
1161        struct BlockingRepository {
1162            wait_time: Duration,
1163        }
1164
1165        impl BlockingRepository {
1166            fn block_thread(&self) {
1167                std::thread::sleep(self.wait_time);
1168            }
1169        }
1170
1171        #[async_trait]
1172        impl TransactionStore for BlockingRepository {
1173            async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
1174                self.block_thread();
1175                Ok(None)
1176            }
1177
1178            async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
1179                self.block_thread();
1180                Ok(None)
1181            }
1182
1183            async fn store_transactions(&self, _: Vec<CardanoTransaction>) -> StdResult<()> {
1184                self.block_thread();
1185                Ok(())
1186            }
1187
1188            async fn get_transactions_in_range(
1189                &self,
1190                _: Range<BlockNumber>,
1191            ) -> StdResult<Vec<CardanoTransaction>> {
1192                self.block_thread();
1193                Ok(vec![])
1194            }
1195
1196            async fn store_block_range_roots(
1197                &self,
1198                _: Vec<(BlockRange, MKTreeNode)>,
1199            ) -> StdResult<()> {
1200                self.block_thread();
1201                Ok(())
1202            }
1203
1204            async fn remove_rolled_back_transactions_and_block_range(
1205                &self,
1206                _: SlotNumber,
1207            ) -> StdResult<()> {
1208                self.block_thread();
1209                Ok(())
1210            }
1211        }
1212    }
1213}