mithril_cardano_node_chain/test/double/
block_scanner.rs

1use std::collections::VecDeque;
2use std::sync::RwLock;
3
4use async_trait::async_trait;
5
6use mithril_common::StdResult;
7use mithril_common::entities::{BlockNumber, ChainPoint};
8
9use crate::chain_scanner::{BlockScanner, BlockStreamer, ChainScannedBlocks};
10use crate::entities::{RawCardanoPoint, ScannedBlock};
11
12/// Dumb block scanner
13pub struct DumbBlockScanner {
14    streamer: RwLock<DumbBlockStreamer>,
15}
16
17impl DumbBlockScanner {
18    /// Factory
19    pub fn new() -> Self {
20        Self {
21            streamer: RwLock::new(DumbBlockStreamer::new()),
22        }
23    }
24
25    /// Add to the inner streamer several [ChainScannedBlocks::RollForwards] responses at the end of
26    /// its queue.
27    pub fn forwards(self, blocks: Vec<Vec<ScannedBlock>>) -> Self {
28        self.add_forwards(blocks);
29        self
30    }
31
32    /// Add to the inner streamer a [ChainScannedBlocks::RollBackward] response at the end of its queue.
33    pub fn backward(self, chain_point: ChainPoint) -> Self {
34        self.add_backward(chain_point);
35        self
36    }
37
38    /// Set the last polled point to return when [Self::last_polled_point] is called.
39    pub fn last_polled_point(self, raw_point: Option<RawCardanoPoint>) -> Self {
40        self.set_last_polled_point(raw_point);
41        self
42    }
43
44    /// Add to the inner streamer several [ChainScannedBlocks::RollForwards] responses at the end of
45    /// its queue.
46    pub fn add_forwards(&self, blocks: Vec<Vec<ScannedBlock>>) {
47        let mut streamer = self.streamer.write().unwrap();
48        *streamer = streamer.clone().forwards(blocks);
49    }
50
51    /// Add to the inner streamer a [ChainScannedBlocks::RollBackward] response at the end of its queue.
52    pub fn add_backward(&self, chain_point: ChainPoint) {
53        let mut streamer = self.streamer.write().unwrap();
54        *streamer = streamer.clone().rollback(chain_point);
55    }
56
57    /// Set the last polled point to return when [Self::last_polled_point] is called.
58    pub fn set_last_polled_point(&self, raw_point: Option<RawCardanoPoint>) {
59        let mut streamer = self.streamer.write().unwrap();
60        *streamer = streamer.clone().set_last_polled_point(raw_point);
61    }
62}
63
64impl Default for DumbBlockScanner {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70#[async_trait]
71impl BlockScanner for DumbBlockScanner {
72    async fn scan(
73        &self,
74        _from: Option<RawCardanoPoint>,
75        _until: BlockNumber,
76    ) -> StdResult<Box<dyn BlockStreamer>> {
77        let streamer = self.streamer.read().unwrap();
78        Ok(Box::new(streamer.clone()))
79    }
80}
81
82/// Dumb block streamer
83#[derive(Clone)]
84pub struct DumbBlockStreamer {
85    streamer_responses: VecDeque<ChainScannedBlocks>,
86    last_polled_point: Option<RawCardanoPoint>,
87}
88
89impl DumbBlockStreamer {
90    /// Factory - the resulting streamer can be polled one time for each list of blocks given
91    pub fn new() -> Self {
92        Self {
93            streamer_responses: VecDeque::new(),
94            last_polled_point: None,
95        }
96    }
97
98    /// Set the last polled point to return when [Self::last_polled_point] is called
99    pub fn set_last_polled_point(mut self, raw_point: Option<RawCardanoPoint>) -> Self {
100        self.last_polled_point = raw_point;
101        self
102    }
103
104    /// Add to the streamer several [ChainScannedBlocks::RollForwards] responses at the end of
105    /// its queue.
106    pub fn forwards(mut self, blocks: Vec<Vec<ScannedBlock>>) -> Self {
107        let mut source: VecDeque<_> =
108            blocks.into_iter().map(ChainScannedBlocks::RollForwards).collect();
109        self.streamer_responses.append(&mut source);
110
111        self
112    }
113
114    /// Add to the streamer a [ChainScannedBlocks::RollBackward] response at the end of its queue.
115    pub fn rollback(mut self, chain_point: ChainPoint) -> Self {
116        self.streamer_responses
117            .push_back(ChainScannedBlocks::RollBackward(chain_point.slot_number));
118        self
119    }
120}
121
122impl Default for DumbBlockStreamer {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128#[async_trait]
129impl BlockStreamer for DumbBlockStreamer {
130    async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>> {
131        Ok(self.streamer_responses.pop_front())
132    }
133
134    fn last_polled_point(&self) -> Option<RawCardanoPoint> {
135        self.last_polled_point.clone()
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use mithril_common::entities::SlotNumber;
142
143    use super::*;
144
145    #[tokio::test]
146    async fn polling_without_set_of_block_return_none() {
147        let mut streamer = DumbBlockStreamer::new().forwards(vec![]);
148        let blocks = streamer.poll_next().await.unwrap();
149        assert_eq!(blocks, None);
150    }
151
152    #[tokio::test]
153    async fn polling_with_one_set_of_block_returns_some_once() {
154        let expected_blocks = vec![ScannedBlock::new(
155            "hash-1",
156            BlockNumber(1),
157            SlotNumber(10),
158            Vec::<&str>::new(),
159        )];
160        let mut streamer = DumbBlockStreamer::new().forwards(vec![expected_blocks.clone()]);
161
162        let blocks = streamer.poll_next().await.unwrap();
163        assert_eq!(
164            blocks,
165            Some(ChainScannedBlocks::RollForwards(expected_blocks))
166        );
167
168        let blocks = streamer.poll_next().await.unwrap();
169        assert_eq!(blocks, None);
170    }
171
172    #[tokio::test]
173    async fn polling_with_multiple_sets_of_blocks_returns_some_once() {
174        let expected_blocks = vec![
175            vec![ScannedBlock::new(
176                "hash-1",
177                BlockNumber(1),
178                SlotNumber(10),
179                Vec::<&str>::new(),
180            )],
181            vec![
182                ScannedBlock::new("hash-2", BlockNumber(2), SlotNumber(11), Vec::<&str>::new()),
183                ScannedBlock::new("hash-3", BlockNumber(3), SlotNumber(12), Vec::<&str>::new()),
184            ],
185            vec![ScannedBlock::new(
186                "hash-4",
187                BlockNumber(4),
188                SlotNumber(13),
189                Vec::<&str>::new(),
190            )],
191        ];
192        let mut streamer = DumbBlockStreamer::new().forwards(expected_blocks.clone());
193
194        let blocks = streamer.poll_next().await.unwrap();
195        assert_eq!(
196            blocks,
197            Some(ChainScannedBlocks::RollForwards(expected_blocks[0].clone()))
198        );
199
200        let blocks = streamer.poll_next().await.unwrap();
201        assert_eq!(
202            blocks,
203            Some(ChainScannedBlocks::RollForwards(expected_blocks[1].clone()))
204        );
205
206        let blocks = streamer.poll_next().await.unwrap();
207        assert_eq!(
208            blocks,
209            Some(ChainScannedBlocks::RollForwards(expected_blocks[2].clone()))
210        );
211
212        let blocks = streamer.poll_next().await.unwrap();
213        assert_eq!(blocks, None);
214    }
215
216    #[tokio::test]
217    async fn dumb_scanned_construct_a_streamer_based_on_its_stored_blocks() {
218        let expected_blocks = vec![ScannedBlock::new(
219            "hash-1",
220            BlockNumber(1),
221            SlotNumber(10),
222            Vec::<&str>::new(),
223        )];
224
225        let scanner = DumbBlockScanner::new().forwards(vec![expected_blocks.clone()]);
226        let mut streamer = scanner.scan(None, BlockNumber(5)).await.unwrap();
227
228        let blocks = streamer.poll_next().await.unwrap();
229        assert_eq!(
230            blocks,
231            Some(ChainScannedBlocks::RollForwards(expected_blocks))
232        );
233
234        let blocks = streamer.poll_next().await.unwrap();
235        assert_eq!(blocks, None);
236    }
237
238    #[tokio::test]
239    async fn dumb_scanned_construct_a_streamer_based_on_its_stored_chain_scanned_blocks() {
240        let expected_blocks = vec![ScannedBlock::new(
241            "hash-1",
242            BlockNumber(1),
243            SlotNumber(10),
244            Vec::<&str>::new(),
245        )];
246        let expected_chain_point = ChainPoint::new(SlotNumber(10), BlockNumber(2), "block-hash");
247
248        let scanner = DumbBlockScanner::new()
249            .forwards(vec![expected_blocks.clone()])
250            .backward(expected_chain_point.clone());
251        let mut streamer = scanner.scan(None, BlockNumber(5)).await.unwrap();
252
253        let blocks = streamer.poll_next().await.unwrap();
254        assert_eq!(
255            blocks,
256            Some(ChainScannedBlocks::RollForwards(expected_blocks.clone()))
257        );
258
259        let blocks = streamer.poll_next().await.unwrap();
260        assert_eq!(
261            blocks,
262            Some(ChainScannedBlocks::RollBackward(
263                expected_chain_point.slot_number
264            ))
265        );
266    }
267
268    #[tokio::test]
269    async fn polling_with_can_return_roll_backward() {
270        let expected_blocks = vec![
271            vec![ScannedBlock::new(
272                "hash-1",
273                BlockNumber(1),
274                SlotNumber(10),
275                Vec::<&str>::new(),
276            )],
277            vec![ScannedBlock::new(
278                "hash-4",
279                BlockNumber(4),
280                SlotNumber(13),
281                Vec::<&str>::new(),
282            )],
283        ];
284
285        let expected_chain_point = ChainPoint::new(SlotNumber(10), BlockNumber(2), "block-hash");
286
287        let mut streamer = DumbBlockStreamer::new()
288            .forwards(expected_blocks.clone())
289            .rollback(expected_chain_point.clone());
290
291        let blocks = streamer.poll_next().await.unwrap();
292        assert_eq!(
293            blocks,
294            Some(ChainScannedBlocks::RollForwards(expected_blocks[0].clone()))
295        );
296
297        let blocks = streamer.poll_next().await.unwrap();
298        assert_eq!(
299            blocks,
300            Some(ChainScannedBlocks::RollForwards(expected_blocks[1].clone()))
301        );
302
303        let blocks = streamer.poll_next().await.unwrap();
304        assert_eq!(
305            blocks,
306            Some(ChainScannedBlocks::RollBackward(
307                expected_chain_point.slot_number
308            ))
309        );
310
311        let blocks = streamer.poll_next().await.unwrap();
312        assert_eq!(blocks, None);
313    }
314
315    #[tokio::test]
316    async fn setting_last_polled_block() {
317        let mut streamer = DumbBlockStreamer::new().forwards(vec![]);
318        assert_eq!(streamer.last_polled_point(), None);
319
320        let raw_point = RawCardanoPoint::new(SlotNumber(10), "block-hash".as_bytes());
321        streamer = streamer.set_last_polled_point(Some(raw_point.clone()));
322        assert_eq!(streamer.last_polled_point(), Some(raw_point));
323
324        streamer = streamer.set_last_polled_point(None);
325        assert_eq!(streamer.last_polled_point(), None);
326    }
327}