1use std::ops::Range;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_trait::async_trait;
6
7use mithril_common::crypto_helper::{MKTreeNode, MKTreeStorer};
8use mithril_common::entities::{
9 BlockHash, BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber, TransactionHash,
10};
11use mithril_common::signable_builder::BlockRangeRootRetriever;
12use mithril_common::StdResult;
13
14use crate::database::query::{
15 DeleteBlockRangeRootQuery, DeleteCardanoTransactionQuery, GetBlockRangeRootQuery,
16 GetCardanoTransactionQuery, InsertBlockRangeRootQuery, InsertCardanoTransactionQuery,
17};
18use crate::database::record::{BlockRangeRootRecord, CardanoTransactionRecord};
19use crate::sqlite::{ConnectionExtensions, SqliteConnection, SqliteConnectionPool};
20
21pub struct CardanoTransactionRepository {
26 connection_pool: Arc<SqliteConnectionPool>,
27}
28
29impl CardanoTransactionRepository {
30 pub fn new(connection_pool: Arc<SqliteConnectionPool>) -> Self {
32 Self { connection_pool }
33 }
34
35 pub async fn get_all_transactions(&self) -> StdResult<Vec<CardanoTransactionRecord>> {
37 self.connection_pool
38 .connection()?
39 .fetch_collect(GetCardanoTransactionQuery::all())
40 }
41
42 pub async fn get_transactions_in_range_blocks(
45 &self,
46 range: Range<BlockNumber>,
47 ) -> StdResult<Vec<CardanoTransactionRecord>> {
48 self.connection_pool
49 .connection()?
50 .fetch_collect(GetCardanoTransactionQuery::between_blocks(range))
51 }
52
53 pub async fn get_transaction<T: Into<TransactionHash>>(
55 &self,
56 transaction_hash: T,
57 ) -> StdResult<Option<CardanoTransactionRecord>> {
58 self.connection_pool.connection()?.fetch_first(
59 GetCardanoTransactionQuery::by_transaction_hash(&transaction_hash.into()),
60 )
61 }
62
63 pub async fn create_transaction<T: Into<TransactionHash>, U: Into<BlockHash>>(
65 &self,
66 transaction_hash: T,
67 block_number: BlockNumber,
68 slot_number: SlotNumber,
69 block_hash: U,
70 ) -> StdResult<Option<CardanoTransactionRecord>> {
71 let query = InsertCardanoTransactionQuery::insert_one(&CardanoTransactionRecord {
72 transaction_hash: transaction_hash.into(),
73 block_number,
74 slot_number,
75 block_hash: block_hash.into(),
76 })?;
77
78 self.connection_pool.connection()?.fetch_first(query)
79 }
80
81 pub async fn create_transactions<T: Into<CardanoTransactionRecord>>(
83 &self,
84 transactions: Vec<T>,
85 ) -> StdResult<Vec<CardanoTransactionRecord>> {
86 let connection = self.connection_pool.connection()?;
87
88 self.create_transactions_with_connection(transactions, &connection)
89 .await
90 }
91
92 async fn create_transactions_with_connection<T: Into<CardanoTransactionRecord>>(
94 &self,
95 transactions: Vec<T>,
96 connection: &SqliteConnection,
97 ) -> StdResult<Vec<CardanoTransactionRecord>> {
98 let records: Vec<CardanoTransactionRecord> =
99 transactions.into_iter().map(|tx| tx.into()).collect();
100
101 connection.fetch_collect(InsertCardanoTransactionQuery::insert_many(records)?)
102 }
103
104 pub async fn create_block_range_roots<T: Into<BlockRangeRootRecord>>(
106 &self,
107 block_ranges: Vec<T>,
108 ) -> StdResult<Vec<BlockRangeRootRecord>> {
109 let records: Vec<BlockRangeRootRecord> =
110 block_ranges.into_iter().map(|tx| tx.into()).collect();
111 let connection = self.connection_pool.connection()?;
112
113 connection.fetch_collect(InsertBlockRangeRootQuery::insert_many(records)?)
114 }
115
116 pub async fn get_transaction_highest_chain_point(&self) -> StdResult<Option<ChainPoint>> {
118 let first_transaction_with_highest_block_number = self
119 .connection_pool
120 .connection()?
121 .fetch_first(GetCardanoTransactionQuery::with_highest_block_number())?;
122
123 Ok(first_transaction_with_highest_block_number.map(|record| {
124 ChainPoint::new(record.slot_number, record.block_number, record.block_hash)
125 }))
126 }
127
128 pub async fn get_highest_start_block_number_for_block_range_roots(
130 &self,
131 ) -> StdResult<Option<BlockNumber>> {
132 let highest: Option<i64> = self
133 .connection_pool
134 .connection()?
135 .query_single_cell("select max(start) as highest from block_range_root;", &[])?;
136 highest
137 .map(u64::try_from)
138 .transpose()
139 .map(|num| num.map(BlockNumber))
140 .with_context(||
141 format!("Integer field max(start) (value={highest:?}) is incompatible with u64 representation.")
142 )
143 }
144
145 pub async fn retrieve_block_range_roots_up_to(
148 &self,
149 block_number: BlockNumber,
150 ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + '_>> {
151 let block_range_roots = self
152 .connection_pool
153 .connection()?
154 .fetch(GetBlockRangeRootQuery::contains_or_below_block_number(
155 block_number,
156 ))?
157 .map(|record| -> (BlockRange, MKTreeNode) { record.into() })
158 .collect::<Vec<_>>(); Ok(Box::new(block_range_roots.into_iter()))
161 }
162
163 pub async fn retrieve_highest_block_range_root(
165 &self,
166 ) -> StdResult<Option<BlockRangeRootRecord>> {
167 self.connection_pool
168 .connection()?
169 .fetch_first(GetBlockRangeRootQuery::highest())
170 }
171
172 pub async fn get_all(&self) -> StdResult<Vec<CardanoTransaction>> {
174 let records = self
175 .connection_pool
176 .connection()?
177 .fetch(GetCardanoTransactionQuery::all())?
178 .map(|record| record.into())
179 .collect();
180
181 Ok(records)
182 }
183
184 pub fn get_all_block_range_root(&self) -> StdResult<Vec<BlockRangeRootRecord>> {
186 self.connection_pool
187 .connection()?
188 .fetch_collect(GetBlockRangeRootQuery::all())
189 }
190
191 pub async fn store_transactions<T: Into<CardanoTransactionRecord> + Clone>(
195 &self,
196 transactions: Vec<T>,
197 ) -> StdResult<()> {
198 const DB_TRANSACTION_SIZE: usize = 100000;
199 for transactions_in_db_transaction_chunk in transactions.chunks(DB_TRANSACTION_SIZE) {
200 let connection = self.connection_pool.connection()?;
201 let transaction = connection.begin_transaction()?;
202
203 for transactions_in_chunk in transactions_in_db_transaction_chunk.chunks(100) {
205 self.create_transactions_with_connection(
206 transactions_in_chunk.to_vec(),
207 &connection,
208 )
209 .await
210 .with_context(|| "CardanoTransactionRepository can not store transactions")?;
211 }
212
213 transaction.commit()?;
214 }
215 Ok(())
216 }
217
218 pub async fn get_closest_block_number_above_slot_number(
220 &self,
221 slot_number: SlotNumber,
222 ) -> StdResult<Option<BlockNumber>> {
223 let query =
224 GetCardanoTransactionQuery::with_highest_block_number_below_slot_number(slot_number);
225 let record = self.connection_pool.connection()?.fetch_first(query)?;
226
227 Ok(record.map(|r| r.block_number))
228 }
229
230 pub async fn get_transaction_by_hashes<T: Into<TransactionHash>>(
232 &self,
233 hashes: Vec<T>,
234 up_to: BlockNumber,
235 ) -> StdResult<Vec<CardanoTransactionRecord>> {
236 let query = GetCardanoTransactionQuery::by_transaction_hashes(
237 hashes.into_iter().map(Into::into).collect(),
238 up_to,
239 );
240 self.connection_pool.connection()?.fetch_collect(query)
241 }
242
243 pub async fn get_transaction_by_block_ranges(
245 &self,
246 block_ranges: Vec<BlockRange>,
247 ) -> StdResult<Vec<CardanoTransactionRecord>> {
248 let mut transactions = vec![];
249 for block_range in block_ranges {
250 let block_range_transactions: Vec<CardanoTransactionRecord> =
251 self.connection_pool.connection()?.fetch_collect(
252 GetCardanoTransactionQuery::by_block_ranges(vec![block_range]),
253 )?;
254 transactions.extend(block_range_transactions);
255 }
256
257 Ok(transactions)
258 }
259
260 pub async fn prune_transaction(&self, number_of_blocks_to_keep: BlockNumber) -> StdResult<()> {
263 if let Some(highest_block_range_start) = self
264 .get_highest_start_block_number_for_block_range_roots()
265 .await?
266 {
267 let threshold = highest_block_range_start - number_of_blocks_to_keep;
268 let query = DeleteCardanoTransactionQuery::below_block_number_threshold(threshold)?;
269
270 let connection = self.connection_pool.connection()?;
271 connection.fetch_first(query)?;
272 }
273
274 Ok(())
275 }
276
277 pub async fn remove_rolled_back_transactions_and_block_range_by_block_number(
282 &self,
283 block_number: BlockNumber,
284 ) -> StdResult<()> {
285 let connection = self.connection_pool.connection()?;
286 let transaction = connection.begin_transaction()?;
287 let query = DeleteCardanoTransactionQuery::above_block_number_threshold(block_number)?;
288 connection.fetch_first(query)?;
289
290 let query =
291 DeleteBlockRangeRootQuery::contains_or_above_block_number_threshold(block_number)?;
292 connection.fetch_first(query)?;
293 transaction.commit()?;
294
295 Ok(())
296 }
297
298 pub async fn remove_rolled_back_transactions_and_block_range_by_slot_number(
303 &self,
304 slot_number: SlotNumber,
305 ) -> StdResult<()> {
306 if let Some(block_number) = self
307 .get_closest_block_number_above_slot_number(slot_number)
308 .await?
309 {
310 self.remove_rolled_back_transactions_and_block_range_by_block_number(block_number)
311 .await?;
312 }
313
314 Ok(())
315 }
316}
317
318#[async_trait]
319impl<S: MKTreeStorer> BlockRangeRootRetriever<S> for CardanoTransactionRepository {
320 async fn retrieve_block_range_roots<'a>(
321 &'a self,
322 up_to_beacon: BlockNumber,
323 ) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)> + 'a>> {
324 self.retrieve_block_range_roots_up_to(up_to_beacon).await
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use mithril_common::test_utils::CardanoTransactionsBuilder;
331
332 use crate::database::query::GetBlockRangeRootQuery;
333 use crate::database::test_helper::cardano_tx_db_connection;
334
335 use super::*;
336
337 #[tokio::test]
338 async fn repository_create_and_get_transaction() {
339 let repository = CardanoTransactionRepository::new(Arc::new(
340 SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
341 ));
342
343 repository
344 .create_transactions(vec![
345 CardanoTransaction::new(
346 "tx_hash-123",
347 BlockNumber(10),
348 SlotNumber(50),
349 "block_hash-123",
350 ),
351 CardanoTransaction::new(
352 "tx_hash-456",
353 BlockNumber(11),
354 SlotNumber(51),
355 "block_hash-456",
356 ),
357 ])
358 .await
359 .unwrap();
360
361 {
362 let transaction_result = repository.get_transaction("tx_hash-123").await.unwrap();
363 assert_eq!(
364 Some(CardanoTransactionRecord {
365 transaction_hash: "tx_hash-123".to_string(),
366 block_number: BlockNumber(10),
367 slot_number: SlotNumber(50),
368 block_hash: "block_hash-123".to_string(),
369 }),
370 transaction_result
371 );
372 }
373 {
374 let transaction_result = repository.get_transaction("not-exist").await.unwrap();
375 assert_eq!(None, transaction_result);
376 }
377 }
378
379 #[tokio::test]
380 async fn repository_get_transaction_by_hashes() {
381 let repository = CardanoTransactionRepository::new(Arc::new(
382 SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
383 ));
384
385 repository
386 .create_transactions(vec![
387 CardanoTransactionRecord::new(
388 "tx_hash-123",
389 BlockNumber(10),
390 SlotNumber(50),
391 "block_hash-123",
392 ),
393 CardanoTransactionRecord::new(
394 "tx_hash-456",
395 BlockNumber(11),
396 SlotNumber(51),
397 "block_hash-456",
398 ),
399 CardanoTransactionRecord::new(
400 "tx_hash-789",
401 BlockNumber(12),
402 SlotNumber(52),
403 "block_hash-789",
404 ),
405 CardanoTransactionRecord::new(
406 "tx_hash-000",
407 BlockNumber(101),
408 SlotNumber(100),
409 "block_hash-000",
410 ),
411 ])
412 .await
413 .unwrap();
414
415 {
416 let transactions = repository
417 .get_transaction_by_hashes(vec!["tx_hash-123", "tx_hash-789"], BlockNumber(100))
418 .await
419 .unwrap();
420
421 assert_eq!(
422 vec![
423 CardanoTransactionRecord::new(
424 "tx_hash-123",
425 BlockNumber(10),
426 SlotNumber(50),
427 "block_hash-123"
428 ),
429 CardanoTransactionRecord::new(
430 "tx_hash-789",
431 BlockNumber(12),
432 SlotNumber(52),
433 "block_hash-789"
434 ),
435 ],
436 transactions
437 );
438 }
439 {
440 let transactions = repository
441 .get_transaction_by_hashes(
442 vec!["tx_hash-123", "tx_hash-789", "tx_hash-000"],
443 BlockNumber(100),
444 )
445 .await
446 .unwrap();
447
448 assert_eq!(
449 vec![
450 CardanoTransactionRecord::new(
451 "tx_hash-123",
452 BlockNumber(10),
453 SlotNumber(50),
454 "block_hash-123"
455 ),
456 CardanoTransactionRecord::new(
457 "tx_hash-789",
458 BlockNumber(12),
459 SlotNumber(52),
460 "block_hash-789"
461 ),
462 ],
463 transactions
464 );
465 }
466 {
467 let transactions = repository
468 .get_transaction_by_hashes(
469 vec!["tx_hash-123", "tx_hash-789", "tx_hash-000"],
470 BlockNumber(101),
471 )
472 .await
473 .unwrap();
474
475 assert_eq!(
476 vec![
477 CardanoTransactionRecord::new(
478 "tx_hash-123",
479 BlockNumber(10),
480 SlotNumber(50),
481 "block_hash-123"
482 ),
483 CardanoTransactionRecord::new(
484 "tx_hash-789",
485 BlockNumber(12),
486 SlotNumber(52),
487 "block_hash-789"
488 ),
489 CardanoTransactionRecord::new(
490 "tx_hash-000",
491 BlockNumber(101),
492 SlotNumber(100),
493 "block_hash-000"
494 ),
495 ],
496 transactions
497 );
498 }
499 {
500 let transactions = repository
501 .get_transaction_by_hashes(vec!["not-exist".to_string()], BlockNumber(100))
502 .await
503 .unwrap();
504
505 assert_eq!(Vec::<CardanoTransactionRecord>::new(), transactions);
506 }
507 }
508
509 #[tokio::test]
510 async fn repository_create_ignore_further_transactions_when_exists() {
511 let connection = cardano_tx_db_connection().unwrap();
512 let repository = CardanoTransactionRepository::new(Arc::new(
513 SqliteConnectionPool::build_from_connection(connection),
514 ));
515
516 repository
517 .create_transaction(
518 "tx-hash-123",
519 BlockNumber(10),
520 SlotNumber(50),
521 "block_hash-123",
522 )
523 .await
524 .unwrap();
525 repository
526 .create_transaction(
527 "tx-hash-123",
528 BlockNumber(11),
529 SlotNumber(51),
530 "block_hash-123-bis",
531 )
532 .await
533 .unwrap();
534 let transaction_result = repository.get_transaction("tx-hash-123").await.unwrap();
535
536 assert_eq!(
537 Some(CardanoTransactionRecord {
538 transaction_hash: "tx-hash-123".to_string(),
539 block_number: BlockNumber(10),
540 slot_number: SlotNumber(50),
541 block_hash: "block_hash-123".to_string(),
542 }),
543 transaction_result
544 );
545 }
546
547 #[tokio::test]
548 async fn repository_store_transactions_and_get_stored_transactions() {
549 let connection = cardano_tx_db_connection().unwrap();
550 let repository = CardanoTransactionRepository::new(Arc::new(
551 SqliteConnectionPool::build_from_connection(connection),
552 ));
553
554 let cardano_transactions = vec![
555 CardanoTransaction::new(
556 "tx-hash-123",
557 BlockNumber(10),
558 SlotNumber(50),
559 "block-hash-123",
560 ),
561 CardanoTransaction::new(
562 "tx-hash-456",
563 BlockNumber(11),
564 SlotNumber(51),
565 "block-hash-456",
566 ),
567 ];
568 repository
569 .create_transactions(cardano_transactions)
570 .await
571 .unwrap();
572
573 let transaction_result = repository.get_transaction("tx-hash-123").await.unwrap();
574
575 assert_eq!(
576 Some(CardanoTransactionRecord {
577 transaction_hash: "tx-hash-123".to_string(),
578 block_number: BlockNumber(10),
579 slot_number: SlotNumber(50),
580 block_hash: "block-hash-123".to_string(),
581 }),
582 transaction_result
583 );
584
585 let transaction_result = repository.get_transaction("tx-hash-456").await.unwrap();
586
587 assert_eq!(
588 Some(CardanoTransactionRecord {
589 transaction_hash: "tx-hash-456".to_string(),
590 block_number: BlockNumber(11),
591 slot_number: SlotNumber(51),
592 block_hash: "block-hash-456".to_string(),
593 }),
594 transaction_result
595 );
596 }
597
598 #[tokio::test]
599 async fn repository_get_all_stored_transactions() {
600 let connection = cardano_tx_db_connection().unwrap();
601 let repository = CardanoTransactionRepository::new(Arc::new(
602 SqliteConnectionPool::build_from_connection(connection),
603 ));
604
605 let cardano_transactions = vec![
606 CardanoTransaction::new(
607 "tx-hash-123",
608 BlockNumber(10),
609 SlotNumber(50),
610 "block-hash-123",
611 ),
612 CardanoTransaction::new(
613 "tx-hash-456",
614 BlockNumber(11),
615 SlotNumber(51),
616 "block-hash-456",
617 ),
618 ];
619 repository
620 .create_transactions(cardano_transactions.clone())
621 .await
622 .unwrap();
623
624 let transactions_result = repository.get_all_transactions().await.unwrap();
625 let transactions_expected: Vec<CardanoTransactionRecord> = cardano_transactions
626 .iter()
627 .map(|tx| tx.clone().into())
628 .collect();
629
630 assert_eq!(transactions_expected, transactions_result);
631 }
632
633 #[tokio::test]
634 async fn repository_store_transactions_doesnt_erase_existing_data() {
635 let connection = cardano_tx_db_connection().unwrap();
636 let repository = CardanoTransactionRepository::new(Arc::new(
637 SqliteConnectionPool::build_from_connection(connection),
638 ));
639
640 repository
641 .create_transaction("tx-hash-000", BlockNumber(1), SlotNumber(5), "block-hash")
642 .await
643 .unwrap();
644
645 let cardano_transactions = vec![CardanoTransaction::new(
646 "tx-hash-123",
647 BlockNumber(10),
648 SlotNumber(50),
649 "block-hash-123",
650 )];
651 repository
652 .create_transactions(cardano_transactions)
653 .await
654 .unwrap();
655
656 let transaction_result = repository.get_transaction("tx-hash-000").await.unwrap();
657
658 assert_eq!(
659 Some(CardanoTransactionRecord {
660 transaction_hash: "tx-hash-000".to_string(),
661 block_number: BlockNumber(1),
662 slot_number: SlotNumber(5),
663 block_hash: "block-hash".to_string(),
664 }),
665 transaction_result
666 );
667 }
668
669 #[tokio::test]
670 async fn repository_get_transaction_highest_chain_point_without_transactions_in_db() {
671 let connection = cardano_tx_db_connection().unwrap();
672 let repository = CardanoTransactionRepository::new(Arc::new(
673 SqliteConnectionPool::build_from_connection(connection),
674 ));
675
676 let highest_beacon = repository
677 .get_transaction_highest_chain_point()
678 .await
679 .unwrap();
680 assert_eq!(None, highest_beacon);
681 }
682
683 #[tokio::test]
684 async fn repository_get_transaction_highest_chain_point_with_transactions_in_db() {
685 let connection = cardano_tx_db_connection().unwrap();
686 let repository = CardanoTransactionRepository::new(Arc::new(
687 SqliteConnectionPool::build_from_connection(connection),
688 ));
689
690 let cardano_transactions = vec![
691 CardanoTransaction::new(
692 "tx-hash-123",
693 BlockNumber(10),
694 SlotNumber(50),
695 "block-hash-10",
696 ),
697 CardanoTransaction::new(
698 "tx-hash-456",
699 BlockNumber(25),
700 SlotNumber(51),
701 "block-hash-25",
702 ),
703 ];
704 repository
705 .create_transactions(cardano_transactions)
706 .await
707 .unwrap();
708
709 let highest_beacon = repository
710 .get_transaction_highest_chain_point()
711 .await
712 .unwrap();
713 assert_eq!(
714 Some(ChainPoint {
715 slot_number: SlotNumber(51),
716 block_number: BlockNumber(25),
717 block_hash: "block-hash-25".to_string()
718 }),
719 highest_beacon
720 );
721 }
722
723 #[tokio::test]
724 async fn repository_get_transaction_highest_chain_point_with_transactions_with_same_block_number_in_db(
725 ) {
726 let connection = cardano_tx_db_connection().unwrap();
727 let repository = CardanoTransactionRepository::new(Arc::new(
728 SqliteConnectionPool::build_from_connection(connection),
729 ));
730
731 let cardano_transactions = vec![
732 CardanoTransaction::new(
733 "tx-hash-123",
734 BlockNumber(10),
735 SlotNumber(50),
736 "block-hash-10",
737 ),
738 CardanoTransaction::new(
739 "tx-hash-456",
740 BlockNumber(25),
741 SlotNumber(51),
742 "block-hash-25",
743 ),
744 CardanoTransaction::new(
745 "tx-hash-789",
746 BlockNumber(25),
747 SlotNumber(51),
748 "block-hash-25",
749 ),
750 ];
751 repository
752 .create_transactions(cardano_transactions)
753 .await
754 .unwrap();
755
756 let highest_beacon = repository
757 .get_transaction_highest_chain_point()
758 .await
759 .unwrap();
760 assert_eq!(
761 Some(ChainPoint {
762 slot_number: SlotNumber(51),
763 block_number: BlockNumber(25),
764 block_hash: "block-hash-25".to_string()
765 }),
766 highest_beacon
767 );
768 }
769
770 #[tokio::test]
771 async fn repository_get_transactions_in_range_blocks() {
772 let connection = cardano_tx_db_connection().unwrap();
773 let repository = CardanoTransactionRepository::new(Arc::new(
774 SqliteConnectionPool::build_from_connection(connection),
775 ));
776
777 let transactions = vec![
778 CardanoTransactionRecord::new(
779 "tx-hash-1",
780 BlockNumber(10),
781 SlotNumber(50),
782 "block-hash-1",
783 ),
784 CardanoTransactionRecord::new(
785 "tx-hash-2",
786 BlockNumber(11),
787 SlotNumber(51),
788 "block-hash-2",
789 ),
790 CardanoTransactionRecord::new(
791 "tx-hash-3",
792 BlockNumber(12),
793 SlotNumber(52),
794 "block-hash-3",
795 ),
796 ];
797 repository
798 .create_transactions(transactions.clone())
799 .await
800 .unwrap();
801
802 {
803 let transaction_result = repository
804 .get_transactions_in_range_blocks(BlockNumber(0)..BlockNumber(10))
805 .await
806 .unwrap();
807 assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
808 }
809 {
810 let transaction_result = repository
811 .get_transactions_in_range_blocks(BlockNumber(13)..BlockNumber(21))
812 .await
813 .unwrap();
814 assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
815 }
816 {
817 let transaction_result = repository
818 .get_transactions_in_range_blocks(BlockNumber(9)..BlockNumber(12))
819 .await
820 .unwrap();
821 assert_eq!(transactions[0..=1].to_vec(), transaction_result);
822 }
823 {
824 let transaction_result = repository
825 .get_transactions_in_range_blocks(BlockNumber(10)..BlockNumber(13))
826 .await
827 .unwrap();
828 assert_eq!(transactions.clone(), transaction_result);
829 }
830 {
831 let transaction_result = repository
832 .get_transactions_in_range_blocks(BlockNumber(11)..BlockNumber(14))
833 .await
834 .unwrap();
835 assert_eq!(transactions[1..=2].to_vec(), transaction_result);
836 }
837 }
838
839 #[tokio::test]
840 async fn repository_get_transactions_by_block_ranges() {
841 let connection = cardano_tx_db_connection().unwrap();
842 let repository = CardanoTransactionRepository::new(Arc::new(
843 SqliteConnectionPool::build_from_connection(connection),
844 ));
845
846 let transactions = vec![
847 CardanoTransactionRecord::new(
848 "tx-hash-1",
849 BlockNumber(10),
850 SlotNumber(50),
851 "block-hash-1",
852 ),
853 CardanoTransactionRecord::new(
854 "tx-hash-2",
855 BlockNumber(11),
856 SlotNumber(51),
857 "block-hash-2",
858 ),
859 CardanoTransactionRecord::new(
860 "tx-hash-3",
861 BlockNumber(20),
862 SlotNumber(52),
863 "block-hash-3",
864 ),
865 CardanoTransactionRecord::new(
866 "tx-hash-4",
867 BlockNumber(31),
868 SlotNumber(53),
869 "block-hash-4",
870 ),
871 CardanoTransactionRecord::new(
872 "tx-hash-5",
873 BlockNumber(35),
874 SlotNumber(54),
875 "block-hash-5",
876 ),
877 CardanoTransactionRecord::new(
878 "tx-hash-6",
879 BlockNumber(46),
880 SlotNumber(55),
881 "block-hash-6",
882 ),
883 ];
884 repository
885 .create_transactions(transactions.clone())
886 .await
887 .unwrap();
888
889 {
890 let transaction_result = repository
891 .get_transaction_by_block_ranges(vec![BlockRange::from_block_number(BlockNumber(
892 100,
893 ))])
894 .await
895 .unwrap();
896 assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
897 }
898 {
899 let transaction_result = repository
900 .get_transaction_by_block_ranges(vec![BlockRange::from_block_number(BlockNumber(
901 0,
902 ))])
903 .await
904 .unwrap();
905 assert_eq!(transactions[0..=1].to_vec(), transaction_result);
906 }
907 {
908 let transaction_result = repository
909 .get_transaction_by_block_ranges(vec![
910 BlockRange::from_block_number(BlockNumber(0)),
911 BlockRange::from_block_number(BlockNumber(15)),
912 ])
913 .await
914 .unwrap();
915 assert_eq!(transactions[0..=2].to_vec(), transaction_result);
916 }
917 {
918 let transaction_result = repository
919 .get_transaction_by_block_ranges(vec![
920 BlockRange::from_block_number(BlockNumber(0)),
921 BlockRange::from_block_number(BlockNumber(30)),
922 ])
923 .await
924 .unwrap();
925 assert_eq!(
926 [transactions[0..=1].to_vec(), transactions[3..=4].to_vec()].concat(),
927 transaction_result
928 );
929 }
930 }
931
932 #[tokio::test]
933 async fn repository_get_closest_block_number_by_slot_number() {
934 let connection = cardano_tx_db_connection().unwrap();
935 let repository = CardanoTransactionRepository::new(Arc::new(
936 SqliteConnectionPool::build_from_connection(connection),
937 ));
938
939 let transactions = vec![
940 CardanoTransactionRecord::new("tx-1", BlockNumber(100), SlotNumber(500), "block-1"),
941 CardanoTransactionRecord::new("tx-2", BlockNumber(100), SlotNumber(500), "block-1"),
942 CardanoTransactionRecord::new("tx-3", BlockNumber(101), SlotNumber(501), "block-1"),
943 ];
944 repository
945 .create_transactions(transactions.clone())
946 .await
947 .unwrap();
948
949 let transaction_block_number_retrieved = repository
950 .get_closest_block_number_above_slot_number(SlotNumber(500))
951 .await
952 .unwrap();
953
954 assert_eq!(transaction_block_number_retrieved, Some(BlockNumber(100)));
955 }
956
957 #[tokio::test]
958 async fn repository_store_block_range() {
959 let connection = cardano_tx_db_connection().unwrap();
960 let repository = CardanoTransactionRepository::new(Arc::new(
961 SqliteConnectionPool::build_from_connection(connection),
962 ));
963
964 repository
965 .create_block_range_roots(vec![
966 (
967 BlockRange::from_block_number(BlockNumber(0)),
968 MKTreeNode::from_hex("AAAA").unwrap(),
969 ),
970 (
971 BlockRange::from_block_number(BlockRange::LENGTH),
972 MKTreeNode::from_hex("BBBB").unwrap(),
973 ),
974 ])
975 .await
976 .unwrap();
977
978 let records: Vec<BlockRangeRootRecord> = repository
979 .connection_pool
980 .connection()
981 .unwrap()
982 .fetch_collect(GetBlockRangeRootQuery::all())
983 .unwrap();
984 assert_eq!(
985 vec![
986 BlockRangeRootRecord {
987 range: BlockRange::from_block_number(BlockNumber(0)),
988 merkle_root: MKTreeNode::from_hex("AAAA").unwrap(),
989 },
990 BlockRangeRootRecord {
991 range: BlockRange::from_block_number(BlockRange::LENGTH),
992 merkle_root: MKTreeNode::from_hex("BBBB").unwrap(),
993 }
994 ],
995 records
996 );
997 }
998
999 #[tokio::test]
1000 async fn repository_store_block_range_with_existing_hash_doesnt_erase_existing_data() {
1001 let connection = cardano_tx_db_connection().unwrap();
1002 let repository = CardanoTransactionRepository::new(Arc::new(
1003 SqliteConnectionPool::build_from_connection(connection),
1004 ));
1005 let range = BlockRange::from_block_number(BlockNumber(0));
1006
1007 repository
1008 .create_block_range_roots(vec![(range.clone(), MKTreeNode::from_hex("AAAA").unwrap())])
1009 .await
1010 .unwrap();
1011 repository
1012 .create_block_range_roots(vec![(range.clone(), MKTreeNode::from_hex("BBBB").unwrap())])
1013 .await
1014 .unwrap();
1015
1016 let record: Vec<BlockRangeRootRecord> = repository
1017 .connection_pool
1018 .connection()
1019 .unwrap()
1020 .fetch_collect(GetBlockRangeRootQuery::all())
1021 .unwrap();
1022 assert_eq!(
1023 vec![BlockRangeRootRecord {
1024 range,
1025 merkle_root: MKTreeNode::from_hex("AAAA").unwrap()
1026 }],
1027 record
1028 );
1029 }
1030
1031 #[tokio::test]
1032 async fn repository_retrieve_block_range_roots_up_to() {
1033 let connection = cardano_tx_db_connection().unwrap();
1034 let repository = CardanoTransactionRepository::new(Arc::new(
1035 SqliteConnectionPool::build_from_connection(connection),
1036 ));
1037 let block_range_roots = vec![
1038 (
1039 BlockRange::from_block_number(BlockNumber(15)),
1040 MKTreeNode::from_hex("AAAA").unwrap(),
1041 ),
1042 (
1043 BlockRange::from_block_number(BlockNumber(30)),
1044 MKTreeNode::from_hex("BBBB").unwrap(),
1045 ),
1046 (
1047 BlockRange::from_block_number(BlockNumber(45)),
1048 MKTreeNode::from_hex("CCCC").unwrap(),
1049 ),
1050 ];
1051 repository
1052 .create_block_range_roots(block_range_roots.clone())
1053 .await
1054 .unwrap();
1055
1056 let retrieved_block_ranges = repository
1057 .retrieve_block_range_roots_up_to(BlockNumber(45))
1058 .await
1059 .unwrap();
1060 assert_eq!(
1061 block_range_roots[0..2].to_vec(),
1062 retrieved_block_ranges.collect::<Vec<_>>()
1063 );
1064 }
1065
1066 #[tokio::test]
1067 async fn repository_retrieve_highest_block_range_roots() {
1068 let connection = cardano_tx_db_connection().unwrap();
1069 let repository = CardanoTransactionRepository::new(Arc::new(
1070 SqliteConnectionPool::build_from_connection(connection),
1071 ));
1072 let block_range_roots = vec![
1073 BlockRangeRootRecord {
1074 range: BlockRange::from_block_number(BlockNumber(15)),
1075 merkle_root: MKTreeNode::from_hex("AAAA").unwrap(),
1076 },
1077 BlockRangeRootRecord {
1078 range: BlockRange::from_block_number(BlockNumber(30)),
1079 merkle_root: MKTreeNode::from_hex("BBBB").unwrap(),
1080 },
1081 BlockRangeRootRecord {
1082 range: BlockRange::from_block_number(BlockNumber(45)),
1083 merkle_root: MKTreeNode::from_hex("CCCC").unwrap(),
1084 },
1085 ];
1086 repository
1087 .create_block_range_roots(block_range_roots.clone())
1088 .await
1089 .unwrap();
1090
1091 let retrieved_block_range = repository
1092 .retrieve_highest_block_range_root()
1093 .await
1094 .unwrap();
1095 assert_eq!(block_range_roots.last().cloned(), retrieved_block_range);
1096 }
1097
1098 #[tokio::test]
1099 async fn repository_prune_transactions() {
1100 let connection = cardano_tx_db_connection().unwrap();
1101 let repository = CardanoTransactionRepository::new(Arc::new(
1102 SqliteConnectionPool::build_from_connection(connection),
1103 ));
1104
1105 let cardano_transactions: Vec<CardanoTransactionRecord> = CardanoTransactionsBuilder::new()
1106 .blocks_per_block_range(15)
1107 .build_transactions(53)
1108 .into_iter()
1109 .map(CardanoTransactionRecord::from)
1110 .collect();
1111
1112 repository
1113 .create_transactions(cardano_transactions.clone())
1114 .await
1115 .unwrap();
1116 repository
1118 .create_block_range_roots(vec![(
1119 BlockRange::from_block_number(BlockNumber(45)),
1120 MKTreeNode::from_hex("BBBB").unwrap(),
1121 )])
1122 .await
1123 .unwrap();
1124
1125 let transaction_result = repository.get_all().await.unwrap();
1126 assert_eq!(cardano_transactions.len(), transaction_result.len());
1127
1128 repository
1131 .prune_transaction(BlockNumber(10_000_000))
1132 .await
1133 .unwrap();
1134 let transaction_result = repository.get_all_transactions().await.unwrap();
1135 assert_eq!(cardano_transactions, transaction_result);
1136
1137 repository.prune_transaction(BlockNumber(20)).await.unwrap();
1140 let transaction_result = repository
1141 .get_transactions_in_range_blocks(BlockNumber(0)..BlockNumber(25))
1142 .await
1143 .unwrap();
1144 assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
1145
1146 let transaction_result = repository
1147 .get_transactions_in_range_blocks(BlockNumber(25)..BlockNumber(1000))
1148 .await
1149 .unwrap();
1150 assert_eq!(28, transaction_result.len());
1151 }
1152
1153 #[tokio::test]
1154 async fn get_highest_start_block_number_for_block_range_roots() {
1155 let connection = cardano_tx_db_connection().unwrap();
1156 let repository = CardanoTransactionRepository::new(Arc::new(
1157 SqliteConnectionPool::build_from_connection(connection),
1158 ));
1159
1160 let highest = repository
1161 .get_highest_start_block_number_for_block_range_roots()
1162 .await
1163 .unwrap();
1164 assert_eq!(None, highest);
1165
1166 let block_range_roots = vec![
1167 (
1168 BlockRange::from_block_number(BlockNumber(15)),
1169 MKTreeNode::from_hex("AAAA").unwrap(),
1170 ),
1171 (
1172 BlockRange::from_block_number(BlockNumber(30)),
1173 MKTreeNode::from_hex("BBBB").unwrap(),
1174 ),
1175 ];
1176 repository
1177 .create_block_range_roots(block_range_roots.clone())
1178 .await
1179 .unwrap();
1180
1181 let highest = repository
1182 .get_highest_start_block_number_for_block_range_roots()
1183 .await
1184 .unwrap();
1185 assert_eq!(Some(BlockNumber(30)), highest);
1186 }
1187
1188 #[tokio::test]
1189 async fn remove_transactions_and_block_range_greater_than_given_block_number() {
1190 let connection = cardano_tx_db_connection().unwrap();
1191 let repository = CardanoTransactionRepository::new(Arc::new(
1192 SqliteConnectionPool::build_from_connection(connection),
1193 ));
1194
1195 let cardano_transactions = vec![
1196 CardanoTransaction::new(
1197 "tx-hash-123",
1198 BlockRange::LENGTH,
1199 SlotNumber(50),
1200 "block-hash-123",
1201 ),
1202 CardanoTransaction::new(
1203 "tx-hash-123",
1204 BlockRange::LENGTH * 3 - 1,
1205 SlotNumber(50),
1206 "block-hash-123",
1207 ),
1208 CardanoTransaction::new(
1209 "tx-hash-456",
1210 BlockRange::LENGTH * 3,
1211 SlotNumber(51),
1212 "block-hash-456",
1213 ),
1214 ];
1215 repository
1216 .create_transactions(cardano_transactions)
1217 .await
1218 .unwrap();
1219 repository
1220 .create_block_range_roots(vec![
1221 (
1222 BlockRange::from_block_number(BlockRange::LENGTH),
1223 MKTreeNode::from_hex("AAAA").unwrap(),
1224 ),
1225 (
1226 BlockRange::from_block_number(BlockRange::LENGTH * 2),
1227 MKTreeNode::from_hex("AAAA").unwrap(),
1228 ),
1229 (
1230 BlockRange::from_block_number(BlockRange::LENGTH * 3),
1231 MKTreeNode::from_hex("AAAA").unwrap(),
1232 ),
1233 ])
1234 .await
1235 .unwrap();
1236
1237 repository
1238 .remove_rolled_back_transactions_and_block_range_by_block_number(BlockRange::LENGTH * 3)
1239 .await
1240 .unwrap();
1241 assert_eq!(2, repository.get_all_transactions().await.unwrap().len());
1242 assert_eq!(2, repository.get_all_block_range_root().unwrap().len());
1243 }
1244
1245 #[tokio::test]
1246 async fn remove_rolled_back_transactions_and_block_range_by_slot_number() {
1247 fn transaction_record(
1248 block_number: BlockNumber,
1249 slot_number: SlotNumber,
1250 tx_hash: &str,
1251 ) -> CardanoTransactionRecord {
1252 CardanoTransactionRecord::new(
1253 tx_hash,
1254 block_number,
1255 slot_number,
1256 format!("block-hash-{}", block_number),
1257 )
1258 }
1259
1260 let repository = CardanoTransactionRepository::new(Arc::new(
1261 SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap(),
1262 ));
1263
1264 repository
1265 .create_transactions(vec![
1266 transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1267 transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1268 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1269 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1270 transaction_record(BlockNumber(101), SlotNumber(100), "tx-hash-5"),
1271 transaction_record(BlockNumber(202), SlotNumber(200), "tx-hash-56"),
1272 ])
1273 .await
1274 .unwrap();
1275
1276 {
1277 repository
1278 .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(110))
1279 .await
1280 .expect("Failed to remove rolled back transactions");
1281
1282 let transactions = repository
1283 .get_all()
1284 .await
1285 .unwrap()
1286 .into_iter()
1287 .map(|v| v.into())
1288 .collect::<Vec<_>>();
1289 assert_eq!(
1290 vec![
1291 transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1292 transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1293 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1294 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1295 transaction_record(BlockNumber(101), SlotNumber(100), "tx-hash-5"),
1296 ],
1297 transactions
1298 );
1299 }
1300
1301 {
1302 repository
1303 .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(53))
1304 .await
1305 .expect("Failed to remove rolled back transactions");
1306
1307 let transactions = repository
1308 .get_all()
1309 .await
1310 .unwrap()
1311 .into_iter()
1312 .map(|v| v.into())
1313 .collect::<Vec<_>>();
1314 assert_eq!(
1315 vec![
1316 transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1317 transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1318 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-3"),
1319 transaction_record(BlockNumber(13), SlotNumber(52), "tx-hash-4"),
1320 ],
1321 transactions
1322 );
1323 }
1324
1325 {
1326 repository
1327 .remove_rolled_back_transactions_and_block_range_by_slot_number(SlotNumber(51))
1328 .await
1329 .expect("Failed to remove rolled back transactions");
1330
1331 let transactions = repository
1332 .get_all()
1333 .await
1334 .unwrap()
1335 .into_iter()
1336 .map(|v| v.into())
1337 .collect::<Vec<_>>();
1338 assert_eq!(
1339 vec![
1340 transaction_record(BlockNumber(10), SlotNumber(50), "tx-hash-1"),
1341 transaction_record(BlockNumber(11), SlotNumber(51), "tx-hash-2"),
1342 ],
1343 transactions
1344 );
1345 }
1346 }
1347}