1use std::mem;
2use std::ops::Range;
3use std::sync::Arc;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use slog::{Logger, debug};
8use tokio::{runtime::Handle, sync::Mutex, task};
9
10use mithril_cardano_node_chain::chain_scanner::{BlockScanner, ChainScannedBlocks};
11use mithril_cardano_node_chain::entities::RawCardanoPoint;
12use mithril_common::StdResult;
13use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory};
14use mithril_common::entities::{
15 BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber,
16};
17use mithril_common::logging::LoggerExtensions;
18use mithril_common::signable_builder::TransactionsImporter;
19
20#[cfg_attr(test, mockall::automock)]
22#[async_trait]
23pub trait TransactionStore: Send + Sync {
24 async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>>;
26
27 async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>>;
29
30 async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()>;
32
33 async fn get_transactions_in_range(
35 &self,
36 range: Range<BlockNumber>,
37 ) -> StdResult<Vec<CardanoTransaction>>;
38
39 async fn store_block_range_roots(
41 &self,
42 block_ranges: Vec<(BlockRange, MKTreeNode)>,
43 ) -> StdResult<()>;
44
45 async fn remove_rolled_back_transactions_and_block_range(
50 &self,
51 slot_number: SlotNumber,
52 ) -> StdResult<()>;
53}
54
55#[derive(Clone)]
57pub struct CardanoTransactionsImporter {
58 block_scanner: Arc<dyn BlockScanner>,
59 transaction_store: Arc<dyn TransactionStore>,
60 last_polled_point: Arc<Mutex<Option<RawCardanoPoint>>>,
61 logger: Logger,
62}
63
64impl CardanoTransactionsImporter {
65 pub fn new(
67 block_scanner: Arc<dyn BlockScanner>,
68 transaction_store: Arc<dyn TransactionStore>,
69 logger: Logger,
70 ) -> Self {
71 Self {
72 block_scanner,
73 transaction_store,
74 last_polled_point: Arc::new(Mutex::new(None)),
75 logger: logger.new_with_component_name::<Self>(),
76 }
77 }
78
79 async fn start_point(
80 &self,
81 highest_stored_chain_point: &Option<ChainPoint>,
82 ) -> StdResult<Option<RawCardanoPoint>> {
83 let last_polled_point = self.last_polled_point.lock().await.clone();
84 if last_polled_point.is_none() {
85 debug!(
86 self.logger,
87 "No last polled point available, falling back to the highest stored chain point"
88 );
89 }
90
91 Ok(last_polled_point.or(highest_stored_chain_point.as_ref().map(RawCardanoPoint::from)))
92 }
93
94 async fn import_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
95 let highest_stored_beacon = self.transaction_store.get_highest_beacon().await?;
96 let from = self.start_point(&highest_stored_beacon).await?;
97
98 if highest_stored_beacon
99 .as_ref()
100 .is_some_and(|f| f.block_number >= up_to_beacon)
101 {
102 debug!(
103 self.logger,
104 "No need to retrieve Cardano transactions, the database is up to date for block_number '{up_to_beacon}'",
105 );
106
107 Ok(())
108 } else {
109 debug!(
110 self.logger, "Retrieving Cardano transactions until block numbered '{up_to_beacon}'";
111 "starting_slot_number" => ?from.as_ref().map(|c| c.slot_number),
112 "highest_stored_block_number" => ?highest_stored_beacon.as_ref().map(|c| c.block_number),
113 );
114
115 self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon)
116 .await
117 }
118 }
119
120 async fn parse_and_store_transactions_not_imported_yet(
121 &self,
122 from: Option<RawCardanoPoint>,
123 until: BlockNumber,
124 ) -> StdResult<()> {
125 let mut streamer = self.block_scanner.scan(from, until).await?;
126
127 while let Some(blocks) = streamer.poll_next().await? {
128 match blocks {
129 ChainScannedBlocks::RollForwards(forward_blocks) => {
130 let parsed_transactions: Vec<CardanoTransaction> = forward_blocks
131 .into_iter()
132 .flat_map(|b| b.into_transactions())
133 .collect();
134
135 self.transaction_store.store_transactions(parsed_transactions).await?;
136 }
137 ChainScannedBlocks::RollBackward(slot_number) => {
138 self.transaction_store
139 .remove_rolled_back_transactions_and_block_range(slot_number)
140 .await?;
141 }
142 }
143 }
144
145 if let Some(point) = streamer.last_polled_point() {
146 *self.last_polled_point.lock().await = Some(point);
147 }
148
149 Ok(())
150 }
151
152 async fn import_block_ranges(&self, until: BlockNumber) -> StdResult<()> {
153 let block_ranges = match self.transaction_store.get_highest_block_range().await?.map(
154 |highest_stored_block_range| {
155 BlockRange::all_block_ranges_in(
156 BlockRange::start(highest_stored_block_range.end)..=(until),
157 )
158 },
159 ) {
160 None => BlockRange::all_block_ranges_in(BlockNumber(0)..=(until)),
162 Some(ranges) if ranges.is_empty() => return Ok(()),
164 Some(ranges) => ranges,
165 };
166
167 debug!(
168 self.logger, "Computing Block Range Roots";
169 "start_block" => *block_ranges.start(), "end_block" => *block_ranges.end(),
170 );
171
172 let mut block_ranges_with_merkle_root: Vec<(BlockRange, MKTreeNode)> = vec![];
173 for block_range in block_ranges {
174 let transactions = self
175 .transaction_store
176 .get_transactions_in_range(block_range.start..block_range.end)
177 .await?;
178
179 if transactions.is_empty() {
180 continue;
181 }
182
183 let merkle_root = MKTree::<MKTreeStoreInMemory>::new(&transactions)?.compute_root()?;
184 block_ranges_with_merkle_root.push((block_range, merkle_root));
185
186 if block_ranges_with_merkle_root.len() >= 100 {
187 let block_ranges_with_merkle_root_save =
188 mem::take(&mut block_ranges_with_merkle_root);
189 self.transaction_store
190 .store_block_range_roots(block_ranges_with_merkle_root_save)
191 .await?;
192 }
193 }
194
195 self.transaction_store
196 .store_block_range_roots(block_ranges_with_merkle_root)
197 .await
198 }
199}
200
201#[async_trait]
202impl TransactionsImporter for CardanoTransactionsImporter {
203 async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
204 let importer = self.clone();
205 task::spawn_blocking(move || {
206 Handle::current().block_on(async move {
207 importer.import_transactions(up_to_beacon).await?;
208 importer.import_block_ranges(up_to_beacon).await?;
209 Ok(())
210 })
211 })
212 .await
213 .with_context(|| "TransactionsImporter - worker thread crashed")?
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use std::sync::atomic::AtomicUsize;
220 use std::time::Duration;
221
222 use mockall::mock;
223
224 use mithril_cardano_node_chain::chain_scanner::BlockStreamer;
225 use mithril_cardano_node_chain::entities::ScannedBlock;
226 use mithril_cardano_node_chain::test::double::{DumbBlockScanner, DumbBlockStreamer};
227 use mithril_common::crypto_helper::MKTree;
228 use mithril_common::entities::{BlockNumber, BlockRangesSequence};
229 use mithril_persistence::database::repository::CardanoTransactionRepository;
230 use mithril_persistence::sqlite::SqliteConnectionPool;
231
232 use crate::database::test_helper::cardano_tx_db_connection;
233 use crate::test_tools::TestLogger;
234
235 use super::*;
236
237 mock! {
238 pub BlockScannerImpl { }
239
240 #[async_trait]
241 impl BlockScanner for BlockScannerImpl {
242 async fn scan(
243 &self,
244 from: Option<RawCardanoPoint>,
245 until: BlockNumber,
246 ) -> StdResult<Box<dyn BlockStreamer>>;
247 }
248 }
249
250 impl CardanoTransactionsImporter {
251 pub(crate) fn new_for_test(
252 scanner: Arc<dyn BlockScanner>,
253 transaction_store: Arc<dyn TransactionStore>,
254 ) -> Self {
255 CardanoTransactionsImporter::new(scanner, transaction_store, TestLogger::stdout())
256 }
257 }
258
259 fn build_blocks(
260 start_block_number: BlockNumber,
261 number_of_consecutive_block: BlockNumber,
262 ) -> Vec<ScannedBlock> {
263 (*start_block_number..*(start_block_number + number_of_consecutive_block))
264 .map(|block_number| {
265 ScannedBlock::new(
266 format!("block_hash-{block_number}"),
267 BlockNumber(block_number),
268 SlotNumber(block_number * 100),
269 vec![format!("tx_hash-{}", block_number)],
270 )
271 })
272 .collect()
273 }
274
275 fn into_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoTransaction> {
276 blocks.iter().flat_map(|b| b.clone().into_transactions()).collect()
277 }
278
279 fn merkle_root_for_blocks(block_ranges: &[ScannedBlock]) -> MKTreeNode {
280 let tx: Vec<_> = block_ranges
281 .iter()
282 .flat_map(|br| br.clone().into_transactions())
283 .collect();
284 MKTree::<MKTreeStoreInMemory>::new(&tx)
285 .unwrap()
286 .compute_root()
287 .unwrap()
288 }
289
290 #[tokio::test]
291 async fn if_nothing_stored_parse_and_store_all_transactions() {
292 let connection = cardano_tx_db_connection().unwrap();
293 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
294 SqliteConnectionPool::build_from_connection(connection),
295 )));
296
297 let blocks = vec![
298 ScannedBlock::new(
299 "block_hash-1",
300 BlockNumber(10),
301 SlotNumber(15),
302 vec!["tx_hash-1", "tx_hash-2"],
303 ),
304 ScannedBlock::new(
305 "block_hash-2",
306 BlockNumber(20),
307 SlotNumber(25),
308 vec!["tx_hash-3", "tx_hash-4"],
309 ),
310 ];
311 let expected_transactions = into_transactions(&blocks);
312 let up_to_block_number = BlockNumber(1000);
313
314 let importer = {
315 let mut scanner_mock = MockBlockScannerImpl::new();
316 scanner_mock
317 .expect_scan()
318 .withf(move |from, until| from.is_none() && until == up_to_block_number)
319 .return_once(move |_, _| {
320 Ok(Box::new(DumbBlockStreamer::new().forwards(vec![blocks])))
321 });
322 CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
323 };
324
325 importer
326 .import_transactions(up_to_block_number)
327 .await
328 .expect("Transactions Importer should succeed");
329
330 let stored_transactions = repository.get_all().await.unwrap();
331 assert_eq!(expected_transactions, stored_transactions);
332 }
333
334 #[tokio::test]
335 async fn if_nothing_stored_parse_and_store_all_block_ranges() {
336 let connection = cardano_tx_db_connection().unwrap();
337 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
338 SqliteConnectionPool::build_from_connection(connection),
339 )));
340
341 let up_to_block_number = BlockRange::LENGTH * 5;
342 let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
343 let transactions = into_transactions(&blocks);
344 repository.store_transactions(transactions).await.unwrap();
345
346 let importer = CardanoTransactionsImporter::new_for_test(
347 Arc::new(MockBlockScannerImpl::new()),
348 repository.clone(),
349 );
350
351 importer
352 .import_block_ranges(up_to_block_number)
353 .await
354 .expect("Transactions Importer should succeed");
355
356 let block_range_roots = repository.get_all_block_range_root().unwrap();
357 assert_eq!(
358 vec![
359 BlockRange::from_block_number(BlockNumber(0)),
360 BlockRange::from_block_number(BlockRange::LENGTH),
361 BlockRange::from_block_number(BlockRange::LENGTH * 2),
362 BlockRange::from_block_number(BlockRange::LENGTH * 3),
363 BlockRange::from_block_number(BlockRange::LENGTH * 4),
364 ],
365 block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
366 );
367 }
368
369 #[tokio::test]
370 async fn if_theres_gap_between_two_stored_block_ranges_it_can_still_compute_their_root() {
371 let connection = cardano_tx_db_connection().unwrap();
372 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
373 SqliteConnectionPool::build_from_connection(connection),
374 )));
375
376 let up_to_block_number = BlockRange::LENGTH * 4;
377 let blocks: Vec<ScannedBlock> = [
379 build_blocks(BlockNumber(0), BlockRange::LENGTH),
380 build_blocks(BlockRange::LENGTH * 3, BlockRange::LENGTH),
381 ]
382 .concat();
383 let transactions = into_transactions(&blocks);
384 repository.store_transactions(transactions).await.unwrap();
385
386 let importer = CardanoTransactionsImporter::new_for_test(
387 Arc::new(MockBlockScannerImpl::new()),
388 repository.clone(),
389 );
390
391 importer
392 .import_block_ranges(up_to_block_number)
393 .await
394 .expect("Transactions Importer should succeed");
395
396 let block_range_roots = repository.get_all_block_range_root().unwrap();
397 assert_eq!(
398 vec![
399 BlockRange::from_block_number(BlockNumber(0)),
400 BlockRange::from_block_number(BlockRange::LENGTH * 3),
401 ],
402 block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
403 );
404 }
405
406 #[tokio::test]
407 async fn if_all_block_ranges_computed_nothing_computed_and_stored() {
408 let connection = cardano_tx_db_connection().unwrap();
409 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
410 SqliteConnectionPool::build_from_connection(connection),
411 )));
412
413 let importer = CardanoTransactionsImporter::new_for_test(
414 Arc::new(MockBlockScannerImpl::new()),
415 repository.clone(),
416 );
417
418 importer
419 .import_block_ranges(BlockNumber(10_000))
420 .await
421 .expect("Transactions Importer should succeed");
422
423 let block_range_roots = repository.get_all_block_range_root().unwrap();
424 assert!(
425 block_range_roots.is_empty(),
426 "No block range root should be stored, found: {block_range_roots:?}"
427 );
428 }
429
430 #[tokio::test]
431 async fn if_all_transactions_stored_nothing_is_parsed_and_stored() {
432 let up_to_block_number = BlockNumber(12);
433 let connection = cardano_tx_db_connection().unwrap();
434 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
435 SqliteConnectionPool::build_from_connection(connection),
436 )));
437 let scanner = DumbBlockScanner::new().forwards(vec![vec![
438 ScannedBlock::new(
439 "block_hash-1",
440 BlockNumber(10),
441 SlotNumber(15),
442 vec!["tx_hash-1", "tx_hash-2"],
443 ),
444 ScannedBlock::new(
445 "block_hash-2",
446 BlockNumber(20),
447 SlotNumber(25),
448 vec!["tx_hash-3", "tx_hash-4"],
449 ),
450 ]]);
451
452 let last_tx = CardanoTransaction::new(
453 "tx-20",
454 BlockNumber(30),
455 SlotNumber(35),
456 hex::encode("block_hash-3"),
457 );
458 repository.store_transactions(vec![last_tx.clone()]).await.unwrap();
459
460 let importer =
461 CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
462
463 importer
464 .import_transactions(up_to_block_number)
465 .await
466 .expect("Transactions Importer should succeed");
467
468 let transactions = repository.get_all().await.unwrap();
469 assert_eq!(vec![last_tx], transactions);
470 }
471
472 #[tokio::test]
473 async fn if_half_transactions_are_already_stored_the_other_half_is_parsed_and_stored() {
474 let connection = cardano_tx_db_connection().unwrap();
475 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
476 SqliteConnectionPool::build_from_connection(connection),
477 )));
478
479 let highest_stored_chain_point = ChainPoint::new(
480 SlotNumber(134),
481 BlockNumber(10),
482 hex::encode("block_hash-1"),
483 );
484 let stored_block = ScannedBlock::new(
485 hex::decode(highest_stored_chain_point.block_hash.clone()).unwrap(),
486 highest_stored_chain_point.block_number,
487 highest_stored_chain_point.slot_number,
488 vec!["tx_hash-1", "tx_hash-2"],
489 );
490 let to_store_block = ScannedBlock::new(
491 "block_hash-2",
492 BlockNumber(20),
493 SlotNumber(229),
494 vec!["tx_hash-3", "tx_hash-4"],
495 );
496 let expected_transactions: Vec<CardanoTransaction> = [
497 stored_block.clone().into_transactions(),
498 to_store_block.clone().into_transactions(),
499 ]
500 .concat();
501 let up_to_block_number = BlockNumber(22);
502
503 repository
504 .store_transactions(stored_block.clone().into_transactions())
505 .await
506 .unwrap();
507
508 let importer = {
509 let scanned_blocks = vec![to_store_block.clone()];
510 let mut scanner_mock = MockBlockScannerImpl::new();
511 scanner_mock
512 .expect_scan()
513 .withf(move |from, until| {
514 from == &Some(highest_stored_chain_point.clone().into())
515 && *until == up_to_block_number
516 })
517 .return_once(move |_, _| {
518 Ok(Box::new(
519 DumbBlockStreamer::new().forwards(vec![scanned_blocks]),
520 ))
521 })
522 .once();
523 CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
524 };
525
526 let stored_transactions = repository.get_all().await.unwrap();
527 assert_eq!(stored_block.into_transactions(), stored_transactions);
528
529 importer
530 .import_transactions(up_to_block_number)
531 .await
532 .expect("Transactions Importer should succeed");
533
534 let stored_transactions = repository.get_all().await.unwrap();
535 assert_eq!(expected_transactions, stored_transactions);
536 }
537
538 #[tokio::test]
539 async fn if_half_block_ranges_are_stored_the_other_half_is_computed_and_stored() {
540 let connection = cardano_tx_db_connection().unwrap();
541 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
542 SqliteConnectionPool::build_from_connection(connection),
543 )));
544
545 let up_to_block_number = BlockRange::LENGTH * 4;
546 let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
547 let transactions = into_transactions(&blocks);
548 repository.store_transactions(transactions).await.unwrap();
549 repository
550 .store_block_range_roots(
551 blocks[0..(*(BlockRange::LENGTH * 2) as usize)]
552 .iter()
553 .map(|b| {
554 (
555 BlockRange::from_block_number(b.block_number),
556 MKTreeNode::from_hex("AAAA").unwrap(),
557 )
558 })
559 .collect(),
560 )
561 .await
562 .unwrap();
563
564 let importer = CardanoTransactionsImporter::new_for_test(
565 Arc::new(MockBlockScannerImpl::new()),
566 repository.clone(),
567 );
568
569 let block_range_roots = repository.get_all_block_range_root().unwrap();
570 assert_eq!(
571 vec![
572 BlockRange::from_block_number(BlockNumber(0)),
573 BlockRange::from_block_number(BlockRange::LENGTH),
574 ],
575 block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
576 );
577
578 importer
579 .import_block_ranges(up_to_block_number)
580 .await
581 .expect("Transactions Importer should succeed");
582
583 let block_range_roots = repository.get_all_block_range_root().unwrap();
584 assert_eq!(
585 vec![
586 BlockRange::from_block_number(BlockNumber(0)),
587 BlockRange::from_block_number(BlockRange::LENGTH),
588 BlockRange::from_block_number(BlockRange::LENGTH * 2),
589 BlockRange::from_block_number(BlockRange::LENGTH * 3),
590 ],
591 block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
592 );
593 }
594
595 #[tokio::test]
596 async fn can_compute_block_ranges_up_to_the_strict_end_of_a_block_range() {
597 let connection = cardano_tx_db_connection().unwrap();
598 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
599 SqliteConnectionPool::build_from_connection(connection),
600 )));
601
602 let blocks = build_blocks(BlockRange::LENGTH, BlockRange::LENGTH - 1);
604 let transactions = into_transactions(&blocks);
605 repository.store_transactions(transactions).await.unwrap();
606
607 let importer = CardanoTransactionsImporter::new_for_test(
608 Arc::new(MockBlockScannerImpl::new()),
609 repository.clone(),
610 );
611
612 importer
613 .import_block_ranges(BlockRange::LENGTH * 2 - 1)
614 .await
615 .expect("Transactions Importer should succeed");
616
617 let block_range_roots = repository.get_all_block_range_root().unwrap();
618 assert_eq!(
619 vec![BlockRange::from_block_number(BlockRange::LENGTH)],
620 block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
621 );
622 }
623
624 #[tokio::test]
625 async fn can_compute_block_ranges_even_if_last_blocks_in_range_dont_have_transactions() {
626 let connection = cardano_tx_db_connection().unwrap();
627 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
628 SqliteConnectionPool::build_from_connection(connection),
629 )));
630
631 let blocks = build_blocks(BlockRange::LENGTH, BlockNumber(10));
633 let transactions = into_transactions(&blocks);
634 repository.store_transactions(transactions).await.unwrap();
635
636 let importer = CardanoTransactionsImporter::new_for_test(
637 Arc::new(MockBlockScannerImpl::new()),
638 repository.clone(),
639 );
640
641 importer
642 .import_block_ranges(BlockRange::LENGTH * 2)
643 .await
644 .expect("Transactions Importer should succeed");
645
646 let block_range_roots = repository.get_all_block_range_root().unwrap();
647 assert_eq!(
648 vec![BlockRange::from_block_number(BlockRange::LENGTH)],
649 block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
650 );
651 }
652
653 #[tokio::test]
654 async fn block_range_root_retrieves_only_strictly_required_transactions() {
655 fn transactions_for_block(range: Range<BlockNumber>) -> StdResult<Vec<CardanoTransaction>> {
656 Ok(build_blocks(range.start, range.end - range.start)
657 .into_iter()
658 .flat_map(|b| b.into_transactions())
659 .collect())
660 }
661 const HIGHEST_BLOCK_RANGE_START: BlockNumber = BlockRange::LENGTH;
662 let up_to_block_number = BlockRange::LENGTH * 5;
663
664 let importer = {
665 let mut store_mock = MockTransactionStore::new();
666 store_mock
667 .expect_get_highest_block_range()
668 .returning(|| {
669 Ok(Some(BlockRange::from_block_number(
670 HIGHEST_BLOCK_RANGE_START,
671 )))
672 })
673 .once();
674 store_mock
675 .expect_get_transactions_in_range()
676 .withf(move |range| {
679 BlockRangesSequence::new(HIGHEST_BLOCK_RANGE_START..=up_to_block_number)
680 .contains(range)
681 })
682 .returning(transactions_for_block);
683 store_mock.expect_store_block_range_roots().returning(|_| Ok(()));
684
685 CardanoTransactionsImporter::new_for_test(
686 Arc::new(MockBlockScannerImpl::new()),
687 Arc::new(store_mock),
688 )
689 };
690
691 importer
692 .import_block_ranges(up_to_block_number)
693 .await
694 .expect("Transactions Importer should succeed");
695 }
696
697 #[tokio::test]
698 async fn compute_block_range_merkle_root() {
699 let connection = cardano_tx_db_connection().unwrap();
700 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
701 SqliteConnectionPool::build_from_connection(connection),
702 )));
703
704 let up_to_block_number = BlockRange::LENGTH * 2;
706 let blocks = build_blocks(BlockNumber(0), up_to_block_number + 1);
707 let transactions = into_transactions(&blocks);
708 let expected_block_range_roots = vec![
709 (
710 BlockRange::from_block_number(BlockNumber(0)),
711 merkle_root_for_blocks(&blocks[0..(*BlockRange::LENGTH as usize)]),
712 ),
713 (
714 BlockRange::from_block_number(BlockRange::LENGTH),
715 merkle_root_for_blocks(
716 &blocks[(*BlockRange::LENGTH as usize)..((*BlockRange::LENGTH * 2) as usize)],
717 ),
718 ),
719 ];
720
721 repository.store_transactions(transactions).await.unwrap();
722
723 let importer = CardanoTransactionsImporter::new_for_test(
724 Arc::new(MockBlockScannerImpl::new()),
725 repository.clone(),
726 );
727
728 importer
729 .import_block_ranges(up_to_block_number)
730 .await
731 .expect("Transactions Importer should succeed");
732
733 let block_range_roots = repository.get_all_block_range_root().unwrap();
734 assert_eq!(
735 expected_block_range_roots,
736 block_range_roots.into_iter().map(|br| br.into()).collect::<Vec<_>>()
737 );
738 }
739
740 #[tokio::test]
741 async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_transactions_in_same_order()
742 {
743 let blocks = vec![
744 ScannedBlock::new(
745 "block_hash-1",
746 BlockNumber(10),
747 SlotNumber(15),
748 vec!["tx_hash-1", "tx_hash-2"],
749 ),
750 ScannedBlock::new(
751 "block_hash-2",
752 BlockNumber(20),
753 SlotNumber(25),
754 vec!["tx_hash-3", "tx_hash-4"],
755 ),
756 ];
757 let up_to_block_number = BlockNumber(1000);
758 let transactions = into_transactions(&blocks);
759
760 let (importer, repository) = {
761 let connection = cardano_tx_db_connection().unwrap();
762 let connection_pool = Arc::new(SqliteConnectionPool::build_from_connection(connection));
763 let repository = Arc::new(CardanoTransactionRepository::new(connection_pool));
764 let importer = CardanoTransactionsImporter::new_for_test(
765 Arc::new(DumbBlockScanner::new().forwards(vec![blocks.clone()])),
766 repository.clone(),
767 );
768 (importer, repository)
769 };
770
771 importer
772 .import(up_to_block_number)
773 .await
774 .expect("Transactions Importer should succeed");
775 let cold_imported_transactions = repository.get_all().await.unwrap();
776
777 importer
778 .import(up_to_block_number)
779 .await
780 .expect("Transactions Importer should succeed");
781 let warm_imported_transactions = repository.get_all().await.unwrap();
782
783 assert_eq!(transactions, cold_imported_transactions);
784 assert_eq!(cold_imported_transactions, warm_imported_transactions);
785 }
786
787 mod transactions_import_start_point {
788 use super::*;
789
790 async fn importer_with_last_polled_point(
791 last_polled_point: Option<RawCardanoPoint>,
792 ) -> CardanoTransactionsImporter {
793 let connection = cardano_tx_db_connection().unwrap();
794 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
795 SqliteConnectionPool::build_from_connection(connection),
796 )));
797
798 CardanoTransactionsImporter {
799 last_polled_point: Arc::new(Mutex::new(last_polled_point)),
800 ..CardanoTransactionsImporter::new_for_test(
801 Arc::new(DumbBlockScanner::new()),
802 repository,
803 )
804 }
805 }
806
807 #[tokio::test]
808 async fn cloning_keep_last_polled_point() {
809 let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
810 SlotNumber(15),
811 "block_hash-1",
812 )))
813 .await;
814
815 let cloned_importer = importer.clone();
816 let start_point = cloned_importer.start_point(&None).await.unwrap();
817 assert_eq!(
818 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
819 start_point
820 );
821 }
822
823 #[tokio::test]
824 async fn none_if_nothing_stored_nor_scanned() {
825 let importer = importer_with_last_polled_point(None).await;
826 let highest_stored_block_number = None;
827
828 let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
829 assert_eq!(None, start_point);
830 }
831
832 #[tokio::test]
833 async fn start_at_last_stored_point_if_nothing_scanned() {
834 let importer = importer_with_last_polled_point(None).await;
835 let highest_stored_block_number = Some(ChainPoint::new(
836 SlotNumber(25),
837 BlockNumber(20),
838 hex::encode("block_hash-2"),
839 ));
840
841 let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
842 assert_eq!(
843 Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
844 start_point
845 );
846 }
847
848 #[tokio::test]
849 async fn start_at_last_scanned_point_when_nothing_stored() {
850 let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
851 SlotNumber(15),
852 "block_hash-1",
853 )))
854 .await;
855 let highest_stored_block_number = None;
856
857 let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
858 assert_eq!(
859 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
860 start_point
861 );
862 }
863
864 #[tokio::test]
865 async fn start_at_last_scanned_point_even_if_something_stored() {
866 let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new(
867 SlotNumber(15),
868 "block_hash-1",
869 )))
870 .await;
871 let highest_stored_block_number = Some(ChainPoint::new(
872 SlotNumber(25),
873 BlockNumber(20),
874 hex::encode("block_hash-2"),
875 ));
876
877 let start_point = importer.start_point(&highest_stored_block_number).await.unwrap();
878 assert_eq!(
879 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
880 start_point
881 );
882 }
883
884 #[tokio::test]
885 async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() {
886 let connection = cardano_tx_db_connection().unwrap();
887 let importer = CardanoTransactionsImporter {
888 last_polled_point: Arc::new(Mutex::new(None)),
889 ..CardanoTransactionsImporter::new_for_test(
890 Arc::new(
891 DumbBlockScanner::new()
892 .forwards(vec![vec![ScannedBlock::new(
893 "block_hash-1",
894 BlockNumber(10),
895 SlotNumber(15),
896 Vec::<&str>::new(),
897 )]])
898 .last_polled_point(Some(RawCardanoPoint::new(
899 SlotNumber(25),
900 "block_hash-2",
901 ))),
902 ),
903 Arc::new(CardanoTransactionRepository::new(Arc::new(
904 SqliteConnectionPool::build_from_connection(connection),
905 ))),
906 )
907 };
908 let highest_stored_block_number = None;
909
910 let start_point_before_import =
911 importer.start_point(&highest_stored_block_number).await.unwrap();
912 assert_eq!(None, start_point_before_import);
913
914 importer.import_transactions(BlockNumber(1000)).await.unwrap();
915
916 let start_point_after_import =
917 importer.start_point(&highest_stored_block_number).await.unwrap();
918 assert_eq!(
919 Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")),
920 start_point_after_import
921 );
922 }
923
924 #[tokio::test]
925 async fn importing_transactions_dont_update_start_point_if_streamer_did_nothing() {
926 let connection = cardano_tx_db_connection().unwrap();
927 let importer = CardanoTransactionsImporter {
928 last_polled_point: Arc::new(Mutex::new(Some(RawCardanoPoint::new(
929 SlotNumber(15),
930 "block_hash-1",
931 )))),
932 ..CardanoTransactionsImporter::new_for_test(
933 Arc::new(DumbBlockScanner::new()),
934 Arc::new(CardanoTransactionRepository::new(Arc::new(
935 SqliteConnectionPool::build_from_connection(connection),
936 ))),
937 )
938 };
939 let highest_stored_block_number = None;
940
941 importer.import_transactions(BlockNumber(1000)).await.unwrap();
942
943 let start_point_after_import =
944 importer.start_point(&highest_stored_block_number).await.unwrap();
945 assert_eq!(
946 Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")),
947 start_point_after_import
948 );
949 }
950 }
951
952 #[tokio::test]
953 async fn when_rollbackward_should_remove_transactions() {
954 let connection = cardano_tx_db_connection().unwrap();
955 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
956 SqliteConnectionPool::build_from_connection(connection),
957 )));
958
959 let expected_remaining_transactions = ScannedBlock::new(
960 "block_hash-130",
961 BlockNumber(130),
962 SlotNumber(5),
963 vec!["tx_hash-6", "tx_hash-7"],
964 )
965 .into_transactions();
966 repository
967 .store_transactions(expected_remaining_transactions.clone())
968 .await
969 .unwrap();
970 repository
971 .store_transactions(
972 ScannedBlock::new(
973 "block_hash-131",
974 BlockNumber(131),
975 SlotNumber(10),
976 vec!["tx_hash-8", "tx_hash-9", "tx_hash-10"],
977 )
978 .into_transactions(),
979 )
980 .await
981 .unwrap();
982
983 let chain_point = ChainPoint::new(SlotNumber(5), BlockNumber(130), "block_hash-130");
984 let scanner = DumbBlockScanner::new().backward(chain_point);
985
986 let importer =
987 CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
988
989 importer
990 .import_transactions(BlockNumber(3000))
991 .await
992 .expect("Transactions Importer should succeed");
993
994 let stored_transactions = repository.get_all().await.unwrap();
995 assert_eq!(expected_remaining_transactions, stored_transactions);
996 }
997
998 #[tokio::test]
999 async fn when_rollbackward_should_remove_block_ranges() {
1000 let connection = cardano_tx_db_connection().unwrap();
1001 let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
1002 SqliteConnectionPool::build_from_connection(connection),
1003 )));
1004
1005 let expected_remaining_block_ranges = vec![
1006 BlockRange::from_block_number(BlockNumber(0)),
1007 BlockRange::from_block_number(BlockRange::LENGTH),
1008 BlockRange::from_block_number(BlockRange::LENGTH * 2),
1009 ];
1010
1011 repository
1012 .store_block_range_roots(
1013 expected_remaining_block_ranges
1014 .iter()
1015 .map(|b| (b.clone(), MKTreeNode::from_hex("AAAA").unwrap()))
1016 .collect(),
1017 )
1018 .await
1019 .unwrap();
1020 repository
1021 .store_block_range_roots(
1022 [
1023 BlockRange::from_block_number(BlockRange::LENGTH * 3),
1024 BlockRange::from_block_number(BlockRange::LENGTH * 4),
1025 BlockRange::from_block_number(BlockRange::LENGTH * 5),
1026 ]
1027 .iter()
1028 .map(|b| (b.clone(), MKTreeNode::from_hex("AAAA").unwrap()))
1029 .collect(),
1030 )
1031 .await
1032 .unwrap();
1033 repository
1034 .store_transactions(
1035 ScannedBlock::new(
1036 "block_hash-131",
1037 BlockRange::from_block_number(BlockRange::LENGTH * 3).start,
1038 SlotNumber(1),
1039 vec!["tx_hash-1", "tx_hash-2", "tx_hash-3"],
1040 )
1041 .into_transactions(),
1042 )
1043 .await
1044 .unwrap();
1045
1046 let block_range_roots = repository.get_all_block_range_root().unwrap();
1047 assert_eq!(6, block_range_roots.len());
1048
1049 let chain_point = ChainPoint::new(SlotNumber(1), BlockRange::LENGTH * 3, "block_hash-131");
1050 let scanner = DumbBlockScanner::new().backward(chain_point);
1051
1052 let importer =
1053 CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
1054
1055 importer
1056 .import_transactions(BlockNumber(3000))
1057 .await
1058 .expect("Transactions Importer should succeed");
1059
1060 let block_range_roots = repository.get_all_block_range_root().unwrap();
1061 assert_eq!(
1062 expected_remaining_block_ranges,
1063 block_range_roots.into_iter().map(|r| r.range).collect::<Vec<_>>()
1064 );
1065 }
1066
1067 #[tokio::test]
1068 async fn test_import_is_non_blocking() {
1069 static COUNTER: AtomicUsize = AtomicUsize::new(0);
1070 static MAX_COUNTER: usize = 25;
1071 static WAIT_TIME: u64 = 50;
1072
1073 let local = task::LocalSet::new();
1075 local
1076 .run_until(async {
1077 let importer = CardanoTransactionsImporter::new_for_test(
1078 Arc::new(DumbBlockScanner::new()),
1079 Arc::new(BlockingRepository {
1080 wait_time: Duration::from_millis(WAIT_TIME),
1081 }),
1082 );
1083
1084 let importer_future = importer.import(BlockNumber(100));
1085 let counter_task = task::spawn_local(async {
1086 while COUNTER.load(std::sync::atomic::Ordering::SeqCst) < MAX_COUNTER {
1087 tokio::time::sleep(Duration::from_millis(1)).await;
1088 COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1089 }
1090 });
1091 importer_future.await.unwrap();
1092
1093 counter_task.abort();
1094 })
1095 .await;
1096
1097 assert_eq!(
1098 MAX_COUNTER,
1099 COUNTER.load(std::sync::atomic::Ordering::SeqCst)
1100 );
1101
1102 struct BlockingRepository {
1103 wait_time: Duration,
1104 }
1105
1106 impl BlockingRepository {
1107 fn block_thread(&self) {
1108 std::thread::sleep(self.wait_time);
1109 }
1110 }
1111
1112 #[async_trait]
1113 impl TransactionStore for BlockingRepository {
1114 async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
1115 self.block_thread();
1116 Ok(None)
1117 }
1118
1119 async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
1120 self.block_thread();
1121 Ok(None)
1122 }
1123
1124 async fn store_transactions(&self, _: Vec<CardanoTransaction>) -> StdResult<()> {
1125 self.block_thread();
1126 Ok(())
1127 }
1128
1129 async fn get_transactions_in_range(
1130 &self,
1131 _: Range<BlockNumber>,
1132 ) -> StdResult<Vec<CardanoTransaction>> {
1133 self.block_thread();
1134 Ok(vec![])
1135 }
1136
1137 async fn store_block_range_roots(
1138 &self,
1139 _: Vec<(BlockRange, MKTreeNode)>,
1140 ) -> StdResult<()> {
1141 self.block_thread();
1142 Ok(())
1143 }
1144
1145 async fn remove_rolled_back_transactions_and_block_range(
1146 &self,
1147 _: SlotNumber,
1148 ) -> StdResult<()> {
1149 self.block_thread();
1150 Ok(())
1151 }
1152 }
1153 }
1154}