mithril_aggregator/database/repository/
cardano_transaction_repository.rs1use std::ops::{Deref, Range};
2use std::sync::Arc;
3
4use mithril_cardano_node_chain::chain_importer::ChainDataStore;
5use mithril_common::StdResult;
6use mithril_common::crypto_helper::{MKTreeNode, MKTreeStorer};
7use mithril_common::entities::{
8 BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber, TransactionHash,
9};
10use mithril_common::signable_builder::BlockRangeRootRetriever;
11use mithril_persistence::database::repository::CardanoTransactionRepository;
12use mithril_persistence::sqlite::SqliteConnectionPool;
13
14use crate::services::TransactionsRetriever;
15
16pub struct AggregatorCardanoChainDataRepository {
18 inner: CardanoTransactionRepository,
19}
20
21impl AggregatorCardanoChainDataRepository {
22 pub fn new(connection_pool: Arc<SqliteConnectionPool>) -> Self {
24 Self {
25 inner: CardanoTransactionRepository::new(connection_pool),
26 }
27 }
28}
29
30impl Deref for AggregatorCardanoChainDataRepository {
31 type Target = CardanoTransactionRepository;
32
33 fn deref(&self) -> &Self::Target {
34 &self.inner
35 }
36}
37
38#[async_trait::async_trait]
39impl ChainDataStore for AggregatorCardanoChainDataRepository {
40 async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
41 self.inner.get_transaction_highest_chain_point().await
42 }
43
44 async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
45 let record = self.inner.retrieve_highest_block_range_root().await?;
46 Ok(record.map(|record| record.range))
47 }
48
49 async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()> {
50 self.inner.store_transactions(transactions).await
51 }
52
53 async fn get_transactions_in_range(
54 &self,
55 range: Range<BlockNumber>,
56 ) -> StdResult<Vec<CardanoTransaction>> {
57 self.get_transactions_in_range_blocks(range).await.map(|v| {
58 v.into_iter()
59 .map(|record| record.into())
60 .collect::<Vec<CardanoTransaction>>()
61 })
62 }
63
64 async fn store_block_range_roots(
65 &self,
66 block_ranges: Vec<(BlockRange, MKTreeNode)>,
67 ) -> StdResult<()> {
68 if !block_ranges.is_empty() {
69 self.inner.create_block_range_roots(block_ranges).await?;
70 }
71 Ok(())
72 }
73
74 async fn remove_rolled_back_transactions_and_block_range(
75 &self,
76 slot_number: SlotNumber,
77 ) -> StdResult<()> {
78 self.inner
79 .remove_rolled_back_transactions_and_block_range_by_slot_number(slot_number)
80 .await
81 }
82}
83
84#[async_trait::async_trait]
85impl<S: MKTreeStorer> BlockRangeRootRetriever<S> for AggregatorCardanoChainDataRepository {
86 async fn retrieve_block_range_roots<'a>(
87 &'a self,
88 up_to_beacon: BlockNumber,
89 ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + 'a>> {
90 self.inner.retrieve_block_range_roots_up_to(up_to_beacon).await
91 }
92}
93
94#[async_trait::async_trait]
95impl TransactionsRetriever for AggregatorCardanoChainDataRepository {
96 async fn get_by_hashes(
97 &self,
98 hashes: Vec<TransactionHash>,
99 up_to: BlockNumber,
100 ) -> StdResult<Vec<CardanoTransaction>> {
101 self.inner.get_transaction_by_hashes(hashes, up_to).await.map(|v| {
102 v.into_iter()
103 .map(|record| record.into())
104 .collect::<Vec<CardanoTransaction>>()
105 })
106 }
107
108 async fn get_by_block_ranges(
109 &self,
110 block_ranges: Vec<BlockRange>,
111 ) -> StdResult<Vec<CardanoTransaction>> {
112 self.inner
113 .get_transaction_by_block_ranges(block_ranges)
114 .await
115 .map(|v| {
116 v.into_iter()
117 .map(|record| record.into())
118 .collect::<Vec<CardanoTransaction>>()
119 })
120 }
121}
122
123#[async_trait::async_trait]
124impl TransactionsRetriever for CardanoTransactionRepository {
125 async fn get_by_hashes(
126 &self,
127 hashes: Vec<TransactionHash>,
128 up_to: BlockNumber,
129 ) -> StdResult<Vec<CardanoTransaction>> {
130 self.get_transaction_by_hashes(hashes, up_to).await.map(|v| {
131 v.into_iter()
132 .map(|record| record.into())
133 .collect::<Vec<CardanoTransaction>>()
134 })
135 }
136
137 async fn get_by_block_ranges(
138 &self,
139 block_ranges: Vec<BlockRange>,
140 ) -> StdResult<Vec<CardanoTransaction>> {
141 self.get_transaction_by_block_ranges(block_ranges).await.map(|v| {
142 v.into_iter()
143 .map(|record| record.into())
144 .collect::<Vec<CardanoTransaction>>()
145 })
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use mithril_cardano_node_chain::chain_importer::{CardanoChainDataImporter, ChainDataImporter};
152 use mithril_cardano_node_chain::entities::ScannedBlock;
153 use mithril_cardano_node_chain::test::double::DumbBlockScanner;
154
155 use crate::database::test_helper::cardano_tx_db_connection;
156 use crate::test::TestLogger;
157
158 use super::*;
159
160 fn into_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoTransaction> {
161 blocks.iter().flat_map(|b| b.clone().into_transactions()).collect()
162 }
163
164 #[tokio::test]
165 async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_transactions_in_same_order()
166 {
167 let blocks = vec![
168 ScannedBlock::new(
169 "block_hash-1",
170 BlockNumber(10),
171 SlotNumber(15),
172 vec!["tx_hash-1", "tx_hash-2"],
173 ),
174 ScannedBlock::new(
175 "block_hash-2",
176 BlockNumber(20),
177 SlotNumber(25),
178 vec!["tx_hash-3", "tx_hash-4"],
179 ),
180 ];
181 let up_to_block_number = BlockNumber(1000);
182 let transactions = into_transactions(&blocks);
183
184 let (importer, repository) = {
185 let connection = cardano_tx_db_connection().unwrap();
186 let connection_pool = Arc::new(SqliteConnectionPool::build_from_connection(connection));
187 let repository = Arc::new(AggregatorCardanoChainDataRepository::new(connection_pool));
188 let importer = CardanoChainDataImporter::new(
189 Arc::new(DumbBlockScanner::new().forwards(vec![blocks.clone()])),
190 repository.clone(),
191 TestLogger::stdout(),
192 );
193 (importer, repository)
194 };
195
196 importer
197 .import(up_to_block_number)
198 .await
199 .expect("Transactions Importer should succeed");
200 let cold_imported_transactions = repository.get_all().await.unwrap();
201
202 importer
203 .import(up_to_block_number)
204 .await
205 .expect("Transactions Importer should succeed");
206 let warm_imported_transactions = repository.get_all().await.unwrap();
207
208 assert_eq!(transactions, cold_imported_transactions);
209 assert_eq!(cold_imported_transactions, warm_imported_transactions);
210 }
211}