mithril_cardano_node_chain/chain_scanner/
block_streamer_with_throttling.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::sync::Mutex;
5use tokio::time::Interval;
6
7use mithril_common::StdResult;
8
9use crate::chain_scanner::{BlockStreamer, ChainScannedBlocks};
10use crate::entities::RawCardanoPoint;
11
12pub struct BlockStreamerWithThrottling {
14 wrapped_streamer: Box<dyn BlockStreamer>,
15 interval: Mutex<Interval>,
16}
17
18impl BlockStreamerWithThrottling {
19 pub fn new(wrapped_streamer: Box<dyn BlockStreamer>, min_interval: Duration) -> Self {
21 Self {
22 wrapped_streamer,
23 interval: Mutex::new(tokio::time::interval(min_interval)),
24 }
25 }
26}
27
28#[async_trait]
29impl BlockStreamer for BlockStreamerWithThrottling {
30 async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>> {
31 let mut interval = self.interval.lock().await;
32 interval.tick().await;
33
34 self.wrapped_streamer.poll_next().await
35 }
36
37 fn last_polled_point(&self) -> Option<RawCardanoPoint> {
38 self.wrapped_streamer.last_polled_point()
39 }
40}
41
42#[cfg(test)]
43mod tests {
44 use tokio::time::Instant;
45
46 use mithril_common::entities::{BlockNumber, SlotNumber};
47
48 use crate::entities::ScannedBlock;
49 use crate::test::double::DumbBlockStreamer;
50
51 use super::*;
52
53 fn blocks_set() -> [ScannedBlock; 4] {
54 [
55 ScannedBlock::new(
56 "block_hash-1",
57 BlockNumber(10),
58 SlotNumber(15),
59 vec!["tx_hash-1"],
60 ),
61 ScannedBlock::new(
62 "block_hash-2",
63 BlockNumber(20),
64 SlotNumber(25),
65 vec!["tx_hash-2"],
66 ),
67 ScannedBlock::new(
68 "block_hash-3",
69 BlockNumber(30),
70 SlotNumber(35),
71 vec!["tx_hash-3"],
72 ),
73 ScannedBlock::new(
74 "block_hash-4",
75 BlockNumber(40),
76 SlotNumber(45),
77 vec!["tx_hash-4"],
78 ),
79 ]
80 }
81
82 #[tokio::test(flavor = "current_thread", start_paused = true)]
83 async fn first_poll_succeeds_immediately_without_throttling() {
84 let mut streamer = BlockStreamerWithThrottling::new(
85 Box::new(DumbBlockStreamer::new().forwards(vec![blocks_set().to_vec()])),
86 Duration::from_millis(100),
87 );
88
89 let start = Instant::now();
90 streamer.poll_next().await.unwrap();
91
92 assert_eq!(start.elapsed(), Duration::from_millis(0));
93 }
94
95 #[tokio::test(flavor = "current_thread", start_paused = true)]
96 async fn consecutive_polls_are_throttled_by_min_interval() {
97 let mut streamer = BlockStreamerWithThrottling::new(
98 Box::new(DumbBlockStreamer::new().forwards(vec![blocks_set().to_vec()])),
99 Duration::from_millis(100),
100 );
101
102 let start = Instant::now();
103
104 streamer.poll_next().await.unwrap();
106 assert_eq!(start.elapsed(), Duration::from_millis(0));
107
108 streamer.poll_next().await.unwrap();
109 assert_eq!(start.elapsed(), Duration::from_millis(100));
110 }
111
112 #[tokio::test(flavor = "current_thread", start_paused = true)]
113 async fn throttling_waits_only_for_remaining_time() {
114 let mut streamer = BlockStreamerWithThrottling::new(
115 Box::new(DumbBlockStreamer::new().forwards(vec![blocks_set().to_vec()])),
116 Duration::from_millis(100),
117 );
118
119 streamer.poll_next().await.unwrap();
121
122 tokio::time::advance(Duration::from_millis(60)).await;
124 let before_second_poll = Instant::now();
125
126 streamer.poll_next().await.unwrap();
128 assert_eq!(before_second_poll.elapsed(), Duration::from_millis(40));
129 }
130
131 #[tokio::test(flavor = "current_thread", start_paused = true)]
132 async fn no_throttling_if_enough_time_elapsed_between_polls() {
133 let mut streamer = BlockStreamerWithThrottling::new(
134 Box::new(DumbBlockStreamer::new().forwards(vec![blocks_set().to_vec()])),
135 Duration::from_millis(100),
136 );
137
138 streamer.poll_next().await.unwrap();
140
141 tokio::time::advance(Duration::from_millis(150)).await;
143 let before_second_poll = Instant::now();
144
145 streamer.poll_next().await.unwrap();
147 assert_eq!(before_second_poll.elapsed(), Duration::from_millis(0));
148 }
149
150 #[tokio::test]
151 async fn delegates_to_wrapped_streamer() {
152 let expected_point = RawCardanoPoint::new(SlotNumber(100), "block_hash-last");
153
154 let streamer = BlockStreamerWithThrottling::new(
155 Box::new(DumbBlockStreamer::new().set_last_polled_point(Some(expected_point.clone()))),
156 Duration::from_millis(100),
157 );
158
159 assert_eq!(Some(expected_point), streamer.last_polled_point());
160 }
161}