1use std::ops::Range;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_trait::async_trait;
6
7use mithril_common::StdResult;
8use mithril_common::crypto_helper::{MKTreeNode, MKTreeStorer};
9use mithril_common::entities::{
10 BlockHash, BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber, TransactionHash,
11};
12use mithril_common::signable_builder::BlockRangeRootRetriever;
13
14use crate::database::query::{
15 DeleteBlockRangeRootQuery, DeleteCardanoTransactionQuery, GetBlockRangeRootQuery,
16 GetCardanoTransactionQuery, InsertBlockRangeRootQuery, InsertCardanoTransactionQuery,
17};
18use crate::database::record::{BlockRangeRootRecord, CardanoTransactionRecord};
19use crate::sqlite::{ConnectionExtensions, SqliteConnection, SqliteConnectionPool};
20
21pub struct CardanoTransactionRepository {
26 connection_pool: Arc<SqliteConnectionPool>,
27}
28
29impl CardanoTransactionRepository {
30 pub fn new(connection_pool: Arc<SqliteConnectionPool>) -> Self {
32 Self { connection_pool }
33 }
34
35 pub async fn get_all_transactions(&self) -> StdResult<Vec<CardanoTransactionRecord>> {
37 self.connection_pool
38 .connection()?
39 .fetch_collect(GetCardanoTransactionQuery::all())
40 }
41
42 pub async fn get_transactions_in_range_blocks(
45 &self,
46 range: Range<BlockNumber>,
47 ) -> StdResult<Vec<CardanoTransactionRecord>> {
48 self.connection_pool
49 .connection()?
50 .fetch_collect(GetCardanoTransactionQuery::between_blocks(range))
51 }
52
53 pub async fn get_transaction<T: Into<TransactionHash>>(
55 &self,
56 transaction_hash: T,
57 ) -> StdResult<Option<CardanoTransactionRecord>> {
58 self.connection_pool.connection()?.fetch_first(
59 GetCardanoTransactionQuery::by_transaction_hash(&transaction_hash.into()),
60 )
61 }
62
63 pub async fn create_transaction<T: Into<TransactionHash>, U: Into<BlockHash>>(
65 &self,
66 transaction_hash: T,
67 block_number: BlockNumber,
68 slot_number: SlotNumber,
69 block_hash: U,
70 ) -> StdResult<Option<CardanoTransactionRecord>> {
71 let query = InsertCardanoTransactionQuery::insert_one(&CardanoTransactionRecord {
72 transaction_hash: transaction_hash.into(),
73 block_number,
74 slot_number,
75 block_hash: block_hash.into(),
76 })?;
77
78 self.connection_pool.connection()?.fetch_first(query)
79 }
80
81 pub async fn create_transactions<T: Into<CardanoTransactionRecord>>(
83 &self,
84 transactions: Vec<T>,
85 ) -> StdResult<Vec<CardanoTransactionRecord>> {
86 let connection = self.connection_pool.connection()?;
87
88 self.create_transactions_with_connection(transactions, &connection)
89 .await
90 }
91
92 async fn create_transactions_with_connection<T: Into<CardanoTransactionRecord>>(
94 &self,
95 transactions: Vec<T>,
96 connection: &SqliteConnection,
97 ) -> StdResult<Vec<CardanoTransactionRecord>> {
98 let records: Vec<CardanoTransactionRecord> =
99 transactions.into_iter().map(|tx| tx.into()).collect();
100
101 connection.fetch_collect(InsertCardanoTransactionQuery::insert_many(records)?)
102 }
103
104 pub async fn create_block_range_roots<T: Into<BlockRangeRootRecord>>(
106 &self,
107 block_ranges: Vec<T>,
108 ) -> StdResult<Vec<BlockRangeRootRecord>> {
109 let records: Vec<BlockRangeRootRecord> =
110 block_ranges.into_iter().map(|tx| tx.into()).collect();
111 let connection = self.connection_pool.connection()?;
112
113 connection.fetch_collect(InsertBlockRangeRootQuery::insert_many(records)?)
114 }
115
116 pub async fn get_transaction_highest_chain_point(&self) -> StdResult<Option<ChainPoint>> {
118 let first_transaction_with_highest_block_number = self
119 .connection_pool
120 .connection()?
121 .fetch_first(GetCardanoTransactionQuery::with_highest_block_number())?;
122
123 Ok(first_transaction_with_highest_block_number.map(|record| {
124 ChainPoint::new(record.slot_number, record.block_number, record.block_hash)
125 }))
126 }
127
128 pub async fn get_highest_start_block_number_for_block_range_roots(
130 &self,
131 ) -> StdResult<Option<BlockNumber>> {
132 let highest: Option<i64> = self
133 .connection_pool
134 .connection()?
135 .query_single_cell("select max(start) as highest from block_range_root;", &[])?;
136 highest
137 .map(u64::try_from)
138 .transpose()
139 .map(|num| num.map(BlockNumber))
140 .with_context(||
141 format!("Integer field max(start) (value={highest:?}) is incompatible with u64 representation.")
142 )
143 }
144
145 pub async fn retrieve_block_range_roots_up_to(
148 &self,
149 block_number: BlockNumber,
150 ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + '_>> {
151 let block_range_roots = self
152 .connection_pool
153 .connection()?
154 .fetch(GetBlockRangeRootQuery::contains_or_below_block_number(
155 block_number,
156 ))?
157 .map(|record| -> (BlockRange, MKTreeNode) { record.into() })
158 .collect::<Vec<_>>(); Ok(Box::new(block_range_roots.into_iter()))
161 }
162
163 pub async fn retrieve_highest_block_range_root(
165 &self,
166 ) -> StdResult<Option<BlockRangeRootRecord>> {
167 self.connection_pool
168 .connection()?
169 .fetch_first(GetBlockRangeRootQuery::highest())
170 }
171
172 pub async fn get_all(&self) -> StdResult<Vec<CardanoTransaction>> {
174 let records = self
175 .connection_pool
176 .connection()?
177 .fetch(GetCardanoTransactionQuery::all())?
178 .map(|record| record.into())
179 .collect();
180
181 Ok(records)
182 }
183
184 pub fn get_all_block_range_root(&self) -> StdResult<Vec<BlockRangeRootRecord>> {
186 self.connection_pool
187 .connection()?
188 .fetch_collect(GetBlockRangeRootQuery::all())
189 }
190
191 pub async fn store_transactions<T: Into<CardanoTransactionRecord> + Clone>(
195 &self,
196 transactions: Vec<T>,
197 ) -> StdResult<()> {
198 const DB_TRANSACTION_SIZE: usize = 100000;
199 for transactions_in_db_transaction_chunk in transactions.chunks(DB_TRANSACTION_SIZE) {
200 let connection = self.connection_pool.connection()?;
201 let transaction = connection.begin_transaction()?;
202
203 for transactions_in_chunk in transactions_in_db_transaction_chunk.chunks(100) {
205 self.create_transactions_with_connection(
206 transactions_in_chunk.to_vec(),
207 &connection,
208 )
209 .await
210 .with_context(|| "CardanoTransactionRepository can not store transactions")?;
211 }
212
213 transaction.commit()?;
214 }
215 Ok(())
216 }
217
218 pub async fn get_closest_block_number_above_slot_number(
220 &self,
221 slot_number: SlotNumber,
222 ) -> StdResult<Option<BlockNumber>> {
223 let query =
224 GetCardanoTransactionQuery::with_highest_block_number_below_slot_number(slot_number);
225 let record = self.connection_pool.connection()?.fetch_first(query)?;
226
227 Ok(record.map(|r| r.block_number))
228 }
229
230 pub async fn get_transaction_by_hashes<T: Into<TransactionHash>>(
232 &self,
233 hashes: Vec<T>,
234 up_to: BlockNumber,
235 ) -> StdResult<Vec<CardanoTransactionRecord>> {
236 let query = GetCardanoTransactionQuery::by_transaction_hashes(
237 hashes.into_iter().map(Into::into).collect(),
238 up_to,
239 );
240 self.connection_pool.connection()?.fetch_collect(query)
241 }
242
243 pub async fn get_transaction_by_block_ranges(
245 &self,
246 block_ranges: Vec<BlockRange>,
247 ) -> StdResult<Vec<CardanoTransactionRecord>> {
248 let mut transactions = vec![];
249 for block_range in block_ranges {
250 let block_range_transactions: Vec<CardanoTransactionRecord> =
251 self.connection_pool.connection()?.fetch_collect(
252 GetCardanoTransactionQuery::by_block_ranges(vec![block_range]),
253 )?;
254 transactions.extend(block_range_transactions);
255 }
256
257 Ok(transactions)
258 }
259
260 pub async fn prune_transaction(&self, number_of_blocks_to_keep: BlockNumber) -> StdResult<()> {
263 if let Some(highest_block_range_start) =
264 self.get_highest_start_block_number_for_block_range_roots().await?
265 {
266 let threshold = highest_block_range_start - number_of_blocks_to_keep;
267 let query = DeleteCardanoTransactionQuery::below_block_number_threshold(threshold)?;
268
269 let connection = self.connection_pool.connection()?;
270 connection.fetch_first(query)?;
271 }
272
273 Ok(())
274 }
275
276 pub async fn remove_rolled_back_transactions_and_block_range_by_block_number(
281 &self,
282 block_number: BlockNumber,
283 ) -> StdResult<()> {
284 let connection = self.connection_pool.connection()?;
285 let transaction = connection.begin_transaction()?;
286 let query = DeleteCardanoTransactionQuery::above_block_number_threshold(block_number)?;
287 connection.fetch_first(query)?;
288
289 let query =
290 DeleteBlockRangeRootQuery::contains_or_above_block_number_threshold(block_number)?;
291 connection.fetch_first(query)?;
292 transaction.commit()?;
293
294 Ok(())
295 }
296
297 pub async fn remove_rolled_back_transactions_and_block_range_by_slot_number(
302 &self,
303 slot_number: SlotNumber,
304 ) -> StdResult<()> {
305 if let Some(block_number) =
306 self.get_closest_block_number_above_slot_number(slot_number).await?
307 {
308 self.remove_rolled_back_transactions_and_block_range_by_block_number(block_number)
309 .await?;
310 }
311
312 Ok(())
313 }
314}
315
316#[async_trait]
317impl<S: MKTreeStorer> BlockRangeRootRetriever<S> for CardanoTransactionRepository {
318 async fn retrieve_block_range_roots<'a>(
319 &'a self,
320 up_to_beacon: BlockNumber,
321 ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + 'a>> {
322 self.retrieve_block_range_roots_up_to(up_to_beacon).await
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use mithril_common::test_utils::CardanoTransactionsBuilder;
329
330 use crate::database::query::GetBlockRangeRootQuery;
331 use crate::database::test_helper::cardano_tx_db_connection;
332
333 use super::*;
334
335 #[tokio::test]
336 async fn repository_create_and_get_transaction() {
337 let repository = CardanoTransactionRepository::new(Arc::new(
338 SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
339 ));
340
341 repository
342 .create_transactions(vec![
343 CardanoTransaction::new(
344 "tx_hash-123",
345 BlockNumber(10),
346 SlotNumber(50),
347 "block_hash-123",
348 ),
349 CardanoTransaction::new(
350 "tx_hash-456",
351 BlockNumber(11),
352 SlotNumber(51),
353 "block_hash-456",
354 ),
355 ])
356 .await
357 .unwrap();
358
359 {
360 let transaction_result = repository.get_transaction("tx_hash-123").await.unwrap();
361 assert_eq!(
362 Some(CardanoTransactionRecord {
363 transaction_hash: "tx_hash-123".to_string(),
364 block_number: BlockNumber(10),
365 slot_number: SlotNumber(50),
366 block_hash: "block_hash-123".to_string(),
367 }),
368 transaction_result
369 );
370 }
371 {
372 let transaction_result = repository.get_transaction("not-exist").await.unwrap();
373 assert_eq!(None, transaction_result);
374 }
375 }
376
377 #[tokio::test]
378 async fn repository_get_transaction_by_hashes() {
379 let repository = CardanoTransactionRepository::new(Arc::new(
380 SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
381 ));
382
383 repository
384 .create_transactions(vec![
385 CardanoTransactionRecord::new(
386 "tx_hash-123",
387 BlockNumber(10),
388 SlotNumber(50),
389 "block_hash-123",
390 ),
391 CardanoTransactionRecord::new(
392 "tx_hash-456",
393 BlockNumber(11),
394 SlotNumber(51),
395 "block_hash-456",
396 ),
397 CardanoTransactionRecord::new(
398 "tx_hash-789",
399 BlockNumber(12),
400 SlotNumber(52),
401 "block_hash-789",
402 ),
403 CardanoTransactionRecord::new(
404 "tx_hash-000",
405 BlockNumber(101),
406 SlotNumber(100),
407 "block_hash-000",
408 ),
409 ])
410 .await
411 .unwrap();
412
413 {
414 let transactions = repository
415 .get_transaction_by_hashes(vec!["tx_hash-123", "tx_hash-789"], BlockNumber(100))
416 .await
417 .unwrap();
418
419 assert_eq!(
420 vec![
421 CardanoTransactionRecord::new(
422 "tx_hash-123",
423 BlockNumber(10),
424 SlotNumber(50),
425 "block_hash-123"
426 ),
427 CardanoTransactionRecord::new(
428 "tx_hash-789",
429 BlockNumber(12),
430 SlotNumber(52),
431 "block_hash-789"
432 ),
433 ],
434 transactions
435 );
436 }
437 {
438 let transactions = repository
439 .get_transaction_by_hashes(
440 vec!["tx_hash-123", "tx_hash-789", "tx_hash-000"],
441 BlockNumber(100),
442 )
443 .await
444 .unwrap();
445
446 assert_eq!(
447 vec![
448 CardanoTransactionRecord::new(
449 "tx_hash-123",
450 BlockNumber(10),
451 SlotNumber(50),
452 "block_hash-123"
453 ),
454 CardanoTransactionRecord::new(
455 "tx_hash-789",
456 BlockNumber(12),
457 SlotNumber(52),
458 "block_hash-789"
459 ),
460 ],
461 transactions
462 );
463 }
464 {
465 let transactions = repository
466 .get_transaction_by_hashes(
467 vec!["tx_hash-123", "tx_hash-789", "tx_hash-000"],
468 BlockNumber(101),
469 )
470 .await
471 .unwrap();
472
473 assert_eq!(
474 vec![
475 CardanoTransactionRecord::new(
476 "tx_hash-123",
477 BlockNumber(10),
478 SlotNumber(50),
479 "block_hash-123"
480 ),
481 CardanoTransactionRecord::new(
482 "tx_hash-789",
483 BlockNumber(12),
484 SlotNumber(52),
485 "block_hash-789"
486 ),
487 CardanoTransactionRecord::new(
488 "tx_hash-000",
489 BlockNumber(101),
490 SlotNumber(100),
491 "block_hash-000"
492 ),
493 ],
494 transactions
495 );
496 }
497 {
498 let transactions = repository
499 .get_transaction_by_hashes(vec!["not-exist".to_string()], BlockNumber(100))
500 .await
501 .unwrap();
502
503 assert_eq!(Vec::<CardanoTransactionRecord>::new(), transactions);
504 }
505 }
506
507 #[tokio::test]
508 async fn repository_create_ignore_further_transactions_when_exists() {
509 let connection = cardano_tx_db_connection().unwrap();
510 let repository = CardanoTransactionRepository::new(Arc::new(
511 SqliteConnectionPool::build_from_connection(connection),
512 ));
513
514 repository
515 .create_transaction(
516 "tx-hash-123",
517 BlockNumber(10),
518 SlotNumber(50),
519 "block_hash-123",
520 )
521 .await
522 .unwrap();
523 repository
524 .create_transaction(
525 "tx-hash-123",
526 BlockNumber(11),
527 SlotNumber(51),
528 "block_hash-123-bis",
529 )
530 .await
531 .unwrap();
532 let transaction_result = repository.get_transaction("tx-hash-123").await.unwrap();
533
534 assert_eq!(
535 Some(CardanoTransactionRecord {
536 transaction_hash: "tx-hash-123".to_string(),
537 block_number: BlockNumber(10),
538 slot_number: SlotNumber(50),
539 block_hash: "block_hash-123".to_string(),
540 }),
541 transaction_result
542 );
543 }
544
545 #[tokio::test]
546 async fn repository_store_transactions_and_get_stored_transactions() {
547 let connection = cardano_tx_db_connection().unwrap();
548 let repository = CardanoTransactionRepository::new(Arc::new(
549 SqliteConnectionPool::build_from_connection(connection),
550 ));
551
552 let cardano_transactions = vec![
553 CardanoTransaction::new(
554 "tx-hash-123",
555 BlockNumber(10),
556 SlotNumber(50),
557 "block-hash-123",
558 ),
559 CardanoTransaction::new(
560 "tx-hash-456",
561 BlockNumber(11),
562 SlotNumber(51),
563 "block-hash-456",
564 ),
565 ];
566 repository.create_transactions(cardano_transactions).await.unwrap();
567
568 let transaction_result = repository.get_transaction("tx-hash-123").await.unwrap();
569
570 assert_eq!(
571 Some(CardanoTransactionRecord {
572 transaction_hash: "tx-hash-123".to_string(),
573 block_number: BlockNumber(10),
574 slot_number: SlotNumber(50),
575 block_hash: "block-hash-123".to_string(),
576 }),
577 transaction_result
578 );
579
580 let transaction_result = repository.get_transaction("tx-hash-456").await.unwrap();
581
582 assert_eq!(
583 Some(CardanoTransactionRecord {
584 transaction_hash: "tx-hash-456".to_string(),
585 block_number: BlockNumber(11),
586 slot_number: SlotNumber(51),
587 block_hash: "block-hash-456".to_string(),
588 }),
589 transaction_result
590 );
591 }
592
593 #[tokio::test]
594 async fn repository_get_all_stored_transactions() {
595 let connection = cardano_tx_db_connection().unwrap();
596 let repository = CardanoTransactionRepository::new(Arc::new(
597 SqliteConnectionPool::build_from_connection(connection),
598 ));
599
600 let cardano_transactions = vec![
601 CardanoTransaction::new(
602 "tx-hash-123",
603 BlockNumber(10),
604 SlotNumber(50),
605 "block-hash-123",
606 ),
607 CardanoTransaction::new(
608 "tx-hash-456",
609 BlockNumber(11),
610 SlotNumber(51),
611 "block-hash-456",
612 ),
613 ];
614 repository
615 .create_transactions(cardano_transactions.clone())
616 .await
617 .unwrap();
618
619 let transactions_result = repository.get_all_transactions().await.unwrap();
620 let transactions_expected: Vec<CardanoTransactionRecord> =
621 cardano_transactions.iter().map(|tx| tx.clone().into()).collect();
622
623 assert_eq!(transactions_expected, transactions_result);
624 }
625
626 #[tokio::test]
627 async fn repository_store_transactions_doesnt_erase_existing_data() {
628 let connection = cardano_tx_db_connection().unwrap();
629 let repository = CardanoTransactionRepository::new(Arc::new(
630 SqliteConnectionPool::build_from_connection(connection),
631 ));
632
633 repository
634 .create_transaction("tx-hash-000", BlockNumber(1), SlotNumber(5), "block-hash")
635 .await
636 .unwrap();
637
638 let cardano_transactions = vec![CardanoTransaction::new(
639 "tx-hash-123",
640 BlockNumber(10),
641 SlotNumber(50),
642 "block-hash-123",
643 )];
644 repository.create_transactions(cardano_transactions).await.unwrap();
645
646 let transaction_result = repository.get_transaction("tx-hash-000").await.unwrap();
647
648 assert_eq!(
649 Some(CardanoTransactionRecord {
650 transaction_hash: "tx-hash-000".to_string(),
651 block_number: BlockNumber(1),
652 slot_number: SlotNumber(5),
653 block_hash: "block-hash".to_string(),
654 }),
655 transaction_result
656 );
657 }
658
659 #[tokio::test]
660 async fn repository_get_transaction_highest_chain_point_without_transactions_in_db() {
661 let connection = cardano_tx_db_connection().unwrap();
662 let repository = CardanoTransactionRepository::new(Arc::new(
663 SqliteConnectionPool::build_from_connection(connection),
664 ));
665
666 let highest_beacon = repository.get_transaction_highest_chain_point().await.unwrap();
667 assert_eq!(None, highest_beacon);
668 }
669
670 #[tokio::test]
671 async fn repository_get_transaction_highest_chain_point_with_transactions_in_db() {
672 let connection = cardano_tx_db_connection().unwrap();
673 let repository = CardanoTransactionRepository::new(Arc::new(
674 SqliteConnectionPool::build_from_connection(connection),
675 ));
676
677 let cardano_transactions = vec![
678 CardanoTransaction::new(
679 "tx-hash-123",
680 BlockNumber(10),
681 SlotNumber(50),
682 "block-hash-10",
683 ),
684 CardanoTransaction::new(
685 "tx-hash-456",
686 BlockNumber(25),
687 SlotNumber(51),
688 "block-hash-25",
689 ),
690 ];
691 repository.create_transactions(cardano_transactions).await.unwrap();
692
693 let highest_beacon = repository.get_transaction_highest_chain_point().await.unwrap();
694 assert_eq!(
695 Some(ChainPoint {
696 slot_number: SlotNumber(51),
697 block_number: BlockNumber(25),
698 block_hash: "block-hash-25".to_string()
699 }),
700 highest_beacon
701 );
702 }
703
704 #[tokio::test]
705 async fn repository_get_transaction_highest_chain_point_with_transactions_with_same_block_number_in_db()
706 {
707 let connection = cardano_tx_db_connection().unwrap();
708 let repository = CardanoTransactionRepository::new(Arc::new(
709 SqliteConnectionPool::build_from_connection(connection),
710 ));
711
712 let cardano_transactions = vec![
713 CardanoTransaction::new(
714 "tx-hash-123",
715 BlockNumber(10),
716 SlotNumber(50),
717 "block-hash-10",
718 ),
719 CardanoTransaction::new(
720 "tx-hash-456",
721 BlockNumber(25),
722 SlotNumber(51),
723 "block-hash-25",
724 ),
725 CardanoTransaction::new(
726 "tx-hash-789",
727 BlockNumber(25),
728 SlotNumber(51),
729 "block-hash-25",
730 ),
731 ];
732 repository.create_transactions(cardano_transactions).await.unwrap();
733
734 let highest_beacon = repository.get_transaction_highest_chain_point().await.unwrap();
735 assert_eq!(
736 Some(ChainPoint {
737 slot_number: SlotNumber(51),
738 block_number: BlockNumber(25),
739 block_hash: "block-hash-25".to_string()
740 }),
741 highest_beacon
742 );
743 }
744
745 #[tokio::test]
746 async fn repository_get_transactions_in_range_blocks() {
747 let connection = cardano_tx_db_connection().unwrap();
748 let repository = CardanoTransactionRepository::new(Arc::new(
749 SqliteConnectionPool::build_from_connection(connection),
750 ));
751
752 let transactions = vec![
753 CardanoTransactionRecord::new(
754 "tx-hash-1",
755 BlockNumber(10),
756 SlotNumber(50),
757 "block-hash-1",
758 ),
759 CardanoTransactionRecord::new(
760 "tx-hash-2",
761 BlockNumber(11),
762 SlotNumber(51),
763 "block-hash-2",
764 ),
765 CardanoTransactionRecord::new(
766 "tx-hash-3",
767 BlockNumber(12),
768 SlotNumber(52),
769 "block-hash-3",
770 ),
771 ];
772 repository.create_transactions(transactions.clone()).await.unwrap();
773
774 {
775 let transaction_result = repository
776 .get_transactions_in_range_blocks(BlockNumber(0)..BlockNumber(10))
777 .await
778 .unwrap();
779 assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
780 }
781 {
782 let transaction_result = repository
783 .get_transactions_in_range_blocks(BlockNumber(13)..BlockNumber(21))
784 .await
785 .unwrap();
786 assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
787 }
788 {
789 let transaction_result = repository
790 .get_transactions_in_range_blocks(BlockNumber(9)..BlockNumber(12))
791 .await
792 .unwrap();
793 assert_eq!(transactions[0..=1].to_vec(), transaction_result);
794 }
795 {
796 let transaction_result = repository
797 .get_transactions_in_range_blocks(BlockNumber(10)..BlockNumber(13))
798 .await
799 .unwrap();
800 assert_eq!(transactions.clone(), transaction_result);
801 }
802 {
803 let transaction_result = repository
804 .get_transactions_in_range_blocks(BlockNumber(11)..BlockNumber(14))
805 .await
806 .unwrap();
807 assert_eq!(transactions[1..=2].to_vec(), transaction_result);
808 }
809 }
810
811 #[tokio::test]
812 async fn repository_get_transactions_by_block_ranges() {
813 let connection = cardano_tx_db_connection().unwrap();
814 let repository = CardanoTransactionRepository::new(Arc::new(
815 SqliteConnectionPool::build_from_connection(connection),
816 ));
817
818 let transactions = vec![
819 CardanoTransactionRecord::new(
820 "tx-hash-1",
821 BlockNumber(10),
822 SlotNumber(50),
823 "block-hash-1",
824 ),
825 CardanoTransactionRecord::new(
826 "tx-hash-2",
827 BlockNumber(11),
828 SlotNumber(51),
829 "block-hash-2",
830 ),
831 CardanoTransactionRecord::new(
832 "tx-hash-3",
833 BlockNumber(20),
834 SlotNumber(52),
835 "block-hash-3",
836 ),
837 CardanoTransactionRecord::new(
838 "tx-hash-4",
839 BlockNumber(31),
840 SlotNumber(53),
841 "block-hash-4",
842 ),
843 CardanoTransactionRecord::new(
844 "tx-hash-5",
845 BlockNumber(35),
846 SlotNumber(54),
847 "block-hash-5",
848 ),
849 CardanoTransactionRecord::new(
850 "tx-hash-6",
851 BlockNumber(46),
852 SlotNumber(55),
853 "block-hash-6",
854 ),
855 ];
856 repository.create_transactions(transactions.clone()).await.unwrap();
857
858 {
859 let transaction_result = repository
860 .get_transaction_by_block_ranges(vec![BlockRange::from_block_number(BlockNumber(
861 100,
862 ))])
863 .await
864 .unwrap();
865 assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
866 }
867 {
868 let transaction_result = repository
869 .get_transaction_by_block_ranges(vec![BlockRange::from_block_number(BlockNumber(
870 0,
871 ))])
872 .await
873 .unwrap();
874 assert_eq!(transactions[0..=1].to_vec(), transaction_result);
875 }
876 {
877 let transaction_result = repository
878 .get_transaction_by_block_ranges(vec![
879 BlockRange::from_block_number(BlockNumber(0)),
880 BlockRange::from_block_number(BlockNumber(15)),
881 ])
882 .await
883 .unwrap();
884 assert_eq!(transactions[0..=2].to_vec(), transaction_result);
885 }
886 {
887 let transaction_result = repository
888 .get_transaction_by_block_ranges(vec![
889 BlockRange::from_block_number(BlockNumber(0)),
890 BlockRange::from_block_number(BlockNumber(30)),
891 ])
892 .await
893 .unwrap();
894 assert_eq!(
895 [transactions[0..=1].to_vec(), transactions[3..=4].to_vec()].concat(),
896 transaction_result
897 );
898 }
899 }
900
901 #[tokio::test]
902 async fn repository_get_closest_block_number_by_slot_number() {
903 let connection = cardano_tx_db_connection().unwrap();
904 let repository = CardanoTransactionRepository::new(Arc::new(
905 SqliteConnectionPool::build_from_connection(connection),
906 ));
907
908 let transactions = vec![
909 CardanoTransactionRecord::new("tx-1", BlockNumber(100), SlotNumber(500), "block-1"),
910 CardanoTransactionRecord::new("tx-2", BlockNumber(100), SlotNumber(500), "block-1"),
911 CardanoTransactionRecord::new("tx-3", BlockNumber(101), SlotNumber(501), "block-1"),
912 ];
913 repository.create_transactions(transactions.clone()).await.unwrap();
914
915 let transaction_block_number_retrieved = repository
916 .get_closest_block_number_above_slot_number(SlotNumber(500))
917 .await
918 .unwrap();
919
920 assert_eq!(transaction_block_number_retrieved, Some(BlockNumber(100)));
921 }
922
923 #[tokio::test]
924 async fn repository_store_block_range() {
925 let connection = cardano_tx_db_connection().unwrap();
926 let repository = CardanoTransactionRepository::new(Arc::new(
927 SqliteConnectionPool::build_from_connection(connection),
928 ));
929
930 repository
931 .create_block_range_roots(vec![
932 (
933 BlockRange::from_block_number(BlockNumber(0)),
934 MKTreeNode::from_hex("AAAA").unwrap(),
935 ),
936 (
937 BlockRange::from_block_number(BlockRange::LENGTH),
938 MKTreeNode::from_hex("BBBB").unwrap(),
939 ),
940 ])
941 .await
942 .unwrap();
943
944 let records: Vec<BlockRangeRootRecord> = repository
945 .connection_pool
946 .connection()
947 .unwrap()
948 .fetch_collect(GetBlockRangeRootQuery::all())
949 .unwrap();
950 assert_eq!(
951 vec![
952 BlockRangeRootRecord {
953 range: BlockRange::from_block_number(BlockNumber(0)),
954 merkle_root: MKTreeNode::from_hex("AAAA").unwrap(),
955 },
956 BlockRangeRootRecord {
957 range: BlockRange::from_block_number(BlockRange::LENGTH),
958 merkle_root: MKTreeNode::from_hex("BBBB").unwrap(),
959 }
960 ],
961 records
962 );
963 }
964
965 #[tokio::test]
966 async fn repository_store_block_range_with_existing_hash_doesnt_erase_existing_data() {
967 let connection = cardano_tx_db_connection().unwrap();
968 let repository = CardanoTransactionRepository::new(Arc::new(
969 SqliteConnectionPool::build_from_connection(connection),
970 ));
971 let range = BlockRange::from_block_number(BlockNumber(0));
972
973 repository
974 .create_block_range_roots(vec![(range.clone(), MKTreeNode::from_hex("AAAA").unwrap())])
975 .await
976 .unwrap();
977 repository
978 .create_block_range_roots(vec![(range.clone(), MKTreeNode::from_hex("BBBB").unwrap())])
979 .await
980 .unwrap();
981
982 let record: Vec<BlockRangeRootRecord> = repository
983 .connection_pool
984 .connection()
985 .unwrap()
986 .fetch_collect(GetBlockRangeRootQuery::all())
987 .unwrap();
988 assert_eq!(
989 vec![BlockRangeRootRecord {
990 range,
991 merkle_root: MKTreeNode::from_hex("AAAA").unwrap()
992 }],
993 record
994 );
995 }
996
997 #[tokio::test]
998 async fn repository_retrieve_block_range_roots_up_to() {
999 let connection = cardano_tx_db_connection().unwrap();
1000 let repository = CardanoTransactionRepository::new(Arc::new(
1001 SqliteConnectionPool::build_from_connection(connection),
1002 ));
1003 let block_range_roots = vec![
1004 (
1005 BlockRange::from_block_number(BlockNumber(15)),
1006 MKTreeNode::from_hex("AAAA").unwrap(),
1007 ),
1008 (
1009 BlockRange::from_block_number(BlockNumber(30)),
1010 MKTreeNode::from_hex("BBBB").unwrap(),
1011 ),
1012 (
1013 BlockRange::from_block_number(BlockNumber(45)),
1014 MKTreeNode::from_hex("CCCC").unwrap(),
1015 ),
1016 ];
1017 repository
1018 .create_block_range_roots(block_range_roots.clone())
1019 .await
1020 .unwrap();
1021
1022 let retrieved_block_ranges = repository
1023 .retrieve_block_range_roots_up_to(BlockNumber(45))
1024 .await
1025 .unwrap();
1026 assert_eq!(
1027 block_range_roots[0..2].to_vec(),
1028 retrieved_block_ranges.collect::<Vec<_>>()
1029 );
1030 }
1031
1032 #[tokio::test]
1033 async fn repository_retrieve_highest_block_range_roots() {
1034 let connection = cardano_tx_db_connection().unwrap();
1035 let repository = CardanoTransactionRepository::new(Arc::new(
1036 SqliteConnectionPool::build_from_connection(connection),
1037 ));
1038 let block_range_roots = vec![
1039 BlockRangeRootRecord {
1040 range: BlockRange::from_block_number(BlockNumber(15)),
1041 merkle_root: MKTreeNode::from_hex("AAAA").unwrap(),
1042 },
1043 BlockRangeRootRecord {
1044 range: BlockRange::from_block_number(BlockNumber(30)),
1045 merkle_root: MKTreeNode::from_hex("BBBB").unwrap(),
1046 },
1047 BlockRangeRootRecord {
1048 range: BlockRange::from_block_number(BlockNumber(45)),
1049 merkle_root: MKTreeNode::from_hex("CCCC").unwrap(),
1050 },
1051 ];
1052 repository
1053 .create_block_range_roots(block_range_roots.clone())
1054 .await
1055 .unwrap();
1056
1057 let retrieved_block_range = repository.retrieve_highest_block_range_root().await.unwrap();
1058 assert_eq!(block_range_roots.last().cloned(), retrieved_block_range);
1059 }
1060
1061 #[tokio::test]
1062 async fn repository_prune_transactions() {
1063 let connection = cardano_tx_db_connection().unwrap();
1064 let repository = CardanoTransactionRepository::new(Arc::new(
1065 SqliteConnectionPool::build_from_connection(connection),
1066 ));
1067
1068 let cardano_transactions: Vec<CardanoTransactionRecord> = CardanoTransactionsBuilder::new()
1069 .blocks_per_block_range(15)
1070 .build_transactions(53)
1071 .into_iter()
1072 .map(CardanoTransactionRecord::from)
1073 .collect();
1074
1075 repository
1076 .create_transactions(cardano_transactions.clone())
1077 .await
1078 .unwrap();
1079 repository
1081 .create_block_range_roots(vec![(
1082 BlockRange::from_block_number(BlockNumber(45)),
1083 MKTreeNode::from_hex("BBBB").unwrap(),
1084 )])
1085 .await
1086 .unwrap();
1087
1088 let transaction_result = repository.get_all().await.unwrap();
1089 assert_eq!(cardano_transactions.len(), transaction_result.len());
1090
1091 repository.prune_transaction(BlockNumber(10_000_000)).await.unwrap();
1094 let transaction_result = repository.get_all_transactions().await.unwrap();
1095 assert_eq!(cardano_transactions, transaction_result);
1096
1097 repository.prune_transaction(BlockNumber(20)).await.unwrap();
1100 let transaction_result = repository
1101 .get_transactions_in_range_blocks(BlockNumber(0)..BlockNumber(25))
1102 .await
1103 .unwrap();
1104 assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
1105
1106 let transaction_result = repository
1107 .get_transactions_in_range_blocks(BlockNumber(25)..BlockNumber(1000))
1108 .await
1109 .unwrap();
1110 assert_eq!(28, transaction_result.len());
1111 }
1112
1113 #[tokio::test]
1114 async fn get_highest_start_block_number_for_block_range_roots() {
1115 let connection = cardano_tx_db_connection().unwrap();
1116 let repository = CardanoTransactionRepository::new(Arc::new(
1117 SqliteConnectionPool::build_from_connection(connection),
1118 ));
1119
1120 let highest = repository
1121 .get_highest_start_block_number_for_block_range_roots()
1122 .await
1123 .unwrap();
1124 assert_eq!(None, highest);
1125
1126 let block_range_roots = vec![
1127 (
1128 BlockRange::from_block_number(BlockNumber(15)),
1129 MKTreeNode::from_hex("AAAA").unwrap(),
1130 ),
1131 (
1132 BlockRange::from_block_number(BlockNumber(30)),
1133 MKTreeNode::from_hex("BBBB").unwrap(),
1134 ),
1135 ];
1136 repository
1137 .create_block_range_roots(block_range_roots.clone())
1138 .await
1139 .unwrap();
1140
1141 let highest = repository
1142 .get_highest_start_block_number_for_block_range_roots()
1143 .await
1144 .unwrap();
1145 assert_eq!(Some(BlockNumber(30)), highest);
1146 }
1147
1148 #[tokio::test]
1149 async fn remove_transactions_and_block_range_greater_than_given_block_number() {
1150 let connection = cardano_tx_db_connection().unwrap();
1151 let repository = CardanoTransactionRepository::new(Arc::new(
1152 SqliteConnectionPool::build_from_connection(connection),
1153 ));
1154
1155 let cardano_transactions = vec![
1156 CardanoTransaction::new(
1157 "tx-hash-123",
1158 BlockRange::LENGTH,
1159 SlotNumber(50),
1160 "block-hash-123",
1161 ),
1162 CardanoTransaction::new(
1163 "tx-hash-123",
1164 BlockRange::LENGTH * 3 - 1,
1165 SlotNumber(50),
1166 "block-hash-123",
1167 ),
1168 CardanoTransaction::new(
1169 "tx-hash-456",
1170 BlockRange::LENGTH * 3,
1171 SlotNumber(51),
1172 "block-hash-456",
1173 ),
1174 ];
1175 repository.create_transactions(cardano_transactions).await.unwrap();
1176 repository
1177 .create_block_range_roots(vec![
1178 (
1179 BlockRange::from_block_number(BlockRange::LENGTH),
1180 MKTreeNode::from_hex("AAAA").unwrap(),
1181 ),
1182 (
1183 BlockRange::from_block_number(BlockRange::LENGTH * 2),
1184 MKTreeNode::from_hex("AAAA").unwrap(),
1185 ),
1186 (
1187 BlockRange::from_block_number(BlockRange::LENGTH * 3),
1188 MKTreeNode::from_hex("AAAA").unwrap(),
1189 ),
1190 ])
1191 .await
1192 .unwrap();
1193
1194 repository
1195 .remove_rolled_back_transactions_and_block_range_by_block_number(BlockRange::LENGTH * 3)
1196 .await
1197 .unwrap();
1198 assert_eq!(2, repository.get_all_transactions().await.unwrap().len());
1199 assert_eq!(2, repository.get_all_block_range_root().unwrap().len());
1200 }
1201
1202 #[tokio::test]
1203 async fn remove_rolled_back_transactions_and_block_range_by_slot_number() {
1204 fn transaction_record(
1205 block_number: BlockNumber,
1206 slot_number: SlotNumber,
1207 tx_hash: &str,
1208 ) -> CardanoTransactionRecord {
1209 CardanoTransactionRecord::new(
1210 tx_hash,
1211 block_number,
1212 slot_number,
1213 format!("block-hash-{block_number}"),
1214 )
1215 }
1216
1217 let repository = CardanoTransactionRepository::new(Arc::new(
1218 SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
1219 ));
1220
1221 repository
1222 .create_transactions(vec![
1223 transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1224 transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1225 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1226 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1227 transaction_record(BlockNumber(101), SlotNumber(100), "tx-hash-5"),
1228 transaction_record(BlockNumber(202), SlotNumber(200), "tx-hash-56"),
1229 ])
1230 .await
1231 .unwrap();
1232
1233 {
1234 repository
1235 .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(110))
1236 .await
1237 .expect("Failed to remove rolled back transactions");
1238
1239 let transactions = repository
1240 .get_all()
1241 .await
1242 .unwrap()
1243 .into_iter()
1244 .map(|v| v.into())
1245 .collect::<Vec<_>>();
1246 assert_eq!(
1247 vec![
1248 transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1249 transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1250 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1251 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1252 transaction_record(BlockNumber(101), SlotNumber(100), "tx-hash-5"),
1253 ],
1254 transactions
1255 );
1256 }
1257
1258 {
1259 repository
1260 .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(53))
1261 .await
1262 .expect("Failed to remove rolled back transactions");
1263
1264 let transactions = repository
1265 .get_all()
1266 .await
1267 .unwrap()
1268 .into_iter()
1269 .map(|v| v.into())
1270 .collect::<Vec<_>>();
1271 assert_eq!(
1272 vec![
1273 transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1274 transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1275 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1276 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1277 ],
1278 transactions
1279 );
1280 }
1281
1282 {
1283 repository
1284 .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(51))
1285 .await
1286 .expect("Failed to remove rolled back transactions");
1287
1288 let transactions = repository
1289 .get_all()
1290 .await
1291 .unwrap()
1292 .into_iter()
1293 .map(|v| v.into())
1294 .collect::<Vec<_>>();
1295 assert_eq!(
1296 vec![
1297 transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1298 transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1299 ],
1300 transactions
1301 );
1302 }
1303 }
1304}