1use std::mem;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_trait::async_trait;
6use slog::{Logger, debug};
7use tokio::{runtime::Handle, sync::Mutex, task};
8
9use mithril_common::StdResult;
10use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory};
11use mithril_common::entities::{BlockNumber, BlockRange, CardanoBlockWithTransactions, ChainPoint};
12use mithril_common::logging::LoggerExtensions;
13
14use crate::chain_importer::{ChainDataImporter, ChainDataStore};
15use crate::chain_scanner::{BlockScanner, ChainScannedBlocks};
16use crate::entities::RawCardanoPoint;
17
18#[derive(Clone)]
21pub struct CardanoChainDataImporter {
22 block_scanner: Arc<dyn BlockScanner>,
23 transaction_store: Arc<dyn ChainDataStore>,
24 last_polled_point: Arc<Mutex<Option<RawCardanoPoint>>>,
25 logger: Logger,
26}
27
28impl CardanoChainDataImporter {
29 pub fn new(
31 block_scanner: Arc<dyn BlockScanner>,
32 transaction_store: Arc<dyn ChainDataStore>,
33 logger: Logger,
34 ) -> Self {
35 Self {
36 block_scanner,
37 transaction_store,
38 last_polled_point: Arc::new(Mutex::new(None)),
39 logger: logger.new_with_component_name::<Self>(),
40 }
41 }
42
43 async fn start_point(
44 &self,
45 highest_stored_chain_point: &Option<ChainPoint>,
46 ) -> StdResult<Option<RawCardanoPoint>> {
47 let last_polled_point = self.last_polled_point.lock().await.clone();
48 if last_polled_point.is_none() {
49 debug!(
50 self.logger,
51 "No last polled point available, falling back to the highest stored chain point"
52 );
53 }
54
55 Ok(last_polled_point.or(highest_stored_chain_point.as_ref().map(RawCardanoPoint::from)))
56 }
57
58 async fn import_blocks_and_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
59 let highest_stored_beacon = self.transaction_store.get_highest_beacon().await?;
60 let from = self.start_point(&highest_stored_beacon).await?;
61
62 if highest_stored_beacon
63 .as_ref()
64 .is_some_and(|f| f.block_number >= up_to_beacon)
65 {
66 debug!(
67 self.logger,
68 "No need to retrieve Cardano blocks and transactions, the database is up to date for block_number '{up_to_beacon}'",
69 );
70
71 Ok(())
72 } else {
73 debug!(
74 self.logger, "Retrieving Cardano blocks and transactions until block numbered '{up_to_beacon}'";
75 "starting_slot_number" => ?from.as_ref().map(|c| c.slot_number),
76 "highest_stored_block_number" => ?highest_stored_beacon.as_ref().map(|c| c.block_number),
77 );
78
79 self.parse_and_store_block_and_transactions_not_imported_yet(from, up_to_beacon)
80 .await
81 }
82 }
83
84 async fn parse_and_store_block_and_transactions_not_imported_yet(
85 &self,
86 from: Option<RawCardanoPoint>,
87 until: BlockNumber,
88 ) -> StdResult<()> {
89 let mut streamer = self.block_scanner.scan(from, until).await?;
90
91 while let Some(blocks) = streamer.poll_next().await? {
92 match blocks {
93 ChainScannedBlocks::RollForwards(forward_blocks) => {
94 let parsed_blocks_with_transactions: Vec<CardanoBlockWithTransactions> =
95 forward_blocks.into_iter().map(Into::into).collect();
96
97 self.transaction_store
98 .store_blocks_and_transactions(parsed_blocks_with_transactions)
99 .await?;
100 }
101 ChainScannedBlocks::RollBackward(slot_number) => {
102 self.transaction_store
103 .remove_rolled_chain_data_and_block_range(slot_number)
104 .await?;
105 }
106 }
107 }
108
109 if let Some(point) = streamer.last_polled_point() {
110 *self.last_polled_point.lock().await = Some(point);
111 }
112
113 Ok(())
114 }
115
116 async fn import_block_ranges(&self, until: BlockNumber) -> StdResult<()> {
117 let block_ranges = match self.transaction_store.get_highest_block_range().await?.map(
118 |highest_stored_block_range| {
119 BlockRange::all_block_ranges_in(
120 BlockRange::start(highest_stored_block_range.end)..=(until),
121 )
122 },
123 ) {
124 None => BlockRange::all_block_ranges_in(BlockNumber(0)..=(until)),
126 Some(ranges) if ranges.is_empty() => return Ok(()),
128 Some(ranges) => ranges,
129 };
130
131 debug!(
132 self.logger, "Computing Block Range Roots";
133 "start_block" => *block_ranges.start(), "end_block" => *block_ranges.end(),
134 );
135
136 let mut block_ranges_with_merkle_root: Vec<(BlockRange, MKTreeNode)> = vec![];
137 for block_range in block_ranges {
138 let transactions = self
139 .transaction_store
140 .get_transactions_in_range(block_range.start..block_range.end)
141 .await?;
142
143 if transactions.is_empty() {
144 continue;
145 }
146
147 let merkle_root = MKTree::<MKTreeStoreInMemory>::new(&transactions)?.compute_root()?;
148 block_ranges_with_merkle_root.push((block_range, merkle_root));
149
150 if block_ranges_with_merkle_root.len() >= 100 {
151 let block_ranges_with_merkle_root_save =
152 mem::take(&mut block_ranges_with_merkle_root);
153 self.transaction_store
154 .store_block_range_roots(block_ranges_with_merkle_root_save)
155 .await?;
156 }
157 }
158
159 self.transaction_store
160 .store_block_range_roots(block_ranges_with_merkle_root)
161 .await
162 }
163}
164
165#[async_trait]
166impl ChainDataImporter for CardanoChainDataImporter {
167 async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
168 let importer = self.clone();
169 task::spawn_blocking(move || {
170 Handle::current().block_on(async move {
171 importer.import_blocks_and_transactions(up_to_beacon).await?;
172 importer.import_block_ranges(up_to_beacon).await?;
173 Ok(())
174 })
175 })
176 .await
177 .with_context(|| "ChainDataImporter - worker thread crashed")?
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use std::ops::Range;
184 use std::sync::atomic::AtomicUsize;
185 use std::time::Duration;
186
187 use mockall::mock;
188
189 use mithril_common::crypto_helper::MKTree;
190 use mithril_common::entities::{BlockRangesSequence, CardanoTransaction, SlotNumber};
191
192 use crate::chain_importer::MockChainDataStore;
193 use crate::chain_scanner::BlockStreamer;
194 use crate::entities::ScannedBlock;
195 use crate::test::TestLogger;
196 use crate::test::double::{DumbBlockScanner, DumbBlockStreamer, InMemoryChainDataStore};
197
198 use super::*;
199
200 mock! {
201 pub BlockScannerImpl { }
202
203 #[async_trait]
204 impl BlockScanner for BlockScannerImpl {
205 async fn scan(
206 &self,
207 from: Option<RawCardanoPoint>,
208 until: BlockNumber,
209 ) -> StdResult<Box<dyn BlockStreamer>>;
210 }
211 }
212
213 impl CardanoChainDataImporter {
214 pub(crate) fn new_for_test(
215 scanner: Arc<dyn BlockScanner>,
216 transaction_store: Arc<dyn ChainDataStore>,
217 ) -> Self {
218 Self::new(scanner, transaction_store, TestLogger::stdout())
219 }
220 }
221
222 fn build_blocks(
223 start_block_number: BlockNumber,
224 number_of_consecutive_block: BlockNumber,
225 ) -> Vec<ScannedBlock> {
226 (*start_block_number..*(start_block_number + number_of_consecutive_block))
227 .map(|block_number| {
228 ScannedBlock::new(
229 format!("block_hash-{block_number}"),
230 BlockNumber(block_number),
231 SlotNumber(block_number * 100),
232 vec![format!("tx_hash-{}", block_number)],
233 )
234 })
235 .collect()
236 }
237
238 fn into_blocks_with_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoBlockWithTransactions> {
239 blocks.iter().map(|b| b.clone().into()).collect()
240 }
241
242 fn merkle_root_for_blocks(block_ranges: &[ScannedBlock]) -> MKTreeNode {
243 let tx: Vec<_> = block_ranges
244 .iter()
245 .flat_map(|br| br.clone().into_transactions())
246 .collect();
247 MKTree::<MKTreeStoreInMemory>::new(&tx)
248 .unwrap()
249 .compute_root()
250 .unwrap()
251 }
252
253 mod store_blocks_and_transactions {
254 use super::*;
255
256 #[tokio::test]
257 async fn store_blocks_that_do_not_have_transactions() {
258 let repository = Arc::new(InMemoryChainDataStore::default());
259 let up_to_block_number = BlockNumber(1000);
260
261 let scanner = DumbBlockScanner::new().forwards(vec![vec![ScannedBlock::new(
262 "block_hash-1",
263 BlockNumber(10),
264 SlotNumber(15),
265 Vec::<String>::new(),
266 )]]);
267 let importer =
268 CardanoChainDataImporter::new_for_test(Arc::new(scanner), repository.clone());
269
270 importer
271 .import_blocks_and_transactions(up_to_block_number)
272 .await
273 .unwrap();
274
275 let stored_transactions = repository.get_all_block_with_txs().await;
276 assert_eq!(
277 vec![CardanoBlockWithTransactions::new(
278 hex::encode("block_hash-1"),
279 BlockNumber(10),
280 SlotNumber(15),
281 Vec::<String>::new()
282 )],
283 stored_transactions
284 );
285 }
286
287 #[tokio::test]
288 async fn if_nothing_stored_parse_and_store_all_blocks_and_transactions() {
289 let repository = Arc::new(InMemoryChainDataStore::default());
290
291 let blocks = vec![
292 ScannedBlock::new(
293 "block_hash-1",
294 BlockNumber(10),
295 SlotNumber(15),
296 vec!["tx_hash-1", "tx_hash-2"],
297 ),
298 ScannedBlock::new(
299 "block_hash-2",
300 BlockNumber(20),
301 SlotNumber(25),
302 vec!["tx_hash-3", "tx_hash-4"],
303 ),
304 ];
305 let expected_blocks_with_transactions = into_blocks_with_transactions(&blocks);
306 let up_to_block_number = BlockNumber(1000);
307
308 let importer = {
309 let mut scanner_mock = MockBlockScannerImpl::new();
310 scanner_mock
311 .expect_scan()
312 .withf(move |from, until| from.is_none() && until == up_to_block_number)
313 .return_once(move |_, _| {
314 Ok(Box::new(DumbBlockStreamer::new().forwards(vec![blocks])))
315 });
316 CardanoChainDataImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
317 };
318
319 importer
320 .import_blocks_and_transactions(up_to_block_number)
321 .await
322 .unwrap();
323
324 let stored_transactions = repository.get_all_block_with_txs().await;
325 assert_eq!(expected_blocks_with_transactions, stored_transactions);
326 }
327
328 #[tokio::test]
329 async fn if_all_blocks_and_transactions_are_stored_nothing_is_parsed_and_stored() {
330 let up_to_block_number = BlockNumber(12);
331 let last_block = vec![CardanoBlockWithTransactions::new(
332 hex::encode("block_hash-3"),
333 BlockNumber(30),
334 SlotNumber(35),
335 vec!["tx-20"],
336 )];
337 let repository = Arc::new(
338 InMemoryChainDataStore::builder()
339 .with_blocks_and_transactions(&last_block)
340 .build(),
341 );
342 let scanner = DumbBlockScanner::new().forwards(vec![vec![
343 ScannedBlock::new(
344 "block_hash-1",
345 BlockNumber(10),
346 SlotNumber(15),
347 vec!["tx_hash-1", "tx_hash-2"],
348 ),
349 ScannedBlock::new(
350 "block_hash-2",
351 BlockNumber(20),
352 SlotNumber(25),
353 vec!["tx_hash-3", "tx_hash-4"],
354 ),
355 ]]);
356
357 let importer =
358 CardanoChainDataImporter::new_for_test(Arc::new(scanner), repository.clone());
359
360 importer
361 .import_blocks_and_transactions(up_to_block_number)
362 .await
363 .unwrap();
364
365 let blocks_with_txs = repository.get_all_block_with_txs().await;
366 assert_eq!(last_block, blocks_with_txs);
367 }
368
369 #[tokio::test]
370 async fn if_half_blocks_and_transactions_are_already_stored_the_other_half_is_parsed_and_stored()
371 {
372 let highest_stored_chain_point = ChainPoint::new(
373 SlotNumber(134),
374 BlockNumber(10),
375 hex::encode("block_hash-1"),
376 );
377 let stored_block = ScannedBlock::new(
378 hex::decode(&highest_stored_chain_point.block_hash).unwrap(),
379 highest_stored_chain_point.block_number,
380 highest_stored_chain_point.slot_number,
381 vec!["tx_hash-1", "tx_hash-2"],
382 );
383 let to_store_block = ScannedBlock::new(
384 "block_hash-2",
385 BlockNumber(20),
386 SlotNumber(229),
387 vec!["tx_hash-3", "tx_hash-4"],
388 );
389 let existing_blocks = vec![stored_block.clone()];
390 let expected_blocks_with_transactions =
391 into_blocks_with_transactions(&[stored_block.clone(), to_store_block.clone()]);
392 let up_to_block_number = BlockNumber(22);
393
394 let repository = Arc::new(
395 InMemoryChainDataStore::builder()
396 .with_blocks_and_transactions(&existing_blocks)
397 .build(),
398 );
399
400 let importer = {
401 let scanned_blocks = vec![to_store_block.clone()];
402 let mut scanner_mock = MockBlockScannerImpl::new();
403 scanner_mock
404 .expect_scan()
405 .withf(move |from, until| {
406 from == &Some(highest_stored_chain_point.clone().into())
407 && *until == up_to_block_number
408 })
409 .return_once(move |_, _| {
410 Ok(Box::new(
411 DumbBlockStreamer::new().forwards(vec![scanned_blocks]),
412 ))
413 })
414 .once();
415 CardanoChainDataImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
416 };
417
418 importer
419 .import_blocks_and_transactions(up_to_block_number)
420 .await
421 .unwrap();
422
423 let stored_blocks_with_transactions = repository.get_all_block_with_txs().await;
424 assert_eq!(
425 expected_blocks_with_transactions,
426 stored_blocks_with_transactions
427 );
428 }
429 }
430
431 mod compute_legacy_transactions_block_ranges {
432 use super::*;
433
434 #[tokio::test]
435 async fn if_nothing_stored_parse_and_store_all_block_ranges() {
436 let up_to_block_number = BlockRange::LENGTH * 5;
437 let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
438 let repository = Arc::new(
439 InMemoryChainDataStore::builder()
440 .with_blocks_and_transactions(&blocks)
441 .build(),
442 );
443
444 let importer = CardanoChainDataImporter::new_for_test(
445 Arc::new(MockBlockScannerImpl::new()),
446 repository.clone(),
447 );
448
449 importer.import_block_ranges(up_to_block_number).await.unwrap();
450
451 assert_eq!(
452 vec![
453 BlockRange::from_block_number(BlockNumber(0)),
454 BlockRange::from_block_number(BlockRange::LENGTH),
455 BlockRange::from_block_number(BlockRange::LENGTH * 2),
456 BlockRange::from_block_number(BlockRange::LENGTH * 3),
457 BlockRange::from_block_number(BlockRange::LENGTH * 4),
458 ],
459 repository.get_all_block_range().await
460 );
461 }
462
463 #[tokio::test]
464 async fn if_theres_gap_between_two_stored_block_ranges_it_can_still_compute_their_root() {
465 let up_to_block_number = BlockRange::LENGTH * 4;
466 let blocks: Vec<ScannedBlock> = [
468 build_blocks(BlockNumber(0), BlockRange::LENGTH),
469 build_blocks(BlockRange::LENGTH * 3, BlockRange::LENGTH),
470 ]
471 .concat();
472 let repository = Arc::new(
473 InMemoryChainDataStore::builder()
474 .with_blocks_and_transactions(&blocks)
475 .build(),
476 );
477
478 let importer = CardanoChainDataImporter::new_for_test(
479 Arc::new(MockBlockScannerImpl::new()),
480 repository.clone(),
481 );
482
483 importer.import_block_ranges(up_to_block_number).await.unwrap();
484
485 assert_eq!(
486 vec![
487 BlockRange::from_block_number(BlockNumber(0)),
488 BlockRange::from_block_number(BlockRange::LENGTH * 3),
489 ],
490 repository.get_all_block_range().await
491 );
492 }
493
494 #[tokio::test]
495 async fn if_all_block_ranges_computed_nothing_computed_and_stored() {
496 let repository = Arc::new(InMemoryChainDataStore::default());
497
498 let importer = CardanoChainDataImporter::new_for_test(
499 Arc::new(MockBlockScannerImpl::new()),
500 repository.clone(),
501 );
502
503 importer.import_block_ranges(BlockNumber(10_000)).await.unwrap();
504
505 let block_range_roots = repository.get_all_block_range_root().await;
506 assert!(
507 block_range_roots.is_empty(),
508 "No block range root should be stored, found: {block_range_roots:?}"
509 );
510 }
511
512 #[tokio::test]
513 async fn if_half_block_ranges_are_stored_the_other_half_is_computed_and_stored() {
514 let up_to_block_number = BlockRange::LENGTH * 4;
515 let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
516 let repository = Arc::new(
517 InMemoryChainDataStore::builder()
518 .with_blocks_and_transactions(&blocks)
519 .with_block_range_roots(&[
520 (
521 BlockRange::from_block_number(BlockNumber(0)),
522 MKTreeNode::from_hex("AAAA").unwrap(),
523 ),
524 (
525 BlockRange::from_block_number(BlockRange::LENGTH),
526 MKTreeNode::from_hex("BBBB").unwrap(),
527 ),
528 ])
529 .build(),
530 );
531
532 let importer = CardanoChainDataImporter::new_for_test(
533 Arc::new(MockBlockScannerImpl::new()),
534 repository.clone(),
535 );
536
537 importer.import_block_ranges(up_to_block_number).await.unwrap();
538
539 assert_eq!(
540 vec![
541 BlockRange::from_block_number(BlockNumber(0)),
542 BlockRange::from_block_number(BlockRange::LENGTH),
543 BlockRange::from_block_number(BlockRange::LENGTH * 2),
544 BlockRange::from_block_number(BlockRange::LENGTH * 3),
545 ],
546 repository.get_all_block_range().await
547 );
548 }
549
550 #[tokio::test]
551 async fn can_compute_block_ranges_up_to_the_strict_end_of_a_block_range() {
552 let blocks = build_blocks(BlockRange::LENGTH, BlockRange::LENGTH - 1);
554 let repository = Arc::new(
555 InMemoryChainDataStore::builder()
556 .with_blocks_and_transactions(&blocks)
557 .build(),
558 );
559
560 let importer = CardanoChainDataImporter::new_for_test(
561 Arc::new(MockBlockScannerImpl::new()),
562 repository.clone(),
563 );
564
565 importer
566 .import_block_ranges(BlockRange::LENGTH * 2 - 1)
567 .await
568 .unwrap();
569
570 assert_eq!(
571 vec![BlockRange::from_block_number(BlockRange::LENGTH)],
572 repository.get_all_block_range().await
573 );
574 }
575
576 #[tokio::test]
577 async fn can_compute_block_ranges_even_if_last_blocks_in_range_dont_have_transactions() {
578 let blocks = build_blocks(BlockRange::LENGTH, BlockNumber(10));
580 let repository = Arc::new(
581 InMemoryChainDataStore::builder()
582 .with_blocks_and_transactions(&blocks)
583 .build(),
584 );
585
586 let importer = CardanoChainDataImporter::new_for_test(
587 Arc::new(MockBlockScannerImpl::new()),
588 repository.clone(),
589 );
590
591 importer.import_block_ranges(BlockRange::LENGTH * 2).await.unwrap();
592
593 assert_eq!(
594 vec![BlockRange::from_block_number(BlockRange::LENGTH)],
595 repository.get_all_block_range().await
596 );
597 }
598
599 #[tokio::test]
600 async fn block_range_root_retrieves_only_strictly_required_transactions() {
601 fn transactions_for_block(
602 range: Range<BlockNumber>,
603 ) -> StdResult<Vec<CardanoTransaction>> {
604 Ok(build_blocks(range.start, range.end - range.start)
605 .into_iter()
606 .flat_map(|b| b.into_transactions())
607 .collect())
608 }
609 const HIGHEST_BLOCK_RANGE_START: BlockNumber = BlockRange::LENGTH;
610 let up_to_block_number = BlockRange::LENGTH * 5;
611
612 let importer = {
613 let mut store_mock = MockChainDataStore::new();
614 store_mock
615 .expect_get_highest_block_range()
616 .returning(|| {
617 Ok(Some(BlockRange::from_block_number(
618 HIGHEST_BLOCK_RANGE_START,
619 )))
620 })
621 .once();
622 store_mock
623 .expect_get_transactions_in_range()
624 .withf(move |range| {
627 BlockRangesSequence::new(HIGHEST_BLOCK_RANGE_START..=up_to_block_number)
628 .contains(range)
629 })
630 .returning(transactions_for_block);
631 store_mock.expect_store_block_range_roots().returning(|_| Ok(()));
632
633 CardanoChainDataImporter::new_for_test(
634 Arc::new(MockBlockScannerImpl::new()),
635 Arc::new(store_mock),
636 )
637 };
638
639 importer.import_block_ranges(up_to_block_number).await.unwrap();
640 }
641
642 #[tokio::test]
643 async fn compute_block_range_merkle_root() {
644 let up_to_block_number = BlockRange::LENGTH * 2;
646 let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
647 let expected_block_range_roots = vec![
648 (
649 BlockRange::from_block_number(BlockNumber(0)),
650 merkle_root_for_blocks(&blocks[0..(*BlockRange::LENGTH as usize)]),
651 ),
652 (
653 BlockRange::from_block_number(BlockRange::LENGTH),
654 merkle_root_for_blocks(
655 &blocks
656 [(*BlockRange::LENGTH as usize)..((*BlockRange::LENGTH * 2) as usize)],
657 ),
658 ),
659 ];
660
661 let repository = Arc::new(
662 InMemoryChainDataStore::builder()
663 .with_blocks_and_transactions(&blocks)
664 .build(),
665 );
666
667 let importer = CardanoChainDataImporter::new_for_test(
668 Arc::new(MockBlockScannerImpl::new()),
669 repository.clone(),
670 );
671
672 importer.import_block_ranges(up_to_block_number).await.unwrap();
673
674 let block_range_roots = repository.get_all_block_range_root().await;
675 assert_eq!(
676 expected_block_range_roots,
677 block_range_roots
678 .into_iter()
679 .map(|r| (r.range, r.merkle_root))
680 .collect::<Vec<_>>()
681 );
682 }
683 }
684
685 mod transactions_import_start_point {
686 use super::*;
687
688 async fn importer_with_last_polled_point(
689 last_polled_point: Option<RawCardanoPoint>,
690 ) -> CardanoChainDataImporter {
691 let repository = Arc::new(InMemoryChainDataStore::default());
692
693 CardanoChainDataImporter {
694 last_polled_point: Arc::new(Mutex::new(last_polled_point)),
695 ..CardanoChainDataImporter::new_for_test(
696 Arc::new(DumbBlockScanner::new()),
697 repository,
698 )
699 }
700 }
701
702 #[tokio::test]
703 async fn cloning_keep_last_polled_point() {
704 let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
705 SlotNumber(15),
706 "block_hash-1",
707 )))
708 .await;
709
710 let cloned_importer = importer.clone();
711 let start_point = cloned_importer.start_point(&None).await.unwrap();
712 assert_eq!(
713 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
714 start_point
715 );
716 }
717
718 #[tokio::test]
719 async fn none_if_nothing_stored_nor_scanned() {
720 let importer = importer_with_last_polled_point(None).await;
721 let highest_stored_block_number = None;
722
723 let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
724 assert_eq!(None, start_point);
725 }
726
727 #[tokio::test]
728 async fn start_at_last_stored_point_if_nothing_scanned() {
729 let importer = importer_with_last_polled_point(None).await;
730 let highest_stored_block_number = Some(ChainPoint::new(
731 SlotNumber(25),
732 BlockNumber(20),
733 hex::encode("block_hash-2"),
734 ));
735
736 let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
737 assert_eq!(
738 Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
739 start_point
740 );
741 }
742
743 #[tokio::test]
744 async fn start_at_last_scanned_point_when_nothing_stored() {
745 let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
746 SlotNumber(15),
747 "block_hash-1",
748 )))
749 .await;
750 let highest_stored_block_number = None;
751
752 let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
753 assert_eq!(
754 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
755 start_point
756 );
757 }
758
759 #[tokio::test]
760 async fn start_at_last_scanned_point_even_if_something_stored() {
761 let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
762 SlotNumber(15),
763 "block_hash-1",
764 )))
765 .await;
766 let highest_stored_block_number = Some(ChainPoint::new(
767 SlotNumber(25),
768 BlockNumber(20),
769 hex::encode("block_hash-2"),
770 ));
771
772 let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
773 assert_eq!(
774 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
775 start_point
776 );
777 }
778
779 #[tokio::test]
780 async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() {
781 let importer = CardanoChainDataImporter {
782 last_polled_point: Arc::new(Mutex::new(None)),
783 ..CardanoChainDataImporter::new_for_test(
784 Arc::new(
785 DumbBlockScanner::new()
786 .forwards(vec![vec![ScannedBlock::new(
787 "block_hash-1",
788 BlockNumber(10),
789 SlotNumber(15),
790 Vec::<&str>::new(),
791 )]])
792 .last_polled_point(Some(RawCardanoPoint::new(
793 SlotNumber(25),
794 "block_hash-2",
795 ))),
796 ),
797 Arc::new(InMemoryChainDataStore::default()),
798 )
799 };
800 let highest_stored_block_number = None;
801
802 let start_point_before_import =
803 importer.start_point(&highest_stored_block_number).await.unwrap();
804 assert_eq!(None, start_point_before_import);
805
806 importer
807 .import_blocks_and_transactions(BlockNumber(1000))
808 .await
809 .unwrap();
810
811 let start_point_after_import =
812 importer.start_point(&highest_stored_block_number).await.unwrap();
813 assert_eq!(
814 Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
815 start_point_after_import
816 );
817 }
818
819 #[tokio::test]
820 async fn importing_transactions_dont_update_start_point_if_streamer_did_nothing() {
821 let importer = CardanoChainDataImporter {
822 last_polled_point: Arc::new(Mutex::new(Some(RawCardanoPoint::new(
823 SlotNumber(15),
824 "block_hash-1",
825 )))),
826 ..CardanoChainDataImporter::new_for_test(
827 Arc::new(DumbBlockScanner::new()),
828 Arc::new(InMemoryChainDataStore::default()),
829 )
830 };
831 let highest_stored_block_number = None;
832
833 importer
834 .import_blocks_and_transactions(BlockNumber(1000))
835 .await
836 .unwrap();
837
838 let start_point_after_import =
839 importer.start_point(&highest_stored_block_number).await.unwrap();
840 assert_eq!(
841 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
842 start_point_after_import
843 );
844 }
845 }
846
847 mod chain_data_rollback {
848 use super::*;
849
850 #[tokio::test]
851 async fn when_rollbackward_should_remove_transactions() {
852 let expected_remaining_block_with_transactions =
853 vec![CardanoBlockWithTransactions::new(
854 "block_hash-130",
855 BlockNumber(130),
856 SlotNumber(5),
857 vec!["tx_hash-6", "tx_hash-7"],
858 )];
859 let repository = Arc::new(
860 InMemoryChainDataStore::builder()
861 .with_blocks_and_transactions(
862 &[
863 expected_remaining_block_with_transactions.clone(),
864 vec![CardanoBlockWithTransactions::new(
865 hex::encode("block_hash-131"),
866 BlockNumber(131),
867 SlotNumber(10),
868 vec!["tx_hash-8", "tx_hash-9", "tx_hash-10"],
869 )],
870 ]
871 .concat(),
872 )
873 .build(),
874 );
875
876 let chain_point = ChainPoint::new(SlotNumber(5), BlockNumber(130), "block_hash-130");
877 let scanner = DumbBlockScanner::new().backward(chain_point);
878
879 let importer =
880 CardanoChainDataImporter::new_for_test(Arc::new(scanner), repository.clone());
881
882 importer
883 .import_blocks_and_transactions(BlockNumber(3000))
884 .await
885 .unwrap();
886
887 let stored_blocks_with_transactions = repository.get_all_block_with_txs().await;
888 assert_eq!(
889 expected_remaining_block_with_transactions,
890 stored_blocks_with_transactions
891 );
892 }
893
894 #[tokio::test]
895 async fn when_rollbackward_should_remove_block_ranges() {
896 let expected_remaining_block_ranges = vec![
897 BlockRange::from_block_number(BlockNumber(0)),
898 BlockRange::from_block_number(BlockRange::LENGTH),
899 BlockRange::from_block_number(BlockRange::LENGTH * 2),
900 ];
901
902 let repository = Arc::new(
903 InMemoryChainDataStore::builder()
904 .with_block_range_roots(
905 &[
906 expected_remaining_block_ranges.clone(),
907 vec![
908 BlockRange::from_block_number(BlockRange::LENGTH * 3),
909 BlockRange::from_block_number(BlockRange::LENGTH * 4),
910 BlockRange::from_block_number(BlockRange::LENGTH * 5),
911 ],
912 ]
913 .concat()
914 .into_iter()
915 .map(|b| (b, MKTreeNode::from_hex("AAAA").unwrap()))
916 .collect::<Vec<_>>(),
917 )
918 .with_blocks_and_transactions(&[CardanoBlockWithTransactions::new(
919 hex::encode("block_hash-131"),
920 BlockRange::from_block_number(BlockRange::LENGTH * 3).start,
921 SlotNumber(1),
922 vec!["tx_hash-1", "tx_hash-2", "tx_hash-3"],
923 )])
924 .build(),
925 );
926
927 let block_range_roots = repository.get_all_block_range_root().await;
928 assert_eq!(6, block_range_roots.len());
929
930 let chain_point =
931 ChainPoint::new(SlotNumber(1), BlockRange::LENGTH * 3, "block_hash-131");
932 let scanner = DumbBlockScanner::new().backward(chain_point);
933
934 let importer =
935 CardanoChainDataImporter::new_for_test(Arc::new(scanner), repository.clone());
936
937 importer
938 .import_blocks_and_transactions(BlockNumber(3000))
939 .await
940 .unwrap();
941
942 assert_eq!(
943 expected_remaining_block_ranges,
944 repository.get_all_block_range().await
945 );
946 }
947 }
948
949 #[tokio::test]
950 async fn test_import_is_non_blocking() {
951 static COUNTER: AtomicUsize = AtomicUsize::new(0);
952 static MAX_COUNTER: usize = 25;
953 static WAIT_TIME: u64 = 50;
954
955 let local = task::LocalSet::new();
957 local
958 .run_until(async {
959 let importer = CardanoChainDataImporter::new_for_test(
960 Arc::new(DumbBlockScanner::new()),
961 Arc::new(BlockingRepository {
962 wait_time: Duration::from_millis(WAIT_TIME),
963 }),
964 );
965
966 let importer_future = importer.import(BlockNumber(100));
967 let counter_task = task::spawn_local(async {
968 while COUNTER.load(std::sync::atomic::Ordering::SeqCst) < MAX_COUNTER {
969 tokio::time::sleep(Duration::from_millis(1)).await;
970 COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
971 }
972 });
973 importer_future.await.unwrap();
974
975 counter_task.abort();
976 })
977 .await;
978
979 assert_eq!(
980 MAX_COUNTER,
981 COUNTER.load(std::sync::atomic::Ordering::SeqCst)
982 );
983
984 struct BlockingRepository {
985 wait_time: Duration,
986 }
987
988 impl BlockingRepository {
989 fn block_thread(&self) {
990 std::thread::sleep(self.wait_time);
991 }
992 }
993
994 #[async_trait]
995 impl ChainDataStore for BlockingRepository {
996 async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
997 self.block_thread();
998 Ok(None)
999 }
1000
1001 async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
1002 self.block_thread();
1003 Ok(None)
1004 }
1005
1006 async fn store_blocks_and_transactions(
1007 &self,
1008 _: Vec<CardanoBlockWithTransactions>,
1009 ) -> StdResult<()> {
1010 self.block_thread();
1011 Ok(())
1012 }
1013
1014 async fn get_transactions_in_range(
1015 &self,
1016 _: Range<BlockNumber>,
1017 ) -> StdResult<Vec<CardanoTransaction>> {
1018 self.block_thread();
1019 Ok(vec![])
1020 }
1021
1022 async fn store_block_range_roots(
1023 &self,
1024 _: Vec<(BlockRange, MKTreeNode)>,
1025 ) -> StdResult<()> {
1026 self.block_thread();
1027 Ok(())
1028 }
1029
1030 async fn remove_rolled_chain_data_and_block_range(
1031 &self,
1032 _: SlotNumber,
1033 ) -> StdResult<()> {
1034 self.block_thread();
1035 Ok(())
1036 }
1037 }
1038 }
1039}