mithril_signer/database/repository/
stake_pool_store.rs

1use std::ops::Not;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_trait::async_trait;
6
7use mithril_common::entities::{Epoch, StakeDistribution};
8use mithril_common::signable_builder::StakeDistributionRetriever;
9use mithril_common::StdResult;
10use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
11use mithril_persistence::store::StakeStorer;
12
13use crate::database::query::{
14    DeleteStakePoolQuery, GetStakePoolQuery, InsertOrReplaceStakePoolQuery,
15};
16use crate::database::record::StakePool;
17use crate::services::EpochPruningTask;
18
19/// Service to deal with stake pools (read & write).
20pub struct StakePoolStore {
21    connection: Arc<SqliteConnection>,
22
23    /// Number of epochs before previous records will be pruned at the next call to
24    /// [save_protocol_parameters][StakePoolStore::save_stakes].
25    retention_limit: Option<u64>,
26}
27
28impl StakePoolStore {
29    /// Create a new StakePool service
30    pub fn new(connection: Arc<SqliteConnection>, retention_limit: Option<u64>) -> Self {
31        Self {
32            connection,
33            retention_limit,
34        }
35    }
36}
37
38#[async_trait]
39impl StakeStorer for StakePoolStore {
40    async fn save_stakes(
41        &self,
42        epoch: Epoch,
43        stakes: StakeDistribution,
44    ) -> StdResult<Option<StakeDistribution>> {
45        // We should create a transaction  including delete and insert but it's not possible
46        // with the current implementation because the connection is shared.
47        self.connection
48            .apply(DeleteStakePoolQuery::by_epoch(epoch)?)
49            .with_context(|| format!("delete stakes failure, epoch: {epoch}"))?;
50
51        let pools: Vec<StakePool> = self
52            .connection
53            .fetch_collect(InsertOrReplaceStakePoolQuery::many(
54                stakes
55                    .into_iter()
56                    .map(|(pool_id, stake)| (pool_id, epoch, stake))
57                    .collect(),
58            ))
59            .with_context(|| format!("persist stakes failure, epoch: {epoch}"))?;
60
61        Ok(Some(StakeDistribution::from_iter(
62            pools.into_iter().map(|p| (p.stake_pool_id, p.stake)),
63        )))
64    }
65
66    async fn get_stakes(&self, epoch: Epoch) -> StdResult<Option<StakeDistribution>> {
67        let cursor = self
68            .connection
69            .fetch(GetStakePoolQuery::by_epoch(epoch)?)
70            .with_context(|| format!("get stakes failure, epoch: {epoch}"))?;
71        let mut stake_distribution = StakeDistribution::new();
72
73        for stake_pool in cursor {
74            stake_distribution.insert(stake_pool.stake_pool_id, stake_pool.stake);
75        }
76
77        Ok(stake_distribution
78            .is_empty()
79            .not()
80            .then_some(stake_distribution))
81    }
82}
83
84#[async_trait]
85impl StakeDistributionRetriever for StakePoolStore {
86    async fn retrieve(&self, epoch: Epoch) -> StdResult<Option<StakeDistribution>> {
87        self.get_stakes(epoch).await
88    }
89}
90
91#[async_trait]
92impl EpochPruningTask for StakePoolStore {
93    fn pruned_data(&self) -> &'static str {
94        "Stake pool"
95    }
96
97    async fn prune(&self, epoch: Epoch) -> StdResult<()> {
98        if let Some(threshold) = self.retention_limit {
99            self.connection
100                .apply(DeleteStakePoolQuery::below_epoch_threshold(
101                    epoch - threshold,
102                ))?;
103        }
104        Ok(())
105    }
106}