mithril_common/cardano_block_scanner/
dumb_block_scanner.rs

1use std::collections::VecDeque;
2use std::sync::RwLock;
3
4use async_trait::async_trait;
5
6use crate::cardano_block_scanner::{BlockScanner, BlockStreamer, ScannedBlock};
7use crate::cardano_block_scanner::{ChainScannedBlocks, RawCardanoPoint};
8use crate::entities::{BlockNumber, ChainPoint};
9use crate::StdResult;
10
11/// Dumb block scanner
12pub struct DumbBlockScanner {
13    streamer: RwLock<DumbBlockStreamer>,
14}
15
16impl DumbBlockScanner {
17    /// Factory
18    pub fn new() -> Self {
19        Self {
20            streamer: RwLock::new(DumbBlockStreamer::new()),
21        }
22    }
23
24    /// Add to the inner streamer several [ChainScannedBlocks::RollForwards] responses at the end of
25    /// its queue.
26    pub fn forwards(self, blocks: Vec<Vec<ScannedBlock>>) -> Self {
27        self.add_forwards(blocks);
28        self
29    }
30
31    /// Add to the inner streamer a [ChainScannedBlocks::RollBackward] response at the end of its queue.
32    pub fn backward(self, chain_point: ChainPoint) -> Self {
33        self.add_backward(chain_point);
34        self
35    }
36
37    /// Set the last polled point to return when [Self::last_polled_point] is called.
38    pub fn last_polled_point(self, raw_point: Option<RawCardanoPoint>) -> Self {
39        self.set_last_polled_point(raw_point);
40        self
41    }
42
43    /// Add to the inner streamer several [ChainScannedBlocks::RollForwards] responses at the end of
44    /// its queue.
45    pub fn add_forwards(&self, blocks: Vec<Vec<ScannedBlock>>) {
46        let mut streamer = self.streamer.write().unwrap();
47        *streamer = streamer.clone().forwards(blocks);
48    }
49
50    /// Add to the inner streamer a [ChainScannedBlocks::RollBackward] response at the end of its queue.
51    pub fn add_backward(&self, chain_point: ChainPoint) {
52        let mut streamer = self.streamer.write().unwrap();
53        *streamer = streamer.clone().rollback(chain_point);
54    }
55
56    /// Set the last polled point to return when [Self::last_polled_point] is called.
57    pub fn set_last_polled_point(&self, raw_point: Option<RawCardanoPoint>) {
58        let mut streamer = self.streamer.write().unwrap();
59        *streamer = streamer.clone().set_last_polled_point(raw_point);
60    }
61}
62
63impl Default for DumbBlockScanner {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69#[async_trait]
70impl BlockScanner for DumbBlockScanner {
71    async fn scan(
72        &self,
73        _from: Option<RawCardanoPoint>,
74        _until: BlockNumber,
75    ) -> StdResult<Box<dyn BlockStreamer>> {
76        let streamer = self.streamer.read().unwrap();
77        Ok(Box::new(streamer.clone()))
78    }
79}
80
81/// Dumb block streamer
82#[derive(Clone)]
83pub struct DumbBlockStreamer {
84    streamer_responses: VecDeque<ChainScannedBlocks>,
85    last_polled_point: Option<RawCardanoPoint>,
86}
87
88impl DumbBlockStreamer {
89    /// Factory - the resulting streamer can be polled one time for each list of blocks given
90    pub fn new() -> Self {
91        Self {
92            streamer_responses: VecDeque::new(),
93            last_polled_point: None,
94        }
95    }
96
97    /// Set the last polled point to return when [Self::last_polled_point] is called
98    pub fn set_last_polled_point(mut self, raw_point: Option<RawCardanoPoint>) -> Self {
99        self.last_polled_point = raw_point;
100        self
101    }
102
103    /// Add to the streamer several [ChainScannedBlocks::RollForwards] responses at the end of
104    /// its queue.
105    pub fn forwards(mut self, blocks: Vec<Vec<ScannedBlock>>) -> Self {
106        let mut source: VecDeque<_> = blocks
107            .into_iter()
108            .map(ChainScannedBlocks::RollForwards)
109            .collect();
110        self.streamer_responses.append(&mut source);
111
112        self
113    }
114
115    /// Add to the streamer a [ChainScannedBlocks::RollBackward] response at the end of its queue.
116    pub fn rollback(mut self, chain_point: ChainPoint) -> Self {
117        self.streamer_responses
118            .push_back(ChainScannedBlocks::RollBackward(chain_point.slot_number));
119        self
120    }
121}
122
123impl Default for DumbBlockStreamer {
124    fn default() -> Self {
125        Self::new()
126    }
127}
128
129#[async_trait]
130impl BlockStreamer for DumbBlockStreamer {
131    async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>> {
132        Ok(self.streamer_responses.pop_front())
133    }
134
135    fn last_polled_point(&self) -> Option<RawCardanoPoint> {
136        self.last_polled_point.clone()
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use crate::cardano_block_scanner::BlockStreamerTestExtensions;
143
144    use super::*;
145
146    use crate::entities::SlotNumber;
147
148    #[tokio::test]
149    async fn polling_without_set_of_block_return_none() {
150        let mut streamer = DumbBlockStreamer::new().forwards(vec![]);
151        let blocks = streamer.poll_next().await.unwrap();
152        assert_eq!(blocks, None);
153    }
154
155    #[tokio::test]
156    async fn polling_with_one_set_of_block_returns_some_once() {
157        let expected_blocks = vec![ScannedBlock::new(
158            "hash-1",
159            BlockNumber(1),
160            SlotNumber(10),
161            Vec::<&str>::new(),
162        )];
163        let mut streamer = DumbBlockStreamer::new().forwards(vec![expected_blocks.clone()]);
164
165        let blocks = streamer.poll_next().await.unwrap();
166        assert_eq!(
167            blocks,
168            Some(ChainScannedBlocks::RollForwards(expected_blocks))
169        );
170
171        let blocks = streamer.poll_next().await.unwrap();
172        assert_eq!(blocks, None);
173    }
174
175    #[tokio::test]
176    async fn polling_with_multiple_sets_of_blocks_returns_some_once() {
177        let expected_blocks = vec![
178            vec![ScannedBlock::new(
179                "hash-1",
180                BlockNumber(1),
181                SlotNumber(10),
182                Vec::<&str>::new(),
183            )],
184            vec![
185                ScannedBlock::new("hash-2", BlockNumber(2), SlotNumber(11), Vec::<&str>::new()),
186                ScannedBlock::new("hash-3", BlockNumber(3), SlotNumber(12), Vec::<&str>::new()),
187            ],
188            vec![ScannedBlock::new(
189                "hash-4",
190                BlockNumber(4),
191                SlotNumber(13),
192                Vec::<&str>::new(),
193            )],
194        ];
195        let mut streamer = DumbBlockStreamer::new().forwards(expected_blocks.clone());
196
197        let blocks = streamer.poll_next().await.unwrap();
198        assert_eq!(
199            blocks,
200            Some(ChainScannedBlocks::RollForwards(expected_blocks[0].clone()))
201        );
202
203        let blocks = streamer.poll_next().await.unwrap();
204        assert_eq!(
205            blocks,
206            Some(ChainScannedBlocks::RollForwards(expected_blocks[1].clone()))
207        );
208
209        let blocks = streamer.poll_next().await.unwrap();
210        assert_eq!(
211            blocks,
212            Some(ChainScannedBlocks::RollForwards(expected_blocks[2].clone()))
213        );
214
215        let blocks = streamer.poll_next().await.unwrap();
216        assert_eq!(blocks, None);
217    }
218
219    #[tokio::test]
220    async fn dumb_scanned_construct_a_streamer_based_on_its_stored_blocks() {
221        let expected_blocks = vec![ScannedBlock::new(
222            "hash-1",
223            BlockNumber(1),
224            SlotNumber(10),
225            Vec::<&str>::new(),
226        )];
227
228        let scanner = DumbBlockScanner::new().forwards(vec![expected_blocks.clone()]);
229        let mut streamer = scanner.scan(None, BlockNumber(5)).await.unwrap();
230
231        let blocks = streamer.poll_all().await.unwrap();
232        assert_eq!(blocks, expected_blocks);
233    }
234
235    #[tokio::test]
236    async fn dumb_scanned_construct_a_streamer_based_on_its_stored_chain_scanned_blocks() {
237        let expected_blocks = vec![ScannedBlock::new(
238            "hash-1",
239            BlockNumber(1),
240            SlotNumber(10),
241            Vec::<&str>::new(),
242        )];
243        let expected_chain_point = ChainPoint::new(SlotNumber(10), BlockNumber(2), "block-hash");
244
245        let scanner = DumbBlockScanner::new()
246            .forwards(vec![expected_blocks.clone()])
247            .backward(expected_chain_point.clone());
248        let mut streamer = scanner.scan(None, BlockNumber(5)).await.unwrap();
249
250        let blocks = streamer.poll_next().await.unwrap();
251        assert_eq!(
252            blocks,
253            Some(ChainScannedBlocks::RollForwards(expected_blocks.clone()))
254        );
255
256        let blocks = streamer.poll_next().await.unwrap();
257        assert_eq!(
258            blocks,
259            Some(ChainScannedBlocks::RollBackward(
260                expected_chain_point.slot_number
261            ))
262        );
263    }
264
265    #[tokio::test]
266    async fn polling_with_can_return_roll_backward() {
267        let expected_blocks = vec![
268            vec![ScannedBlock::new(
269                "hash-1",
270                BlockNumber(1),
271                SlotNumber(10),
272                Vec::<&str>::new(),
273            )],
274            vec![ScannedBlock::new(
275                "hash-4",
276                BlockNumber(4),
277                SlotNumber(13),
278                Vec::<&str>::new(),
279            )],
280        ];
281
282        let expected_chain_point = ChainPoint::new(SlotNumber(10), BlockNumber(2), "block-hash");
283
284        let mut streamer = DumbBlockStreamer::new()
285            .forwards(expected_blocks.clone())
286            .rollback(expected_chain_point.clone());
287
288        let blocks = streamer.poll_next().await.unwrap();
289        assert_eq!(
290            blocks,
291            Some(ChainScannedBlocks::RollForwards(expected_blocks[0].clone()))
292        );
293
294        let blocks = streamer.poll_next().await.unwrap();
295        assert_eq!(
296            blocks,
297            Some(ChainScannedBlocks::RollForwards(expected_blocks[1].clone()))
298        );
299
300        let blocks = streamer.poll_next().await.unwrap();
301        assert_eq!(
302            blocks,
303            Some(ChainScannedBlocks::RollBackward(
304                expected_chain_point.slot_number
305            ))
306        );
307
308        let blocks = streamer.poll_next().await.unwrap();
309        assert_eq!(blocks, None);
310    }
311
312    #[tokio::test]
313    async fn setting_last_polled_block() {
314        let mut streamer = DumbBlockStreamer::new().forwards(vec![]);
315        assert_eq!(streamer.last_polled_point(), None);
316
317        let raw_point = RawCardanoPoint::new(SlotNumber(10), "block-hash".as_bytes());
318        streamer = streamer.set_last_polled_point(Some(raw_point.clone()));
319        assert_eq!(streamer.last_polled_point(), Some(raw_point));
320
321        streamer = streamer.set_last_polled_point(None);
322        assert_eq!(streamer.last_polled_point(), None);
323    }
324}