mithril_cardano_node_chain/chain_importer/
importer_by_chunk.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use slog::{Logger, debug};
5
6use mithril_common::StdResult;
7use mithril_common::entities::BlockNumber;
8use mithril_common::logging::LoggerExtensions;
9
10use crate::chain_importer::ChainDataImporter;
11
12/// Trait to get the highest stored chain data block number
13#[cfg_attr(test, mockall::automock)]
14#[async_trait]
15pub trait HighestStoredBlockNumberGetter: Send + Sync {
16    /// Get the highest known transaction block number
17    async fn get(&self) -> StdResult<Option<BlockNumber>>;
18}
19
20/// A decorator of [ChainDataImporter] that does the import by chunks
21pub struct ChainDataImporterByChunk {
22    highest_stored_block_number_getter: Arc<dyn HighestStoredBlockNumberGetter>,
23    wrapped_importer: Arc<dyn ChainDataImporter>,
24    chunk_size: BlockNumber,
25    logger: Logger,
26}
27
28impl ChainDataImporterByChunk {
29    /// Create a new instance of `ChainDataImporterByChunk`.
30    pub fn new(
31        highest_transaction_block_number_getter: Arc<dyn HighestStoredBlockNumberGetter>,
32        wrapped_importer: Arc<dyn ChainDataImporter>,
33        chunk_size: BlockNumber,
34        logger: Logger,
35    ) -> Self {
36        Self {
37            highest_stored_block_number_getter: highest_transaction_block_number_getter,
38            wrapped_importer,
39            chunk_size,
40            logger: logger.new_with_component_name::<Self>(),
41        }
42    }
43}
44
45#[async_trait]
46impl ChainDataImporter for ChainDataImporterByChunk {
47    async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
48        let mut intermediate_up_to = self
49            .highest_stored_block_number_getter
50            .get()
51            .await?
52            .unwrap_or(BlockNumber(0));
53
54        while intermediate_up_to < up_to_beacon {
55            let next_up_to = (intermediate_up_to + self.chunk_size).min(up_to_beacon);
56            debug!(
57                self.logger,
58                "Running Transactions import between block '{intermediate_up_to}' and '{next_up_to}'";
59            );
60            self.wrapped_importer.import(next_up_to).await?;
61            intermediate_up_to = next_up_to;
62        }
63
64        Ok(())
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use mockall::Sequence;
71    use mockall::predicate::eq;
72
73    use crate::chain_importer::MockChainDataImporter;
74    use crate::test::TestLogger;
75
76    use super::*;
77
78    fn create_highest_stored_block_number_getter_mock(
79        highest_block_number: BlockNumber,
80    ) -> Arc<dyn HighestStoredBlockNumberGetter> {
81        Arc::new({
82            let mut mock = MockHighestStoredBlockNumberGetter::new();
83            mock.expect_get().returning(move || Ok(Some(highest_block_number)));
84            mock
85        })
86    }
87
88    fn create_chain_data_importer_mock(expected_values: Vec<BlockNumber>) -> MockChainDataImporter {
89        let mut seq = Sequence::new();
90        let mut wrapped_importer = MockChainDataImporter::new();
91        for expected_value in expected_values {
92            wrapped_importer
93                .expect_import()
94                .once()
95                .in_sequence(&mut seq)
96                .with(eq(expected_value))
97                .returning(|_| Ok(()));
98        }
99        wrapped_importer
100    }
101
102    #[tokio::test]
103    async fn test_import_nothing_to_do_when_highest_block_number_lower_or_equal_up_to_beacon() {
104        let highest_block_number = BlockNumber(10);
105        let chunk_size = BlockNumber(5);
106
107        let highest_transaction_block_number_getter =
108            create_highest_stored_block_number_getter_mock(highest_block_number);
109        let mut wrapped_importer = MockChainDataImporter::new();
110        wrapped_importer.expect_import().never();
111
112        let importer = ChainDataImporterByChunk::new(
113            highest_transaction_block_number_getter,
114            Arc::new(wrapped_importer),
115            chunk_size,
116            TestLogger::stdout(),
117        );
118
119        let up_to_beacon = highest_block_number;
120        importer.import(up_to_beacon).await.unwrap();
121
122        let up_to_beacon = highest_block_number - 1;
123        importer.import(up_to_beacon).await.unwrap();
124    }
125
126    #[tokio::test]
127    async fn test_import_even_when_highest_block_number_is_none() {
128        let highest_block_number = None;
129        let chunk_size = BlockNumber(5);
130        let up_to_beacon = chunk_size - 1;
131
132        let highest_transaction_block_number_getter = Arc::new({
133            let mut mock = MockHighestStoredBlockNumberGetter::new();
134            mock.expect_get().returning(move || Ok(highest_block_number));
135            mock
136        });
137        let wrapped_importer = create_chain_data_importer_mock(vec![up_to_beacon]);
138
139        let importer = ChainDataImporterByChunk::new(
140            highest_transaction_block_number_getter,
141            Arc::new(wrapped_importer),
142            chunk_size,
143            TestLogger::stdout(),
144        );
145
146        importer.import(up_to_beacon).await.unwrap();
147    }
148
149    #[tokio::test]
150    async fn test_import_only_once_when_block_delta_less_than_chunk_size() {
151        let highest_block_number = BlockNumber(10);
152        let chunk_size = BlockNumber(5);
153        let up_to_beacon = highest_block_number + chunk_size - 1;
154
155        let highest_transaction_block_number_getter =
156            create_highest_stored_block_number_getter_mock(highest_block_number);
157        let wrapped_importer = create_chain_data_importer_mock(vec![up_to_beacon]);
158
159        let importer = ChainDataImporterByChunk::new(
160            highest_transaction_block_number_getter,
161            Arc::new(wrapped_importer),
162            chunk_size,
163            TestLogger::stdout(),
164        );
165
166        importer.import(up_to_beacon).await.unwrap();
167    }
168
169    #[tokio::test]
170    async fn test_import_multiple_times_when_block_delta_is_not_a_multiple_of_chunk_size() {
171        let highest_block_number = BlockNumber(10);
172        let chunk_size = BlockNumber(5);
173        let up_to_beacon = highest_block_number + chunk_size * 2 + 1;
174
175        let highest_transaction_block_number_getter =
176            create_highest_stored_block_number_getter_mock(highest_block_number);
177        let wrapped_importer = create_chain_data_importer_mock(vec![
178            highest_block_number + chunk_size,
179            highest_block_number + chunk_size * 2,
180            up_to_beacon,
181        ]);
182
183        let importer = ChainDataImporterByChunk::new(
184            highest_transaction_block_number_getter,
185            Arc::new(wrapped_importer),
186            chunk_size,
187            TestLogger::stdout(),
188        );
189
190        importer.import(up_to_beacon).await.unwrap();
191    }
192
193    #[tokio::test]
194    async fn test_import_multiple_times_when_block_delta_is_a_multiple_of_chunk_size() {
195        let highest_block_number = BlockNumber(10);
196        let chunk_size = BlockNumber(5);
197        let up_to_beacon = highest_block_number + chunk_size * 2;
198
199        let highest_transaction_block_number_getter =
200            create_highest_stored_block_number_getter_mock(highest_block_number);
201        let wrapped_importer = create_chain_data_importer_mock(vec![
202            highest_block_number + chunk_size,
203            highest_block_number + chunk_size * 2,
204        ]);
205
206        let importer = ChainDataImporterByChunk::new(
207            highest_transaction_block_number_getter,
208            Arc::new(wrapped_importer),
209            chunk_size,
210            TestLogger::stdout(),
211        );
212
213        importer.import(up_to_beacon).await.unwrap();
214    }
215}