mithril_signer/services/cardano_transactions/importer/
importer_by_chunk.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use slog::{Logger, debug};
5
6use mithril_common::StdResult;
7use mithril_common::entities::BlockNumber;
8use mithril_common::logging::LoggerExtensions;
9use mithril_common::signable_builder::TransactionsImporter;
10
11#[cfg_attr(test, mockall::automock)]
13#[async_trait]
14pub trait HighestTransactionBlockNumberGetter: Send + Sync {
15 async fn get(&self) -> StdResult<Option<BlockNumber>>;
17}
18
19pub struct TransactionsImporterByChunk {
21 highest_transaction_block_number_getter: Arc<dyn HighestTransactionBlockNumberGetter>,
22 wrapped_importer: Arc<dyn TransactionsImporter>,
23 chunk_size: BlockNumber,
24 logger: Logger,
25}
26
27impl TransactionsImporterByChunk {
28 pub fn new(
30 highest_transaction_block_number_getter: Arc<dyn HighestTransactionBlockNumberGetter>,
31 wrapped_importer: Arc<dyn TransactionsImporter>,
32 chunk_size: BlockNumber,
33 logger: Logger,
34 ) -> Self {
35 Self {
36 highest_transaction_block_number_getter,
37 wrapped_importer,
38 chunk_size,
39 logger: logger.new_with_component_name::<Self>(),
40 }
41 }
42}
43
44#[async_trait]
45impl TransactionsImporter for TransactionsImporterByChunk {
46 async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
47 let mut intermediate_up_to = self
48 .highest_transaction_block_number_getter
49 .get()
50 .await?
51 .unwrap_or(BlockNumber(0));
52
53 while intermediate_up_to < up_to_beacon {
54 let next_up_to = (intermediate_up_to + self.chunk_size).min(up_to_beacon);
55 debug!(
56 self.logger,
57 "Running Transactions import between block '{intermediate_up_to}' and '{next_up_to}'";
58 );
59 self.wrapped_importer.import(next_up_to).await?;
60 intermediate_up_to = next_up_to;
61 }
62
63 Ok(())
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use mockall::predicate::eq;
70 use mockall::{Sequence, mock};
71
72 use crate::test_tools::TestLogger;
73
74 use super::*;
75
76 mock! {
77 pub TransactionImporterImpl {}
78
79 #[async_trait]
80 impl TransactionsImporter for TransactionImporterImpl {
81 async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()>;
82 }
83 }
84
85 fn create_highest_transaction_block_number_getter_mock(
86 highest_block_number: BlockNumber,
87 ) -> Arc<dyn HighestTransactionBlockNumberGetter> {
88 Arc::new({
89 let mut mock = MockHighestTransactionBlockNumberGetter::new();
90 mock.expect_get().returning(move || Ok(Some(highest_block_number)));
91 mock
92 })
93 }
94
95 fn create_transaction_importer_mock(
96 expected_values: Vec<BlockNumber>,
97 ) -> MockTransactionImporterImpl {
98 let mut seq = Sequence::new();
99 let mut wrapped_importer = MockTransactionImporterImpl::new();
100 for expected_value in expected_values {
101 wrapped_importer
102 .expect_import()
103 .once()
104 .in_sequence(&mut seq)
105 .with(eq(expected_value))
106 .returning(|_| Ok(()));
107 }
108 wrapped_importer
109 }
110
111 #[tokio::test]
112 async fn test_import_nothing_to_do_when_highest_block_number_lower_or_equal_up_to_beacon() {
113 let highest_block_number = BlockNumber(10);
114 let chunk_size = BlockNumber(5);
115
116 let highest_transaction_block_number_getter =
117 create_highest_transaction_block_number_getter_mock(highest_block_number);
118 let mut wrapped_importer = MockTransactionImporterImpl::new();
119 wrapped_importer.expect_import().never();
120
121 let importer = TransactionsImporterByChunk::new(
122 highest_transaction_block_number_getter,
123 Arc::new(wrapped_importer),
124 chunk_size,
125 TestLogger::stdout(),
126 );
127
128 let up_to_beacon = highest_block_number;
129 importer.import(up_to_beacon).await.unwrap();
130
131 let up_to_beacon = highest_block_number - 1;
132 importer.import(up_to_beacon).await.unwrap();
133 }
134
135 #[tokio::test]
136 async fn test_import_even_when_highest_block_number_is_none() {
137 let highest_block_number = None;
138 let chunk_size = BlockNumber(5);
139 let up_to_beacon = chunk_size - 1;
140
141 let highest_transaction_block_number_getter = Arc::new({
142 let mut mock = MockHighestTransactionBlockNumberGetter::new();
143 mock.expect_get().returning(move || Ok(highest_block_number));
144 mock
145 });
146 let wrapped_importer = create_transaction_importer_mock(vec![up_to_beacon]);
147
148 let importer = TransactionsImporterByChunk::new(
149 highest_transaction_block_number_getter,
150 Arc::new(wrapped_importer),
151 chunk_size,
152 TestLogger::stdout(),
153 );
154
155 importer.import(up_to_beacon).await.unwrap();
156 }
157
158 #[tokio::test]
159 async fn test_import_only_once_when_block_delta_less_than_chunk_size() {
160 let highest_block_number = BlockNumber(10);
161 let chunk_size = BlockNumber(5);
162 let up_to_beacon = highest_block_number + chunk_size - 1;
163
164 let highest_transaction_block_number_getter =
165 create_highest_transaction_block_number_getter_mock(highest_block_number);
166 let wrapped_importer = create_transaction_importer_mock(vec![up_to_beacon]);
167
168 let importer = TransactionsImporterByChunk::new(
169 highest_transaction_block_number_getter,
170 Arc::new(wrapped_importer),
171 chunk_size,
172 TestLogger::stdout(),
173 );
174
175 importer.import(up_to_beacon).await.unwrap();
176 }
177
178 #[tokio::test]
179 async fn test_import_multiple_times_when_block_delta_is_not_a_multiple_of_chunk_size() {
180 let highest_block_number = BlockNumber(10);
181 let chunk_size = BlockNumber(5);
182 let up_to_beacon = highest_block_number + chunk_size * 2 + 1;
183
184 let highest_transaction_block_number_getter =
185 create_highest_transaction_block_number_getter_mock(highest_block_number);
186 let wrapped_importer = create_transaction_importer_mock(vec![
187 highest_block_number + chunk_size,
188 highest_block_number + chunk_size * 2,
189 up_to_beacon,
190 ]);
191
192 let importer = TransactionsImporterByChunk::new(
193 highest_transaction_block_number_getter,
194 Arc::new(wrapped_importer),
195 chunk_size,
196 TestLogger::stdout(),
197 );
198
199 importer.import(up_to_beacon).await.unwrap();
200 }
201
202 #[tokio::test]
203 async fn test_import_multiple_times_when_block_delta_is_a_multiple_of_chunk_size() {
204 let highest_block_number = BlockNumber(10);
205 let chunk_size = BlockNumber(5);
206 let up_to_beacon = highest_block_number + chunk_size * 2;
207
208 let highest_transaction_block_number_getter =
209 create_highest_transaction_block_number_getter_mock(highest_block_number);
210 let wrapped_importer = create_transaction_importer_mock(vec![
211 highest_block_number + chunk_size,
212 highest_block_number + chunk_size * 2,
213 ]);
214
215 let importer = TransactionsImporterByChunk::new(
216 highest_transaction_block_number_getter,
217 Arc::new(wrapped_importer),
218 chunk_size,
219 TestLogger::stdout(),
220 );
221
222 importer.import(up_to_beacon).await.unwrap();
223 }
224}