mithril_signer/services/cardano_transactions/importer/
importer_by_chunk.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use slog::{debug, Logger};
5
6use mithril_common::entities::BlockNumber;
7use mithril_common::logging::LoggerExtensions;
8use mithril_common::signable_builder::TransactionsImporter;
9use mithril_common::StdResult;
10
11/// Trait to get the highest transaction block number
12#[cfg_attr(test, mockall::automock)]
13#[async_trait]
14pub trait HighestTransactionBlockNumberGetter: Send + Sync {
15    /// Get the highest known transaction block number
16    async fn get(&self) -> StdResult<Option<BlockNumber>>;
17}
18
19/// A decorator of [TransactionsImporter] that does the import by chunks
20pub struct TransactionsImporterByChunk {
21    highest_transaction_block_number_getter: Arc<dyn HighestTransactionBlockNumberGetter>,
22    wrapped_importer: Arc<dyn TransactionsImporter>,
23    chunk_size: BlockNumber,
24    logger: Logger,
25}
26
27impl TransactionsImporterByChunk {
28    /// Create a new instance of `TransactionsImporterByChunk`.
29    pub fn new(
30        highest_transaction_block_number_getter: Arc<dyn HighestTransactionBlockNumberGetter>,
31        wrapped_importer: Arc<dyn TransactionsImporter>,
32        chunk_size: BlockNumber,
33        logger: Logger,
34    ) -> Self {
35        Self {
36            highest_transaction_block_number_getter,
37            wrapped_importer,
38            chunk_size,
39            logger: logger.new_with_component_name::<Self>(),
40        }
41    }
42}
43
44#[async_trait]
45impl TransactionsImporter for TransactionsImporterByChunk {
46    async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
47        let mut intermediate_up_to = self
48            .highest_transaction_block_number_getter
49            .get()
50            .await?
51            .unwrap_or(BlockNumber(0));
52
53        while intermediate_up_to < up_to_beacon {
54            let next_up_to = (intermediate_up_to + self.chunk_size).min(up_to_beacon);
55            debug!(
56                self.logger,
57                "Running Transactions import between block '{intermediate_up_to}' and '{next_up_to}'";
58            );
59            self.wrapped_importer.import(next_up_to).await?;
60            intermediate_up_to = next_up_to;
61        }
62
63        Ok(())
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    use mockall::predicate::eq;
70    use mockall::{mock, Sequence};
71
72    use crate::test_tools::TestLogger;
73
74    use super::*;
75
76    mock! {
77        pub TransactionImporterImpl {}
78
79        #[async_trait]
80        impl TransactionsImporter for TransactionImporterImpl {
81            async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()>;
82        }
83    }
84
85    fn create_highest_transaction_block_number_getter_mock(
86        highest_block_number: BlockNumber,
87    ) -> Arc<dyn HighestTransactionBlockNumberGetter> {
88        Arc::new({
89            let mut mock = MockHighestTransactionBlockNumberGetter::new();
90            mock.expect_get()
91                .returning(move || Ok(Some(highest_block_number)));
92            mock
93        })
94    }
95
96    fn create_transaction_importer_mock(
97        expected_values: Vec<BlockNumber>,
98    ) -> MockTransactionImporterImpl {
99        let mut seq = Sequence::new();
100        let mut wrapped_importer = MockTransactionImporterImpl::new();
101        for expected_value in expected_values {
102            wrapped_importer
103                .expect_import()
104                .once()
105                .in_sequence(&mut seq)
106                .with(eq(expected_value))
107                .returning(|_| Ok(()));
108        }
109        wrapped_importer
110    }
111
112    #[tokio::test]
113    async fn test_import_nothing_to_do_when_highest_block_number_lower_or_equal_up_to_beacon() {
114        let highest_block_number = BlockNumber(10);
115        let chunk_size = BlockNumber(5);
116
117        let highest_transaction_block_number_getter =
118            create_highest_transaction_block_number_getter_mock(highest_block_number);
119        let mut wrapped_importer = MockTransactionImporterImpl::new();
120        wrapped_importer.expect_import().never();
121
122        let importer = TransactionsImporterByChunk::new(
123            highest_transaction_block_number_getter,
124            Arc::new(wrapped_importer),
125            chunk_size,
126            TestLogger::stdout(),
127        );
128
129        let up_to_beacon = highest_block_number;
130        importer.import(up_to_beacon).await.unwrap();
131
132        let up_to_beacon = highest_block_number - 1;
133        importer.import(up_to_beacon).await.unwrap();
134    }
135
136    #[tokio::test]
137    async fn test_import_even_when_highest_block_number_is_none() {
138        let highest_block_number = None;
139        let chunk_size = BlockNumber(5);
140        let up_to_beacon = chunk_size - 1;
141
142        let highest_transaction_block_number_getter = Arc::new({
143            let mut mock = MockHighestTransactionBlockNumberGetter::new();
144            mock.expect_get()
145                .returning(move || Ok(highest_block_number));
146            mock
147        });
148        let wrapped_importer = create_transaction_importer_mock(vec![up_to_beacon]);
149
150        let importer = TransactionsImporterByChunk::new(
151            highest_transaction_block_number_getter,
152            Arc::new(wrapped_importer),
153            chunk_size,
154            TestLogger::stdout(),
155        );
156
157        importer.import(up_to_beacon).await.unwrap();
158    }
159
160    #[tokio::test]
161    async fn test_import_only_once_when_block_delta_less_than_chunk_size() {
162        let highest_block_number = BlockNumber(10);
163        let chunk_size = BlockNumber(5);
164        let up_to_beacon = highest_block_number + chunk_size - 1;
165
166        let highest_transaction_block_number_getter =
167            create_highest_transaction_block_number_getter_mock(highest_block_number);
168        let wrapped_importer = create_transaction_importer_mock(vec![up_to_beacon]);
169
170        let importer = TransactionsImporterByChunk::new(
171            highest_transaction_block_number_getter,
172            Arc::new(wrapped_importer),
173            chunk_size,
174            TestLogger::stdout(),
175        );
176
177        importer.import(up_to_beacon).await.unwrap();
178    }
179
180    #[tokio::test]
181    async fn test_import_multiple_times_when_block_delta_is_not_a_multiple_of_chunk_size() {
182        let highest_block_number = BlockNumber(10);
183        let chunk_size = BlockNumber(5);
184        let up_to_beacon = highest_block_number + chunk_size * 2 + 1;
185
186        let highest_transaction_block_number_getter =
187            create_highest_transaction_block_number_getter_mock(highest_block_number);
188        let wrapped_importer = create_transaction_importer_mock(vec![
189            highest_block_number + chunk_size,
190            highest_block_number + chunk_size * 2,
191            up_to_beacon,
192        ]);
193
194        let importer = TransactionsImporterByChunk::new(
195            highest_transaction_block_number_getter,
196            Arc::new(wrapped_importer),
197            chunk_size,
198            TestLogger::stdout(),
199        );
200
201        importer.import(up_to_beacon).await.unwrap();
202    }
203
204    #[tokio::test]
205    async fn test_import_multiple_times_when_block_delta_is_a_multiple_of_chunk_size() {
206        let highest_block_number = BlockNumber(10);
207        let chunk_size = BlockNumber(5);
208        let up_to_beacon = highest_block_number + chunk_size * 2;
209
210        let highest_transaction_block_number_getter =
211            create_highest_transaction_block_number_getter_mock(highest_block_number);
212        let wrapped_importer = create_transaction_importer_mock(vec![
213            highest_block_number + chunk_size,
214            highest_block_number + chunk_size * 2,
215        ]);
216
217        let importer = TransactionsImporterByChunk::new(
218            highest_transaction_block_number_getter,
219            Arc::new(wrapped_importer),
220            chunk_size,
221            TestLogger::stdout(),
222        );
223
224        importer.import(up_to_beacon).await.unwrap();
225    }
226}