mithril_cardano_node_chain/chain_importer/
importer_by_chunk.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use slog::{Logger, debug};
5
6use mithril_common::StdResult;
7use mithril_common::entities::BlockNumber;
8use mithril_common::logging::LoggerExtensions;
9
10use crate::chain_importer::ChainDataImporter;
11
12#[cfg_attr(test, mockall::automock)]
14#[async_trait]
15pub trait HighestStoredBlockNumberGetter: Send + Sync {
16 async fn get(&self) -> StdResult<Option<BlockNumber>>;
18}
19
20pub struct ChainDataImporterByChunk {
22 highest_stored_block_number_getter: Arc<dyn HighestStoredBlockNumberGetter>,
23 wrapped_importer: Arc<dyn ChainDataImporter>,
24 chunk_size: BlockNumber,
25 logger: Logger,
26}
27
28impl ChainDataImporterByChunk {
29 pub fn new(
31 highest_transaction_block_number_getter: Arc<dyn HighestStoredBlockNumberGetter>,
32 wrapped_importer: Arc<dyn ChainDataImporter>,
33 chunk_size: BlockNumber,
34 logger: Logger,
35 ) -> Self {
36 Self {
37 highest_stored_block_number_getter: highest_transaction_block_number_getter,
38 wrapped_importer,
39 chunk_size,
40 logger: logger.new_with_component_name::<Self>(),
41 }
42 }
43}
44
45#[async_trait]
46impl ChainDataImporter for ChainDataImporterByChunk {
47 async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
48 let mut intermediate_up_to = self
49 .highest_stored_block_number_getter
50 .get()
51 .await?
52 .unwrap_or(BlockNumber(0));
53
54 while intermediate_up_to < up_to_beacon {
55 let next_up_to = (intermediate_up_to + self.chunk_size).min(up_to_beacon);
56 debug!(
57 self.logger,
58 "Running Transactions import between block '{intermediate_up_to}' and '{next_up_to}'";
59 );
60 self.wrapped_importer.import(next_up_to).await?;
61 intermediate_up_to = next_up_to;
62 }
63
64 Ok(())
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use mockall::Sequence;
71 use mockall::predicate::eq;
72
73 use crate::chain_importer::MockChainDataImporter;
74 use crate::test::TestLogger;
75
76 use super::*;
77
78 fn create_highest_stored_block_number_getter_mock(
79 highest_block_number: BlockNumber,
80 ) -> Arc<dyn HighestStoredBlockNumberGetter> {
81 Arc::new({
82 let mut mock = MockHighestStoredBlockNumberGetter::new();
83 mock.expect_get().returning(move || Ok(Some(highest_block_number)));
84 mock
85 })
86 }
87
88 fn create_chain_data_importer_mock(expected_values: Vec<BlockNumber>) -> MockChainDataImporter {
89 let mut seq = Sequence::new();
90 let mut wrapped_importer = MockChainDataImporter::new();
91 for expected_value in expected_values {
92 wrapped_importer
93 .expect_import()
94 .once()
95 .in_sequence(&mut seq)
96 .with(eq(expected_value))
97 .returning(|_| Ok(()));
98 }
99 wrapped_importer
100 }
101
102 #[tokio::test]
103 async fn test_import_nothing_to_do_when_highest_block_number_lower_or_equal_up_to_beacon() {
104 let highest_block_number = BlockNumber(10);
105 let chunk_size = BlockNumber(5);
106
107 let highest_transaction_block_number_getter =
108 create_highest_stored_block_number_getter_mock(highest_block_number);
109 let mut wrapped_importer = MockChainDataImporter::new();
110 wrapped_importer.expect_import().never();
111
112 let importer = ChainDataImporterByChunk::new(
113 highest_transaction_block_number_getter,
114 Arc::new(wrapped_importer),
115 chunk_size,
116 TestLogger::stdout(),
117 );
118
119 let up_to_beacon = highest_block_number;
120 importer.import(up_to_beacon).await.unwrap();
121
122 let up_to_beacon = highest_block_number - 1;
123 importer.import(up_to_beacon).await.unwrap();
124 }
125
126 #[tokio::test]
127 async fn test_import_even_when_highest_block_number_is_none() {
128 let highest_block_number = None;
129 let chunk_size = BlockNumber(5);
130 let up_to_beacon = chunk_size - 1;
131
132 let highest_transaction_block_number_getter = Arc::new({
133 let mut mock = MockHighestStoredBlockNumberGetter::new();
134 mock.expect_get().returning(move || Ok(highest_block_number));
135 mock
136 });
137 let wrapped_importer = create_chain_data_importer_mock(vec![up_to_beacon]);
138
139 let importer = ChainDataImporterByChunk::new(
140 highest_transaction_block_number_getter,
141 Arc::new(wrapped_importer),
142 chunk_size,
143 TestLogger::stdout(),
144 );
145
146 importer.import(up_to_beacon).await.unwrap();
147 }
148
149 #[tokio::test]
150 async fn test_import_only_once_when_block_delta_less_than_chunk_size() {
151 let highest_block_number = BlockNumber(10);
152 let chunk_size = BlockNumber(5);
153 let up_to_beacon = highest_block_number + chunk_size - 1;
154
155 let highest_transaction_block_number_getter =
156 create_highest_stored_block_number_getter_mock(highest_block_number);
157 let wrapped_importer = create_chain_data_importer_mock(vec![up_to_beacon]);
158
159 let importer = ChainDataImporterByChunk::new(
160 highest_transaction_block_number_getter,
161 Arc::new(wrapped_importer),
162 chunk_size,
163 TestLogger::stdout(),
164 );
165
166 importer.import(up_to_beacon).await.unwrap();
167 }
168
169 #[tokio::test]
170 async fn test_import_multiple_times_when_block_delta_is_not_a_multiple_of_chunk_size() {
171 let highest_block_number = BlockNumber(10);
172 let chunk_size = BlockNumber(5);
173 let up_to_beacon = highest_block_number + chunk_size * 2 + 1;
174
175 let highest_transaction_block_number_getter =
176 create_highest_stored_block_number_getter_mock(highest_block_number);
177 let wrapped_importer = create_chain_data_importer_mock(vec![
178 highest_block_number + chunk_size,
179 highest_block_number + chunk_size * 2,
180 up_to_beacon,
181 ]);
182
183 let importer = ChainDataImporterByChunk::new(
184 highest_transaction_block_number_getter,
185 Arc::new(wrapped_importer),
186 chunk_size,
187 TestLogger::stdout(),
188 );
189
190 importer.import(up_to_beacon).await.unwrap();
191 }
192
193 #[tokio::test]
194 async fn test_import_multiple_times_when_block_delta_is_a_multiple_of_chunk_size() {
195 let highest_block_number = BlockNumber(10);
196 let chunk_size = BlockNumber(5);
197 let up_to_beacon = highest_block_number + chunk_size * 2;
198
199 let highest_transaction_block_number_getter =
200 create_highest_stored_block_number_getter_mock(highest_block_number);
201 let wrapped_importer = create_chain_data_importer_mock(vec![
202 highest_block_number + chunk_size,
203 highest_block_number + chunk_size * 2,
204 ]);
205
206 let importer = ChainDataImporterByChunk::new(
207 highest_transaction_block_number_getter,
208 Arc::new(wrapped_importer),
209 chunk_size,
210 TestLogger::stdout(),
211 );
212
213 importer.import(up_to_beacon).await.unwrap();
214 }
215}