mithril_aggregator/database/repository/
cardano_transaction_repository.rs

1use std::collections::BTreeSet;
2use std::ops::{Deref, Range};
3use std::sync::Arc;
4
5use mithril_cardano_node_chain::chain_importer::ChainDataStore;
6use mithril_common::StdResult;
7use mithril_common::crypto_helper::{MKTreeNode, MKTreeStorer};
8use mithril_common::entities::{
9    BlockNumber, BlockRange, CardanoBlockTransactionMkTreeNode, CardanoBlockWithTransactions,
10    CardanoTransaction, ChainPoint, SlotNumber, TransactionHash,
11};
12use mithril_common::signable_builder::{BlockRangeRootRetriever, LegacyBlockRangeRootRetriever};
13use mithril_persistence::database::repository::CardanoTransactionRepository;
14use mithril_persistence::sqlite::SqliteConnectionPool;
15
16use crate::services::TransactionsRetriever;
17
18/// Wrapper around [CardanoTransactionRepository] to allow traits implementations
19pub struct AggregatorCardanoChainDataRepository {
20    inner: CardanoTransactionRepository,
21}
22
23impl AggregatorCardanoChainDataRepository {
24    /// Instantiate a new `AggregatorCardanoChainDataRepository`
25    pub fn new(connection_pool: Arc<SqliteConnectionPool>) -> Self {
26        Self {
27            inner: CardanoTransactionRepository::new(connection_pool),
28        }
29    }
30}
31
32impl Deref for AggregatorCardanoChainDataRepository {
33    type Target = CardanoTransactionRepository;
34
35    fn deref(&self) -> &Self::Target {
36        &self.inner
37    }
38}
39
40#[async_trait::async_trait]
41impl ChainDataStore for AggregatorCardanoChainDataRepository {
42    async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
43        self.inner.get_transaction_highest_chain_point().await
44    }
45
46    async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
47        let record = self.inner.retrieve_highest_block_range_root().await?;
48        Ok(record.map(|record| record.range))
49    }
50
51    async fn get_highest_legacy_block_range(&self) -> StdResult<Option<BlockRange>> {
52        let record = self.inner.retrieve_highest_legacy_block_range_root().await?;
53        Ok(record.map(|record| record.range))
54    }
55
56    async fn store_blocks_and_transactions(
57        &self,
58        block_with_transactions: Vec<CardanoBlockWithTransactions>,
59    ) -> StdResult<()> {
60        self.inner
61            .store_blocks_and_transactions(block_with_transactions)
62            .await
63    }
64
65    async fn get_blocks_and_transactions_in_range(
66        &self,
67        range: Range<BlockNumber>,
68    ) -> StdResult<BTreeSet<CardanoBlockTransactionMkTreeNode>> {
69        let records = self.inner.get_blocks_with_transactions_in_range_blocks(range).await?;
70        Ok(records.into_iter().flat_map(|b| b.into_mk_tree_nodes()).collect())
71    }
72
73    async fn get_transactions_in_range(
74        &self,
75        range: Range<BlockNumber>,
76    ) -> StdResult<Vec<CardanoTransaction>> {
77        self.get_transactions_in_range_blocks(range).await.map(|v| {
78            v.into_iter()
79                .map(|record| record.into())
80                .collect::<Vec<CardanoTransaction>>()
81        })
82    }
83
84    async fn store_block_range_roots(
85        &self,
86        block_ranges: Vec<(BlockRange, MKTreeNode)>,
87    ) -> StdResult<()> {
88        if !block_ranges.is_empty() {
89            self.inner.create_block_range_roots(block_ranges).await?;
90        }
91        Ok(())
92    }
93
94    async fn store_legacy_block_range_roots(
95        &self,
96        block_ranges: Vec<(BlockRange, MKTreeNode)>,
97    ) -> StdResult<()> {
98        if !block_ranges.is_empty() {
99            self.inner.create_legacy_block_range_roots(block_ranges).await?;
100        }
101        Ok(())
102    }
103
104    async fn remove_rolled_chain_data_and_block_range(
105        &self,
106        slot_number: SlotNumber,
107    ) -> StdResult<()> {
108        self.inner
109            .remove_rolled_back_blocks_transactions_and_block_range_by_slot_number(slot_number)
110            .await
111    }
112}
113
114#[async_trait::async_trait]
115impl<S: MKTreeStorer> LegacyBlockRangeRootRetriever<S> for AggregatorCardanoChainDataRepository {
116    async fn retrieve_block_range_roots<'a>(
117        &'a self,
118        up_to_beacon: BlockNumber,
119    ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + 'a>> {
120        self.inner.retrieve_legacy_block_range_roots_up_to(up_to_beacon).await
121    }
122}
123
124#[async_trait::async_trait]
125impl<S: MKTreeStorer> BlockRangeRootRetriever<S> for AggregatorCardanoChainDataRepository {
126    async fn retrieve_block_range_roots<'a>(
127        &'a self,
128        up_to_beacon: BlockNumber,
129    ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + 'a>> {
130        self.inner.retrieve_block_range_roots_up_to(up_to_beacon).await
131    }
132}
133
134#[async_trait::async_trait]
135impl TransactionsRetriever for AggregatorCardanoChainDataRepository {
136    async fn get_by_hashes(
137        &self,
138        hashes: Vec<TransactionHash>,
139        up_to: BlockNumber,
140    ) -> StdResult<Vec<CardanoTransaction>> {
141        self.inner.get_transaction_by_hashes(hashes, up_to).await.map(|v| {
142            v.into_iter()
143                .map(|record| record.into())
144                .collect::<Vec<CardanoTransaction>>()
145        })
146    }
147
148    async fn get_by_block_ranges(
149        &self,
150        block_ranges: Vec<BlockRange>,
151    ) -> StdResult<Vec<CardanoTransaction>> {
152        self.inner
153            .get_transaction_by_block_ranges(block_ranges)
154            .await
155            .map(|v| {
156                v.into_iter()
157                    .map(|record| record.into())
158                    .collect::<Vec<CardanoTransaction>>()
159            })
160    }
161}
162
163#[async_trait::async_trait]
164impl TransactionsRetriever for CardanoTransactionRepository {
165    async fn get_by_hashes(
166        &self,
167        hashes: Vec<TransactionHash>,
168        up_to: BlockNumber,
169    ) -> StdResult<Vec<CardanoTransaction>> {
170        self.get_transaction_by_hashes(hashes, up_to).await.map(|v| {
171            v.into_iter()
172                .map(|record| record.into())
173                .collect::<Vec<CardanoTransaction>>()
174        })
175    }
176
177    async fn get_by_block_ranges(
178        &self,
179        block_ranges: Vec<BlockRange>,
180    ) -> StdResult<Vec<CardanoTransaction>> {
181        self.get_transaction_by_block_ranges(block_ranges).await.map(|v| {
182            v.into_iter()
183                .map(|record| record.into())
184                .collect::<Vec<CardanoTransaction>>()
185        })
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use mithril_cardano_node_chain::chain_importer::{CardanoChainDataImporter, ChainDataImporter};
192    use mithril_cardano_node_chain::entities::ScannedBlock;
193    use mithril_cardano_node_chain::test::double::DumbBlockScanner;
194
195    use crate::database::test_helper::cardano_tx_db_connection;
196    use crate::test::TestLogger;
197
198    use super::*;
199
200    fn into_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoTransaction> {
201        blocks.iter().flat_map(|b| b.clone().into_transactions()).collect()
202    }
203
204    #[tokio::test]
205    async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_transactions_in_same_order()
206     {
207        let blocks = vec![
208            ScannedBlock::new(
209                "block_hash-1",
210                BlockNumber(10),
211                SlotNumber(15),
212                vec!["tx_hash-1", "tx_hash-2"],
213            ),
214            ScannedBlock::new(
215                "block_hash-2",
216                BlockNumber(20),
217                SlotNumber(25),
218                vec!["tx_hash-3", "tx_hash-4"],
219            ),
220        ];
221        let up_to_block_number = BlockNumber(1000);
222        let transactions = into_transactions(&blocks);
223
224        let (importer, repository) = {
225            let connection = cardano_tx_db_connection().unwrap();
226            let connection_pool = Arc::new(SqliteConnectionPool::build_from_connection(connection));
227            let repository = Arc::new(AggregatorCardanoChainDataRepository::new(connection_pool));
228            let importer = CardanoChainDataImporter::new(
229                Arc::new(DumbBlockScanner::new().forwards(vec![blocks.clone()])),
230                repository.clone(),
231                TestLogger::stdout(),
232            );
233            (importer, repository)
234        };
235
236        importer
237            .import(up_to_block_number)
238            .await
239            .expect("Transactions Importer should succeed");
240        let cold_imported_transactions = repository.get_all().await.unwrap();
241
242        importer
243            .import(up_to_block_number)
244            .await
245            .expect("Transactions Importer should succeed");
246        let warm_imported_transactions = repository.get_all().await.unwrap();
247
248        assert_eq!(transactions, cold_imported_transactions);
249        assert_eq!(cold_imported_transactions, warm_imported_transactions);
250    }
251}