mithril_cardano_node_chain/test/double/
block_scanner.rs1use std::collections::VecDeque;
2use std::sync::RwLock;
3
4use async_trait::async_trait;
5
6use mithril_common::StdResult;
7use mithril_common::entities::{BlockNumber, ChainPoint};
8
9use crate::chain_scanner::{BlockScanner, BlockStreamer, ChainScannedBlocks};
10use crate::entities::{RawCardanoPoint, ScannedBlock};
11
12pub struct DumbBlockScanner {
14 streamer: RwLock<DumbBlockStreamer>,
15}
16
17impl DumbBlockScanner {
18 pub fn new() -> Self {
20 Self {
21 streamer: RwLock::new(DumbBlockStreamer::new()),
22 }
23 }
24
25 pub fn forwards(self, blocks: Vec<Vec<ScannedBlock>>) -> Self {
28 self.add_forwards(blocks);
29 self
30 }
31
32 pub fn backward(self, chain_point: ChainPoint) -> Self {
34 self.add_backward(chain_point);
35 self
36 }
37
38 pub fn last_polled_point(self, raw_point: Option<RawCardanoPoint>) -> Self {
40 self.set_last_polled_point(raw_point);
41 self
42 }
43
44 pub fn add_forwards(&self, blocks: Vec<Vec<ScannedBlock>>) {
47 let mut streamer = self.streamer.write().unwrap();
48 *streamer = streamer.clone().forwards(blocks);
49 }
50
51 pub fn add_backward(&self, chain_point: ChainPoint) {
53 let mut streamer = self.streamer.write().unwrap();
54 *streamer = streamer.clone().rollback(chain_point);
55 }
56
57 pub fn set_last_polled_point(&self, raw_point: Option<RawCardanoPoint>) {
59 let mut streamer = self.streamer.write().unwrap();
60 *streamer = streamer.clone().set_last_polled_point(raw_point);
61 }
62}
63
64impl Default for DumbBlockScanner {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70#[async_trait]
71impl BlockScanner for DumbBlockScanner {
72 async fn scan(
73 &self,
74 _from: Option<RawCardanoPoint>,
75 _until: BlockNumber,
76 ) -> StdResult<Box<dyn BlockStreamer>> {
77 let streamer = self.streamer.read().unwrap();
78 Ok(Box::new(streamer.clone()))
79 }
80}
81
82#[derive(Clone)]
84pub struct DumbBlockStreamer {
85 streamer_responses: VecDeque<ChainScannedBlocks>,
86 last_polled_point: Option<RawCardanoPoint>,
87}
88
89impl DumbBlockStreamer {
90 pub fn new() -> Self {
92 Self {
93 streamer_responses: VecDeque::new(),
94 last_polled_point: None,
95 }
96 }
97
98 pub fn set_last_polled_point(mut self, raw_point: Option<RawCardanoPoint>) -> Self {
100 self.last_polled_point = raw_point;
101 self
102 }
103
104 pub fn forwards(mut self, blocks: Vec<Vec<ScannedBlock>>) -> Self {
107 let mut source: VecDeque<_> =
108 blocks.into_iter().map(ChainScannedBlocks::RollForwards).collect();
109 self.streamer_responses.append(&mut source);
110
111 self
112 }
113
114 pub fn rollback(mut self, chain_point: ChainPoint) -> Self {
116 self.streamer_responses
117 .push_back(ChainScannedBlocks::RollBackward(chain_point.slot_number));
118 self
119 }
120}
121
122impl Default for DumbBlockStreamer {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128#[async_trait]
129impl BlockStreamer for DumbBlockStreamer {
130 async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>> {
131 Ok(self.streamer_responses.pop_front())
132 }
133
134 fn last_polled_point(&self) -> Option<RawCardanoPoint> {
135 self.last_polled_point.clone()
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use mithril_common::entities::SlotNumber;
142
143 use super::*;
144
145 #[tokio::test]
146 async fn polling_without_set_of_block_return_none() {
147 let mut streamer = DumbBlockStreamer::new().forwards(vec![]);
148 let blocks = streamer.poll_next().await.unwrap();
149 assert_eq!(blocks, None);
150 }
151
152 #[tokio::test]
153 async fn polling_with_one_set_of_block_returns_some_once() {
154 let expected_blocks = vec![ScannedBlock::new(
155 "hash-1",
156 BlockNumber(1),
157 SlotNumber(10),
158 Vec::<&str>::new(),
159 )];
160 let mut streamer = DumbBlockStreamer::new().forwards(vec![expected_blocks.clone()]);
161
162 let blocks = streamer.poll_next().await.unwrap();
163 assert_eq!(
164 blocks,
165 Some(ChainScannedBlocks::RollForwards(expected_blocks))
166 );
167
168 let blocks = streamer.poll_next().await.unwrap();
169 assert_eq!(blocks, None);
170 }
171
172 #[tokio::test]
173 async fn polling_with_multiple_sets_of_blocks_returns_some_once() {
174 let expected_blocks = vec![
175 vec![ScannedBlock::new(
176 "hash-1",
177 BlockNumber(1),
178 SlotNumber(10),
179 Vec::<&str>::new(),
180 )],
181 vec![
182 ScannedBlock::new("hash-2", BlockNumber(2), SlotNumber(11), Vec::<&str>::new()),
183 ScannedBlock::new("hash-3", BlockNumber(3), SlotNumber(12), Vec::<&str>::new()),
184 ],
185 vec![ScannedBlock::new(
186 "hash-4",
187 BlockNumber(4),
188 SlotNumber(13),
189 Vec::<&str>::new(),
190 )],
191 ];
192 let mut streamer = DumbBlockStreamer::new().forwards(expected_blocks.clone());
193
194 let blocks = streamer.poll_next().await.unwrap();
195 assert_eq!(
196 blocks,
197 Some(ChainScannedBlocks::RollForwards(expected_blocks[0].clone()))
198 );
199
200 let blocks = streamer.poll_next().await.unwrap();
201 assert_eq!(
202 blocks,
203 Some(ChainScannedBlocks::RollForwards(expected_blocks[1].clone()))
204 );
205
206 let blocks = streamer.poll_next().await.unwrap();
207 assert_eq!(
208 blocks,
209 Some(ChainScannedBlocks::RollForwards(expected_blocks[2].clone()))
210 );
211
212 let blocks = streamer.poll_next().await.unwrap();
213 assert_eq!(blocks, None);
214 }
215
216 #[tokio::test]
217 async fn dumb_scanned_construct_a_streamer_based_on_its_stored_blocks() {
218 let expected_blocks = vec![ScannedBlock::new(
219 "hash-1",
220 BlockNumber(1),
221 SlotNumber(10),
222 Vec::<&str>::new(),
223 )];
224
225 let scanner = DumbBlockScanner::new().forwards(vec![expected_blocks.clone()]);
226 let mut streamer = scanner.scan(None, BlockNumber(5)).await.unwrap();
227
228 let blocks = streamer.poll_next().await.unwrap();
229 assert_eq!(
230 blocks,
231 Some(ChainScannedBlocks::RollForwards(expected_blocks))
232 );
233
234 let blocks = streamer.poll_next().await.unwrap();
235 assert_eq!(blocks, None);
236 }
237
238 #[tokio::test]
239 async fn dumb_scanned_construct_a_streamer_based_on_its_stored_chain_scanned_blocks() {
240 let expected_blocks = vec![ScannedBlock::new(
241 "hash-1",
242 BlockNumber(1),
243 SlotNumber(10),
244 Vec::<&str>::new(),
245 )];
246 let expected_chain_point = ChainPoint::new(SlotNumber(10), BlockNumber(2), "block-hash");
247
248 let scanner = DumbBlockScanner::new()
249 .forwards(vec![expected_blocks.clone()])
250 .backward(expected_chain_point.clone());
251 let mut streamer = scanner.scan(None, BlockNumber(5)).await.unwrap();
252
253 let blocks = streamer.poll_next().await.unwrap();
254 assert_eq!(
255 blocks,
256 Some(ChainScannedBlocks::RollForwards(expected_blocks.clone()))
257 );
258
259 let blocks = streamer.poll_next().await.unwrap();
260 assert_eq!(
261 blocks,
262 Some(ChainScannedBlocks::RollBackward(
263 expected_chain_point.slot_number
264 ))
265 );
266 }
267
268 #[tokio::test]
269 async fn polling_with_can_return_roll_backward() {
270 let expected_blocks = vec![
271 vec![ScannedBlock::new(
272 "hash-1",
273 BlockNumber(1),
274 SlotNumber(10),
275 Vec::<&str>::new(),
276 )],
277 vec![ScannedBlock::new(
278 "hash-4",
279 BlockNumber(4),
280 SlotNumber(13),
281 Vec::<&str>::new(),
282 )],
283 ];
284
285 let expected_chain_point = ChainPoint::new(SlotNumber(10), BlockNumber(2), "block-hash");
286
287 let mut streamer = DumbBlockStreamer::new()
288 .forwards(expected_blocks.clone())
289 .rollback(expected_chain_point.clone());
290
291 let blocks = streamer.poll_next().await.unwrap();
292 assert_eq!(
293 blocks,
294 Some(ChainScannedBlocks::RollForwards(expected_blocks[0].clone()))
295 );
296
297 let blocks = streamer.poll_next().await.unwrap();
298 assert_eq!(
299 blocks,
300 Some(ChainScannedBlocks::RollForwards(expected_blocks[1].clone()))
301 );
302
303 let blocks = streamer.poll_next().await.unwrap();
304 assert_eq!(
305 blocks,
306 Some(ChainScannedBlocks::RollBackward(
307 expected_chain_point.slot_number
308 ))
309 );
310
311 let blocks = streamer.poll_next().await.unwrap();
312 assert_eq!(blocks, None);
313 }
314
315 #[tokio::test]
316 async fn setting_last_polled_block() {
317 let mut streamer = DumbBlockStreamer::new().forwards(vec![]);
318 assert_eq!(streamer.last_polled_point(), None);
319
320 let raw_point = RawCardanoPoint::new(SlotNumber(10), "block-hash".as_bytes());
321 streamer = streamer.set_last_polled_point(Some(raw_point.clone()));
322 assert_eq!(streamer.last_polled_point(), Some(raw_point));
323
324 streamer = streamer.set_last_polled_point(None);
325 assert_eq!(streamer.last_polled_point(), None);
326 }
327}