1use std::mem;
2use std::ops::Range;
3use std::sync::Arc;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use slog::{debug, Logger};
8use tokio::{runtime::Handle, sync::Mutex, task};
9
10use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks, RawCardanoPoint};
11use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory};
12use mithril_common::entities::{
13 BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber,
14};
15use mithril_common::logging::LoggerExtensions;
16use mithril_common::signable_builder::TransactionsImporter;
17use mithril_common::StdResult;
18
19#[cfg_attr(test, mockall::automock)]
21#[async_trait]
22pub trait TransactionStore: Send + Sync {
23 async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>>;
25
26 async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>>;
28
29 async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()>;
31
32 async fn get_transactions_in_range(
34 &self,
35 range: Range<BlockNumber>,
36 ) -> StdResult<Vec<CardanoTransaction>>;
37
38 async fn store_block_range_roots(
40 &self,
41 block_ranges: Vec<(BlockRange, MKTreeNode)>,
42 ) -> StdResult<()>;
43
44 async fn remove_rolled_back_transactions_and_block_range(
49 &self,
50 slot_number: SlotNumber,
51 ) -> StdResult<()>;
52}
53
54#[derive(Clone)]
56pub struct CardanoTransactionsImporter {
57 block_scanner: Arc<dyn BlockScanner>,
58 transaction_store: Arc<dyn TransactionStore>,
59 last_polled_point: Arc<Mutex<Option<RawCardanoPoint>>>,
60 logger: Logger,
61}
62
63impl CardanoTransactionsImporter {
64 pub fn new(
66 block_scanner: Arc<dyn BlockScanner>,
67 transaction_store: Arc<dyn TransactionStore>,
68 logger: Logger,
69 ) -> Self {
70 Self {
71 block_scanner,
72 transaction_store,
73 last_polled_point: Arc::new(Mutex::new(None)),
74 logger: logger.new_with_component_name::<Self>(),
75 }
76 }
77
78 async fn start_point(
79 &self,
80 highest_stored_chain_point: &Option<ChainPoint>,
81 ) -> StdResult<Option<RawCardanoPoint>> {
82 let last_polled_point = self.last_polled_point.lock().await.clone();
83 if last_polled_point.is_none() {
84 debug!(
85 self.logger,
86 "No last polled point available, falling back to the highest stored chain point"
87 );
88 }
89
90 Ok(last_polled_point.or(highest_stored_chain_point
91 .as_ref()
92 .map(RawCardanoPoint::from)))
93 }
94
95 async fn import_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
96 let highest_stored_beacon = self.transaction_store.get_highest_beacon().await?;
97 let from = self.start_point(&highest_stored_beacon).await?;
98
99 if highest_stored_beacon
100 .as_ref()
101 .is_some_and(|f| f.block_number >= up_to_beacon)
102 {
103 debug!(
104 self.logger,
105 "No need to retrieve Cardano transactions, the database is up to date for block_number '{up_to_beacon}'",
106 );
107
108 Ok(())
109 } else {
110 debug!(
111 self.logger, "Retrieving Cardano transactions until block numbered '{up_to_beacon}'";
112 "starting_slot_number" => ?from.as_ref().map(|c| c.slot_number),
113 "highest_stored_block_number" => ?highest_stored_beacon.as_ref().map(|c| c.block_number),
114 );
115
116 self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon)
117 .await
118 }
119 }
120
121 async fn parse_and_store_transactions_not_imported_yet(
122 &self,
123 from: Option<RawCardanoPoint>,
124 until: BlockNumber,
125 ) -> StdResult<()> {
126 let mut streamer = self.block_scanner.scan(from, until).await?;
127
128 while let Some(blocks) = streamer.poll_next().await? {
129 match blocks {
130 ChainScannedBlocks::RollForwards(forward_blocks) => {
131 let parsed_transactions: Vec<CardanoTransaction> = forward_blocks
132 .into_iter()
133 .flat_map(|b| b.into_transactions())
134 .collect();
135
136 self.transaction_store
137 .store_transactions(parsed_transactions)
138 .await?;
139 }
140 ChainScannedBlocks::RollBackward(slot_number) => {
141 self.transaction_store
142 .remove_rolled_back_transactions_and_block_range(slot_number)
143 .await?;
144 }
145 }
146 }
147
148 if let Some(point) = streamer.last_polled_point() {
149 *self.last_polled_point.lock().await = Some(point);
150 }
151
152 Ok(())
153 }
154
155 async fn import_block_ranges(&self, until: BlockNumber) -> StdResult<()> {
156 let block_ranges = match self.transaction_store.get_highest_block_range().await?.map(
157 |highest_stored_block_range| {
158 BlockRange::all_block_ranges_in(
159 BlockRange::start(highest_stored_block_range.end)..=(until),
160 )
161 },
162 ) {
163 None => BlockRange::all_block_ranges_in(BlockNumber(0)..=(until)),
165 Some(ranges) if ranges.is_empty() => return Ok(()),
167 Some(ranges) => ranges,
168 };
169
170 debug!(
171 self.logger, "Computing Block Range Roots";
172 "start_block" => *block_ranges.start(), "end_block" => *block_ranges.end(),
173 );
174
175 let mut block_ranges_with_merkle_root: Vec<(BlockRange, MKTreeNode)> = vec![];
176 for block_range in block_ranges {
177 let transactions = self
178 .transaction_store
179 .get_transactions_in_range(block_range.start..block_range.end)
180 .await?;
181
182 if transactions.is_empty() {
183 continue;
184 }
185
186 let merkle_root = MKTree::<MKTreeStoreInMemory>::new(&transactions)?.compute_root()?;
187 block_ranges_with_merkle_root.push((block_range, merkle_root));
188
189 if block_ranges_with_merkle_root.len() >= 100 {
190 let block_ranges_with_merkle_root_save =
191 mem::take(&mut block_ranges_with_merkle_root);
192 self.transaction_store
193 .store_block_range_roots(block_ranges_with_merkle_root_save)
194 .await?;
195 }
196 }
197
198 self.transaction_store
199 .store_block_range_roots(block_ranges_with_merkle_root)
200 .await
201 }
202}
203
204#[async_trait]
205impl TransactionsImporter for CardanoTransactionsImporter {
206 async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
207 let importer = self.clone();
208 task::spawn_blocking(move || {
209 Handle::current().block_on(async move {
210 importer.import_transactions(up_to_beacon).await?;
211 importer.import_block_ranges(up_to_beacon).await?;
212 Ok(())
213 })
214 })
215 .await
216 .with_context(|| "TransactionsImporter - worker thread crashed")?
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use std::sync::atomic::AtomicUsize;
223 use std::time::Duration;
224
225 use mockall::mock;
226
227 use mithril_common::cardano_block_scanner::{
228 BlockStreamer, DumbBlockScanner, DumbBlockStreamer, ScannedBlock,
229 };
230 use mithril_common::crypto_helper::MKTree;
231 use mithril_common::entities::{BlockNumber, BlockRangesSequence};
232 use mithril_persistence::database::repository::CardanoTransactionRepository;
233 use mithril_persistence::sqlite::SqliteConnectionPool;
234
235 use crate::database::test_helper::cardano_tx_db_connection;
236 use crate::test_tools::TestLogger;
237
238 use super::*;
239
240 mock! {
241 pub BlockScannerImpl { }
242
243 #[async_trait]
244 impl BlockScanner for BlockScannerImpl {
245 async fn scan(
246 &self,
247 from: Option<RawCardanoPoint>,
248 until: BlockNumber,
249 ) -> StdResult<Box<dyn BlockStreamer>>;
250 }
251 }
252
253 impl CardanoTransactionsImporter {
254 pub(crate) fn new_for_test(
255 scanner: Arc<dyn BlockScanner>,
256 transaction_store: Arc<dyn TransactionStore>,
257 ) -> Self {
258 CardanoTransactionsImporter::new(scanner, transaction_store, TestLogger::stdout())
259 }
260 }
261
262 fn build_blocks(
263 start_block_number: BlockNumber,
264 number_of_consecutive_block: BlockNumber,
265 ) -> Vec<ScannedBlock> {
266 (*start_block_number..*(start_block_number + number_of_consecutive_block))
267 .map(|block_number| {
268 ScannedBlock::new(
269 format!("block_hash-{}", block_number),
270 BlockNumber(block_number),
271 SlotNumber(block_number * 100),
272 vec![format!("tx_hash-{}", block_number)],
273 )
274 })
275 .collect()
276 }
277
278 fn into_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoTransaction> {
279 blocks
280 .iter()
281 .flat_map(|b| b.clone().into_transactions())
282 .collect()
283 }
284
285 fn merkle_root_for_blocks(block_ranges: &[ScannedBlock]) -> MKTreeNode {
286 let tx: Vec<_> = block_ranges
287 .iter()
288 .flat_map(|br| br.clone().into_transactions())
289 .collect();
290 MKTree::<MKTreeStoreInMemory>::new(&tx)
291 .unwrap()
292 .compute_root()
293 .unwrap()
294 }
295
296 #[tokio::test]
297 async fn if_nothing_stored_parse_and_store_all_transactions() {
298 let connection = cardano_tx_db_connection().unwrap();
299 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
300 SqliteConnectionPool::build_from_connection(connection),
301 )));
302
303 let blocks = vec![
304 ScannedBlock::new(
305 "block_hash-1",
306 BlockNumber(10),
307 SlotNumber(15),
308 vec!["tx_hash-1", "tx_hash-2"],
309 ),
310 ScannedBlock::new(
311 "block_hash-2",
312 BlockNumber(20),
313 SlotNumber(25),
314 vec!["tx_hash-3", "tx_hash-4"],
315 ),
316 ];
317 let expected_transactions = into_transactions(&blocks);
318 let up_to_block_number = BlockNumber(1000);
319
320 let importer = {
321 let mut scanner_mock = MockBlockScannerImpl::new();
322 scanner_mock
323 .expect_scan()
324 .withf(move |from, until| from.is_none() && until == up_to_block_number)
325 .return_once(move |_, _| {
326 Ok(Box::new(DumbBlockStreamer::new().forwards(vec![blocks])))
327 });
328 CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
329 };
330
331 importer
332 .import_transactions(up_to_block_number)
333 .await
334 .expect("Transactions Importer should succeed");
335
336 let stored_transactions = repository.get_all().await.unwrap();
337 assert_eq!(expected_transactions, stored_transactions);
338 }
339
340 #[tokio::test]
341 async fn if_nothing_stored_parse_and_store_all_block_ranges() {
342 let connection = cardano_tx_db_connection().unwrap();
343 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
344 SqliteConnectionPool::build_from_connection(connection),
345 )));
346
347 let up_to_block_number = BlockRange::LENGTH * 5;
348 let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
349 let transactions = into_transactions(&blocks);
350 repository.store_transactions(transactions).await.unwrap();
351
352 let importer = CardanoTransactionsImporter::new_for_test(
353 Arc::new(MockBlockScannerImpl::new()),
354 repository.clone(),
355 );
356
357 importer
358 .import_block_ranges(up_to_block_number)
359 .await
360 .expect("Transactions Importer should succeed");
361
362 let block_range_roots = repository.get_all_block_range_root().unwrap();
363 assert_eq!(
364 vec![
365 BlockRange::from_block_number(BlockNumber(0)),
366 BlockRange::from_block_number(BlockRange::LENGTH),
367 BlockRange::from_block_number(BlockRange::LENGTH * 2),
368 BlockRange::from_block_number(BlockRange::LENGTH * 3),
369 BlockRange::from_block_number(BlockRange::LENGTH * 4),
370 ],
371 block_range_roots
372 .into_iter()
373 .map(|r| r.range)
374 .collect::<Vec<_>>()
375 );
376 }
377
378 #[tokio::test]
379 async fn if_theres_gap_between_two_stored_block_ranges_it_can_still_compute_their_root() {
380 let connection = cardano_tx_db_connection().unwrap();
381 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
382 SqliteConnectionPool::build_from_connection(connection),
383 )));
384
385 let up_to_block_number = BlockRange::LENGTH * 4;
386 let blocks: Vec<ScannedBlock> = [
388 build_blocks(BlockNumber(0), BlockRange::LENGTH),
389 build_blocks(BlockRange::LENGTH * 3, BlockRange::LENGTH),
390 ]
391 .concat();
392 let transactions = into_transactions(&blocks);
393 repository.store_transactions(transactions).await.unwrap();
394
395 let importer = CardanoTransactionsImporter::new_for_test(
396 Arc::new(MockBlockScannerImpl::new()),
397 repository.clone(),
398 );
399
400 importer
401 .import_block_ranges(up_to_block_number)
402 .await
403 .expect("Transactions Importer should succeed");
404
405 let block_range_roots = repository.get_all_block_range_root().unwrap();
406 assert_eq!(
407 vec![
408 BlockRange::from_block_number(BlockNumber(0)),
409 BlockRange::from_block_number(BlockRange::LENGTH * 3),
410 ],
411 block_range_roots
412 .into_iter()
413 .map(|r| r.range)
414 .collect::<Vec<_>>()
415 );
416 }
417
418 #[tokio::test]
419 async fn if_all_block_ranges_computed_nothing_computed_and_stored() {
420 let connection = cardano_tx_db_connection().unwrap();
421 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
422 SqliteConnectionPool::build_from_connection(connection),
423 )));
424
425 let importer = CardanoTransactionsImporter::new_for_test(
426 Arc::new(MockBlockScannerImpl::new()),
427 repository.clone(),
428 );
429
430 importer
431 .import_block_ranges(BlockNumber(10_000))
432 .await
433 .expect("Transactions Importer should succeed");
434
435 let block_range_roots = repository.get_all_block_range_root().unwrap();
436 assert!(
437 block_range_roots.is_empty(),
438 "No block range root should be stored, found: {block_range_roots:?}"
439 );
440 }
441
442 #[tokio::test]
443 async fn if_all_transactions_stored_nothing_is_parsed_and_stored() {
444 let up_to_block_number = BlockNumber(12);
445 let connection = cardano_tx_db_connection().unwrap();
446 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
447 SqliteConnectionPool::build_from_connection(connection),
448 )));
449 let scanner = DumbBlockScanner::new().forwards(vec![vec![
450 ScannedBlock::new(
451 "block_hash-1",
452 BlockNumber(10),
453 SlotNumber(15),
454 vec!["tx_hash-1", "tx_hash-2"],
455 ),
456 ScannedBlock::new(
457 "block_hash-2",
458 BlockNumber(20),
459 SlotNumber(25),
460 vec!["tx_hash-3", "tx_hash-4"],
461 ),
462 ]]);
463
464 let last_tx = CardanoTransaction::new(
465 "tx-20",
466 BlockNumber(30),
467 SlotNumber(35),
468 hex::encode("block_hash-3"),
469 );
470 repository
471 .store_transactions(vec![last_tx.clone()])
472 .await
473 .unwrap();
474
475 let importer =
476 CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
477
478 importer
479 .import_transactions(up_to_block_number)
480 .await
481 .expect("Transactions Importer should succeed");
482
483 let transactions = repository.get_all().await.unwrap();
484 assert_eq!(vec![last_tx], transactions);
485 }
486
487 #[tokio::test]
488 async fn if_half_transactions_are_already_stored_the_other_half_is_parsed_and_stored() {
489 let connection = cardano_tx_db_connection().unwrap();
490 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
491 SqliteConnectionPool::build_from_connection(connection),
492 )));
493
494 let highest_stored_chain_point = ChainPoint::new(
495 SlotNumber(134),
496 BlockNumber(10),
497 hex::encode("block_hash-1"),
498 );
499 let stored_block = ScannedBlock::new(
500 hex::decode(highest_stored_chain_point.block_hash.clone()).unwrap(),
501 highest_stored_chain_point.block_number,
502 highest_stored_chain_point.slot_number,
503 vec!["tx_hash-1", "tx_hash-2"],
504 );
505 let to_store_block = ScannedBlock::new(
506 "block_hash-2",
507 BlockNumber(20),
508 SlotNumber(229),
509 vec!["tx_hash-3", "tx_hash-4"],
510 );
511 let expected_transactions: Vec<CardanoTransaction> = [
512 stored_block.clone().into_transactions(),
513 to_store_block.clone().into_transactions(),
514 ]
515 .concat();
516 let up_to_block_number = BlockNumber(22);
517
518 repository
519 .store_transactions(stored_block.clone().into_transactions())
520 .await
521 .unwrap();
522
523 let importer = {
524 let scanned_blocks = vec![to_store_block.clone()];
525 let mut scanner_mock = MockBlockScannerImpl::new();
526 scanner_mock
527 .expect_scan()
528 .withf(move |from, until| {
529 from == &Some(highest_stored_chain_point.clone().into())
530 && *until == up_to_block_number
531 })
532 .return_once(move |_, _| {
533 Ok(Box::new(
534 DumbBlockStreamer::new().forwards(vec![scanned_blocks]),
535 ))
536 })
537 .once();
538 CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
539 };
540
541 let stored_transactions = repository.get_all().await.unwrap();
542 assert_eq!(stored_block.into_transactions(), stored_transactions);
543
544 importer
545 .import_transactions(up_to_block_number)
546 .await
547 .expect("Transactions Importer should succeed");
548
549 let stored_transactions = repository.get_all().await.unwrap();
550 assert_eq!(expected_transactions, stored_transactions);
551 }
552
553 #[tokio::test]
554 async fn if_half_block_ranges_are_stored_the_other_half_is_computed_and_stored() {
555 let connection = cardano_tx_db_connection().unwrap();
556 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
557 SqliteConnectionPool::build_from_connection(connection),
558 )));
559
560 let up_to_block_number = BlockRange::LENGTH * 4;
561 let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
562 let transactions = into_transactions(&blocks);
563 repository.store_transactions(transactions).await.unwrap();
564 repository
565 .store_block_range_roots(
566 blocks[0..(*(BlockRange::LENGTH * 2) as usize)]
567 .iter()
568 .map(|b| {
569 (
570 BlockRange::from_block_number(b.block_number),
571 MKTreeNode::from_hex("AAAA").unwrap(),
572 )
573 })
574 .collect(),
575 )
576 .await
577 .unwrap();
578
579 let importer = CardanoTransactionsImporter::new_for_test(
580 Arc::new(MockBlockScannerImpl::new()),
581 repository.clone(),
582 );
583
584 let block_range_roots = repository.get_all_block_range_root().unwrap();
585 assert_eq!(
586 vec![
587 BlockRange::from_block_number(BlockNumber(0)),
588 BlockRange::from_block_number(BlockRange::LENGTH),
589 ],
590 block_range_roots
591 .into_iter()
592 .map(|r| r.range)
593 .collect::<Vec<_>>()
594 );
595
596 importer
597 .import_block_ranges(up_to_block_number)
598 .await
599 .expect("Transactions Importer should succeed");
600
601 let block_range_roots = repository.get_all_block_range_root().unwrap();
602 assert_eq!(
603 vec![
604 BlockRange::from_block_number(BlockNumber(0)),
605 BlockRange::from_block_number(BlockRange::LENGTH),
606 BlockRange::from_block_number(BlockRange::LENGTH * 2),
607 BlockRange::from_block_number(BlockRange::LENGTH * 3),
608 ],
609 block_range_roots
610 .into_iter()
611 .map(|r| r.range)
612 .collect::<Vec<_>>()
613 );
614 }
615
616 #[tokio::test]
617 async fn can_compute_block_ranges_up_to_the_strict_end_of_a_block_range() {
618 let connection = cardano_tx_db_connection().unwrap();
619 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
620 SqliteConnectionPool::build_from_connection(connection),
621 )));
622
623 let blocks = build_blocks(BlockRange::LENGTH, BlockRange::LENGTH - 1);
625 let transactions = into_transactions(&blocks);
626 repository.store_transactions(transactions).await.unwrap();
627
628 let importer = CardanoTransactionsImporter::new_for_test(
629 Arc::new(MockBlockScannerImpl::new()),
630 repository.clone(),
631 );
632
633 importer
634 .import_block_ranges(BlockRange::LENGTH * 2 - 1)
635 .await
636 .expect("Transactions Importer should succeed");
637
638 let block_range_roots = repository.get_all_block_range_root().unwrap();
639 assert_eq!(
640 vec![BlockRange::from_block_number(BlockRange::LENGTH)],
641 block_range_roots
642 .into_iter()
643 .map(|r| r.range)
644 .collect::<Vec<_>>()
645 );
646 }
647
648 #[tokio::test]
649 async fn can_compute_block_ranges_even_if_last_blocks_in_range_dont_have_transactions() {
650 let connection = cardano_tx_db_connection().unwrap();
651 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
652 SqliteConnectionPool::build_from_connection(connection),
653 )));
654
655 let blocks = build_blocks(BlockRange::LENGTH, BlockNumber(10));
657 let transactions = into_transactions(&blocks);
658 repository.store_transactions(transactions).await.unwrap();
659
660 let importer = CardanoTransactionsImporter::new_for_test(
661 Arc::new(MockBlockScannerImpl::new()),
662 repository.clone(),
663 );
664
665 importer
666 .import_block_ranges(BlockRange::LENGTH * 2)
667 .await
668 .expect("Transactions Importer should succeed");
669
670 let block_range_roots = repository.get_all_block_range_root().unwrap();
671 assert_eq!(
672 vec![BlockRange::from_block_number(BlockRange::LENGTH)],
673 block_range_roots
674 .into_iter()
675 .map(|r| r.range)
676 .collect::<Vec<_>>()
677 );
678 }
679
680 #[tokio::test]
681 async fn block_range_root_retrieves_only_strictly_required_transactions() {
682 fn transactions_for_block(range: Range<BlockNumber>) -> StdResult<Vec<CardanoTransaction>> {
683 Ok(build_blocks(range.start, range.end - range.start)
684 .into_iter()
685 .flat_map(|b| b.into_transactions())
686 .collect())
687 }
688 const HIGHEST_BLOCK_RANGE_START: BlockNumber = BlockRange::LENGTH;
689 let up_to_block_number = BlockRange::LENGTH * 5;
690
691 let importer = {
692 let mut store_mock = MockTransactionStore::new();
693 store_mock
694 .expect_get_highest_block_range()
695 .returning(|| {
696 Ok(Some(BlockRange::from_block_number(
697 HIGHEST_BLOCK_RANGE_START,
698 )))
699 })
700 .once();
701 store_mock
702 .expect_get_transactions_in_range()
703 .withf(move |range| {
706 BlockRangesSequence::new(HIGHEST_BLOCK_RANGE_START..=up_to_block_number)
707 .contains(range)
708 })
709 .returning(transactions_for_block);
710 store_mock
711 .expect_store_block_range_roots()
712 .returning(|_| Ok(()));
713
714 CardanoTransactionsImporter::new_for_test(
715 Arc::new(MockBlockScannerImpl::new()),
716 Arc::new(store_mock),
717 )
718 };
719
720 importer
721 .import_block_ranges(up_to_block_number)
722 .await
723 .expect("Transactions Importer should succeed");
724 }
725
726 #[tokio::test]
727 async fn compute_block_range_merkle_root() {
728 let connection = cardano_tx_db_connection().unwrap();
729 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
730 SqliteConnectionPool::build_from_connection(connection),
731 )));
732
733 let up_to_block_number = BlockRange::LENGTH * 2;
735 let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
736 let transactions = into_transactions(&blocks);
737 let expected_block_range_roots = vec![
738 (
739 BlockRange::from_block_number(BlockNumber(0)),
740 merkle_root_for_blocks(&blocks[0..(*BlockRange::LENGTH as usize)]),
741 ),
742 (
743 BlockRange::from_block_number(BlockRange::LENGTH),
744 merkle_root_for_blocks(
745 &blocks[(*BlockRange::LENGTH as usize)..((*BlockRange::LENGTH * 2) as usize)],
746 ),
747 ),
748 ];
749
750 repository.store_transactions(transactions).await.unwrap();
751
752 let importer = CardanoTransactionsImporter::new_for_test(
753 Arc::new(MockBlockScannerImpl::new()),
754 repository.clone(),
755 );
756
757 importer
758 .import_block_ranges(up_to_block_number)
759 .await
760 .expect("Transactions Importer should succeed");
761
762 let block_range_roots = repository.get_all_block_range_root().unwrap();
763 assert_eq!(
764 expected_block_range_roots,
765 block_range_roots
766 .into_iter()
767 .map(|br| br.into())
768 .collect::<Vec<_>>()
769 );
770 }
771
772 #[tokio::test]
773 async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_transactions_in_same_order(
774 ) {
775 let blocks = vec![
776 ScannedBlock::new(
777 "block_hash-1",
778 BlockNumber(10),
779 SlotNumber(15),
780 vec!["tx_hash-1", "tx_hash-2"],
781 ),
782 ScannedBlock::new(
783 "block_hash-2",
784 BlockNumber(20),
785 SlotNumber(25),
786 vec!["tx_hash-3", "tx_hash-4"],
787 ),
788 ];
789 let up_to_block_number = BlockNumber(1000);
790 let transactions = into_transactions(&blocks);
791
792 let (importer, repository) = {
793 let connection = cardano_tx_db_connection().unwrap();
794 let connection_pool = Arc::new(SqliteConnectionPool::build_from_connection(connection));
795 let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));
796 let importer = CardanoTransactionsImporter::new_for_test(
797 Arc::new(DumbBlockScanner::new().forwards(vec![blocks.clone()])),
798 repository.clone(),
799 );
800 (importer, repository)
801 };
802
803 importer
804 .import(up_to_block_number)
805 .await
806 .expect("Transactions Importer should succeed");
807 let cold_imported_transactions = repository.get_all().await.unwrap();
808
809 importer
810 .import(up_to_block_number)
811 .await
812 .expect("Transactions Importer should succeed");
813 let warm_imported_transactions = repository.get_all().await.unwrap();
814
815 assert_eq!(transactions, cold_imported_transactions);
816 assert_eq!(cold_imported_transactions, warm_imported_transactions);
817 }
818
819 mod transactions_import_start_point {
820 use super::*;
821
822 async fn importer_with_last_polled_point(
823 last_polled_point: Option<RawCardanoPoint>,
824 ) -> CardanoTransactionsImporter {
825 let connection = cardano_tx_db_connection().unwrap();
826 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
827 SqliteConnectionPool::build_from_connection(connection),
828 )));
829
830 CardanoTransactionsImporter {
831 last_polled_point: Arc::new(Mutex::new(last_polled_point)),
832 ..CardanoTransactionsImporter::new_for_test(
833 Arc::new(DumbBlockScanner::new()),
834 repository,
835 )
836 }
837 }
838
839 #[tokio::test]
840 async fn cloning_keep_last_polled_point() {
841 let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
842 SlotNumber(15),
843 "block_hash-1",
844 )))
845 .await;
846
847 let cloned_importer = importer.clone();
848 let start_point = cloned_importer.start_point(&None).await.unwrap();
849 assert_eq!(
850 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
851 start_point
852 );
853 }
854
855 #[tokio::test]
856 async fn none_if_nothing_stored_nor_scanned() {
857 let importer = importer_with_last_polled_point(None).await;
858 let highest_stored_block_number = None;
859
860 let start_point = importer
861 .start_point(&highest_stored_block_number)
862 .await
863 .unwrap();
864 assert_eq!(None, start_point);
865 }
866
867 #[tokio::test]
868 async fn start_at_last_stored_point_if_nothing_scanned() {
869 let importer = importer_with_last_polled_point(None).await;
870 let highest_stored_block_number = Some(ChainPoint::new(
871 SlotNumber(25),
872 BlockNumber(20),
873 hex::encode("block_hash-2"),
874 ));
875
876 let start_point = importer
877 .start_point(&highest_stored_block_number)
878 .await
879 .unwrap();
880 assert_eq!(
881 Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
882 start_point
883 );
884 }
885
886 #[tokio::test]
887 async fn start_at_last_scanned_point_when_nothing_stored() {
888 let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
889 SlotNumber(15),
890 "block_hash-1",
891 )))
892 .await;
893 let highest_stored_block_number = None;
894
895 let start_point = importer
896 .start_point(&highest_stored_block_number)
897 .await
898 .unwrap();
899 assert_eq!(
900 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
901 start_point
902 );
903 }
904
905 #[tokio::test]
906 async fn start_at_last_scanned_point_even_if_something_stored() {
907 let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
908 SlotNumber(15),
909 "block_hash-1",
910 )))
911 .await;
912 let highest_stored_block_number = Some(ChainPoint::new(
913 SlotNumber(25),
914 BlockNumber(20),
915 hex::encode("block_hash-2"),
916 ));
917
918 let start_point = importer
919 .start_point(&highest_stored_block_number)
920 .await
921 .unwrap();
922 assert_eq!(
923 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
924 start_point
925 );
926 }
927
928 #[tokio::test]
929 async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() {
930 let connection = cardano_tx_db_connection().unwrap();
931 let importer = CardanoTransactionsImporter {
932 last_polled_point: Arc::new(Mutex::new(None)),
933 ..CardanoTransactionsImporter::new_for_test(
934 Arc::new(
935 DumbBlockScanner::new()
936 .forwards(vec![vec![ScannedBlock::new(
937 "block_hash-1",
938 BlockNumber(10),
939 SlotNumber(15),
940 Vec::<&str>::new(),
941 )]])
942 .last_polled_point(Some(RawCardanoPoint::new(
943 SlotNumber(25),
944 "block_hash-2",
945 ))),
946 ),
947 Arc::new(CardanoTransactionRepository::new(Arc::new(
948 SqliteConnectionPool::build_from_connection(connection),
949 ))),
950 )
951 };
952 let highest_stored_block_number = None;
953
954 let start_point_before_import = importer
955 .start_point(&highest_stored_block_number)
956 .await
957 .unwrap();
958 assert_eq!(None, start_point_before_import);
959
960 importer
961 .import_transactions(BlockNumber(1000))
962 .await
963 .unwrap();
964
965 let start_point_after_import = importer
966 .start_point(&highest_stored_block_number)
967 .await
968 .unwrap();
969 assert_eq!(
970 Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
971 start_point_after_import
972 );
973 }
974
975 #[tokio::test]
976 async fn importing_transactions_dont_update_start_point_if_streamer_did_nothing() {
977 let connection = cardano_tx_db_connection().unwrap();
978 let importer = CardanoTransactionsImporter {
979 last_polled_point: Arc::new(Mutex::new(Some(RawCardanoPoint::new(
980 SlotNumber(15),
981 "block_hash-1",
982 )))),
983 ..CardanoTransactionsImporter::new_for_test(
984 Arc::new(DumbBlockScanner::new()),
985 Arc::new(CardanoTransactionRepository::new(Arc::new(
986 SqliteConnectionPool::build_from_connection(connection),
987 ))),
988 )
989 };
990 let highest_stored_block_number = None;
991
992 importer
993 .import_transactions(BlockNumber(1000))
994 .await
995 .unwrap();
996
997 let start_point_after_import = importer
998 .start_point(&highest_stored_block_number)
999 .await
1000 .unwrap();
1001 assert_eq!(
1002 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
1003 start_point_after_import
1004 );
1005 }
1006 }
1007
1008 #[tokio::test]
1009 async fn when_rollbackward_should_remove_transactions() {
1010 let connection = cardano_tx_db_connection().unwrap();
1011 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
1012 SqliteConnectionPool::build_from_connection(connection),
1013 )));
1014
1015 let expected_remaining_transactions = ScannedBlock::new(
1016 "block_hash-130",
1017 BlockNumber(130),
1018 SlotNumber(5),
1019 vec!["tx_hash-6", "tx_hash-7"],
1020 )
1021 .into_transactions();
1022 repository
1023 .store_transactions(expected_remaining_transactions.clone())
1024 .await
1025 .unwrap();
1026 repository
1027 .store_transactions(
1028 ScannedBlock::new(
1029 "block_hash-131",
1030 BlockNumber(131),
1031 SlotNumber(10),
1032 vec!["tx_hash-8", "tx_hash-9", "tx_hash-10"],
1033 )
1034 .into_transactions(),
1035 )
1036 .await
1037 .unwrap();
1038
1039 let chain_point = ChainPoint::new(SlotNumber(5), BlockNumber(130), "block_hash-130");
1040 let scanner = DumbBlockScanner::new().backward(chain_point);
1041
1042 let importer =
1043 CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
1044
1045 importer
1046 .import_transactions(BlockNumber(3000))
1047 .await
1048 .expect("Transactions Importer should succeed");
1049
1050 let stored_transactions = repository.get_all().await.unwrap();
1051 assert_eq!(expected_remaining_transactions, stored_transactions);
1052 }
1053
1054 #[tokio::test]
1055 async fn when_rollbackward_should_remove_block_ranges() {
1056 let connection = cardano_tx_db_connection().unwrap();
1057 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
1058 SqliteConnectionPool::build_from_connection(connection),
1059 )));
1060
1061 let expected_remaining_block_ranges = vec![
1062 BlockRange::from_block_number(BlockNumber(0)),
1063 BlockRange::from_block_number(BlockRange::LENGTH),
1064 BlockRange::from_block_number(BlockRange::LENGTH * 2),
1065 ];
1066
1067 repository
1068 .store_block_range_roots(
1069 expected_remaining_block_ranges
1070 .iter()
1071 .map(|b| (b.clone(), MKTreeNode::from_hex("AAAA").unwrap()))
1072 .collect(),
1073 )
1074 .await
1075 .unwrap();
1076 repository
1077 .store_block_range_roots(
1078 [
1079 BlockRange::from_block_number(BlockRange::LENGTH * 3),
1080 BlockRange::from_block_number(BlockRange::LENGTH * 4),
1081 BlockRange::from_block_number(BlockRange::LENGTH * 5),
1082 ]
1083 .iter()
1084 .map(|b| (b.clone(), MKTreeNode::from_hex("AAAA").unwrap()))
1085 .collect(),
1086 )
1087 .await
1088 .unwrap();
1089 repository
1090 .store_transactions(
1091 ScannedBlock::new(
1092 "block_hash-131",
1093 BlockRange::from_block_number(BlockRange::LENGTH * 3).start,
1094 SlotNumber(1),
1095 vec!["tx_hash-1", "tx_hash-2", "tx_hash-3"],
1096 )
1097 .into_transactions(),
1098 )
1099 .await
1100 .unwrap();
1101
1102 let block_range_roots = repository.get_all_block_range_root().unwrap();
1103 assert_eq!(6, block_range_roots.len());
1104
1105 let chain_point = ChainPoint::new(SlotNumber(1), BlockRange::LENGTH * 3, "block_hash-131");
1106 let scanner = DumbBlockScanner::new().backward(chain_point);
1107
1108 let importer =
1109 CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
1110
1111 importer
1112 .import_transactions(BlockNumber(3000))
1113 .await
1114 .expect("Transactions Importer should succeed");
1115
1116 let block_range_roots = repository.get_all_block_range_root().unwrap();
1117 assert_eq!(
1118 expected_remaining_block_ranges,
1119 block_range_roots
1120 .into_iter()
1121 .map(|r| r.range)
1122 .collect::<Vec<_>>()
1123 );
1124 }
1125
1126 #[tokio::test]
1127 async fn test_import_is_non_blocking() {
1128 static COUNTER: AtomicUsize = AtomicUsize::new(0);
1129 static MAX_COUNTER: usize = 25;
1130 static WAIT_TIME: u64 = 50;
1131
1132 let local = task::LocalSet::new();
1134 local
1135 .run_until(async {
1136 let importer = CardanoTransactionsImporter::new_for_test(
1137 Arc::new(DumbBlockScanner::new()),
1138 Arc::new(BlockingRepository {
1139 wait_time: Duration::from_millis(WAIT_TIME),
1140 }),
1141 );
1142
1143 let importer_future = importer.import(BlockNumber(100));
1144 let counter_task = task::spawn_local(async {
1145 while COUNTER.load(std::sync::atomic::Ordering::SeqCst) < MAX_COUNTER {
1146 tokio::time::sleep(Duration::from_millis(1)).await;
1147 COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1148 }
1149 });
1150 importer_future.await.unwrap();
1151
1152 counter_task.abort();
1153 })
1154 .await;
1155
1156 assert_eq!(
1157 MAX_COUNTER,
1158 COUNTER.load(std::sync::atomic::Ordering::SeqCst)
1159 );
1160
1161 struct BlockingRepository {
1162 wait_time: Duration,
1163 }
1164
1165 impl BlockingRepository {
1166 fn block_thread(&self) {
1167 std::thread::sleep(self.wait_time);
1168 }
1169 }
1170
1171 #[async_trait]
1172 impl TransactionStore for BlockingRepository {
1173 async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
1174 self.block_thread();
1175 Ok(None)
1176 }
1177
1178 async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
1179 self.block_thread();
1180 Ok(None)
1181 }
1182
1183 async fn store_transactions(&self, _: Vec<CardanoTransaction>) -> StdResult<()> {
1184 self.block_thread();
1185 Ok(())
1186 }
1187
1188 async fn get_transactions_in_range(
1189 &self,
1190 _: Range<BlockNumber>,
1191 ) -> StdResult<Vec<CardanoTransaction>> {
1192 self.block_thread();
1193 Ok(vec![])
1194 }
1195
1196 async fn store_block_range_roots(
1197 &self,
1198 _: Vec<(BlockRange, MKTreeNode)>,
1199 ) -> StdResult<()> {
1200 self.block_thread();
1201 Ok(())
1202 }
1203
1204 async fn remove_rolled_back_transactions_and_block_range(
1205 &self,
1206 _: SlotNumber,
1207 ) -> StdResult<()> {
1208 self.block_thread();
1209 Ok(())
1210 }
1211 }
1212 }
1213}