1use std::ops::Range;
2
3use tokio::sync::Mutex;
4
5use mithril_common::StdResult;
6use mithril_common::crypto_helper::MKTreeNode;
7use mithril_common::entities::{
8 BlockNumber, BlockRange, CardanoBlockWithTransactions, CardanoTransaction, ChainPoint,
9 SlotNumber,
10};
11
12use crate::chain_importer::ChainDataStore;
13
14#[derive(Debug, PartialEq, Clone)]
16pub struct InMemoryBlockRangeRoot {
17 pub range: BlockRange,
19 pub merkle_root: MKTreeNode,
21}
22
23impl From<(BlockRange, MKTreeNode)> for InMemoryBlockRangeRoot {
24 fn from(value: (BlockRange, MKTreeNode)) -> Self {
25 Self {
26 range: value.0,
27 merkle_root: value.1,
28 }
29 }
30}
31
32impl From<InMemoryBlockRangeRoot> for (BlockRange, MKTreeNode) {
33 fn from(value: InMemoryBlockRangeRoot) -> Self {
34 (value.range, value.merkle_root)
35 }
36}
37
38pub struct InMemoryChainDataStore {
40 blocks_with_txs: Mutex<Vec<CardanoBlockWithTransactions>>,
41 block_range_roots: Mutex<Vec<InMemoryBlockRangeRoot>>,
42}
43
44impl Default for InMemoryChainDataStore {
45 fn default() -> Self {
46 Self::builder().build()
47 }
48}
49
50pub struct InMemoryChainDataStoreBuilder {
52 blocks_with_txs: Vec<CardanoBlockWithTransactions>,
53 block_range_roots: Vec<InMemoryBlockRangeRoot>,
54}
55
56impl InMemoryChainDataStoreBuilder {
57 pub fn with_blocks_and_transactions<T: Into<CardanoBlockWithTransactions> + Clone>(
59 mut self,
60 transactions: &[T],
61 ) -> Self {
62 self.blocks_with_txs = transactions.iter().map(|b| b.clone().into()).collect();
63 self
64 }
65
66 pub fn with_block_range_roots<T: Into<InMemoryBlockRangeRoot> + Clone>(
68 mut self,
69 block_range_roots: &[T],
70 ) -> Self {
71 self.block_range_roots = block_range_roots.iter().map(|brr| brr.clone().into()).collect();
72 self
73 }
74
75 pub fn build(self) -> InMemoryChainDataStore {
77 InMemoryChainDataStore {
78 blocks_with_txs: Mutex::new(self.blocks_with_txs),
79 block_range_roots: Mutex::new(self.block_range_roots),
80 }
81 }
82}
83
84impl InMemoryChainDataStore {
85 pub fn builder() -> InMemoryChainDataStoreBuilder {
87 InMemoryChainDataStoreBuilder {
88 blocks_with_txs: vec![],
89 block_range_roots: vec![],
90 }
91 }
92
93 pub async fn get_all_transactions(&self) -> Vec<CardanoTransaction> {
95 let blocks = self.blocks_with_txs.lock().await.clone();
96 blocks.into_iter().flat_map(|b| b.into_transactions()).collect()
97 }
98
99 pub async fn get_all_block_with_txs(&self) -> Vec<CardanoBlockWithTransactions> {
101 self.blocks_with_txs.lock().await.clone()
102 }
103
104 pub async fn get_all_block_range_root(&self) -> Vec<InMemoryBlockRangeRoot> {
106 self.block_range_roots.lock().await.clone()
107 }
108
109 pub async fn get_all_block_range(&self) -> Vec<BlockRange> {
111 self.block_range_roots
112 .lock()
113 .await
114 .iter()
115 .map(|r| r.range.clone())
116 .collect()
117 }
118}
119
120#[async_trait::async_trait]
121impl ChainDataStore for InMemoryChainDataStore {
122 async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
123 let txs = self.blocks_with_txs.lock().await;
124 Ok(txs
125 .iter()
126 .max_by_key(|tx| tx.block_number)
127 .map(|tx| ChainPoint::new(tx.slot_number, tx.block_number, tx.block_hash.clone())))
128 }
129
130 async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
131 let roots = self.block_range_roots.lock().await;
132 Ok(roots.iter().map(|record| record.range.clone()).max_by_key(|r| r.end))
133 }
134
135 async fn store_blocks_and_transactions(
136 &self,
137 blocks_and_transactions: Vec<CardanoBlockWithTransactions>,
138 ) -> StdResult<()> {
139 self.blocks_with_txs.lock().await.extend(blocks_and_transactions);
140 Ok(())
141 }
142
143 async fn get_transactions_in_range(
144 &self,
145 range: Range<BlockNumber>,
146 ) -> StdResult<Vec<CardanoTransaction>> {
147 let txs = self.blocks_with_txs.lock().await;
148 Ok(txs
149 .iter()
150 .filter(|tx| range.contains(&tx.block_number))
151 .cloned()
152 .flat_map(|tx| tx.into_transactions())
153 .collect())
154 }
155
156 async fn store_block_range_roots(
157 &self,
158 block_ranges: Vec<(BlockRange, MKTreeNode)>,
159 ) -> StdResult<()> {
160 self.block_range_roots
161 .lock()
162 .await
163 .extend(block_ranges.into_iter().map(Into::into));
164 Ok(())
165 }
166
167 async fn remove_rolled_chain_data_and_block_range(
168 &self,
169 slot_number: SlotNumber,
170 ) -> StdResult<()> {
171 self.blocks_with_txs
172 .lock()
173 .await
174 .retain(|b| b.slot_number <= slot_number);
175
176 if let Some(highest_remaining_block_number) =
177 self.blocks_with_txs.lock().await.last().map(|tx| tx.block_number)
178 {
179 self.block_range_roots
180 .lock()
181 .await
182 .retain(|record| record.range.start < highest_remaining_block_number);
183 } else {
184 self.block_range_roots.lock().await.clear();
185 }
186
187 Ok(())
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 #[tokio::test]
196 async fn default_store_is_empty() {
197 let store = InMemoryChainDataStore::default();
198
199 assert!(store.get_all_transactions().await.is_empty());
200 assert!(store.get_all_block_range_root().await.is_empty());
201 assert!(store.get_all_block_range().await.is_empty());
202 }
203
204 #[tokio::test]
205 async fn store_and_get_blocks_and_transactions() {
206 let store = InMemoryChainDataStore::default();
207
208 let block_with_tx = vec![
209 CardanoBlockWithTransactions::new(
210 "block_hash-123",
211 BlockNumber(10),
212 SlotNumber(50),
213 vec!["tx_hash-123"],
214 ),
215 CardanoBlockWithTransactions::new(
216 "block_hash-456",
217 BlockNumber(11),
218 SlotNumber(51),
219 vec!["tx_hash-456", "tx_hash-789"],
220 ),
221 ];
222
223 store
224 .store_blocks_and_transactions(block_with_tx.clone())
225 .await
226 .unwrap();
227
228 let stored_blocks_with_txs = store.get_all_block_with_txs().await;
229 assert_eq!(block_with_tx, stored_blocks_with_txs);
230
231 let stored_transactions = store.get_all_transactions().await;
232 assert_eq!(
233 vec![
234 CardanoTransaction::new(
235 "tx_hash-123",
236 BlockNumber(10),
237 SlotNumber(50),
238 "block_hash-123"
239 ),
240 CardanoTransaction::new(
241 "tx_hash-456",
242 BlockNumber(11),
243 SlotNumber(51),
244 "block_hash-456"
245 ),
246 CardanoTransaction::new(
247 "tx_hash-789",
248 BlockNumber(11),
249 SlotNumber(51),
250 "block_hash-456"
251 )
252 ],
253 stored_transactions
254 );
255 }
256
257 #[tokio::test]
258 async fn store_transactions_appends_to_existing() {
259 let existing_batch = vec![CardanoBlockWithTransactions::new(
260 "block_hash-123",
261 BlockNumber(10),
262 SlotNumber(50),
263 vec!["tx_hash-123"],
264 )];
265
266 let store = InMemoryChainDataStore::builder()
267 .with_blocks_and_transactions(&existing_batch)
268 .build();
269
270 let second_batch = vec![CardanoBlockWithTransactions::new(
271 "block_hash-456",
272 BlockNumber(11),
273 SlotNumber(51),
274 vec!["tx_hash-456"],
275 )];
276
277 store
278 .store_blocks_and_transactions(second_batch.clone())
279 .await
280 .unwrap();
281
282 let stored_blocks_with_tx = store.get_all_block_with_txs().await;
283 assert_eq!(2, stored_blocks_with_tx.len());
284 assert_eq!(existing_batch[0], stored_blocks_with_tx[0]);
285 assert_eq!(second_batch[0], stored_blocks_with_tx[1]);
286 }
287
288 #[tokio::test]
289 async fn get_highest_beacon_returns_none_when_empty() {
290 let store = InMemoryChainDataStore::default();
291
292 let highest_beacon = store.get_highest_beacon().await.unwrap();
293
294 assert_eq!(None, highest_beacon);
295 }
296
297 #[tokio::test]
298 async fn get_highest_beacon_returns_transaction_with_highest_block_number() {
299 let store = InMemoryChainDataStore::builder()
300 .with_blocks_and_transactions(&[
301 CardanoBlockWithTransactions::new(
302 "block_hash-10",
303 BlockNumber(10),
304 SlotNumber(50),
305 vec!["tx_hash-123"],
306 ),
307 CardanoBlockWithTransactions::new(
308 "block_hash-25",
309 BlockNumber(25),
310 SlotNumber(51),
311 vec!["tx_hash-456"],
312 ),
313 CardanoBlockWithTransactions::new(
314 "block_hash-15",
315 BlockNumber(15),
316 SlotNumber(52),
317 vec!["tx_hash-789"],
318 ),
319 ])
320 .build();
321
322 let highest_beacon = store.get_highest_beacon().await.unwrap();
323
324 assert_eq!(
325 Some(ChainPoint::new(
326 SlotNumber(51),
327 BlockNumber(25),
328 "block_hash-25"
329 )),
330 highest_beacon
331 );
332 }
333
334 #[tokio::test]
335 async fn get_highest_beacon_with_multiple_blocks_with_same_block_number() {
336 let store = InMemoryChainDataStore::builder()
337 .with_blocks_and_transactions(&[
338 CardanoBlockWithTransactions::new(
339 "block_hash-10",
340 BlockNumber(10),
341 SlotNumber(50),
342 vec!["tx_hash-123"],
343 ),
344 CardanoBlockWithTransactions::new(
345 "block_hash-25",
346 BlockNumber(25),
347 SlotNumber(51),
348 vec!["tx_hash-456"],
349 ),
350 CardanoBlockWithTransactions::new(
351 "block_hash-25",
352 BlockNumber(25),
353 SlotNumber(51),
354 vec!["tx_hash-789"],
355 ),
356 ])
357 .build();
358
359 let highest_beacon = store.get_highest_beacon().await.unwrap();
360
361 assert_eq!(
362 Some(ChainPoint::new(
363 SlotNumber(51),
364 BlockNumber(25),
365 "block_hash-25"
366 )),
367 highest_beacon
368 );
369 }
370
371 #[tokio::test]
372 async fn store_and_get_block_range_roots() {
373 let store = InMemoryChainDataStore::default();
374
375 let block_ranges = vec![
376 (
377 BlockRange::from_block_number(BlockNumber(0)),
378 MKTreeNode::from_hex("AAAA").unwrap(),
379 ),
380 (
381 BlockRange::from_block_number(BlockRange::LENGTH),
382 MKTreeNode::from_hex("BBBB").unwrap(),
383 ),
384 ];
385
386 store.store_block_range_roots(block_ranges.clone()).await.unwrap();
387
388 let stored_roots = store.get_all_block_range_root().await;
389 assert_eq!(
390 vec![
391 InMemoryBlockRangeRoot {
392 range: BlockRange::from_block_number(BlockNumber(0)),
393 merkle_root: MKTreeNode::from_hex("AAAA").unwrap(),
394 },
395 InMemoryBlockRangeRoot {
396 range: BlockRange::from_block_number(BlockRange::LENGTH),
397 merkle_root: MKTreeNode::from_hex("BBBB").unwrap(),
398 },
399 ],
400 stored_roots
401 );
402 }
403
404 #[tokio::test]
405 async fn store_block_range_roots_appends_to_existing() {
406 let store = InMemoryChainDataStore::builder()
407 .with_block_range_roots(&[(
408 BlockRange::from_block_number(BlockNumber(0)),
409 MKTreeNode::from_hex("AAAA").unwrap(),
410 )])
411 .build();
412
413 store
414 .store_block_range_roots(vec![(
415 BlockRange::from_block_number(BlockRange::LENGTH),
416 MKTreeNode::from_hex("BBBB").unwrap(),
417 )])
418 .await
419 .unwrap();
420
421 let stored_roots = store.get_all_block_range_root().await;
422 assert_eq!(
423 vec![
424 InMemoryBlockRangeRoot {
425 range: BlockRange::from_block_number(BlockNumber(0)),
426 merkle_root: MKTreeNode::from_hex("AAAA").unwrap()
427 },
428 InMemoryBlockRangeRoot {
429 range: BlockRange::from_block_number(BlockRange::LENGTH),
430 merkle_root: MKTreeNode::from_hex("BBBB").unwrap()
431 },
432 ],
433 stored_roots
434 );
435
436 let ranges = store.get_all_block_range().await;
437 assert_eq!(
438 vec![
439 BlockRange::from_block_number(BlockNumber(0)),
440 BlockRange::from_block_number(BlockRange::LENGTH),
441 ],
442 ranges
443 );
444 }
445
446 #[tokio::test]
447 async fn get_highest_block_range_returns_none_when_empty() {
448 let store = InMemoryChainDataStore::default();
449
450 let highest_range = store.get_highest_block_range().await.unwrap();
451
452 assert_eq!(None, highest_range);
453 }
454
455 #[tokio::test]
456 async fn get_highest_block_range_returns_range_with_highest_end() {
457 let store = InMemoryChainDataStore::builder()
458 .with_block_range_roots(&[
459 (
460 BlockRange::from_block_number(BlockNumber(0)),
461 MKTreeNode::from_hex("AAAA").unwrap(),
462 ),
463 (
464 BlockRange::from_block_number(BlockRange::LENGTH * 2),
465 MKTreeNode::from_hex("CCCC").unwrap(),
466 ),
467 (
468 BlockRange::from_block_number(BlockRange::LENGTH),
469 MKTreeNode::from_hex("BBBB").unwrap(),
470 ),
471 ])
472 .build();
473
474 let highest_range = store.get_highest_block_range().await.unwrap();
475
476 assert_eq!(
477 Some(BlockRange::from_block_number(BlockRange::LENGTH * 2)),
478 highest_range
479 );
480 }
481
482 #[tokio::test]
483 async fn get_transactions_in_range_returns_empty_when_no_transactions() {
484 let store = InMemoryChainDataStore::default();
485
486 let transactions = store
487 .get_transactions_in_range(BlockNumber(0)..BlockNumber(100))
488 .await
489 .unwrap();
490
491 assert!(transactions.is_empty());
492 }
493
494 #[tokio::test]
495 async fn get_transactions_in_range_filters_correctly() {
496 let blocks_with_tx = vec![
497 CardanoBlockWithTransactions::new(
498 "block-hash-1",
499 BlockNumber(10),
500 SlotNumber(50),
501 vec!["tx-hash-1"],
502 ),
503 CardanoBlockWithTransactions::new(
504 "block-hash-2",
505 BlockNumber(11),
506 SlotNumber(51),
507 vec!["tx-hash-2"],
508 ),
509 CardanoBlockWithTransactions::new(
510 "block-hash-3",
511 BlockNumber(12),
512 SlotNumber(52),
513 vec!["tx-hash-3"],
514 ),
515 ];
516 let store = InMemoryChainDataStore::builder()
517 .with_blocks_and_transactions(&blocks_with_tx)
518 .build();
519 let transactions: Vec<CardanoTransaction> = blocks_with_tx
520 .into_iter()
521 .flat_map(|tx| tx.into_transactions())
522 .collect();
523
524 {
526 let result = store
527 .get_transactions_in_range(BlockNumber(0)..BlockNumber(10))
528 .await
529 .unwrap();
530 assert!(result.is_empty());
531 }
532
533 {
535 let result = store
536 .get_transactions_in_range(BlockNumber(13)..BlockNumber(21))
537 .await
538 .unwrap();
539 assert!(result.is_empty());
540 }
541
542 {
544 let result = store
545 .get_transactions_in_range(BlockNumber(9)..BlockNumber(12))
546 .await
547 .unwrap();
548 assert_eq!(transactions[0..=1].to_vec(), result);
549 }
550
551 {
553 let result = store
554 .get_transactions_in_range(BlockNumber(10)..BlockNumber(13))
555 .await
556 .unwrap();
557 assert_eq!(transactions, result);
558 }
559
560 {
562 let result = store
563 .get_transactions_in_range(BlockNumber(11)..BlockNumber(14))
564 .await
565 .unwrap();
566 assert_eq!(transactions[1..=2].to_vec(), result);
567 }
568 }
569
570 #[tokio::test]
571 async fn remove_rolled_back_chain_data_and_block_range_removes_transactions_above_slot_number()
572 {
573 let blocks_with_tx = vec![
574 CardanoBlockWithTransactions::new(
575 "block-hash-1",
576 BlockNumber(10),
577 SlotNumber(50),
578 vec!["tx-hash-1"],
579 ),
580 CardanoBlockWithTransactions::new(
581 "block-hash-2",
582 BlockNumber(11),
583 SlotNumber(51),
584 vec!["tx-hash-2"],
585 ),
586 CardanoBlockWithTransactions::new(
587 "block-hash-3",
588 BlockNumber(12),
589 SlotNumber(52),
590 vec!["tx-hash-3"],
591 ),
592 ];
593 let store = InMemoryChainDataStore::builder()
594 .with_blocks_and_transactions(&blocks_with_tx)
595 .build();
596
597 store
598 .remove_rolled_chain_data_and_block_range(SlotNumber(51))
599 .await
600 .unwrap();
601
602 let remaining = store.get_all_block_with_txs().await;
603 assert_eq!(blocks_with_tx[0..=1].to_vec(), remaining);
604 }
605
606 #[tokio::test]
607 async fn remove_rolled_back_chain_data_and_block_range_removes_block_ranges_above_highest_remaining_block()
608 {
609 let blocks_with_tx = vec![
610 CardanoBlockWithTransactions::new(
611 "block-hash-1",
612 BlockNumber(10),
613 SlotNumber(50),
614 vec!["tx-hash-1"],
615 ),
616 CardanoBlockWithTransactions::new(
617 "block-hash-2",
618 BlockRange::LENGTH * 2,
619 SlotNumber(100),
620 vec!["tx-hash-2"],
621 ),
622 CardanoBlockWithTransactions::new(
623 "block-hash-3",
624 BlockRange::LENGTH * 4,
625 SlotNumber(200),
626 vec!["tx-hash-3"],
627 ),
628 ];
629 let block_ranges_roots = vec![
630 (
631 BlockRange::from_block_number(BlockNumber(0)),
632 MKTreeNode::from_hex("AAAA").unwrap(),
633 ),
634 (
635 BlockRange::from_block_number(BlockRange::LENGTH),
636 MKTreeNode::from_hex("BBBB").unwrap(),
637 ),
638 (
639 BlockRange::from_block_number(BlockRange::LENGTH * 2),
640 MKTreeNode::from_hex("CCCC").unwrap(),
641 ),
642 (
643 BlockRange::from_block_number(BlockRange::LENGTH * 3),
644 MKTreeNode::from_hex("DDDD").unwrap(),
645 ),
646 ];
647 let store = InMemoryChainDataStore::builder()
648 .with_blocks_and_transactions(&blocks_with_tx)
649 .with_block_range_roots(&block_ranges_roots)
650 .build();
651
652 store
654 .remove_rolled_chain_data_and_block_range(SlotNumber(100))
655 .await
656 .unwrap();
657
658 let remaining_transactions = store.get_all_transactions().await;
659 assert_eq!(2, remaining_transactions.len());
660
661 let remaining_ranges = store.get_all_block_range().await;
663 assert_eq!(
664 vec![
665 BlockRange::from_block_number(BlockNumber(0)),
666 BlockRange::from_block_number(BlockRange::LENGTH),
667 ],
668 remaining_ranges
669 );
670 }
671
672 #[tokio::test]
673 async fn remove_rolled_back_chain_data_and_block_range_with_no_remaining_transactions() {
674 let store = InMemoryChainDataStore::builder()
675 .with_blocks_and_transactions(&[CardanoBlockWithTransactions::new(
676 "block-hash-1",
677 BlockNumber(10),
678 SlotNumber(50),
679 vec!["tx-hash-1"],
680 )])
681 .with_block_range_roots(&[(
682 BlockRange::from_block_number(BlockNumber(0)),
683 MKTreeNode::from_hex("AAAA").unwrap(),
684 )])
685 .build();
686
687 store
689 .remove_rolled_chain_data_and_block_range(SlotNumber(40))
690 .await
691 .unwrap();
692
693 assert!(store.get_all_block_with_txs().await.is_empty());
694 assert!(store.get_all_transactions().await.is_empty());
695 assert!(store.get_all_block_range_root().await.is_empty());
696 }
697
698 #[tokio::test]
699 async fn remove_rolled_back_chain_data_and_block_range_keeps_transactions_with_equal_slot_number()
700 {
701 let blocks_with_tx = vec![
702 CardanoBlockWithTransactions::new(
703 "block-hash-1",
704 BlockNumber(10),
705 SlotNumber(50),
706 vec!["tx-hash-1"],
707 ),
708 CardanoBlockWithTransactions::new(
709 "block-hash-2",
710 BlockNumber(11),
711 SlotNumber(51),
712 vec!["tx-hash-2"],
713 ),
714 CardanoBlockWithTransactions::new(
715 "block-hash-3",
716 BlockNumber(12),
717 SlotNumber(52),
718 vec!["tx-hash-3"],
719 ),
720 ];
721 let store = InMemoryChainDataStore::builder()
722 .with_blocks_and_transactions(&blocks_with_tx)
723 .build();
724
725 store
726 .remove_rolled_chain_data_and_block_range(SlotNumber(50))
727 .await
728 .unwrap();
729
730 let remaining = store.get_all_block_with_txs().await;
731 assert_eq!(blocks_with_tx[0..1].to_vec(), remaining);
732 }
733}