mithril_cardano_node_chain/chain_scanner/
block_streamer_with_throttling.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::sync::Mutex;
5use tokio::time::Interval;
6
7use mithril_common::StdResult;
8
9use crate::chain_scanner::{BlockStreamer, ChainScannedBlocks};
10use crate::entities::RawCardanoPoint;
11
12/// A decorator of [BlockStreamer] that throttles the block polling
13pub struct BlockStreamerWithThrottling {
14    wrapped_streamer: Box<dyn BlockStreamer>,
15    interval: Mutex<Interval>,
16}
17
18impl BlockStreamerWithThrottling {
19    /// Create a new instance of `BlockStreamerWithThrottling`.
20    pub fn new(wrapped_streamer: Box<dyn BlockStreamer>, min_interval: Duration) -> Self {
21        Self {
22            wrapped_streamer,
23            interval: Mutex::new(tokio::time::interval(min_interval)),
24        }
25    }
26}
27
28#[async_trait]
29impl BlockStreamer for BlockStreamerWithThrottling {
30    async fn poll_next(&mut self) -> StdResult<Option<ChainScannedBlocks>> {
31        let mut interval = self.interval.lock().await;
32        interval.tick().await;
33
34        self.wrapped_streamer.poll_next().await
35    }
36
37    fn last_polled_point(&self) -> Option<RawCardanoPoint> {
38        self.wrapped_streamer.last_polled_point()
39    }
40}
41
42#[cfg(test)]
43mod tests {
44    use tokio::time::Instant;
45
46    use mithril_common::entities::{BlockNumber, SlotNumber};
47
48    use crate::entities::ScannedBlock;
49    use crate::test::double::DumbBlockStreamer;
50
51    use super::*;
52
53    fn blocks_set() -> [ScannedBlock; 4] {
54        [
55            ScannedBlock::new(
56                "block_hash-1",
57                BlockNumber(10),
58                SlotNumber(15),
59                vec!["tx_hash-1"],
60            ),
61            ScannedBlock::new(
62                "block_hash-2",
63                BlockNumber(20),
64                SlotNumber(25),
65                vec!["tx_hash-2"],
66            ),
67            ScannedBlock::new(
68                "block_hash-3",
69                BlockNumber(30),
70                SlotNumber(35),
71                vec!["tx_hash-3"],
72            ),
73            ScannedBlock::new(
74                "block_hash-4",
75                BlockNumber(40),
76                SlotNumber(45),
77                vec!["tx_hash-4"],
78            ),
79        ]
80    }
81
82    #[tokio::test(flavor = "current_thread", start_paused = true)]
83    async fn first_poll_succeeds_immediately_without_throttling() {
84        let mut streamer = BlockStreamerWithThrottling::new(
85            Box::new(DumbBlockStreamer::new().forwards(vec![blocks_set().to_vec()])),
86            Duration::from_millis(100),
87        );
88
89        let start = Instant::now();
90        streamer.poll_next().await.unwrap();
91
92        assert_eq!(start.elapsed(), Duration::from_millis(0));
93    }
94
95    #[tokio::test(flavor = "current_thread", start_paused = true)]
96    async fn consecutive_polls_are_throttled_by_min_interval() {
97        let mut streamer = BlockStreamerWithThrottling::new(
98            Box::new(DumbBlockStreamer::new().forwards(vec![blocks_set().to_vec()])),
99            Duration::from_millis(100),
100        );
101
102        let start = Instant::now();
103
104        // First poll - immediate
105        streamer.poll_next().await.unwrap();
106        assert_eq!(start.elapsed(), Duration::from_millis(0));
107
108        streamer.poll_next().await.unwrap();
109        assert_eq!(start.elapsed(), Duration::from_millis(100));
110    }
111
112    #[tokio::test(flavor = "current_thread", start_paused = true)]
113    async fn throttling_waits_only_for_remaining_time() {
114        let mut streamer = BlockStreamerWithThrottling::new(
115            Box::new(DumbBlockStreamer::new().forwards(vec![blocks_set().to_vec()])),
116            Duration::from_millis(100),
117        );
118
119        // First poll - immediate
120        streamer.poll_next().await.unwrap();
121
122        // Advance time by 60ms (less than the configured 100ms interval)
123        tokio::time::advance(Duration::from_millis(60)).await;
124        let before_second_poll = Instant::now();
125
126        // Second poll should only wait the remaining 40ms
127        streamer.poll_next().await.unwrap();
128        assert_eq!(before_second_poll.elapsed(), Duration::from_millis(40));
129    }
130
131    #[tokio::test(flavor = "current_thread", start_paused = true)]
132    async fn no_throttling_if_enough_time_elapsed_between_polls() {
133        let mut streamer = BlockStreamerWithThrottling::new(
134            Box::new(DumbBlockStreamer::new().forwards(vec![blocks_set().to_vec()])),
135            Duration::from_millis(100),
136        );
137
138        // First poll - immediate
139        streamer.poll_next().await.unwrap();
140
141        // Advance time beyond the throttle interval
142        tokio::time::advance(Duration::from_millis(150)).await;
143        let before_second_poll = Instant::now();
144
145        // Second poll should succeed immediately
146        streamer.poll_next().await.unwrap();
147        assert_eq!(before_second_poll.elapsed(), Duration::from_millis(0));
148    }
149
150    #[tokio::test]
151    async fn delegates_to_wrapped_streamer() {
152        let expected_point = RawCardanoPoint::new(SlotNumber(100), "block_hash-last");
153
154        let streamer = BlockStreamerWithThrottling::new(
155            Box::new(DumbBlockStreamer::new().set_last_polled_point(Some(expected_point.clone()))),
156            Duration::from_millis(100),
157        );
158
159        assert_eq!(Some(expected_point), streamer.last_polled_point());
160    }
161}