1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use async_trait::async_trait;

use crate::cardano_block_scanner::{RawCardanoPoint, ScannedBlock};
use crate::entities::{BlockNumber, SlotNumber};
use crate::StdResult;

/// A scanner that can read cardano transactions in a cardano database
///
/// If you want to mock it using mockall:
/// ```
/// mod test {
///     use std::path::Path;
///
///     use anyhow::anyhow;
///     use async_trait::async_trait;
///     use mockall::mock;
///
///     use mithril_common::cardano_block_scanner::{BlockScanner, BlockStreamer, RawCardanoPoint};
///     use mithril_common::entities::{BlockNumber};
///     use mithril_common::StdResult;
///
///     mock! {
///         pub BlockScannerImpl { }
///
///         #[async_trait]
///         impl BlockScanner for BlockScannerImpl {
///             async fn scan(
///               &self,
///               from: Option<RawCardanoPoint>,
///               until: BlockNumber,
///             ) -> StdResult<Box<dyn BlockStreamer>>;
///         }
///     }
///
///     #[test]
///     fn test_mock() {
///         let mut mock = MockBlockScannerImpl::new();
///         mock.expect_scan().return_once(|_, _| {
///             Err(anyhow!("parse error"))
///         });
///     }
/// }
/// ```
#[async_trait]
pub trait BlockScanner: Sync + Send {
    /// Scan the transactions
    async fn scan(
        &self,
        from: Option<RawCardanoPoint>,
        until: BlockNumber,
    ) -> StdResult<Box<dyn BlockStreamer>>;
}

/// [ChainScannedBlocks] allows to scan new blocks and handle rollbacks
#[derive(Debug, Clone, PartialEq)]
pub enum ChainScannedBlocks {
    /// Roll forward on the chain to the next list of [ScannedBlock]
    RollForwards(Vec<ScannedBlock>),
    /// Roll backward on the chain to the previous [SlotNumber]
    RollBackward(SlotNumber),
}

/// Trait that define how blocks are streamed from a Cardano database
#[async_trait]
pub trait BlockStreamer: Sync + Send {
    /// Stream the next available blocks
    async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>>;

    /// Get the last polled point of the chain
    fn last_polled_point(&self) -> Option<RawCardanoPoint>;
}

cfg_test_tools! {
    /// Tests extensions methods for the [BlockStreamer] trait.
    #[async_trait]
    pub trait BlockStreamerTestExtensions{
        /// Stream all the available blocks, may be very memory intensive
        async fn poll_all(&mut self) -> StdResult<Vec<ScannedBlock>>;
    }

    #[async_trait]
    impl <S: BlockStreamer + ?Sized> BlockStreamerTestExtensions for S {
        async fn poll_all(&mut self) -> StdResult<Vec<ScannedBlock>> {
            let mut all_blocks = Vec::new();
            while let Some(next_blocks) = self.poll_next().await? {
                match next_blocks {
                    ChainScannedBlocks::RollForwards(mut forward_blocks) => {
                        all_blocks.append(&mut forward_blocks);
                    }
                    ChainScannedBlocks::RollBackward(_) => {
                        return Err(anyhow::anyhow!("poll_all: RollBackward not supported"));
                    }
                };
            }
            Ok(all_blocks)
        }
    }

}