mithril_signer/database/repository/
stake_pool_store.rs

1use std::ops::Not;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_trait::async_trait;
6
7use mithril_common::StdResult;
8use mithril_common::entities::{Epoch, StakeDistribution};
9use mithril_common::signable_builder::StakeDistributionRetriever;
10use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
11use mithril_persistence::store::StakeStorer;
12
13use crate::database::query::{
14    DeleteStakePoolQuery, GetStakePoolQuery, InsertOrReplaceStakePoolQuery,
15};
16use crate::database::record::StakePool;
17use crate::services::EpochPruningTask;
18
19/// Service to deal with stake pools (read & write).
20pub struct StakePoolStore {
21    connection: Arc<SqliteConnection>,
22
23    /// Number of epochs before previous records will be pruned at the next call to
24    /// [save_protocol_parameters][StakePoolStore::save_stakes].
25    retention_limit: Option<u64>,
26}
27
28impl StakePoolStore {
29    /// Create a new StakePool service
30    pub fn new(connection: Arc<SqliteConnection>, retention_limit: Option<u64>) -> Self {
31        Self {
32            connection,
33            retention_limit,
34        }
35    }
36}
37
38#[async_trait]
39impl StakeStorer for StakePoolStore {
40    async fn save_stakes(
41        &self,
42        epoch: Epoch,
43        stakes: StakeDistribution,
44    ) -> StdResult<Option<StakeDistribution>> {
45        // We should create a transaction  including delete and insert but it's not possible
46        // with the current implementation because the connection is shared.
47        self.connection
48            .apply(DeleteStakePoolQuery::by_epoch(epoch)?)
49            .with_context(|| format!("delete stakes failure, epoch: {epoch}"))?;
50
51        let pools: Vec<StakePool> = self
52            .connection
53            .fetch_collect(InsertOrReplaceStakePoolQuery::many(
54                stakes
55                    .into_iter()
56                    .map(|(pool_id, stake)| (pool_id, epoch, stake))
57                    .collect(),
58            ))
59            .with_context(|| format!("persist stakes failure, epoch: {epoch}"))?;
60
61        Ok(Some(StakeDistribution::from_iter(
62            pools.into_iter().map(|p| (p.stake_pool_id, p.stake)),
63        )))
64    }
65
66    async fn get_stakes(&self, epoch: Epoch) -> StdResult<Option<StakeDistribution>> {
67        let cursor = self
68            .connection
69            .fetch(GetStakePoolQuery::by_epoch(epoch)?)
70            .with_context(|| format!("get stakes failure, epoch: {epoch}"))?;
71        let mut stake_distribution = StakeDistribution::new();
72
73        for stake_pool in cursor {
74            stake_distribution.insert(stake_pool.stake_pool_id, stake_pool.stake);
75        }
76
77        Ok(stake_distribution.is_empty().not().then_some(stake_distribution))
78    }
79}
80
81#[async_trait]
82impl StakeDistributionRetriever for StakePoolStore {
83    async fn retrieve(&self, epoch: Epoch) -> StdResult<Option<StakeDistribution>> {
84        self.get_stakes(epoch).await
85    }
86}
87
88#[async_trait]
89impl EpochPruningTask for StakePoolStore {
90    fn pruned_data(&self) -> &'static str {
91        "Stake pool"
92    }
93
94    async fn prune(&self, epoch: Epoch) -> StdResult<()> {
95        if let Some(threshold) = self.retention_limit {
96            self.connection.apply(DeleteStakePoolQuery::below_epoch_threshold(
97                epoch - threshold,
98            ))?;
99        }
100        Ok(())
101    }
102}