mithril_cardano_node_chain/chain_importer/
service.rs

1use std::mem;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_trait::async_trait;
6use slog::{Logger, debug};
7use tokio::{runtime::Handle, sync::Mutex, task};
8
9use mithril_common::StdResult;
10use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory};
11use mithril_common::entities::{BlockNumber, BlockRange, CardanoBlockWithTransactions, ChainPoint};
12use mithril_common::logging::LoggerExtensions;
13
14use crate::chain_importer::{ChainDataImporter, ChainDataStore};
15use crate::chain_scanner::{BlockScanner, ChainScannedBlocks};
16use crate::entities::RawCardanoPoint;
17
18/// Import and store Cardano chain data (transactions, blocks) and aggregate them into Merkle trees
19/// for Mithril certificate generation and proof verification.
20#[derive(Clone)]
21pub struct CardanoChainDataImporter {
22    block_scanner: Arc<dyn BlockScanner>,
23    transaction_store: Arc<dyn ChainDataStore>,
24    last_polled_point: Arc<Mutex<Option<RawCardanoPoint>>>,
25    logger: Logger,
26}
27
28impl CardanoChainDataImporter {
29    /// Constructor
30    pub fn new(
31        block_scanner: Arc<dyn BlockScanner>,
32        transaction_store: Arc<dyn ChainDataStore>,
33        logger: Logger,
34    ) -> Self {
35        Self {
36            block_scanner,
37            transaction_store,
38            last_polled_point: Arc::new(Mutex::new(None)),
39            logger: logger.new_with_component_name::<Self>(),
40        }
41    }
42
43    async fn start_point(
44        &self,
45        highest_stored_chain_point: &Option<ChainPoint>,
46    ) -> StdResult<Option<RawCardanoPoint>> {
47        let last_polled_point = self.last_polled_point.lock().await.clone();
48        if last_polled_point.is_none() {
49            debug!(
50                self.logger,
51                "No last polled point available, falling back to the highest stored chain point"
52            );
53        }
54
55        Ok(last_polled_point.or(highest_stored_chain_point.as_ref().map(RawCardanoPoint::from)))
56    }
57
58    async fn import_blocks_and_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
59        let highest_stored_beacon = self.transaction_store.get_highest_beacon().await?;
60        let from = self.start_point(&highest_stored_beacon).await?;
61
62        if highest_stored_beacon
63            .as_ref()
64            .is_some_and(|f| f.block_number >= up_to_beacon)
65        {
66            debug!(
67                self.logger,
68                "No need to retrieve Cardano blocks and transactions, the database is up to date for block_number '{up_to_beacon}'",
69            );
70
71            Ok(())
72        } else {
73            debug!(
74                self.logger, "Retrieving Cardano blocks and transactions until block numbered '{up_to_beacon}'";
75                "starting_slot_number" => ?from.as_ref().map(|c| c.slot_number),
76                "highest_stored_block_number" => ?highest_stored_beacon.as_ref().map(|c| c.block_number),
77            );
78
79            self.parse_and_store_block_and_transactions_not_imported_yet(from, up_to_beacon)
80                .await
81        }
82    }
83
84    async fn parse_and_store_block_and_transactions_not_imported_yet(
85        &self,
86        from: Option<RawCardanoPoint>,
87        until: BlockNumber,
88    ) -> StdResult<()> {
89        let mut streamer = self.block_scanner.scan(from, until).await?;
90
91        while let Some(blocks) = streamer.poll_next().await? {
92            match blocks {
93                ChainScannedBlocks::RollForwards(forward_blocks) => {
94                    let parsed_blocks_with_transactions: Vec<CardanoBlockWithTransactions> =
95                        forward_blocks.into_iter().map(Into::into).collect();
96
97                    self.transaction_store
98                        .store_blocks_and_transactions(parsed_blocks_with_transactions)
99                        .await?;
100                }
101                ChainScannedBlocks::RollBackward(slot_number) => {
102                    self.transaction_store
103                        .remove_rolled_chain_data_and_block_range(slot_number)
104                        .await?;
105                }
106            }
107        }
108
109        if let Some(point) = streamer.last_polled_point() {
110            *self.last_polled_point.lock().await = Some(point);
111        }
112
113        Ok(())
114    }
115
116    async fn import_block_ranges(&self, until: BlockNumber) -> StdResult<()> {
117        let block_ranges = match self.transaction_store.get_highest_block_range().await?.map(
118            |highest_stored_block_range| {
119                BlockRange::all_block_ranges_in(
120                    BlockRange::start(highest_stored_block_range.end)..=(until),
121                )
122            },
123        ) {
124            // No block range root stored yet, start from the beginning
125            None => BlockRange::all_block_ranges_in(BlockNumber(0)..=(until)),
126            // Not enough block to form at least one block range
127            Some(ranges) if ranges.is_empty() => return Ok(()),
128            Some(ranges) => ranges,
129        };
130
131        debug!(
132            self.logger, "Computing Block Range Roots";
133            "start_block" => *block_ranges.start(), "end_block" => *block_ranges.end(),
134        );
135
136        let mut block_ranges_with_merkle_root: Vec<(BlockRange, MKTreeNode)> = vec![];
137        for block_range in block_ranges {
138            let transactions = self
139                .transaction_store
140                .get_transactions_in_range(block_range.start..block_range.end)
141                .await?;
142
143            if transactions.is_empty() {
144                continue;
145            }
146
147            let merkle_root = MKTree::<MKTreeStoreInMemory>::new(&transactions)?.compute_root()?;
148            block_ranges_with_merkle_root.push((block_range, merkle_root));
149
150            if block_ranges_with_merkle_root.len() >= 100 {
151                let block_ranges_with_merkle_root_save =
152                    mem::take(&mut block_ranges_with_merkle_root);
153                self.transaction_store
154                    .store_block_range_roots(block_ranges_with_merkle_root_save)
155                    .await?;
156            }
157        }
158
159        self.transaction_store
160            .store_block_range_roots(block_ranges_with_merkle_root)
161            .await
162    }
163}
164
165#[async_trait]
166impl ChainDataImporter for CardanoChainDataImporter {
167    async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
168        let importer = self.clone();
169        task::spawn_blocking(move || {
170            Handle::current().block_on(async move {
171                importer.import_blocks_and_transactions(up_to_beacon).await?;
172                importer.import_block_ranges(up_to_beacon).await?;
173                Ok(())
174            })
175        })
176        .await
177        .with_context(|| "ChainDataImporter - worker thread crashed")?
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use std::ops::Range;
184    use std::sync::atomic::AtomicUsize;
185    use std::time::Duration;
186
187    use mockall::mock;
188
189    use mithril_common::crypto_helper::MKTree;
190    use mithril_common::entities::{BlockRangesSequence, CardanoTransaction, SlotNumber};
191
192    use crate::chain_importer::MockChainDataStore;
193    use crate::chain_scanner::BlockStreamer;
194    use crate::entities::ScannedBlock;
195    use crate::test::TestLogger;
196    use crate::test::double::{DumbBlockScanner, DumbBlockStreamer, InMemoryChainDataStore};
197
198    use super::*;
199
200    mock! {
201        pub BlockScannerImpl { }
202
203        #[async_trait]
204        impl BlockScanner for BlockScannerImpl {
205            async fn scan(
206              &self,
207              from: Option<RawCardanoPoint>,
208              until: BlockNumber,
209            ) -> StdResult<Box<dyn BlockStreamer>>;
210        }
211    }
212
213    impl CardanoChainDataImporter {
214        pub(crate) fn new_for_test(
215            scanner: Arc<dyn BlockScanner>,
216            transaction_store: Arc<dyn ChainDataStore>,
217        ) -> Self {
218            Self::new(scanner, transaction_store, TestLogger::stdout())
219        }
220    }
221
222    fn build_blocks(
223        start_block_number: BlockNumber,
224        number_of_consecutive_block: BlockNumber,
225    ) -> Vec<ScannedBlock> {
226        (*start_block_number..*(start_block_number + number_of_consecutive_block))
227            .map(|block_number| {
228                ScannedBlock::new(
229                    format!("block_hash-{block_number}"),
230                    BlockNumber(block_number),
231                    SlotNumber(block_number * 100),
232                    vec![format!("tx_hash-{}", block_number)],
233                )
234            })
235            .collect()
236    }
237
238    fn into_blocks_with_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoBlockWithTransactions> {
239        blocks.iter().map(|b| b.clone().into()).collect()
240    }
241
242    fn merkle_root_for_blocks(block_ranges: &[ScannedBlock]) -> MKTreeNode {
243        let tx: Vec<_> = block_ranges
244            .iter()
245            .flat_map(|br| br.clone().into_transactions())
246            .collect();
247        MKTree::<MKTreeStoreInMemory>::new(&tx)
248            .unwrap()
249            .compute_root()
250            .unwrap()
251    }
252
253    mod store_blocks_and_transactions {
254        use super::*;
255
256        #[tokio::test]
257        async fn store_blocks_that_do_not_have_transactions() {
258            let repository = Arc::new(InMemoryChainDataStore::default());
259            let up_to_block_number = BlockNumber(1000);
260
261            let scanner = DumbBlockScanner::new().forwards(vec![vec![ScannedBlock::new(
262                "block_hash-1",
263                BlockNumber(10),
264                SlotNumber(15),
265                Vec::<String>::new(),
266            )]]);
267            let importer =
268                CardanoChainDataImporter::new_for_test(Arc::new(scanner), repository.clone());
269
270            importer
271                .import_blocks_and_transactions(up_to_block_number)
272                .await
273                .unwrap();
274
275            let stored_transactions = repository.get_all_block_with_txs().await;
276            assert_eq!(
277                vec![CardanoBlockWithTransactions::new(
278                    hex::encode("block_hash-1"),
279                    BlockNumber(10),
280                    SlotNumber(15),
281                    Vec::<String>::new()
282                )],
283                stored_transactions
284            );
285        }
286
287        #[tokio::test]
288        async fn if_nothing_stored_parse_and_store_all_blocks_and_transactions() {
289            let repository = Arc::new(InMemoryChainDataStore::default());
290
291            let blocks = vec![
292                ScannedBlock::new(
293                    "block_hash-1",
294                    BlockNumber(10),
295                    SlotNumber(15),
296                    vec!["tx_hash-1", "tx_hash-2"],
297                ),
298                ScannedBlock::new(
299                    "block_hash-2",
300                    BlockNumber(20),
301                    SlotNumber(25),
302                    vec!["tx_hash-3", "tx_hash-4"],
303                ),
304            ];
305            let expected_blocks_with_transactions = into_blocks_with_transactions(&blocks);
306            let up_to_block_number = BlockNumber(1000);
307
308            let importer = {
309                let mut scanner_mock = MockBlockScannerImpl::new();
310                scanner_mock
311                    .expect_scan()
312                    .withf(move |from, until| from.is_none() && until == up_to_block_number)
313                    .return_once(move |_, _| {
314                        Ok(Box::new(DumbBlockStreamer::new().forwards(vec![blocks])))
315                    });
316                CardanoChainDataImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
317            };
318
319            importer
320                .import_blocks_and_transactions(up_to_block_number)
321                .await
322                .unwrap();
323
324            let stored_transactions = repository.get_all_block_with_txs().await;
325            assert_eq!(expected_blocks_with_transactions, stored_transactions);
326        }
327
328        #[tokio::test]
329        async fn if_all_blocks_and_transactions_are_stored_nothing_is_parsed_and_stored() {
330            let up_to_block_number = BlockNumber(12);
331            let last_block = vec![CardanoBlockWithTransactions::new(
332                hex::encode("block_hash-3"),
333                BlockNumber(30),
334                SlotNumber(35),
335                vec!["tx-20"],
336            )];
337            let repository = Arc::new(
338                InMemoryChainDataStore::builder()
339                    .with_blocks_and_transactions(&last_block)
340                    .build(),
341            );
342            let scanner = DumbBlockScanner::new().forwards(vec![vec![
343                ScannedBlock::new(
344                    "block_hash-1",
345                    BlockNumber(10),
346                    SlotNumber(15),
347                    vec!["tx_hash-1", "tx_hash-2"],
348                ),
349                ScannedBlock::new(
350                    "block_hash-2",
351                    BlockNumber(20),
352                    SlotNumber(25),
353                    vec!["tx_hash-3", "tx_hash-4"],
354                ),
355            ]]);
356
357            let importer =
358                CardanoChainDataImporter::new_for_test(Arc::new(scanner), repository.clone());
359
360            importer
361                .import_blocks_and_transactions(up_to_block_number)
362                .await
363                .unwrap();
364
365            let blocks_with_txs = repository.get_all_block_with_txs().await;
366            assert_eq!(last_block, blocks_with_txs);
367        }
368
369        #[tokio::test]
370        async fn if_half_blocks_and_transactions_are_already_stored_the_other_half_is_parsed_and_stored()
371         {
372            let highest_stored_chain_point = ChainPoint::new(
373                SlotNumber(134),
374                BlockNumber(10),
375                hex::encode("block_hash-1"),
376            );
377            let stored_block = ScannedBlock::new(
378                hex::decode(&highest_stored_chain_point.block_hash).unwrap(),
379                highest_stored_chain_point.block_number,
380                highest_stored_chain_point.slot_number,
381                vec!["tx_hash-1", "tx_hash-2"],
382            );
383            let to_store_block = ScannedBlock::new(
384                "block_hash-2",
385                BlockNumber(20),
386                SlotNumber(229),
387                vec!["tx_hash-3", "tx_hash-4"],
388            );
389            let existing_blocks = vec![stored_block.clone()];
390            let expected_blocks_with_transactions =
391                into_blocks_with_transactions(&[stored_block.clone(), to_store_block.clone()]);
392            let up_to_block_number = BlockNumber(22);
393
394            let repository = Arc::new(
395                InMemoryChainDataStore::builder()
396                    .with_blocks_and_transactions(&existing_blocks)
397                    .build(),
398            );
399
400            let importer = {
401                let scanned_blocks = vec![to_store_block.clone()];
402                let mut scanner_mock = MockBlockScannerImpl::new();
403                scanner_mock
404                    .expect_scan()
405                    .withf(move |from, until| {
406                        from == &Some(highest_stored_chain_point.clone().into())
407                            && *until == up_to_block_number
408                    })
409                    .return_once(move |_, _| {
410                        Ok(Box::new(
411                            DumbBlockStreamer::new().forwards(vec![scanned_blocks]),
412                        ))
413                    })
414                    .once();
415                CardanoChainDataImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
416            };
417
418            importer
419                .import_blocks_and_transactions(up_to_block_number)
420                .await
421                .unwrap();
422
423            let stored_blocks_with_transactions = repository.get_all_block_with_txs().await;
424            assert_eq!(
425                expected_blocks_with_transactions,
426                stored_blocks_with_transactions
427            );
428        }
429    }
430
431    mod compute_legacy_transactions_block_ranges {
432        use super::*;
433
434        #[tokio::test]
435        async fn if_nothing_stored_parse_and_store_all_block_ranges() {
436            let up_to_block_number = BlockRange::LENGTH * 5;
437            let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
438            let repository = Arc::new(
439                InMemoryChainDataStore::builder()
440                    .with_blocks_and_transactions(&blocks)
441                    .build(),
442            );
443
444            let importer = CardanoChainDataImporter::new_for_test(
445                Arc::new(MockBlockScannerImpl::new()),
446                repository.clone(),
447            );
448
449            importer.import_block_ranges(up_to_block_number).await.unwrap();
450
451            assert_eq!(
452                vec![
453                    BlockRange::from_block_number(BlockNumber(0)),
454                    BlockRange::from_block_number(BlockRange::LENGTH),
455                    BlockRange::from_block_number(BlockRange::LENGTH * 2),
456                    BlockRange::from_block_number(BlockRange::LENGTH * 3),
457                    BlockRange::from_block_number(BlockRange::LENGTH * 4),
458                ],
459                repository.get_all_block_range().await
460            );
461        }
462
463        #[tokio::test]
464        async fn if_theres_gap_between_two_stored_block_ranges_it_can_still_compute_their_root() {
465            let up_to_block_number = BlockRange::LENGTH * 4;
466            // Two block ranges with a gap
467            let blocks: Vec<ScannedBlock> = [
468                build_blocks(BlockNumber(0), BlockRange::LENGTH),
469                build_blocks(BlockRange::LENGTH * 3, BlockRange::LENGTH),
470            ]
471            .concat();
472            let repository = Arc::new(
473                InMemoryChainDataStore::builder()
474                    .with_blocks_and_transactions(&blocks)
475                    .build(),
476            );
477
478            let importer = CardanoChainDataImporter::new_for_test(
479                Arc::new(MockBlockScannerImpl::new()),
480                repository.clone(),
481            );
482
483            importer.import_block_ranges(up_to_block_number).await.unwrap();
484
485            assert_eq!(
486                vec![
487                    BlockRange::from_block_number(BlockNumber(0)),
488                    BlockRange::from_block_number(BlockRange::LENGTH * 3),
489                ],
490                repository.get_all_block_range().await
491            );
492        }
493
494        #[tokio::test]
495        async fn if_all_block_ranges_computed_nothing_computed_and_stored() {
496            let repository = Arc::new(InMemoryChainDataStore::default());
497
498            let importer = CardanoChainDataImporter::new_for_test(
499                Arc::new(MockBlockScannerImpl::new()),
500                repository.clone(),
501            );
502
503            importer.import_block_ranges(BlockNumber(10_000)).await.unwrap();
504
505            let block_range_roots = repository.get_all_block_range_root().await;
506            assert!(
507                block_range_roots.is_empty(),
508                "No block range root should be stored, found: {block_range_roots:?}"
509            );
510        }
511
512        #[tokio::test]
513        async fn if_half_block_ranges_are_stored_the_other_half_is_computed_and_stored() {
514            let up_to_block_number = BlockRange::LENGTH * 4;
515            let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
516            let repository = Arc::new(
517                InMemoryChainDataStore::builder()
518                    .with_blocks_and_transactions(&blocks)
519                    .with_block_range_roots(&[
520                        (
521                            BlockRange::from_block_number(BlockNumber(0)),
522                            MKTreeNode::from_hex("AAAA").unwrap(),
523                        ),
524                        (
525                            BlockRange::from_block_number(BlockRange::LENGTH),
526                            MKTreeNode::from_hex("BBBB").unwrap(),
527                        ),
528                    ])
529                    .build(),
530            );
531
532            let importer = CardanoChainDataImporter::new_for_test(
533                Arc::new(MockBlockScannerImpl::new()),
534                repository.clone(),
535            );
536
537            importer.import_block_ranges(up_to_block_number).await.unwrap();
538
539            assert_eq!(
540                vec![
541                    BlockRange::from_block_number(BlockNumber(0)),
542                    BlockRange::from_block_number(BlockRange::LENGTH),
543                    BlockRange::from_block_number(BlockRange::LENGTH * 2),
544                    BlockRange::from_block_number(BlockRange::LENGTH * 3),
545                ],
546                repository.get_all_block_range().await
547            );
548        }
549
550        #[tokio::test]
551        async fn can_compute_block_ranges_up_to_the_strict_end_of_a_block_range() {
552            // Transactions for all blocks in the (15..=29) interval
553            let blocks = build_blocks(BlockRange::LENGTH, BlockRange::LENGTH - 1);
554            let repository = Arc::new(
555                InMemoryChainDataStore::builder()
556                    .with_blocks_and_transactions(&blocks)
557                    .build(),
558            );
559
560            let importer = CardanoChainDataImporter::new_for_test(
561                Arc::new(MockBlockScannerImpl::new()),
562                repository.clone(),
563            );
564
565            importer
566                .import_block_ranges(BlockRange::LENGTH * 2 - 1)
567                .await
568                .unwrap();
569
570            assert_eq!(
571                vec![BlockRange::from_block_number(BlockRange::LENGTH)],
572                repository.get_all_block_range().await
573            );
574        }
575
576        #[tokio::test]
577        async fn can_compute_block_ranges_even_if_last_blocks_in_range_dont_have_transactions() {
578            // For the block range (15..=29) we only have transactions in the 10 first blocks (15..=24)
579            let blocks = build_blocks(BlockRange::LENGTH, BlockNumber(10));
580            let repository = Arc::new(
581                InMemoryChainDataStore::builder()
582                    .with_blocks_and_transactions(&blocks)
583                    .build(),
584            );
585
586            let importer = CardanoChainDataImporter::new_for_test(
587                Arc::new(MockBlockScannerImpl::new()),
588                repository.clone(),
589            );
590
591            importer.import_block_ranges(BlockRange::LENGTH * 2).await.unwrap();
592
593            assert_eq!(
594                vec![BlockRange::from_block_number(BlockRange::LENGTH)],
595                repository.get_all_block_range().await
596            );
597        }
598
599        #[tokio::test]
600        async fn block_range_root_retrieves_only_strictly_required_transactions() {
601            fn transactions_for_block(
602                range: Range<BlockNumber>,
603            ) -> StdResult<Vec<CardanoTransaction>> {
604                Ok(build_blocks(range.start, range.end - range.start)
605                    .into_iter()
606                    .flat_map(|b| b.into_transactions())
607                    .collect())
608            }
609            const HIGHEST_BLOCK_RANGE_START: BlockNumber = BlockRange::LENGTH;
610            let up_to_block_number = BlockRange::LENGTH * 5;
611
612            let importer = {
613                let mut store_mock = MockChainDataStore::new();
614                store_mock
615                    .expect_get_highest_block_range()
616                    .returning(|| {
617                        Ok(Some(BlockRange::from_block_number(
618                            HIGHEST_BLOCK_RANGE_START,
619                        )))
620                    })
621                    .once();
622                store_mock
623                    .expect_get_transactions_in_range()
624                    // Lower bound should be the end block number of the last known block range
625                    // Upper bound should be the block number provided to `import_block_ranges`
626                    .withf(move |range| {
627                        BlockRangesSequence::new(HIGHEST_BLOCK_RANGE_START..=up_to_block_number)
628                            .contains(range)
629                    })
630                    .returning(transactions_for_block);
631                store_mock.expect_store_block_range_roots().returning(|_| Ok(()));
632
633                CardanoChainDataImporter::new_for_test(
634                    Arc::new(MockBlockScannerImpl::new()),
635                    Arc::new(store_mock),
636                )
637            };
638
639            importer.import_block_ranges(up_to_block_number).await.unwrap();
640        }
641
642        #[tokio::test]
643        async fn compute_block_range_merkle_root() {
644            // 2 block ranges worth of blocks with one more block that should be ignored for merkle root computation
645            let up_to_block_number = BlockRange::LENGTH * 2;
646            let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
647            let expected_block_range_roots = vec![
648                (
649                    BlockRange::from_block_number(BlockNumber(0)),
650                    merkle_root_for_blocks(&blocks[0..(*BlockRange::LENGTH as usize)]),
651                ),
652                (
653                    BlockRange::from_block_number(BlockRange::LENGTH),
654                    merkle_root_for_blocks(
655                        &blocks
656                            [(*BlockRange::LENGTH as usize)..((*BlockRange::LENGTH * 2) as usize)],
657                    ),
658                ),
659            ];
660
661            let repository = Arc::new(
662                InMemoryChainDataStore::builder()
663                    .with_blocks_and_transactions(&blocks)
664                    .build(),
665            );
666
667            let importer = CardanoChainDataImporter::new_for_test(
668                Arc::new(MockBlockScannerImpl::new()),
669                repository.clone(),
670            );
671
672            importer.import_block_ranges(up_to_block_number).await.unwrap();
673
674            let block_range_roots = repository.get_all_block_range_root().await;
675            assert_eq!(
676                expected_block_range_roots,
677                block_range_roots
678                    .into_iter()
679                    .map(|r| (r.range, r.merkle_root))
680                    .collect::<Vec<_>>()
681            );
682        }
683    }
684
685    mod transactions_import_start_point {
686        use super::*;
687
688        async fn importer_with_last_polled_point(
689            last_polled_point: Option<RawCardanoPoint>,
690        ) -> CardanoChainDataImporter {
691            let repository = Arc::new(InMemoryChainDataStore::default());
692
693            CardanoChainDataImporter {
694                last_polled_point: Arc::new(Mutex::new(last_polled_point)),
695                ..CardanoChainDataImporter::new_for_test(
696                    Arc::new(DumbBlockScanner::new()),
697                    repository,
698                )
699            }
700        }
701
702        #[tokio::test]
703        async fn cloning_keep_last_polled_point() {
704            let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
705                SlotNumber(15),
706                "block_hash-1",
707            )))
708            .await;
709
710            let cloned_importer = importer.clone();
711            let start_point = cloned_importer.start_point(&None).await.unwrap();
712            assert_eq!(
713                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
714                start_point
715            );
716        }
717
718        #[tokio::test]
719        async fn none_if_nothing_stored_nor_scanned() {
720            let importer = importer_with_last_polled_point(None).await;
721            let highest_stored_block_number = None;
722
723            let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
724            assert_eq!(None, start_point);
725        }
726
727        #[tokio::test]
728        async fn start_at_last_stored_point_if_nothing_scanned() {
729            let importer = importer_with_last_polled_point(None).await;
730            let highest_stored_block_number = Some(ChainPoint::new(
731                SlotNumber(25),
732                BlockNumber(20),
733                hex::encode("block_hash-2"),
734            ));
735
736            let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
737            assert_eq!(
738                Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
739                start_point
740            );
741        }
742
743        #[tokio::test]
744        async fn start_at_last_scanned_point_when_nothing_stored() {
745            let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
746                SlotNumber(15),
747                "block_hash-1",
748            )))
749            .await;
750            let highest_stored_block_number = None;
751
752            let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
753            assert_eq!(
754                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
755                start_point
756            );
757        }
758
759        #[tokio::test]
760        async fn start_at_last_scanned_point_even_if_something_stored() {
761            let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
762                SlotNumber(15),
763                "block_hash-1",
764            )))
765            .await;
766            let highest_stored_block_number = Some(ChainPoint::new(
767                SlotNumber(25),
768                BlockNumber(20),
769                hex::encode("block_hash-2"),
770            ));
771
772            let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
773            assert_eq!(
774                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
775                start_point
776            );
777        }
778
779        #[tokio::test]
780        async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() {
781            let importer = CardanoChainDataImporter {
782                last_polled_point: Arc::new(Mutex::new(None)),
783                ..CardanoChainDataImporter::new_for_test(
784                    Arc::new(
785                        DumbBlockScanner::new()
786                            .forwards(vec![vec![ScannedBlock::new(
787                                "block_hash-1",
788                                BlockNumber(10),
789                                SlotNumber(15),
790                                Vec::<&str>::new(),
791                            )]])
792                            .last_polled_point(Some(RawCardanoPoint::new(
793                                SlotNumber(25),
794                                "block_hash-2",
795                            ))),
796                    ),
797                    Arc::new(InMemoryChainDataStore::default()),
798                )
799            };
800            let highest_stored_block_number = None;
801
802            let start_point_before_import =
803                importer.start_point(&highest_stored_block_number).await.unwrap();
804            assert_eq!(None, start_point_before_import);
805
806            importer
807                .import_blocks_and_transactions(BlockNumber(1000))
808                .await
809                .unwrap();
810
811            let start_point_after_import =
812                importer.start_point(&highest_stored_block_number).await.unwrap();
813            assert_eq!(
814                Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
815                start_point_after_import
816            );
817        }
818
819        #[tokio::test]
820        async fn importing_transactions_dont_update_start_point_if_streamer_did_nothing() {
821            let importer = CardanoChainDataImporter {
822                last_polled_point: Arc::new(Mutex::new(Some(RawCardanoPoint::new(
823                    SlotNumber(15),
824                    "block_hash-1",
825                )))),
826                ..CardanoChainDataImporter::new_for_test(
827                    Arc::new(DumbBlockScanner::new()),
828                    Arc::new(InMemoryChainDataStore::default()),
829                )
830            };
831            let highest_stored_block_number = None;
832
833            importer
834                .import_blocks_and_transactions(BlockNumber(1000))
835                .await
836                .unwrap();
837
838            let start_point_after_import =
839                importer.start_point(&highest_stored_block_number).await.unwrap();
840            assert_eq!(
841                Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
842                start_point_after_import
843            );
844        }
845    }
846
847    mod chain_data_rollback {
848        use super::*;
849
850        #[tokio::test]
851        async fn when_rollbackward_should_remove_transactions() {
852            let expected_remaining_block_with_transactions =
853                vec![CardanoBlockWithTransactions::new(
854                    "block_hash-130",
855                    BlockNumber(130),
856                    SlotNumber(5),
857                    vec!["tx_hash-6", "tx_hash-7"],
858                )];
859            let repository = Arc::new(
860                InMemoryChainDataStore::builder()
861                    .with_blocks_and_transactions(
862                        &[
863                            expected_remaining_block_with_transactions.clone(),
864                            vec![CardanoBlockWithTransactions::new(
865                                hex::encode("block_hash-131"),
866                                BlockNumber(131),
867                                SlotNumber(10),
868                                vec!["tx_hash-8", "tx_hash-9", "tx_hash-10"],
869                            )],
870                        ]
871                        .concat(),
872                    )
873                    .build(),
874            );
875
876            let chain_point = ChainPoint::new(SlotNumber(5), BlockNumber(130), "block_hash-130");
877            let scanner = DumbBlockScanner::new().backward(chain_point);
878
879            let importer =
880                CardanoChainDataImporter::new_for_test(Arc::new(scanner), repository.clone());
881
882            importer
883                .import_blocks_and_transactions(BlockNumber(3000))
884                .await
885                .unwrap();
886
887            let stored_blocks_with_transactions = repository.get_all_block_with_txs().await;
888            assert_eq!(
889                expected_remaining_block_with_transactions,
890                stored_blocks_with_transactions
891            );
892        }
893
894        #[tokio::test]
895        async fn when_rollbackward_should_remove_block_ranges() {
896            let expected_remaining_block_ranges = vec![
897                BlockRange::from_block_number(BlockNumber(0)),
898                BlockRange::from_block_number(BlockRange::LENGTH),
899                BlockRange::from_block_number(BlockRange::LENGTH * 2),
900            ];
901
902            let repository = Arc::new(
903                InMemoryChainDataStore::builder()
904                    .with_block_range_roots(
905                        &[
906                            expected_remaining_block_ranges.clone(),
907                            vec![
908                                BlockRange::from_block_number(BlockRange::LENGTH * 3),
909                                BlockRange::from_block_number(BlockRange::LENGTH * 4),
910                                BlockRange::from_block_number(BlockRange::LENGTH * 5),
911                            ],
912                        ]
913                        .concat()
914                        .into_iter()
915                        .map(|b| (b, MKTreeNode::from_hex("AAAA").unwrap()))
916                        .collect::<Vec<_>>(),
917                    )
918                    .with_blocks_and_transactions(&[CardanoBlockWithTransactions::new(
919                        hex::encode("block_hash-131"),
920                        BlockRange::from_block_number(BlockRange::LENGTH * 3).start,
921                        SlotNumber(1),
922                        vec!["tx_hash-1", "tx_hash-2", "tx_hash-3"],
923                    )])
924                    .build(),
925            );
926
927            let block_range_roots = repository.get_all_block_range_root().await;
928            assert_eq!(6, block_range_roots.len());
929
930            let chain_point =
931                ChainPoint::new(SlotNumber(1), BlockRange::LENGTH * 3, "block_hash-131");
932            let scanner = DumbBlockScanner::new().backward(chain_point);
933
934            let importer =
935                CardanoChainDataImporter::new_for_test(Arc::new(scanner), repository.clone());
936
937            importer
938                .import_blocks_and_transactions(BlockNumber(3000))
939                .await
940                .unwrap();
941
942            assert_eq!(
943                expected_remaining_block_ranges,
944                repository.get_all_block_range().await
945            );
946        }
947    }
948
949    #[tokio::test]
950    async fn test_import_is_non_blocking() {
951        static COUNTER: AtomicUsize = AtomicUsize::new(0);
952        static MAX_COUNTER: usize = 25;
953        static WAIT_TIME: u64 = 50;
954
955        // Use a local set to ensure the counter-task is not dispatched on a different thread
956        let local = task::LocalSet::new();
957        local
958            .run_until(async {
959                let importer = CardanoChainDataImporter::new_for_test(
960                    Arc::new(DumbBlockScanner::new()),
961                    Arc::new(BlockingRepository {
962                        wait_time: Duration::from_millis(WAIT_TIME),
963                    }),
964                );
965
966                let importer_future = importer.import(BlockNumber(100));
967                let counter_task = task::spawn_local(async {
968                    while COUNTER.load(std::sync::atomic::Ordering::SeqCst) < MAX_COUNTER {
969                        tokio::time::sleep(Duration::from_millis(1)).await;
970                        COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
971                    }
972                });
973                importer_future.await.unwrap();
974
975                counter_task.abort();
976            })
977            .await;
978
979        assert_eq!(
980            MAX_COUNTER,
981            COUNTER.load(std::sync::atomic::Ordering::SeqCst)
982        );
983
984        struct BlockingRepository {
985            wait_time: Duration,
986        }
987
988        impl BlockingRepository {
989            fn block_thread(&self) {
990                std::thread::sleep(self.wait_time);
991            }
992        }
993
994        #[async_trait]
995        impl ChainDataStore for BlockingRepository {
996            async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
997                self.block_thread();
998                Ok(None)
999            }
1000
1001            async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
1002                self.block_thread();
1003                Ok(None)
1004            }
1005
1006            async fn store_blocks_and_transactions(
1007                &self,
1008                _: Vec<CardanoBlockWithTransactions>,
1009            ) -> StdResult<()> {
1010                self.block_thread();
1011                Ok(())
1012            }
1013
1014            async fn get_transactions_in_range(
1015                &self,
1016                _: Range<BlockNumber>,
1017            ) -> StdResult<Vec<CardanoTransaction>> {
1018                self.block_thread();
1019                Ok(vec![])
1020            }
1021
1022            async fn store_block_range_roots(
1023                &self,
1024                _: Vec<(BlockRange, MKTreeNode)>,
1025            ) -> StdResult<()> {
1026                self.block_thread();
1027                Ok(())
1028            }
1029
1030            async fn remove_rolled_chain_data_and_block_range(
1031                &self,
1032                _: SlotNumber,
1033            ) -> StdResult<()> {
1034                self.block_thread();
1035                Ok(())
1036            }
1037        }
1038    }
1039}