mithril_aggregator/database/repository/
cardano_transaction_repository.rs

1use std::ops::{Deref, Range};
2use std::sync::Arc;
3
4use mithril_cardano_node_chain::chain_importer::ChainDataStore;
5use mithril_common::StdResult;
6use mithril_common::crypto_helper::{MKTreeNode, MKTreeStorer};
7use mithril_common::entities::{
8    BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber, TransactionHash,
9};
10use mithril_common::signable_builder::BlockRangeRootRetriever;
11use mithril_persistence::database::repository::CardanoTransactionRepository;
12use mithril_persistence::sqlite::SqliteConnectionPool;
13
14use crate::services::TransactionsRetriever;
15
16/// Wrapper around [CardanoTransactionRepository] to allow traits implementations
17pub struct AggregatorCardanoChainDataRepository {
18    inner: CardanoTransactionRepository,
19}
20
21impl AggregatorCardanoChainDataRepository {
22    /// Instantiate a new `AggregatorCardanoChainDataRepository`
23    pub fn new(connection_pool: Arc<SqliteConnectionPool>) -> Self {
24        Self {
25            inner: CardanoTransactionRepository::new(connection_pool),
26        }
27    }
28}
29
30impl Deref for AggregatorCardanoChainDataRepository {
31    type Target = CardanoTransactionRepository;
32
33    fn deref(&self) -> &Self::Target {
34        &self.inner
35    }
36}
37
38#[async_trait::async_trait]
39impl ChainDataStore for AggregatorCardanoChainDataRepository {
40    async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
41        self.inner.get_transaction_highest_chain_point().await
42    }
43
44    async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
45        let record = self.inner.retrieve_highest_block_range_root().await?;
46        Ok(record.map(|record| record.range))
47    }
48
49    async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()> {
50        self.inner.store_transactions(transactions).await
51    }
52
53    async fn get_transactions_in_range(
54        &self,
55        range: Range<BlockNumber>,
56    ) -> StdResult<Vec<CardanoTransaction>> {
57        self.get_transactions_in_range_blocks(range).await.map(|v| {
58            v.into_iter()
59                .map(|record| record.into())
60                .collect::<Vec<CardanoTransaction>>()
61        })
62    }
63
64    async fn store_block_range_roots(
65        &self,
66        block_ranges: Vec<(BlockRange, MKTreeNode)>,
67    ) -> StdResult<()> {
68        if !block_ranges.is_empty() {
69            self.inner.create_block_range_roots(block_ranges).await?;
70        }
71        Ok(())
72    }
73
74    async fn remove_rolled_back_transactions_and_block_range(
75        &self,
76        slot_number: SlotNumber,
77    ) -> StdResult<()> {
78        self.inner
79            .remove_rolled_back_transactions_and_block_range_by_slot_number(slot_number)
80            .await
81    }
82}
83
84#[async_trait::async_trait]
85impl<S: MKTreeStorer> BlockRangeRootRetriever<S> for AggregatorCardanoChainDataRepository {
86    async fn retrieve_block_range_roots<'a>(
87        &'a self,
88        up_to_beacon: BlockNumber,
89    ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + 'a>> {
90        self.inner.retrieve_block_range_roots_up_to(up_to_beacon).await
91    }
92}
93
94#[async_trait::async_trait]
95impl TransactionsRetriever for AggregatorCardanoChainDataRepository {
96    async fn get_by_hashes(
97        &self,
98        hashes: Vec<TransactionHash>,
99        up_to: BlockNumber,
100    ) -> StdResult<Vec<CardanoTransaction>> {
101        self.inner.get_transaction_by_hashes(hashes, up_to).await.map(|v| {
102            v.into_iter()
103                .map(|record| record.into())
104                .collect::<Vec<CardanoTransaction>>()
105        })
106    }
107
108    async fn get_by_block_ranges(
109        &self,
110        block_ranges: Vec<BlockRange>,
111    ) -> StdResult<Vec<CardanoTransaction>> {
112        self.inner
113            .get_transaction_by_block_ranges(block_ranges)
114            .await
115            .map(|v| {
116                v.into_iter()
117                    .map(|record| record.into())
118                    .collect::<Vec<CardanoTransaction>>()
119            })
120    }
121}
122
123#[async_trait::async_trait]
124impl TransactionsRetriever for CardanoTransactionRepository {
125    async fn get_by_hashes(
126        &self,
127        hashes: Vec<TransactionHash>,
128        up_to: BlockNumber,
129    ) -> StdResult<Vec<CardanoTransaction>> {
130        self.get_transaction_by_hashes(hashes, up_to).await.map(|v| {
131            v.into_iter()
132                .map(|record| record.into())
133                .collect::<Vec<CardanoTransaction>>()
134        })
135    }
136
137    async fn get_by_block_ranges(
138        &self,
139        block_ranges: Vec<BlockRange>,
140    ) -> StdResult<Vec<CardanoTransaction>> {
141        self.get_transaction_by_block_ranges(block_ranges).await.map(|v| {
142            v.into_iter()
143                .map(|record| record.into())
144                .collect::<Vec<CardanoTransaction>>()
145        })
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use mithril_cardano_node_chain::chain_importer::{CardanoChainDataImporter, ChainDataImporter};
152    use mithril_cardano_node_chain::entities::ScannedBlock;
153    use mithril_cardano_node_chain::test::double::DumbBlockScanner;
154
155    use crate::database::test_helper::cardano_tx_db_connection;
156    use crate::test::TestLogger;
157
158    use super::*;
159
160    fn into_transactions(blocks: &[ScannedBlock]) -> Vec<CardanoTransaction> {
161        blocks.iter().flat_map(|b| b.clone().into_transactions()).collect()
162    }
163
164    #[tokio::test]
165    async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_transactions_in_same_order()
166     {
167        let blocks = vec![
168            ScannedBlock::new(
169                "block_hash-1",
170                BlockNumber(10),
171                SlotNumber(15),
172                vec!["tx_hash-1", "tx_hash-2"],
173            ),
174            ScannedBlock::new(
175                "block_hash-2",
176                BlockNumber(20),
177                SlotNumber(25),
178                vec!["tx_hash-3", "tx_hash-4"],
179            ),
180        ];
181        let up_to_block_number = BlockNumber(1000);
182        let transactions = into_transactions(&blocks);
183
184        let (importer, repository) = {
185            let connection = cardano_tx_db_connection().unwrap();
186            let connection_pool = Arc::new(SqliteConnectionPool::build_from_connection(connection));
187            let repository = Arc::new(AggregatorCardanoChainDataRepository::new(connection_pool));
188            let importer = CardanoChainDataImporter::new(
189                Arc::new(DumbBlockScanner::new().forwards(vec![blocks.clone()])),
190                repository.clone(),
191                TestLogger::stdout(),
192            );
193            (importer, repository)
194        };
195
196        importer
197            .import(up_to_block_number)
198            .await
199            .expect("Transactions Importer should succeed");
200        let cold_imported_transactions = repository.get_all().await.unwrap();
201
202        importer
203            .import(up_to_block_number)
204            .await
205            .expect("Transactions Importer should succeed");
206        let warm_imported_transactions = repository.get_all().await.unwrap();
207
208        assert_eq!(transactions, cold_imported_transactions);
209        assert_eq!(cold_imported_transactions, warm_imported_transactions);
210    }
211}