mithril_signer/database/repository/
stake_pool_store.rs1use std::ops::Not;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_trait::async_trait;
6
7use mithril_common::entities::{Epoch, StakeDistribution};
8use mithril_common::signable_builder::StakeDistributionRetriever;
9use mithril_common::StdResult;
10use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
11use mithril_persistence::store::StakeStorer;
12
13use crate::database::query::{
14 DeleteStakePoolQuery, GetStakePoolQuery, InsertOrReplaceStakePoolQuery,
15};
16use crate::database::record::StakePool;
17use crate::services::EpochPruningTask;
18
19pub struct StakePoolStore {
21 connection: Arc<SqliteConnection>,
22
23 retention_limit: Option<u64>,
26}
27
28impl StakePoolStore {
29 pub fn new(connection: Arc<SqliteConnection>, retention_limit: Option<u64>) -> Self {
31 Self {
32 connection,
33 retention_limit,
34 }
35 }
36}
37
38#[async_trait]
39impl StakeStorer for StakePoolStore {
40 async fn save_stakes(
41 &self,
42 epoch: Epoch,
43 stakes: StakeDistribution,
44 ) -> StdResult<Option<StakeDistribution>> {
45 self.connection
48 .apply(DeleteStakePoolQuery::by_epoch(epoch)?)
49 .with_context(|| format!("delete stakes failure, epoch: {epoch}"))?;
50
51 let pools: Vec<StakePool> = self
52 .connection
53 .fetch_collect(InsertOrReplaceStakePoolQuery::many(
54 stakes
55 .into_iter()
56 .map(|(pool_id, stake)| (pool_id, epoch, stake))
57 .collect(),
58 ))
59 .with_context(|| format!("persist stakes failure, epoch: {epoch}"))?;
60
61 Ok(Some(StakeDistribution::from_iter(
62 pools.into_iter().map(|p| (p.stake_pool_id, p.stake)),
63 )))
64 }
65
66 async fn get_stakes(&self, epoch: Epoch) -> StdResult<Option<StakeDistribution>> {
67 let cursor = self
68 .connection
69 .fetch(GetStakePoolQuery::by_epoch(epoch)?)
70 .with_context(|| format!("get stakes failure, epoch: {epoch}"))?;
71 let mut stake_distribution = StakeDistribution::new();
72
73 for stake_pool in cursor {
74 stake_distribution.insert(stake_pool.stake_pool_id, stake_pool.stake);
75 }
76
77 Ok(stake_distribution
78 .is_empty()
79 .not()
80 .then_some(stake_distribution))
81 }
82}
83
84#[async_trait]
85impl StakeDistributionRetriever for StakePoolStore {
86 async fn retrieve(&self, epoch: Epoch) -> StdResult<Option<StakeDistribution>> {
87 self.get_stakes(epoch).await
88 }
89}
90
91#[async_trait]
92impl EpochPruningTask for StakePoolStore {
93 fn pruned_data(&self) -> &'static str {
94 "Stake pool"
95 }
96
97 async fn prune(&self, epoch: Epoch) -> StdResult<()> {
98 if let Some(threshold) = self.retention_limit {
99 self.connection
100 .apply(DeleteStakePoolQuery::below_epoch_threshold(
101 epoch - threshold,
102 ))?;
103 }
104 Ok(())
105 }
106}