mithril_cardano_node_chain/chain_importer/
service.rs

1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5use slog::Logger;
6use tokio::{runtime::Handle, task};
7
8use mithril_common::StdResult;
9use mithril_common::entities::BlockNumber;
10use mithril_common::logging::LoggerExtensions;
11
12use crate::chain_importer::block_ranges_importer::BlockRangeImporter;
13use crate::chain_importer::blocks_and_transactions_importer::BlocksTransactionsImporter;
14use crate::chain_importer::{ChainDataImporter, ChainDataStore};
15use crate::chain_scanner::BlockScanner;
16
17/// Import and store Cardano chain data (transactions, blocks) and aggregate them into Merkle trees
18/// for Mithril certificate generation and proof verification.
19#[derive(Clone)]
20pub struct CardanoChainDataImporter {
21    blocks_transactions_importer: BlocksTransactionsImporter,
22    block_ranges_importer: BlockRangeImporter,
23    transaction_store: Arc<dyn ChainDataStore>,
24}
25
26impl CardanoChainDataImporter {
27    /// Constructor
28    pub fn new(
29        block_scanner: Arc<dyn BlockScanner>,
30        transaction_store: Arc<dyn ChainDataStore>,
31        logger: Logger,
32    ) -> Self {
33        let logger = logger.new_with_component_name::<Self>();
34        Self {
35            blocks_transactions_importer: BlocksTransactionsImporter::new(
36                block_scanner,
37                transaction_store.clone(),
38                logger.clone(),
39            ),
40            block_ranges_importer: BlockRangeImporter::new(
41                transaction_store.clone(),
42                logger.clone(),
43            ),
44            transaction_store,
45        }
46    }
47}
48
49#[async_trait]
50impl ChainDataImporter for CardanoChainDataImporter {
51    async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
52        let importer = self.clone();
53        task::spawn_blocking(move || {
54            Handle::current().block_on(async move {
55                importer.blocks_transactions_importer.run(up_to_beacon).await?;
56                importer.transaction_store.optimize().await?;
57                importer.block_ranges_importer.run(up_to_beacon).await?;
58                importer.block_ranges_importer.run_legacy(up_to_beacon).await?;
59                importer.transaction_store.optimize().await?;
60                Ok(())
61            })
62        })
63        .await
64        .with_context(|| "ChainDataImporter - worker thread crashed")?
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use std::collections::BTreeSet;
71    use std::ops::Range;
72    use std::sync::atomic::AtomicUsize;
73    use std::time::Duration;
74
75    use mithril_common::crypto_helper::MKTreeNode;
76    use mithril_common::entities::{
77        BlockRange, CardanoBlockTransactionMkTreeNode, CardanoBlockWithTransactions,
78        CardanoTransaction, ChainPoint, SlotNumber,
79    };
80
81    use crate::test::TestLogger;
82    use crate::test::double::DumbBlockScanner;
83
84    use super::*;
85
86    #[tokio::test]
87    async fn test_import_is_non_blocking() {
88        static COUNTER: AtomicUsize = AtomicUsize::new(0);
89        static MAX_COUNTER: usize = 25;
90        static WAIT_TIME: u64 = 50;
91
92        // Use a local set to ensure the counter-task is not dispatched on a different thread
93        let local = task::LocalSet::new();
94        local
95            .run_until(async {
96                let importer = CardanoChainDataImporter::new(
97                    Arc::new(DumbBlockScanner::new()),
98                    Arc::new(BlockingRepository {
99                        wait_time: Duration::from_millis(WAIT_TIME),
100                    }),
101                    TestLogger::stdout(),
102                );
103
104                let importer_future = importer.import(BlockNumber(100));
105                let counter_task = task::spawn_local(async {
106                    while COUNTER.load(std::sync::atomic::Ordering::SeqCst) < MAX_COUNTER {
107                        tokio::time::sleep(Duration::from_millis(1)).await;
108                        COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
109                    }
110                });
111                importer_future.await.unwrap();
112
113                counter_task.abort();
114            })
115            .await;
116
117        assert_eq!(
118            MAX_COUNTER,
119            COUNTER.load(std::sync::atomic::Ordering::SeqCst)
120        );
121
122        struct BlockingRepository {
123            wait_time: Duration,
124        }
125
126        impl BlockingRepository {
127            fn block_thread(&self) {
128                std::thread::sleep(self.wait_time);
129            }
130        }
131
132        #[async_trait]
133        impl ChainDataStore for BlockingRepository {
134            async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
135                self.block_thread();
136                Ok(None)
137            }
138
139            async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
140                self.block_thread();
141                Ok(None)
142            }
143
144            async fn get_highest_legacy_block_range(&self) -> StdResult<Option<BlockRange>> {
145                self.block_thread();
146                Ok(None)
147            }
148
149            async fn store_blocks_and_transactions(
150                &self,
151                _: Vec<CardanoBlockWithTransactions>,
152            ) -> StdResult<()> {
153                self.block_thread();
154                Ok(())
155            }
156
157            async fn get_blocks_and_transactions_in_range(
158                &self,
159                _range: Range<BlockNumber>,
160            ) -> StdResult<BTreeSet<CardanoBlockTransactionMkTreeNode>> {
161                self.block_thread();
162                Ok(BTreeSet::new())
163            }
164
165            async fn get_transactions_in_range(
166                &self,
167                _: Range<BlockNumber>,
168            ) -> StdResult<Vec<CardanoTransaction>> {
169                self.block_thread();
170                Ok(vec![])
171            }
172
173            async fn store_block_range_roots(
174                &self,
175                _block_ranges: Vec<(BlockRange, MKTreeNode)>,
176            ) -> StdResult<()> {
177                self.block_thread();
178                Ok(())
179            }
180
181            async fn store_legacy_block_range_roots(
182                &self,
183                _: Vec<(BlockRange, MKTreeNode)>,
184            ) -> StdResult<()> {
185                self.block_thread();
186                Ok(())
187            }
188
189            async fn remove_rolled_chain_data_and_block_range(
190                &self,
191                _: SlotNumber,
192            ) -> StdResult<()> {
193                self.block_thread();
194                Ok(())
195            }
196
197            async fn optimize(&self) -> StdResult<()> {
198                self.block_thread();
199                Ok(())
200            }
201        }
202    }
203}