mithril_cardano_node_chain/chain_importer/
service.rs1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5use slog::Logger;
6use tokio::{runtime::Handle, task};
7
8use mithril_common::StdResult;
9use mithril_common::entities::BlockNumber;
10use mithril_common::logging::LoggerExtensions;
11
12use crate::chain_importer::block_ranges_importer::BlockRangeImporter;
13use crate::chain_importer::blocks_and_transactions_importer::BlocksTransactionsImporter;
14use crate::chain_importer::{ChainDataImporter, ChainDataStore};
15use crate::chain_scanner::BlockScanner;
16
17#[derive(Clone)]
20pub struct CardanoChainDataImporter {
21 blocks_transactions_importer: BlocksTransactionsImporter,
22 block_ranges_importer: BlockRangeImporter,
23 transaction_store: Arc<dyn ChainDataStore>,
24}
25
26impl CardanoChainDataImporter {
27 pub fn new(
29 block_scanner: Arc<dyn BlockScanner>,
30 transaction_store: Arc<dyn ChainDataStore>,
31 logger: Logger,
32 ) -> Self {
33 let logger = logger.new_with_component_name::<Self>();
34 Self {
35 blocks_transactions_importer: BlocksTransactionsImporter::new(
36 block_scanner,
37 transaction_store.clone(),
38 logger.clone(),
39 ),
40 block_ranges_importer: BlockRangeImporter::new(
41 transaction_store.clone(),
42 logger.clone(),
43 ),
44 transaction_store,
45 }
46 }
47}
48
49#[async_trait]
50impl ChainDataImporter for CardanoChainDataImporter {
51 async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
52 let importer = self.clone();
53 task::spawn_blocking(move || {
54 Handle::current().block_on(async move {
55 importer.blocks_transactions_importer.run(up_to_beacon).await?;
56 importer.transaction_store.optimize().await?;
57 importer.block_ranges_importer.run(up_to_beacon).await?;
58 importer.block_ranges_importer.run_legacy(up_to_beacon).await?;
59 importer.transaction_store.optimize().await?;
60 Ok(())
61 })
62 })
63 .await
64 .with_context(|| "ChainDataImporter - worker thread crashed")?
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use std::collections::BTreeSet;
71 use std::ops::Range;
72 use std::sync::atomic::AtomicUsize;
73 use std::time::Duration;
74
75 use mithril_common::crypto_helper::MKTreeNode;
76 use mithril_common::entities::{
77 BlockRange, CardanoBlockTransactionMkTreeNode, CardanoBlockWithTransactions,
78 CardanoTransaction, ChainPoint, SlotNumber,
79 };
80
81 use crate::test::TestLogger;
82 use crate::test::double::DumbBlockScanner;
83
84 use super::*;
85
86 #[tokio::test]
87 async fn test_import_is_non_blocking() {
88 static COUNTER: AtomicUsize = AtomicUsize::new(0);
89 static MAX_COUNTER: usize = 25;
90 static WAIT_TIME: u64 = 50;
91
92 let local = task::LocalSet::new();
94 local
95 .run_until(async {
96 let importer = CardanoChainDataImporter::new(
97 Arc::new(DumbBlockScanner::new()),
98 Arc::new(BlockingRepository {
99 wait_time: Duration::from_millis(WAIT_TIME),
100 }),
101 TestLogger::stdout(),
102 );
103
104 let importer_future = importer.import(BlockNumber(100));
105 let counter_task = task::spawn_local(async {
106 while COUNTER.load(std::sync::atomic::Ordering::SeqCst) < MAX_COUNTER {
107 tokio::time::sleep(Duration::from_millis(1)).await;
108 COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
109 }
110 });
111 importer_future.await.unwrap();
112
113 counter_task.abort();
114 })
115 .await;
116
117 assert_eq!(
118 MAX_COUNTER,
119 COUNTER.load(std::sync::atomic::Ordering::SeqCst)
120 );
121
122 struct BlockingRepository {
123 wait_time: Duration,
124 }
125
126 impl BlockingRepository {
127 fn block_thread(&self) {
128 std::thread::sleep(self.wait_time);
129 }
130 }
131
132 #[async_trait]
133 impl ChainDataStore for BlockingRepository {
134 async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
135 self.block_thread();
136 Ok(None)
137 }
138
139 async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
140 self.block_thread();
141 Ok(None)
142 }
143
144 async fn get_highest_legacy_block_range(&self) -> StdResult<Option<BlockRange>> {
145 self.block_thread();
146 Ok(None)
147 }
148
149 async fn store_blocks_and_transactions(
150 &self,
151 _: Vec<CardanoBlockWithTransactions>,
152 ) -> StdResult<()> {
153 self.block_thread();
154 Ok(())
155 }
156
157 async fn get_blocks_and_transactions_in_range(
158 &self,
159 _range: Range<BlockNumber>,
160 ) -> StdResult<BTreeSet<CardanoBlockTransactionMkTreeNode>> {
161 self.block_thread();
162 Ok(BTreeSet::new())
163 }
164
165 async fn get_transactions_in_range(
166 &self,
167 _: Range<BlockNumber>,
168 ) -> StdResult<Vec<CardanoTransaction>> {
169 self.block_thread();
170 Ok(vec![])
171 }
172
173 async fn store_block_range_roots(
174 &self,
175 _block_ranges: Vec<(BlockRange, MKTreeNode)>,
176 ) -> StdResult<()> {
177 self.block_thread();
178 Ok(())
179 }
180
181 async fn store_legacy_block_range_roots(
182 &self,
183 _: Vec<(BlockRange, MKTreeNode)>,
184 ) -> StdResult<()> {
185 self.block_thread();
186 Ok(())
187 }
188
189 async fn remove_rolled_chain_data_and_block_range(
190 &self,
191 _: SlotNumber,
192 ) -> StdResult<()> {
193 self.block_thread();
194 Ok(())
195 }
196
197 async fn optimize(&self) -> StdResult<()> {
198 self.block_thread();
199 Ok(())
200 }
201 }
202 }
203}