mithril_aggregator/dependency_injection/builder/support/
stores.rs

1use anyhow::Context;
2use slog::debug;
3use std::sync::Arc;
4use std::time::Duration;
5
6use mithril_common::digesters::cache::ImmutableFileDigestCacheProvider;
7use mithril_persistence::database::repository::CardanoTransactionRepository;
8
9use crate::database::repository::{
10    CertificateRepository, EpochSettingsStore, ImmutableFileDigestRepository,
11    OpenMessageRepository, SignedEntityStore, SignedEntityStorer, SignerRegistrationStore,
12    SignerStore, StakePoolStore,
13};
14use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
15use crate::{
16    CExplorerSignerRetriever, EpochSettingsStorer, ImmutableFileDigestMapper, SignersImporter,
17    VerificationKeyStorer,
18};
19
20impl DependenciesBuilder {
21    async fn build_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
22        let stake_pool_store = Arc::new(StakePoolStore::new(
23            self.get_sqlite_connection().await?,
24            self.configuration.safe_epoch_retention_limit(),
25        ));
26
27        Ok(stake_pool_store)
28    }
29
30    /// Return a [StakePoolStore]
31    pub async fn get_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
32        if self.stake_store.is_none() {
33            self.stake_store = Some(self.build_stake_store().await?);
34        }
35
36        Ok(self.stake_store.as_ref().cloned().unwrap())
37    }
38
39    async fn build_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
40        Ok(Arc::new(CertificateRepository::new(
41            self.get_sqlite_connection().await?,
42        )))
43    }
44
45    /// Get a configured [CertificateRepository].
46    pub async fn get_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
47        if self.certificate_repository.is_none() {
48            self.certificate_repository = Some(self.build_certificate_repository().await?);
49        }
50
51        Ok(self.certificate_repository.as_ref().cloned().unwrap())
52    }
53
54    async fn build_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
55        Ok(Arc::new(OpenMessageRepository::new(
56            self.get_sqlite_connection().await?,
57        )))
58    }
59
60    /// Get a configured [OpenMessageRepository].
61    pub async fn get_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
62        if self.open_message_repository.is_none() {
63            self.open_message_repository = Some(self.build_open_message_repository().await?);
64        }
65
66        Ok(self.open_message_repository.as_ref().cloned().unwrap())
67    }
68
69    async fn build_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
70        Ok(Arc::new(SignerRegistrationStore::new(
71            self.get_sqlite_connection().await?,
72        )))
73    }
74
75    /// Get a configured [VerificationKeyStorer].
76    pub async fn get_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
77        if self.verification_key_store.is_none() {
78            self.verification_key_store = Some(self.build_verification_key_store().await?);
79        }
80
81        Ok(self.verification_key_store.as_ref().cloned().unwrap())
82    }
83
84    async fn build_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
85        let logger = self.root_logger();
86        let epoch_settings_store = EpochSettingsStore::new(
87            self.get_sqlite_connection().await?,
88            self.configuration.safe_epoch_retention_limit(),
89        );
90        let current_epoch = self
91            .get_chain_observer()
92            .await?
93            .get_current_epoch()
94            .await
95            .map_err(|e| DependenciesBuilderError::Initialization {
96                message: "cannot create aggregator runner: failed to retrieve current epoch."
97                    .to_string(),
98                error: Some(e.into()),
99            })?
100            .ok_or(DependenciesBuilderError::Initialization {
101                message: "cannot build aggregator runner: no epoch returned.".to_string(),
102                error: None,
103            })?;
104        let retrieval_epoch = current_epoch
105            .offset_to_signer_retrieval_epoch()
106            .map_err(|e| DependenciesBuilderError::Initialization {
107                message: format!("cannot create aggregator runner: failed to offset current epoch '{current_epoch}' to signer retrieval epoch."),
108                error: Some(e.into()),
109            })?;
110
111        let epoch_settings_configuration = self.configuration.get_epoch_settings_configuration();
112        debug!(
113            logger,
114            "Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {retrieval_epoch}";
115            "epoch_settings_configuration" => ?epoch_settings_configuration,
116        );
117        epoch_settings_store
118            .handle_discrepancies_at_startup(retrieval_epoch, &epoch_settings_configuration)
119            .await
120            .map_err(|e| DependenciesBuilderError::Initialization {
121                message: "can not create aggregator runner".to_string(),
122                error: Some(e),
123            })?;
124
125        Ok(Arc::new(epoch_settings_store))
126    }
127
128    /// Get a configured [EpochSettingsStorer].
129    pub async fn get_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
130        if self.epoch_settings_store.is_none() {
131            self.epoch_settings_store = Some(self.build_epoch_settings_store().await?);
132        }
133
134        Ok(self.epoch_settings_store.as_ref().cloned().unwrap())
135    }
136
137    async fn build_immutable_cache_provider(
138        &mut self,
139    ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
140        let cache_provider =
141            ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
142        if self.configuration.reset_digests_cache {
143            cache_provider
144                .reset()
145                .await
146                .with_context(|| "Failure occurred when resetting immutable file digest cache")?;
147        }
148
149        Ok(Arc::new(cache_provider))
150    }
151
152    /// Get an [ImmutableFileDigestCacheProvider]
153    pub async fn get_immutable_cache_provider(
154        &mut self,
155    ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
156        if self.immutable_cache_provider.is_none() {
157            self.immutable_cache_provider = Some(self.build_immutable_cache_provider().await?);
158        }
159
160        Ok(self.immutable_cache_provider.as_ref().cloned().unwrap())
161    }
162
163    async fn build_transaction_repository(&mut self) -> Result<Arc<CardanoTransactionRepository>> {
164        let transaction_store = CardanoTransactionRepository::new(
165            self.get_sqlite_connection_cardano_transaction_pool()
166                .await?,
167        );
168
169        Ok(Arc::new(transaction_store))
170    }
171
172    /// Transaction repository.
173    pub async fn get_transaction_repository(
174        &mut self,
175    ) -> Result<Arc<CardanoTransactionRepository>> {
176        if self.transaction_repository.is_none() {
177            self.transaction_repository = Some(self.build_transaction_repository().await?);
178        }
179
180        Ok(self.transaction_repository.as_ref().cloned().unwrap())
181    }
182
183    async fn build_immutable_file_digest_mapper(
184        &mut self,
185    ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
186        let mapper = ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
187
188        Ok(Arc::new(mapper))
189    }
190
191    /// Immutable digest mapper.
192    pub async fn get_immutable_file_digest_mapper(
193        &mut self,
194    ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
195        if self.immutable_file_digest_mapper.is_none() {
196            self.immutable_file_digest_mapper =
197                Some(self.build_immutable_file_digest_mapper().await?);
198        }
199
200        Ok(self.immutable_file_digest_mapper.as_ref().cloned().unwrap())
201    }
202
203    async fn build_signer_store(&mut self) -> Result<Arc<SignerStore>> {
204        let signer_store = Arc::new(SignerStore::new(self.get_sqlite_connection().await?));
205
206        Ok(signer_store)
207    }
208
209    /// [SignerStore] service
210    pub async fn get_signer_store(&mut self) -> Result<Arc<SignerStore>> {
211        match self.signer_store.as_ref().cloned() {
212            None => {
213                let store = self.build_signer_store().await?;
214                self.signer_store = Some(store.clone());
215                Ok(store)
216            }
217            Some(store) => Ok(store),
218        }
219    }
220
221    async fn build_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
222        let signed_entity_storer =
223            Arc::new(SignedEntityStore::new(self.get_sqlite_connection().await?));
224
225        Ok(signed_entity_storer)
226    }
227
228    /// [SignedEntityStorer] service
229    pub async fn get_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
230        if self.signed_entity_storer.is_none() {
231            self.signed_entity_storer = Some(self.build_signed_entity_storer().await?);
232        }
233
234        Ok(self.signed_entity_storer.as_ref().cloned().unwrap())
235    }
236
237    /// Create a [SignersImporter] instance.
238    pub async fn create_signer_importer(
239        &mut self,
240        cexplorer_pools_url: &str,
241    ) -> Result<SignersImporter> {
242        let retriever = CExplorerSignerRetriever::new(
243            cexplorer_pools_url,
244            Some(Duration::from_secs(30)),
245            self.root_logger(),
246        )?;
247        let persister = self.get_signer_store().await?;
248
249        Ok(SignersImporter::new(
250            Arc::new(retriever),
251            persister,
252            self.root_logger(),
253        ))
254    }
255}