mithril_common/cardano_block_scanner/
dumb_block_scanner.rs1use std::collections::VecDeque;
2use std::sync::RwLock;
3
4use async_trait::async_trait;
5
6use crate::cardano_block_scanner::{BlockScanner, BlockStreamer, ScannedBlock};
7use crate::cardano_block_scanner::{ChainScannedBlocks, RawCardanoPoint};
8use crate::entities::{BlockNumber, ChainPoint};
9use crate::StdResult;
10
11pub struct DumbBlockScanner {
13 streamer: RwLock<DumbBlockStreamer>,
14}
15
16impl DumbBlockScanner {
17 pub fn new() -> Self {
19 Self {
20 streamer: RwLock::new(DumbBlockStreamer::new()),
21 }
22 }
23
24 pub fn forwards(self, blocks: Vec<Vec<ScannedBlock>>) -> Self {
27 self.add_forwards(blocks);
28 self
29 }
30
31 pub fn backward(self, chain_point: ChainPoint) -> Self {
33 self.add_backward(chain_point);
34 self
35 }
36
37 pub fn last_polled_point(self, raw_point: Option<RawCardanoPoint>) -> Self {
39 self.set_last_polled_point(raw_point);
40 self
41 }
42
43 pub fn add_forwards(&self, blocks: Vec<Vec<ScannedBlock>>) {
46 let mut streamer = self.streamer.write().unwrap();
47 *streamer = streamer.clone().forwards(blocks);
48 }
49
50 pub fn add_backward(&self, chain_point: ChainPoint) {
52 let mut streamer = self.streamer.write().unwrap();
53 *streamer = streamer.clone().rollback(chain_point);
54 }
55
56 pub fn set_last_polled_point(&self, raw_point: Option<RawCardanoPoint>) {
58 let mut streamer = self.streamer.write().unwrap();
59 *streamer = streamer.clone().set_last_polled_point(raw_point);
60 }
61}
62
63impl Default for DumbBlockScanner {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69#[async_trait]
70impl BlockScanner for DumbBlockScanner {
71 async fn scan(
72 &self,
73 _from: Option<RawCardanoPoint>,
74 _until: BlockNumber,
75 ) -> StdResult<Box<dyn BlockStreamer>> {
76 let streamer = self.streamer.read().unwrap();
77 Ok(Box::new(streamer.clone()))
78 }
79}
80
81#[derive(Clone)]
83pub struct DumbBlockStreamer {
84 streamer_responses: VecDeque<ChainScannedBlocks>,
85 last_polled_point: Option<RawCardanoPoint>,
86}
87
88impl DumbBlockStreamer {
89 pub fn new() -> Self {
91 Self {
92 streamer_responses: VecDeque::new(),
93 last_polled_point: None,
94 }
95 }
96
97 pub fn set_last_polled_point(mut self, raw_point: Option<RawCardanoPoint>) -> Self {
99 self.last_polled_point = raw_point;
100 self
101 }
102
103 pub fn forwards(mut self, blocks: Vec<Vec<ScannedBlock>>) -> Self {
106 let mut source: VecDeque<_> = blocks
107 .into_iter()
108 .map(ChainScannedBlocks::RollForwards)
109 .collect();
110 self.streamer_responses.append(&mut source);
111
112 self
113 }
114
115 pub fn rollback(mut self, chain_point: ChainPoint) -> Self {
117 self.streamer_responses
118 .push_back(ChainScannedBlocks::RollBackward(chain_point.slot_number));
119 self
120 }
121}
122
123impl Default for DumbBlockStreamer {
124 fn default() -> Self {
125 Self::new()
126 }
127}
128
129#[async_trait]
130impl BlockStreamer for DumbBlockStreamer {
131 async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>> {
132 Ok(self.streamer_responses.pop_front())
133 }
134
135 fn last_polled_point(&self) -> Option<RawCardanoPoint> {
136 self.last_polled_point.clone()
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use crate::cardano_block_scanner::BlockStreamerTestExtensions;
143
144 use super::*;
145
146 use crate::entities::SlotNumber;
147
148 #[tokio::test]
149 async fn polling_without_set_of_block_return_none() {
150 let mut streamer = DumbBlockStreamer::new().forwards(vec![]);
151 let blocks = streamer.poll_next().await.unwrap();
152 assert_eq!(blocks, None);
153 }
154
155 #[tokio::test]
156 async fn polling_with_one_set_of_block_returns_some_once() {
157 let expected_blocks = vec![ScannedBlock::new(
158 "hash-1",
159 BlockNumber(1),
160 SlotNumber(10),
161 Vec::<&str>::new(),
162 )];
163 let mut streamer = DumbBlockStreamer::new().forwards(vec![expected_blocks.clone()]);
164
165 let blocks = streamer.poll_next().await.unwrap();
166 assert_eq!(
167 blocks,
168 Some(ChainScannedBlocks::RollForwards(expected_blocks))
169 );
170
171 let blocks = streamer.poll_next().await.unwrap();
172 assert_eq!(blocks, None);
173 }
174
175 #[tokio::test]
176 async fn polling_with_multiple_sets_of_blocks_returns_some_once() {
177 let expected_blocks = vec![
178 vec![ScannedBlock::new(
179 "hash-1",
180 BlockNumber(1),
181 SlotNumber(10),
182 Vec::<&str>::new(),
183 )],
184 vec![
185 ScannedBlock::new("hash-2", BlockNumber(2), SlotNumber(11), Vec::<&str>::new()),
186 ScannedBlock::new("hash-3", BlockNumber(3), SlotNumber(12), Vec::<&str>::new()),
187 ],
188 vec![ScannedBlock::new(
189 "hash-4",
190 BlockNumber(4),
191 SlotNumber(13),
192 Vec::<&str>::new(),
193 )],
194 ];
195 let mut streamer = DumbBlockStreamer::new().forwards(expected_blocks.clone());
196
197 let blocks = streamer.poll_next().await.unwrap();
198 assert_eq!(
199 blocks,
200 Some(ChainScannedBlocks::RollForwards(expected_blocks[0].clone()))
201 );
202
203 let blocks = streamer.poll_next().await.unwrap();
204 assert_eq!(
205 blocks,
206 Some(ChainScannedBlocks::RollForwards(expected_blocks[1].clone()))
207 );
208
209 let blocks = streamer.poll_next().await.unwrap();
210 assert_eq!(
211 blocks,
212 Some(ChainScannedBlocks::RollForwards(expected_blocks[2].clone()))
213 );
214
215 let blocks = streamer.poll_next().await.unwrap();
216 assert_eq!(blocks, None);
217 }
218
219 #[tokio::test]
220 async fn dumb_scanned_construct_a_streamer_based_on_its_stored_blocks() {
221 let expected_blocks = vec![ScannedBlock::new(
222 "hash-1",
223 BlockNumber(1),
224 SlotNumber(10),
225 Vec::<&str>::new(),
226 )];
227
228 let scanner = DumbBlockScanner::new().forwards(vec![expected_blocks.clone()]);
229 let mut streamer = scanner.scan(None, BlockNumber(5)).await.unwrap();
230
231 let blocks = streamer.poll_all().await.unwrap();
232 assert_eq!(blocks, expected_blocks);
233 }
234
235 #[tokio::test]
236 async fn dumb_scanned_construct_a_streamer_based_on_its_stored_chain_scanned_blocks() {
237 let expected_blocks = vec![ScannedBlock::new(
238 "hash-1",
239 BlockNumber(1),
240 SlotNumber(10),
241 Vec::<&str>::new(),
242 )];
243 let expected_chain_point = ChainPoint::new(SlotNumber(10), BlockNumber(2), "block-hash");
244
245 let scanner = DumbBlockScanner::new()
246 .forwards(vec![expected_blocks.clone()])
247 .backward(expected_chain_point.clone());
248 let mut streamer = scanner.scan(None, BlockNumber(5)).await.unwrap();
249
250 let blocks = streamer.poll_next().await.unwrap();
251 assert_eq!(
252 blocks,
253 Some(ChainScannedBlocks::RollForwards(expected_blocks.clone()))
254 );
255
256 let blocks = streamer.poll_next().await.unwrap();
257 assert_eq!(
258 blocks,
259 Some(ChainScannedBlocks::RollBackward(
260 expected_chain_point.slot_number
261 ))
262 );
263 }
264
265 #[tokio::test]
266 async fn polling_with_can_return_roll_backward() {
267 let expected_blocks = vec![
268 vec![ScannedBlock::new(
269 "hash-1",
270 BlockNumber(1),
271 SlotNumber(10),
272 Vec::<&str>::new(),
273 )],
274 vec![ScannedBlock::new(
275 "hash-4",
276 BlockNumber(4),
277 SlotNumber(13),
278 Vec::<&str>::new(),
279 )],
280 ];
281
282 let expected_chain_point = ChainPoint::new(SlotNumber(10), BlockNumber(2), "block-hash");
283
284 let mut streamer = DumbBlockStreamer::new()
285 .forwards(expected_blocks.clone())
286 .rollback(expected_chain_point.clone());
287
288 let blocks = streamer.poll_next().await.unwrap();
289 assert_eq!(
290 blocks,
291 Some(ChainScannedBlocks::RollForwards(expected_blocks[0].clone()))
292 );
293
294 let blocks = streamer.poll_next().await.unwrap();
295 assert_eq!(
296 blocks,
297 Some(ChainScannedBlocks::RollForwards(expected_blocks[1].clone()))
298 );
299
300 let blocks = streamer.poll_next().await.unwrap();
301 assert_eq!(
302 blocks,
303 Some(ChainScannedBlocks::RollBackward(
304 expected_chain_point.slot_number
305 ))
306 );
307
308 let blocks = streamer.poll_next().await.unwrap();
309 assert_eq!(blocks, None);
310 }
311
312 #[tokio::test]
313 async fn setting_last_polled_block() {
314 let mut streamer = DumbBlockStreamer::new().forwards(vec![]);
315 assert_eq!(streamer.last_polled_point(), None);
316
317 let raw_point = RawCardanoPoint::new(SlotNumber(10), "block-hash".as_bytes());
318 streamer = streamer.set_last_polled_point(Some(raw_point.clone()));
319 assert_eq!(streamer.last_polled_point(), Some(raw_point));
320
321 streamer = streamer.set_last_polled_point(None);
322 assert_eq!(streamer.last_polled_point(), None);
323 }
324}