mithril_aggregator/database/repository/
cardano_transaction_repository.rs1use std::collections::BTreeSet;
2use std::ops::{Deref, Range};
3use std::sync::Arc;
4
5use mithril_cardano_node_chain::chain_importer::ChainDataStore;
6use mithril_common::StdResult;
7use mithril_common::crypto_helper::{MKTreeNode, MKTreeStorer};
8use mithril_common::entities::{
9 BlockNumber, BlockRange, CardanoBlockTransactionMkTreeNode, CardanoBlockWithTransactions,
10 CardanoTransaction, ChainPoint, SlotNumber, TransactionHash,
11};
12use mithril_common::signable_builder::{BlockRangeRootRetriever, LegacyBlockRangeRootRetriever};
13use mithril_persistence::database::repository::CardanoTransactionRepository;
14use mithril_persistence::sqlite::SqliteConnectionPool;
15
16use crate::services::TransactionsRetriever;
17
18pub struct AggregatorCardanoChainDataRepository {
20 inner: CardanoTransactionRepository,
21}
22
23impl AggregatorCardanoChainDataRepository {
24 pub fn new(connection_pool: Arc<SqliteConnectionPool>) -> Self {
26 Self {
27 inner: CardanoTransactionRepository::new(connection_pool),
28 }
29 }
30}
31
32impl Deref for AggregatorCardanoChainDataRepository {
33 type Target = CardanoTransactionRepository;
34
35 fn deref(&self) -> &Self::Target {
36 &self.inner
37 }
38}
39
40#[async_trait::async_trait]
41impl ChainDataStore for AggregatorCardanoChainDataRepository {
42 async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
43 self.inner.get_transaction_highest_chain_point().await
44 }
45
46 async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
47 let record = self.inner.retrieve_highest_block_range_root().await?;
48 Ok(record.map(|record| record.range))
49 }
50
51 async fn get_highest_legacy_block_range(&self) -> StdResult<Option<BlockRange>> {
52 let record = self.inner.retrieve_highest_legacy_block_range_root().await?;
53 Ok(record.map(|record| record.range))
54 }
55
56 async fn store_blocks_and_transactions(
57 &self,
58 block_with_transactions: Vec<CardanoBlockWithTransactions>,
59 ) -> StdResult<()> {
60 self.inner
61 .store_blocks_and_transactions(block_with_transactions)
62 .await
63 }
64
65 async fn get_blocks_and_transactions_in_range(
66 &self,
67 range: Range<BlockNumber>,
68 ) -> StdResult<BTreeSet<CardanoBlockTransactionMkTreeNode>> {
69 let records = self.inner.get_blocks_with_transactions_in_range_blocks(range).await?;
70 Ok(records.into_iter().flat_map(|b| b.into_mk_tree_nodes()).collect())
71 }
72
73 async fn get_transactions_in_range(
74 &self,
75 range: Range<BlockNumber>,
76 ) -> StdResult<Vec<CardanoTransaction>> {
77 self.get_transactions_in_range_blocks(range).await.map(|v| {
78 v.into_iter()
79 .map(|record| record.into())
80 .collect::<Vec<CardanoTransaction>>()
81 })
82 }
83
84 async fn store_block_range_roots(
85 &self,
86 block_ranges: Vec<(BlockRange, MKTreeNode)>,
87 ) -> StdResult<()> {
88 if !block_ranges.is_empty() {
89 self.inner.create_block_range_roots(block_ranges).await?;
90 }
91 Ok(())
92 }
93
94 async fn store_legacy_block_range_roots(
95 &self,
96 block_ranges: Vec<(BlockRange, MKTreeNode)>,
97 ) -> StdResult<()> {
98 if !block_ranges.is_empty() {
99 self.inner.create_legacy_block_range_roots(block_ranges).await?;
100 }
101 Ok(())
102 }
103
104 async fn remove_rolled_chain_data_and_block_range(
105 &self,
106 slot_number: SlotNumber,
107 ) -> StdResult<()> {
108 self.inner
109 .remove_rolled_back_blocks_transactions_and_block_range_by_slot_number(slot_number)
110 .await
111 }
112}
113
114#[async_trait::async_trait]
115impl<S: MKTreeStorer> LegacyBlockRangeRootRetriever<S> for AggregatorCardanoChainDataRepository {
116 async fn retrieve_block_range_roots<'a>(
117 &'a self,
118 up_to_beacon: BlockNumber,
119 ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + 'a>> {
120 self.inner.retrieve_legacy_block_range_roots_up_to(up_to_beacon).await
121 }
122}
123
124#[async_trait::async_trait]
125impl<S: MKTreeStorer> BlockRangeRootRetriever<S> for AggregatorCardanoChainDataRepository {
126 async fn retrieve_block_range_roots<'a>(
127 &'a self,
128 up_to_beacon: BlockNumber,
129 ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + 'a>> {
130 self.inner.retrieve_block_range_roots_up_to(up_to_beacon).await
131 }
132}
133
134#[async_trait::async_trait]
135impl TransactionsRetriever for AggregatorCardanoChainDataRepository {
136 async fn get_by_hashes(
137 &self,
138 hashes: Vec<TransactionHash>,
139 up_to: BlockNumber,
140 ) -> StdResult<Vec<CardanoTransaction>> {
141 self.inner.get_transaction_by_hashes(hashes, up_to).await.map(|v| {
142 v.into_iter()
143 .map(|record| record.into())
144 .collect::<Vec<CardanoTransaction>>()
145 })
146 }
147
148 async fn get_by_block_ranges(
149 &self,
150 block_ranges: Vec<BlockRange>,
151 ) -> StdResult<Vec<CardanoTransaction>> {
152 self.inner
153 .get_transaction_by_block_ranges(block_ranges)
154 .await
155 .map(|v| {
156 v.into_iter()
157 .map(|record| record.into())
158 .collect::<Vec<CardanoTransaction>>()
159 })
160 }
161}
162
163#[async_trait::async_trait]
164impl TransactionsRetriever for CardanoTransactionRepository {
165 async fn get_by_hashes(
166 &self,
167 hashes: Vec<TransactionHash>,
168 up_to: BlockNumber,
169 ) -> StdResult<Vec<CardanoTransaction>> {
170 self.get_transaction_by_hashes(hashes, up_to).await.map(|v| {
171 v.into_iter()
172 .map(|record| record.into())
173 .collect::<Vec<CardanoTransaction>>()
174 })
175 }
176
177 async fn get_by_block_ranges(
178 &self,
179 block_ranges: Vec<BlockRange>,
180 ) -> StdResult<Vec<CardanoTransaction>> {
181 self.get_transaction_by_block_ranges(block_ranges).await.map(|v| {
182 v.into_iter()
183 .map(|record| record.into())
184 .collect::<Vec<CardanoTransaction>>()
185 })
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use mithril_cardano_node_chain::chain_importer::{CardanoChainDataImporter, ChainDataImporter};
192 use mithril_cardano_node_chain::entities::ScannedBlock;
193 use mithril_cardano_node_chain::test::double::DumbBlockScanner;
194
195 use crate::database::test_helper::cardano_tx_db_connection;
196 use crate::test::TestLogger;
197
198 use super::*;
199
200 fn into_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoTransaction> {
201 blocks.iter().flat_map(|b| b.clone().into_transactions()).collect()
202 }
203
204 #[tokio::test]
205 async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_transactions_in_same_order()
206 {
207 let blocks = vec![
208 ScannedBlock::new(
209 "block_hash-1",
210 BlockNumber(10),
211 SlotNumber(15),
212 vec!["tx_hash-1", "tx_hash-2"],
213 ),
214 ScannedBlock::new(
215 "block_hash-2",
216 BlockNumber(20),
217 SlotNumber(25),
218 vec!["tx_hash-3", "tx_hash-4"],
219 ),
220 ];
221 let up_to_block_number = BlockNumber(1000);
222 let transactions = into_transactions(&blocks);
223
224 let (importer, repository) = {
225 let connection = cardano_tx_db_connection().unwrap();
226 let connection_pool = Arc::new(SqliteConnectionPool::build_from_connection(connection));
227 let repository = Arc::new(AggregatorCardanoChainDataRepository::new(connection_pool));
228 let importer = CardanoChainDataImporter::new(
229 Arc::new(DumbBlockScanner::new().forwards(vec![blocks.clone()])),
230 repository.clone(),
231 TestLogger::stdout(),
232 );
233 (importer, repository)
234 };
235
236 importer
237 .import(up_to_block_number)
238 .await
239 .expect("Transactions Importer should succeed");
240 let cold_imported_transactions = repository.get_all().await.unwrap();
241
242 importer
243 .import(up_to_block_number)
244 .await
245 .expect("Transactions Importer should succeed");
246 let warm_imported_transactions = repository.get_all().await.unwrap();
247
248 assert_eq!(transactions, cold_imported_transactions);
249 assert_eq!(cold_imported_transactions, warm_imported_transactions);
250 }
251}