mithril_signer/services/cardano_transactions/importer/
importer_by_chunk.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use slog::{debug, Logger};
5
6use mithril_common::entities::BlockNumber;
7use mithril_common::logging::LoggerExtensions;
8use mithril_common::signable_builder::TransactionsImporter;
9use mithril_common::StdResult;
10
11#[cfg_attr(test, mockall::automock)]
13#[async_trait]
14pub trait HighestTransactionBlockNumberGetter: Send + Sync {
15 async fn get(&self) -> StdResult<Option<BlockNumber>>;
17}
18
19pub struct TransactionsImporterByChunk {
21 highest_transaction_block_number_getter: Arc<dyn HighestTransactionBlockNumberGetter>,
22 wrapped_importer: Arc<dyn TransactionsImporter>,
23 chunk_size: BlockNumber,
24 logger: Logger,
25}
26
27impl TransactionsImporterByChunk {
28 pub fn new(
30 highest_transaction_block_number_getter: Arc<dyn HighestTransactionBlockNumberGetter>,
31 wrapped_importer: Arc<dyn TransactionsImporter>,
32 chunk_size: BlockNumber,
33 logger: Logger,
34 ) -> Self {
35 Self {
36 highest_transaction_block_number_getter,
37 wrapped_importer,
38 chunk_size,
39 logger: logger.new_with_component_name::<Self>(),
40 }
41 }
42}
43
44#[async_trait]
45impl TransactionsImporter for TransactionsImporterByChunk {
46 async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
47 let mut intermediate_up_to = self
48 .highest_transaction_block_number_getter
49 .get()
50 .await?
51 .unwrap_or(BlockNumber(0));
52
53 while intermediate_up_to < up_to_beacon {
54 let next_up_to = (intermediate_up_to + self.chunk_size).min(up_to_beacon);
55 debug!(
56 self.logger,
57 "Running Transactions import between block '{intermediate_up_to}' and '{next_up_to}'";
58 );
59 self.wrapped_importer.import(next_up_to).await?;
60 intermediate_up_to = next_up_to;
61 }
62
63 Ok(())
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use mockall::predicate::eq;
70 use mockall::{mock, Sequence};
71
72 use crate::test_tools::TestLogger;
73
74 use super::*;
75
76 mock! {
77 pub TransactionImporterImpl {}
78
79 #[async_trait]
80 impl TransactionsImporter for TransactionImporterImpl {
81 async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()>;
82 }
83 }
84
85 fn create_highest_transaction_block_number_getter_mock(
86 highest_block_number: BlockNumber,
87 ) -> Arc<dyn HighestTransactionBlockNumberGetter> {
88 Arc::new({
89 let mut mock = MockHighestTransactionBlockNumberGetter::new();
90 mock.expect_get()
91 .returning(move || Ok(Some(highest_block_number)));
92 mock
93 })
94 }
95
96 fn create_transaction_importer_mock(
97 expected_values: Vec<BlockNumber>,
98 ) -> MockTransactionImporterImpl {
99 let mut seq = Sequence::new();
100 let mut wrapped_importer = MockTransactionImporterImpl::new();
101 for expected_value in expected_values {
102 wrapped_importer
103 .expect_import()
104 .once()
105 .in_sequence(&mut seq)
106 .with(eq(expected_value))
107 .returning(|_| Ok(()));
108 }
109 wrapped_importer
110 }
111
112 #[tokio::test]
113 async fn test_import_nothing_to_do_when_highest_block_number_lower_or_equal_up_to_beacon() {
114 let highest_block_number = BlockNumber(10);
115 let chunk_size = BlockNumber(5);
116
117 let highest_transaction_block_number_getter =
118 create_highest_transaction_block_number_getter_mock(highest_block_number);
119 let mut wrapped_importer = MockTransactionImporterImpl::new();
120 wrapped_importer.expect_import().never();
121
122 let importer = TransactionsImporterByChunk::new(
123 highest_transaction_block_number_getter,
124 Arc::new(wrapped_importer),
125 chunk_size,
126 TestLogger::stdout(),
127 );
128
129 let up_to_beacon = highest_block_number;
130 importer.import(up_to_beacon).await.unwrap();
131
132 let up_to_beacon = highest_block_number - 1;
133 importer.import(up_to_beacon).await.unwrap();
134 }
135
136 #[tokio::test]
137 async fn test_import_even_when_highest_block_number_is_none() {
138 let highest_block_number = None;
139 let chunk_size = BlockNumber(5);
140 let up_to_beacon = chunk_size - 1;
141
142 let highest_transaction_block_number_getter = Arc::new({
143 let mut mock = MockHighestTransactionBlockNumberGetter::new();
144 mock.expect_get()
145 .returning(move || Ok(highest_block_number));
146 mock
147 });
148 let wrapped_importer = create_transaction_importer_mock(vec![up_to_beacon]);
149
150 let importer = TransactionsImporterByChunk::new(
151 highest_transaction_block_number_getter,
152 Arc::new(wrapped_importer),
153 chunk_size,
154 TestLogger::stdout(),
155 );
156
157 importer.import(up_to_beacon).await.unwrap();
158 }
159
160 #[tokio::test]
161 async fn test_import_only_once_when_block_delta_less_than_chunk_size() {
162 let highest_block_number = BlockNumber(10);
163 let chunk_size = BlockNumber(5);
164 let up_to_beacon = highest_block_number + chunk_size - 1;
165
166 let highest_transaction_block_number_getter =
167 create_highest_transaction_block_number_getter_mock(highest_block_number);
168 let wrapped_importer = create_transaction_importer_mock(vec![up_to_beacon]);
169
170 let importer = TransactionsImporterByChunk::new(
171 highest_transaction_block_number_getter,
172 Arc::new(wrapped_importer),
173 chunk_size,
174 TestLogger::stdout(),
175 );
176
177 importer.import(up_to_beacon).await.unwrap();
178 }
179
180 #[tokio::test]
181 async fn test_import_multiple_times_when_block_delta_is_not_a_multiple_of_chunk_size() {
182 let highest_block_number = BlockNumber(10);
183 let chunk_size = BlockNumber(5);
184 let up_to_beacon = highest_block_number + chunk_size * 2 + 1;
185
186 let highest_transaction_block_number_getter =
187 create_highest_transaction_block_number_getter_mock(highest_block_number);
188 let wrapped_importer = create_transaction_importer_mock(vec![
189 highest_block_number + chunk_size,
190 highest_block_number + chunk_size * 2,
191 up_to_beacon,
192 ]);
193
194 let importer = TransactionsImporterByChunk::new(
195 highest_transaction_block_number_getter,
196 Arc::new(wrapped_importer),
197 chunk_size,
198 TestLogger::stdout(),
199 );
200
201 importer.import(up_to_beacon).await.unwrap();
202 }
203
204 #[tokio::test]
205 async fn test_import_multiple_times_when_block_delta_is_a_multiple_of_chunk_size() {
206 let highest_block_number = BlockNumber(10);
207 let chunk_size = BlockNumber(5);
208 let up_to_beacon = highest_block_number + chunk_size * 2;
209
210 let highest_transaction_block_number_getter =
211 create_highest_transaction_block_number_getter_mock(highest_block_number);
212 let wrapped_importer = create_transaction_importer_mock(vec![
213 highest_block_number + chunk_size,
214 highest_block_number + chunk_size * 2,
215 ]);
216
217 let importer = TransactionsImporterByChunk::new(
218 highest_transaction_block_number_getter,
219 Arc::new(wrapped_importer),
220 chunk_size,
221 TestLogger::stdout(),
222 );
223
224 importer.import(up_to_beacon).await.unwrap();
225 }
226}