mithril_aggregator/dependency_injection/builder/support/
stores.rs

1use anyhow::Context;
2use slog::debug;
3use std::sync::Arc;
4use std::time::Duration;
5
6use mithril_common::digesters::cache::ImmutableFileDigestCacheProvider;
7use mithril_persistence::database::repository::CardanoTransactionRepository;
8
9use crate::database::repository::{
10    CertificateRepository, EpochSettingsStore, ImmutableFileDigestRepository,
11    OpenMessageRepository, SignedEntityStore, SignedEntityStorer, SignerRegistrationStore,
12    SignerStore, StakePoolStore,
13};
14use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
15use crate::get_dependency;
16use crate::{
17    CExplorerSignerRetriever, EpochSettingsStorer, ImmutableFileDigestMapper,
18    ProtocolParametersRetriever, SignersImporter, VerificationKeyStorer,
19};
20
21impl DependenciesBuilder {
22    async fn build_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
23        let stake_pool_store = Arc::new(StakePoolStore::new(
24            self.get_sqlite_connection().await?,
25            self.configuration.safe_epoch_retention_limit(),
26        ));
27
28        Ok(stake_pool_store)
29    }
30
31    /// Return a [StakePoolStore]
32    pub async fn get_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
33        get_dependency!(self.stake_store)
34    }
35
36    async fn build_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
37        Ok(Arc::new(CertificateRepository::new(
38            self.get_sqlite_connection().await?,
39        )))
40    }
41
42    /// Get a configured [CertificateRepository].
43    pub async fn get_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
44        get_dependency!(self.certificate_repository)
45    }
46
47    async fn build_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
48        Ok(Arc::new(OpenMessageRepository::new(
49            self.get_sqlite_connection().await?,
50        )))
51    }
52
53    /// Get a configured [OpenMessageRepository].
54    pub async fn get_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
55        get_dependency!(self.open_message_repository)
56    }
57
58    async fn build_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
59        Ok(Arc::new(SignerRegistrationStore::new(
60            self.get_sqlite_connection().await?,
61            self.configuration.safe_epoch_retention_limit(),
62        )))
63    }
64
65    /// Get a configured [VerificationKeyStorer].
66    pub async fn get_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
67        get_dependency!(self.verification_key_store)
68    }
69
70    async fn build_protocol_parameters_retriever(
71        &mut self,
72    ) -> Result<Arc<dyn ProtocolParametersRetriever>> {
73        let protocol_parameters_retriever =
74            EpochSettingsStore::new(self.get_sqlite_connection().await?, None);
75
76        Ok(Arc::new(protocol_parameters_retriever))
77    }
78
79    /// Get a configured [ProtocolParametersRetriever].
80    pub async fn get_protocol_parameters_retriever(
81        &mut self,
82    ) -> Result<Arc<dyn ProtocolParametersRetriever>> {
83        if self.protocol_parameters_retriever.is_none() {
84            self.protocol_parameters_retriever =
85                Some(self.build_protocol_parameters_retriever().await?);
86        }
87
88        Ok(self
89            .protocol_parameters_retriever
90            .as_ref()
91            .cloned()
92            .unwrap())
93    }
94
95    async fn build_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
96        let logger = self.root_logger();
97        let epoch_settings_store = EpochSettingsStore::new(
98            self.get_sqlite_connection().await?,
99            self.configuration.safe_epoch_retention_limit(),
100        );
101        let current_epoch = self
102            .get_chain_observer()
103            .await?
104            .get_current_epoch()
105            .await
106            .map_err(|e| DependenciesBuilderError::Initialization {
107                message: "cannot create aggregator runner: failed to retrieve current epoch."
108                    .to_string(),
109                error: Some(e.into()),
110            })?
111            .ok_or(DependenciesBuilderError::Initialization {
112                message: "cannot build aggregator runner: no epoch returned.".to_string(),
113                error: None,
114            })?;
115        let retrieval_epoch = current_epoch
116            .offset_to_signer_retrieval_epoch()
117            .map_err(|e| DependenciesBuilderError::Initialization {
118                message: format!("cannot create aggregator runner: failed to offset current epoch '{current_epoch}' to signer retrieval epoch."),
119                error: Some(e.into()),
120            })?;
121
122        let epoch_settings_configuration = self.configuration.get_epoch_settings_configuration();
123        debug!(
124            logger,
125            "Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {retrieval_epoch}";
126            "epoch_settings_configuration" => ?epoch_settings_configuration,
127        );
128        epoch_settings_store
129            .handle_discrepancies_at_startup(retrieval_epoch, &epoch_settings_configuration)
130            .await
131            .map_err(|e| DependenciesBuilderError::Initialization {
132                message: "can not create aggregator runner".to_string(),
133                error: Some(e),
134            })?;
135
136        Ok(Arc::new(epoch_settings_store))
137    }
138
139    /// Get a configured [EpochSettingsStorer].
140    pub async fn get_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
141        get_dependency!(self.epoch_settings_store)
142    }
143
144    async fn build_immutable_cache_provider(
145        &mut self,
146    ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
147        let cache_provider =
148            ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
149        if self.configuration.reset_digests_cache() {
150            cache_provider
151                .reset()
152                .await
153                .with_context(|| "Failure occurred when resetting immutable file digest cache")?;
154        }
155
156        Ok(Arc::new(cache_provider))
157    }
158
159    /// Get an [ImmutableFileDigestCacheProvider]
160    pub async fn get_immutable_cache_provider(
161        &mut self,
162    ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
163        get_dependency!(self.immutable_cache_provider)
164    }
165
166    async fn build_transaction_repository(&mut self) -> Result<Arc<CardanoTransactionRepository>> {
167        let transaction_store = CardanoTransactionRepository::new(
168            self.get_sqlite_connection_cardano_transaction_pool()
169                .await?,
170        );
171
172        Ok(Arc::new(transaction_store))
173    }
174
175    /// Transaction repository.
176    pub async fn get_transaction_repository(
177        &mut self,
178    ) -> Result<Arc<CardanoTransactionRepository>> {
179        get_dependency!(self.transaction_repository)
180    }
181
182    async fn build_immutable_file_digest_mapper(
183        &mut self,
184    ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
185        let mapper = ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
186
187        Ok(Arc::new(mapper))
188    }
189
190    /// Immutable digest mapper.
191    pub async fn get_immutable_file_digest_mapper(
192        &mut self,
193    ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
194        get_dependency!(self.immutable_file_digest_mapper)
195    }
196
197    async fn build_signer_store(&mut self) -> Result<Arc<SignerStore>> {
198        let signer_store = Arc::new(SignerStore::new(self.get_sqlite_connection().await?));
199
200        Ok(signer_store)
201    }
202
203    /// [SignerStore] service
204    pub async fn get_signer_store(&mut self) -> Result<Arc<SignerStore>> {
205        get_dependency!(self.signer_store)
206    }
207
208    async fn build_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
209        let signed_entity_storer =
210            Arc::new(SignedEntityStore::new(self.get_sqlite_connection().await?));
211
212        Ok(signed_entity_storer)
213    }
214
215    /// [SignedEntityStorer] service
216    pub async fn get_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
217        get_dependency!(self.signed_entity_storer)
218    }
219
220    /// Create a [SignersImporter] instance.
221    pub async fn create_signer_importer(
222        &mut self,
223        cexplorer_pools_url: &str,
224    ) -> Result<SignersImporter> {
225        let retriever = CExplorerSignerRetriever::new(
226            cexplorer_pools_url,
227            Some(Duration::from_secs(30)),
228            self.root_logger(),
229        )?;
230        let persister = self.get_signer_store().await?;
231
232        Ok(SignersImporter::new(
233            Arc::new(retriever),
234            persister,
235            self.root_logger(),
236        ))
237    }
238}