mithril_signer/services/cardano_transactions/importer/
importer_by_chunk.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use slog::{Logger, debug};
5
6use mithril_common::StdResult;
7use mithril_common::entities::BlockNumber;
8use mithril_common::logging::LoggerExtensions;
9use mithril_common::signable_builder::TransactionsImporter;
10
11/// Trait to get the highest transaction block number
12#[cfg_attr(test, mockall::automock)]
13#[async_trait]
14pub trait HighestTransactionBlockNumberGetter: Send + Sync {
15    /// Get the highest known transaction block number
16    async fn get(&self) -> StdResult<Option<BlockNumber>>;
17}
18
19/// A decorator of [TransactionsImporter] that does the import by chunks
20pub struct TransactionsImporterByChunk {
21    highest_transaction_block_number_getter: Arc<dyn HighestTransactionBlockNumberGetter>,
22    wrapped_importer: Arc<dyn TransactionsImporter>,
23    chunk_size: BlockNumber,
24    logger: Logger,
25}
26
27impl TransactionsImporterByChunk {
28    /// Create a new instance of `TransactionsImporterByChunk`.
29    pub fn new(
30        highest_transaction_block_number_getter: Arc<dyn HighestTransactionBlockNumberGetter>,
31        wrapped_importer: Arc<dyn TransactionsImporter>,
32        chunk_size: BlockNumber,
33        logger: Logger,
34    ) -> Self {
35        Self {
36            highest_transaction_block_number_getter,
37            wrapped_importer,
38            chunk_size,
39            logger: logger.new_with_component_name::<Self>(),
40        }
41    }
42}
43
44#[async_trait]
45impl TransactionsImporter for TransactionsImporterByChunk {
46    async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
47        let mut intermediate_up_to = self
48            .highest_transaction_block_number_getter
49            .get()
50            .await?
51            .unwrap_or(BlockNumber(0));
52
53        while intermediate_up_to < up_to_beacon {
54            let next_up_to = (intermediate_up_to + self.chunk_size).min(up_to_beacon);
55            debug!(
56                self.logger,
57                "Running Transactions import between block '{intermediate_up_to}' and '{next_up_to}'";
58            );
59            self.wrapped_importer.import(next_up_to).await?;
60            intermediate_up_to = next_up_to;
61        }
62
63        Ok(())
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    use mockall::predicate::eq;
70    use mockall::{Sequence, mock};
71
72    use crate::test_tools::TestLogger;
73
74    use super::*;
75
76    mock! {
77        pub TransactionImporterImpl {}
78
79        #[async_trait]
80        impl TransactionsImporter for TransactionImporterImpl {
81            async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()>;
82        }
83    }
84
85    fn create_highest_transaction_block_number_getter_mock(
86        highest_block_number: BlockNumber,
87    ) -> Arc<dyn HighestTransactionBlockNumberGetter> {
88        Arc::new({
89            let mut mock = MockHighestTransactionBlockNumberGetter::new();
90            mock.expect_get().returning(move || Ok(Some(highest_block_number)));
91            mock
92        })
93    }
94
95    fn create_transaction_importer_mock(
96        expected_values: Vec<BlockNumber>,
97    ) -> MockTransactionImporterImpl {
98        let mut seq = Sequence::new();
99        let mut wrapped_importer = MockTransactionImporterImpl::new();
100        for expected_value in expected_values {
101            wrapped_importer
102                .expect_import()
103                .once()
104                .in_sequence(&mut seq)
105                .with(eq(expected_value))
106                .returning(|_| Ok(()));
107        }
108        wrapped_importer
109    }
110
111    #[tokio::test]
112    async fn test_import_nothing_to_do_when_highest_block_number_lower_or_equal_up_to_beacon() {
113        let highest_block_number = BlockNumber(10);
114        let chunk_size = BlockNumber(5);
115
116        let highest_transaction_block_number_getter =
117            create_highest_transaction_block_number_getter_mock(highest_block_number);
118        let mut wrapped_importer = MockTransactionImporterImpl::new();
119        wrapped_importer.expect_import().never();
120
121        let importer = TransactionsImporterByChunk::new(
122            highest_transaction_block_number_getter,
123            Arc::new(wrapped_importer),
124            chunk_size,
125            TestLogger::stdout(),
126        );
127
128        let up_to_beacon = highest_block_number;
129        importer.import(up_to_beacon).await.unwrap();
130
131        let up_to_beacon = highest_block_number - 1;
132        importer.import(up_to_beacon).await.unwrap();
133    }
134
135    #[tokio::test]
136    async fn test_import_even_when_highest_block_number_is_none() {
137        let highest_block_number = None;
138        let chunk_size = BlockNumber(5);
139        let up_to_beacon = chunk_size - 1;
140
141        let highest_transaction_block_number_getter = Arc::new({
142            let mut mock = MockHighestTransactionBlockNumberGetter::new();
143            mock.expect_get().returning(move || Ok(highest_block_number));
144            mock
145        });
146        let wrapped_importer = create_transaction_importer_mock(vec![up_to_beacon]);
147
148        let importer = TransactionsImporterByChunk::new(
149            highest_transaction_block_number_getter,
150            Arc::new(wrapped_importer),
151            chunk_size,
152            TestLogger::stdout(),
153        );
154
155        importer.import(up_to_beacon).await.unwrap();
156    }
157
158    #[tokio::test]
159    async fn test_import_only_once_when_block_delta_less_than_chunk_size() {
160        let highest_block_number = BlockNumber(10);
161        let chunk_size = BlockNumber(5);
162        let up_to_beacon = highest_block_number + chunk_size - 1;
163
164        let highest_transaction_block_number_getter =
165            create_highest_transaction_block_number_getter_mock(highest_block_number);
166        let wrapped_importer = create_transaction_importer_mock(vec![up_to_beacon]);
167
168        let importer = TransactionsImporterByChunk::new(
169            highest_transaction_block_number_getter,
170            Arc::new(wrapped_importer),
171            chunk_size,
172            TestLogger::stdout(),
173        );
174
175        importer.import(up_to_beacon).await.unwrap();
176    }
177
178    #[tokio::test]
179    async fn test_import_multiple_times_when_block_delta_is_not_a_multiple_of_chunk_size() {
180        let highest_block_number = BlockNumber(10);
181        let chunk_size = BlockNumber(5);
182        let up_to_beacon = highest_block_number + chunk_size * 2 + 1;
183
184        let highest_transaction_block_number_getter =
185            create_highest_transaction_block_number_getter_mock(highest_block_number);
186        let wrapped_importer = create_transaction_importer_mock(vec![
187            highest_block_number + chunk_size,
188            highest_block_number + chunk_size * 2,
189            up_to_beacon,
190        ]);
191
192        let importer = TransactionsImporterByChunk::new(
193            highest_transaction_block_number_getter,
194            Arc::new(wrapped_importer),
195            chunk_size,
196            TestLogger::stdout(),
197        );
198
199        importer.import(up_to_beacon).await.unwrap();
200    }
201
202    #[tokio::test]
203    async fn test_import_multiple_times_when_block_delta_is_a_multiple_of_chunk_size() {
204        let highest_block_number = BlockNumber(10);
205        let chunk_size = BlockNumber(5);
206        let up_to_beacon = highest_block_number + chunk_size * 2;
207
208        let highest_transaction_block_number_getter =
209            create_highest_transaction_block_number_getter_mock(highest_block_number);
210        let wrapped_importer = create_transaction_importer_mock(vec![
211            highest_block_number + chunk_size,
212            highest_block_number + chunk_size * 2,
213        ]);
214
215        let importer = TransactionsImporterByChunk::new(
216            highest_transaction_block_number_getter,
217            Arc::new(wrapped_importer),
218            chunk_size,
219            TestLogger::stdout(),
220        );
221
222        importer.import(up_to_beacon).await.unwrap();
223    }
224}