use std::mem;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use slog::{debug, Logger};
use mithril_common::cardano_block_scanner::BlockScanner;
use mithril_common::crypto_helper::{MKTree, MKTreeNode};
use mithril_common::entities::{BlockNumber, BlockRange, CardanoTransaction, ImmutableFileNumber};
use mithril_common::signable_builder::TransactionsImporter;
use mithril_common::StdResult;
#[cfg_attr(test, mockall::automock)]
#[async_trait]
pub trait TransactionStore: Send + Sync {
async fn get_highest_beacon(&self) -> StdResult<Option<ImmutableFileNumber>>;
async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()>;
async fn get_block_interval_without_block_range_root(
&self,
) -> StdResult<Option<Range<BlockNumber>>>;
async fn get_transactions_in_range(
&self,
range: Range<BlockNumber>,
) -> StdResult<Vec<CardanoTransaction>>;
async fn store_block_range_roots(
&self,
block_ranges: Vec<(BlockRange, MKTreeNode)>,
) -> StdResult<()>;
}
pub struct CardanoTransactionsImporter {
block_scanner: Arc<dyn BlockScanner>,
transaction_store: Arc<dyn TransactionStore>,
logger: Logger,
rescan_offset: Option<usize>,
dirpath: PathBuf,
}
impl CardanoTransactionsImporter {
pub fn new(
block_scanner: Arc<dyn BlockScanner>,
transaction_store: Arc<dyn TransactionStore>,
dirpath: &Path,
rescan_offset: Option<usize>,
logger: Logger,
) -> Self {
Self {
block_scanner,
transaction_store,
logger,
rescan_offset,
dirpath: dirpath.to_owned(),
}
}
async fn import_transactions(&self, up_to_beacon: ImmutableFileNumber) -> StdResult<()> {
let from = self.get_starting_beacon().await?;
self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon)
.await
}
async fn get_starting_beacon(&self) -> StdResult<Option<u64>> {
let highest = self.transaction_store.get_highest_beacon().await?;
let rescan_offset = self.rescan_offset.unwrap_or(0);
let highest = highest.map(|h| (h + 1).saturating_sub(rescan_offset as u64));
Ok(highest)
}
async fn parse_and_store_transactions_not_imported_yet(
&self,
from: Option<ImmutableFileNumber>,
until: ImmutableFileNumber,
) -> StdResult<()> {
if from.is_some_and(|f| f >= until) {
debug!(
self.logger,
"TransactionsImporter does not need to retrieve Cardano transactions, the database is up to date for immutable '{until}'",
);
return Ok(());
}
debug!(
self.logger,
"TransactionsImporter will retrieve Cardano transactions between immutables '{}' and '{until}'",
from.unwrap_or(0)
);
let mut streamer = self.block_scanner.scan(&self.dirpath, from, until).await?;
while let Some(blocks) = streamer.poll_next().await? {
let parsed_transactions: Vec<CardanoTransaction> = blocks
.into_iter()
.flat_map(|b| b.into_transactions())
.collect();
self.transaction_store
.store_transactions(parsed_transactions)
.await?;
}
Ok(())
}
async fn import_block_ranges(&self) -> StdResult<()> {
let block_ranges = match self
.transaction_store
.get_block_interval_without_block_range_root()
.await?
.map(|range| BlockRange::all_block_ranges_in(BlockRange::start(range.start)..range.end))
{
None => return Ok(()),
Some(ranges) if ranges.is_empty() => return Ok(()),
Some(ranges) => ranges,
};
debug!(
self.logger, "TransactionsImporter - computing Block Range Roots";
"start_block" => block_ranges.start(), "end_block" => block_ranges.end(),
);
let mut block_ranges_with_merkle_root: Vec<(BlockRange, MKTreeNode)> = vec![];
for block_range in block_ranges {
let transactions = self
.transaction_store
.get_transactions_in_range(block_range.start..block_range.end)
.await?;
if transactions.is_empty() {
continue;
}
let merkle_root = MKTree::new(&transactions)?.compute_root()?;
block_ranges_with_merkle_root.push((block_range, merkle_root));
if block_ranges_with_merkle_root.len() >= 100 {
let block_ranges_with_merkle_root_save =
mem::take(&mut block_ranges_with_merkle_root);
self.transaction_store
.store_block_range_roots(block_ranges_with_merkle_root_save)
.await?;
}
}
self.transaction_store
.store_block_range_roots(block_ranges_with_merkle_root)
.await
}
}
#[async_trait]
impl TransactionsImporter for CardanoTransactionsImporter {
async fn import(&self, up_to_beacon: ImmutableFileNumber) -> StdResult<()> {
self.import_transactions(up_to_beacon).await?;
self.import_block_ranges().await
}
}
#[cfg(test)]
mod tests {
use mockall::mock;
use mithril_common::cardano_block_scanner::{
BlockStreamer, DumbBlockScanner, DumbBlockStreamer, ScannedBlock,
};
use mithril_common::crypto_helper::MKTree;
use mithril_common::entities::{BlockNumber, BlockRangesSequence};
use mithril_persistence::database::repository::CardanoTransactionRepository;
use crate::database::test_utils::cardano_tx_db_connection;
use super::*;
mock! {
pub BlockScannerImpl { }
#[async_trait]
impl BlockScanner for BlockScannerImpl {
async fn scan(
&self,
dirpath: &Path,
from_immutable: Option<ImmutableFileNumber>,
until_immutable: ImmutableFileNumber,
) -> StdResult<Box<dyn BlockStreamer>>;
}
}
impl CardanoTransactionsImporter {
pub fn new_for_test(
scanner: Arc<dyn BlockScanner>,
transaction_store: Arc<dyn TransactionStore>,
) -> Self {
CardanoTransactionsImporter::new(
scanner,
transaction_store,
Path::new(""),
None,
crate::test_tools::logger_for_tests(),
)
}
}
fn build_blocks(
start_block_number: BlockNumber,
number_of_consecutive_block: BlockNumber,
) -> Vec<ScannedBlock> {
(start_block_number..(start_block_number + number_of_consecutive_block))
.map(|block_number| {
ScannedBlock::new(
format!("block_hash-{}", block_number),
block_number,
block_number * 100,
block_number * 10,
vec![format!("tx_hash-{}", block_number)],
)
})
.collect()
}
fn into_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoTransaction> {
blocks
.iter()
.flat_map(|b| b.clone().into_transactions())
.collect()
}
fn merkle_root_for_blocks(block_ranges: &[ScannedBlock]) -> MKTreeNode {
let tx: Vec<_> = block_ranges
.iter()
.flat_map(|br| br.clone().into_transactions())
.collect();
MKTree::new(&tx).unwrap().compute_root().unwrap()
}
#[tokio::test]
async fn if_nothing_stored_parse_and_store_all_transactions() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let blocks = vec![
ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]),
ScannedBlock::new("block_hash-2", 20, 25, 12, vec!["tx_hash-3", "tx_hash-4"]),
];
let expected_transactions = into_transactions(&blocks);
let up_to_beacon = 12;
let importer = {
let mut scanner_mock = MockBlockScannerImpl::new();
scanner_mock
.expect_scan()
.withf(move |_, from, until| from.is_none() && until == &up_to_beacon)
.return_once(move |_, _, _| Ok(Box::new(DumbBlockStreamer::new(vec![blocks]))));
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
};
importer
.import_transactions(up_to_beacon)
.await
.expect("Transactions Importer should succeed");
let stored_transactions = repository.get_all().await.unwrap();
assert_eq!(expected_transactions, stored_transactions);
}
#[tokio::test]
async fn if_nothing_stored_parse_and_store_all_block_ranges() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let blocks = build_blocks(0, BlockRange::LENGTH * 5 + 1);
let transactions = into_transactions(&blocks);
repository.store_transactions(transactions).await.unwrap();
let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(MockBlockScannerImpl::new()),
repository.clone(),
);
importer
.import_block_ranges()
.await
.expect("Transactions Importer should succeed");
let block_range_roots = repository.get_all_block_range_root().unwrap();
assert_eq!(
vec![
BlockRange::from_block_number(0),
BlockRange::from_block_number(BlockRange::LENGTH),
BlockRange::from_block_number(BlockRange::LENGTH * 2),
BlockRange::from_block_number(BlockRange::LENGTH * 3),
BlockRange::from_block_number(BlockRange::LENGTH * 4),
],
block_range_roots
.into_iter()
.map(|r| r.range)
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn if_theres_gap_between_two_stored_block_ranges_it_can_still_compute_their_root() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let blocks: Vec<ScannedBlock> = [
build_blocks(0, BlockRange::LENGTH),
build_blocks(BlockRange::LENGTH * 3, BlockRange::LENGTH),
]
.concat();
let transactions = into_transactions(&blocks);
repository.store_transactions(transactions).await.unwrap();
let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(MockBlockScannerImpl::new()),
repository.clone(),
);
importer
.import_block_ranges()
.await
.expect("Transactions Importer should succeed");
let block_range_roots = repository.get_all_block_range_root().unwrap();
assert_eq!(
vec![
BlockRange::from_block_number(0),
BlockRange::from_block_number(BlockRange::LENGTH * 3),
],
block_range_roots
.into_iter()
.map(|r| r.range)
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn if_all_block_ranges_computed_nothing_computed_and_stored() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(MockBlockScannerImpl::new()),
repository.clone(),
);
importer
.import_block_ranges()
.await
.expect("Transactions Importer should succeed");
let block_range_roots = repository.get_all_block_range_root().unwrap();
assert!(
block_range_roots.is_empty(),
"No block range root should be stored, found: {block_range_roots:?}"
);
}
#[tokio::test]
async fn if_all_transactions_stored_nothing_is_parsed_and_stored() {
let up_to_beacon = 12;
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let scanner = DumbBlockScanner::new(vec![
ScannedBlock::new("block_hash-1", 10, 15, 10, vec!["tx_hash-1", "tx_hash-2"]),
ScannedBlock::new("block_hash-2", 20, 25, 11, vec!["tx_hash-3", "tx_hash-4"]),
]);
let last_tx = CardanoTransaction::new("tx-20", 30, 35, "block_hash-3", up_to_beacon);
repository
.store_transactions(vec![last_tx.clone()])
.await
.unwrap();
let importer =
CardanoTransactionsImporter::new_for_test(Arc::new(scanner), repository.clone());
importer
.import_transactions(up_to_beacon)
.await
.expect("Transactions Importer should succeed");
let transactions = repository.get_all().await.unwrap();
assert_eq!(vec![last_tx], transactions);
}
#[tokio::test]
async fn if_half_transactions_are_already_stored_the_other_half_is_parsed_and_stored() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let stored_block =
ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]);
let to_store_block =
ScannedBlock::new("block_hash-2", 20, 25, 12, vec!["tx_hash-3", "tx_hash-4"]);
let expected_transactions: Vec<CardanoTransaction> = [
stored_block.clone().into_transactions(),
to_store_block.clone().into_transactions(),
]
.concat();
let up_to_beacon = 14;
repository
.store_transactions(stored_block.clone().into_transactions())
.await
.unwrap();
let importer = {
let scanned_blocks = vec![to_store_block.clone()];
let mut scanner_mock = MockBlockScannerImpl::new();
scanner_mock
.expect_scan()
.withf(move |_, from, until| from == &Some(12) && until == &up_to_beacon)
.return_once(move |_, _, _| {
Ok(Box::new(DumbBlockStreamer::new(vec![scanned_blocks])))
})
.once();
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
};
let stored_transactions = repository.get_all().await.unwrap();
assert_eq!(stored_block.into_transactions(), stored_transactions);
importer
.import_transactions(up_to_beacon)
.await
.expect("Transactions Importer should succeed");
let stored_transactions = repository.get_all().await.unwrap();
assert_eq!(expected_transactions, stored_transactions);
}
#[tokio::test]
async fn if_half_block_ranges_are_stored_the_other_half_is_computed_and_stored() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let blocks = build_blocks(0, BlockRange::LENGTH * 4 + 1);
let transactions = into_transactions(&blocks);
repository.store_transactions(transactions).await.unwrap();
repository
.store_block_range_roots(
blocks[0..((BlockRange::LENGTH * 2) as usize)]
.iter()
.map(|b| {
(
BlockRange::from_block_number(b.block_number),
MKTreeNode::from_hex("AAAA").unwrap(),
)
})
.collect(),
)
.await
.unwrap();
let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(MockBlockScannerImpl::new()),
repository.clone(),
);
let block_range_roots = repository.get_all_block_range_root().unwrap();
assert_eq!(
vec![
BlockRange::from_block_number(0),
BlockRange::from_block_number(BlockRange::LENGTH),
],
block_range_roots
.into_iter()
.map(|r| r.range)
.collect::<Vec<_>>()
);
importer
.import_block_ranges()
.await
.expect("Transactions Importer should succeed");
let block_range_roots = repository.get_all_block_range_root().unwrap();
assert_eq!(
vec![
BlockRange::from_block_number(0),
BlockRange::from_block_number(BlockRange::LENGTH),
BlockRange::from_block_number(BlockRange::LENGTH * 2),
BlockRange::from_block_number(BlockRange::LENGTH * 3),
],
block_range_roots
.into_iter()
.map(|r| r.range)
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn block_range_root_retrieves_only_strictly_required_transactions() {
fn transactions_for_block(range: Range<BlockNumber>) -> StdResult<Vec<CardanoTransaction>> {
Ok(build_blocks(range.start, range.count() as BlockNumber)
.iter()
.flat_map(|b| b.clone().into_transactions())
.collect())
}
let importer = {
let mut store_mock = MockTransactionStore::new();
store_mock
.expect_get_block_interval_without_block_range_root()
.returning(|| Ok(Some((BlockRange::LENGTH + 2)..(BlockRange::LENGTH * 5))))
.once();
store_mock
.expect_get_transactions_in_range()
.withf(|range| {
BlockRangesSequence::new(BlockRange::LENGTH..(BlockRange::LENGTH * 5))
.contains(range)
})
.returning(transactions_for_block);
store_mock
.expect_store_block_range_roots()
.returning(|_| Ok(()));
CardanoTransactionsImporter::new_for_test(
Arc::new(MockBlockScannerImpl::new()),
Arc::new(store_mock),
)
};
importer
.import_block_ranges()
.await
.expect("Transactions Importer should succeed");
}
#[tokio::test]
async fn compute_block_range_merkle_root() {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(connection)));
let blocks = build_blocks(0, BlockRange::LENGTH * 2 + 1);
let transactions = into_transactions(&blocks);
let expected_block_range_roots = vec![
(
BlockRange::from_block_number(0),
merkle_root_for_blocks(&blocks[0..(BlockRange::LENGTH as usize)]),
),
(
BlockRange::from_block_number(BlockRange::LENGTH),
merkle_root_for_blocks(
&blocks[(BlockRange::LENGTH as usize)..((BlockRange::LENGTH * 2) as usize)],
),
),
];
repository.store_transactions(transactions).await.unwrap();
let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(MockBlockScannerImpl::new()),
repository.clone(),
);
importer
.import_block_ranges()
.await
.expect("Transactions Importer should succeed");
let block_range_roots = repository.get_all_block_range_root().unwrap();
assert_eq!(
expected_block_range_roots,
block_range_roots
.into_iter()
.map(|br| br.into())
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_transactions_in_same_order(
) {
let blocks = vec![
ScannedBlock::new("block_hash-1", 10, 15, 11, vec!["tx_hash-1", "tx_hash-2"]),
ScannedBlock::new("block_hash-2", 20, 25, 12, vec!["tx_hash-3", "tx_hash-4"]),
];
let transactions = into_transactions(&blocks);
let (importer, repository) = {
let connection = Arc::new(cardano_tx_db_connection().unwrap());
let repository = Arc::new(CardanoTransactionRepository::new(connection.clone()));
let importer = CardanoTransactionsImporter::new_for_test(
Arc::new(DumbBlockScanner::new(blocks.clone())),
Arc::new(CardanoTransactionRepository::new(connection.clone())),
);
(importer, repository)
};
importer
.import(12)
.await
.expect("Transactions Importer should succeed");
let cold_imported_transactions = repository.get_all().await.unwrap();
importer
.import(12)
.await
.expect("Transactions Importer should succeed");
let warm_imported_transactions = repository.get_all().await.unwrap();
assert_eq!(transactions, cold_imported_transactions);
assert_eq!(cold_imported_transactions, warm_imported_transactions);
}
#[tokio::test]
async fn change_parsed_lower_bound_when_rescan_limit_is_set() {
fn importer_with_offset(
highest_stored_beacon: ImmutableFileNumber,
rescan_offset: ImmutableFileNumber,
) -> CardanoTransactionsImporter {
let mut store = MockTransactionStore::new();
store
.expect_get_highest_beacon()
.returning(move || Ok(Some(highest_stored_beacon)));
CardanoTransactionsImporter::new(
Arc::new(MockBlockScannerImpl::new()),
Arc::new(store),
Path::new(""),
Some(rescan_offset as usize),
crate::test_tools::logger_for_tests(),
)
}
let importer = importer_with_offset(8, 3);
let from = importer.get_starting_beacon().await.unwrap();
assert_eq!(Some(6), from);
let importer = importer_with_offset(5, 10);
let from = importer.get_starting_beacon().await.unwrap();
assert_eq!(Some(0), from);
}
}