1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
use async_trait::async_trait;
use crate::cardano_block_scanner::{RawCardanoPoint, ScannedBlock};
use crate::entities::{BlockNumber, SlotNumber};
use crate::StdResult;
/// A scanner that can read cardano transactions in a cardano database
///
/// If you want to mock it using mockall:
/// ```
/// mod test {
/// use std::path::Path;
///
/// use anyhow::anyhow;
/// use async_trait::async_trait;
/// use mockall::mock;
///
/// use mithril_common::cardano_block_scanner::{BlockScanner, BlockStreamer, RawCardanoPoint};
/// use mithril_common::entities::{BlockNumber};
/// use mithril_common::StdResult;
///
/// mock! {
/// pub BlockScannerImpl { }
///
/// #[async_trait]
/// impl BlockScanner for BlockScannerImpl {
/// async fn scan(
/// &self,
/// from: Option<RawCardanoPoint>,
/// until: BlockNumber,
/// ) -> StdResult<Box<dyn BlockStreamer>>;
/// }
/// }
///
/// #[test]
/// fn test_mock() {
/// let mut mock = MockBlockScannerImpl::new();
/// mock.expect_scan().return_once(|_, _| {
/// Err(anyhow!("parse error"))
/// });
/// }
/// }
/// ```
#[async_trait]
pub trait BlockScanner: Sync + Send {
/// Scan the transactions
async fn scan(
&self,
from: Option<RawCardanoPoint>,
until: BlockNumber,
) -> StdResult<Box<dyn BlockStreamer>>;
}
/// [ChainScannedBlocks] allows to scan new blocks and handle rollbacks
#[derive(Debug, Clone, PartialEq)]
pub enum ChainScannedBlocks {
/// Roll forward on the chain to the next list of [ScannedBlock]
RollForwards(Vec<ScannedBlock>),
/// Roll backward on the chain to the previous [SlotNumber]
RollBackward(SlotNumber),
}
/// Trait that define how blocks are streamed from a Cardano database
#[async_trait]
pub trait BlockStreamer: Sync + Send {
/// Stream the next available blocks
async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>>;
/// Get the last polled point of the chain
fn last_polled_point(&self) -> Option<RawCardanoPoint>;
}
cfg_test_tools! {
/// Tests extensions methods for the [BlockStreamer] trait.
#[async_trait]
pub trait BlockStreamerTestExtensions{
/// Stream all the available blocks, may be very memory intensive
async fn poll_all(&mut self) -> StdResult<Vec<ScannedBlock>>;
}
#[async_trait]
impl <S: BlockStreamer + ?Sized> BlockStreamerTestExtensions for S {
async fn poll_all(&mut self) -> StdResult<Vec<ScannedBlock>> {
let mut all_blocks = Vec::new();
while let Some(next_blocks) = self.poll_next().await? {
match next_blocks {
ChainScannedBlocks::RollForwards(mut forward_blocks) => {
all_blocks.append(&mut forward_blocks);
}
ChainScannedBlocks::RollBackward(_) => {
return Err(anyhow::anyhow!("poll_all: RollBackward not supported"));
}
};
}
Ok(all_blocks)
}
}
}