mithril_aggregator/dependency_injection/builder/support/
stores.rs

1use anyhow::Context;
2use slog::debug;
3use std::sync::Arc;
4use std::time::Duration;
5
6use mithril_cardano_node_internal_database::digesters::cache::ImmutableFileDigestCacheProvider;
7use mithril_persistence::database::repository::CardanoTransactionRepository;
8
9use crate::database::repository::{
10    CertificateRepository, EpochSettingsStore, ImmutableFileDigestRepository,
11    OpenMessageRepository, SignedEntityStore, SignedEntityStorer, SignerRegistrationStore,
12    SignerStore, StakePoolStore,
13};
14use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
15use crate::get_dependency;
16use crate::{
17    CExplorerSignerRetriever, EpochSettingsStorer, ImmutableFileDigestMapper,
18    ProtocolParametersRetriever, SignersImporter, VerificationKeyStorer,
19};
20
21impl DependenciesBuilder {
22    async fn build_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
23        let stake_pool_store = Arc::new(StakePoolStore::new(
24            self.get_sqlite_connection().await?,
25            self.configuration.safe_epoch_retention_limit(),
26        ));
27
28        Ok(stake_pool_store)
29    }
30
31    /// Return a [StakePoolStore]
32    pub async fn get_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
33        get_dependency!(self.stake_store)
34    }
35
36    async fn build_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
37        Ok(Arc::new(CertificateRepository::new(
38            self.get_sqlite_connection().await?,
39        )))
40    }
41
42    /// Get a configured [CertificateRepository].
43    pub async fn get_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
44        get_dependency!(self.certificate_repository)
45    }
46
47    async fn build_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
48        Ok(Arc::new(OpenMessageRepository::new(
49            self.get_sqlite_connection().await?,
50        )))
51    }
52
53    /// Get a configured [OpenMessageRepository].
54    pub async fn get_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
55        get_dependency!(self.open_message_repository)
56    }
57
58    async fn build_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
59        Ok(Arc::new(SignerRegistrationStore::new(
60            self.get_sqlite_connection().await?,
61            self.configuration.safe_epoch_retention_limit(),
62        )))
63    }
64
65    /// Get a configured [VerificationKeyStorer].
66    pub async fn get_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
67        get_dependency!(self.verification_key_store)
68    }
69
70    async fn build_protocol_parameters_retriever(
71        &mut self,
72    ) -> Result<Arc<dyn ProtocolParametersRetriever>> {
73        let protocol_parameters_retriever =
74            EpochSettingsStore::new(self.get_sqlite_connection().await?, None);
75
76        Ok(Arc::new(protocol_parameters_retriever))
77    }
78
79    /// Get a configured [ProtocolParametersRetriever].
80    pub async fn get_protocol_parameters_retriever(
81        &mut self,
82    ) -> Result<Arc<dyn ProtocolParametersRetriever>> {
83        if self.protocol_parameters_retriever.is_none() {
84            self.protocol_parameters_retriever =
85                Some(self.build_protocol_parameters_retriever().await?);
86        }
87
88        Ok(self.protocol_parameters_retriever.as_ref().cloned().unwrap())
89    }
90
91    async fn build_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
92        let logger = self.root_logger();
93        let epoch_settings_store = EpochSettingsStore::new(
94            self.get_sqlite_connection().await?,
95            self.configuration.safe_epoch_retention_limit(),
96        );
97        let current_epoch = self
98            .get_chain_observer()
99            .await?
100            .get_current_epoch()
101            .await
102            .map_err(|e| DependenciesBuilderError::Initialization {
103                message: "cannot create aggregator runner: failed to retrieve current epoch."
104                    .to_string(),
105                error: Some(e.into()),
106            })?
107            .ok_or(DependenciesBuilderError::Initialization {
108                message: "cannot build aggregator runner: no epoch returned.".to_string(),
109                error: None,
110            })?;
111        let retrieval_epoch = current_epoch
112            .offset_to_signer_retrieval_epoch()
113            .map_err(|e| DependenciesBuilderError::Initialization {
114                message: format!("cannot create aggregator runner: failed to offset current epoch '{current_epoch}' to signer retrieval epoch."),
115                error: Some(e.into()),
116            })?;
117
118        let epoch_settings_configuration = self.configuration.get_epoch_settings_configuration();
119        debug!(
120            logger,
121            "Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {retrieval_epoch}";
122            "epoch_settings_configuration" => ?epoch_settings_configuration,
123        );
124        epoch_settings_store
125            .handle_discrepancies_at_startup(retrieval_epoch, &epoch_settings_configuration)
126            .await
127            .map_err(|e| DependenciesBuilderError::Initialization {
128                message: "can not create aggregator runner".to_string(),
129                error: Some(e),
130            })?;
131
132        Ok(Arc::new(epoch_settings_store))
133    }
134
135    /// Get a configured [EpochSettingsStorer].
136    pub async fn get_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
137        get_dependency!(self.epoch_settings_store)
138    }
139
140    async fn build_immutable_cache_provider(
141        &mut self,
142    ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
143        let cache_provider =
144            ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
145        if self.configuration.reset_digests_cache() {
146            cache_provider
147                .reset()
148                .await
149                .with_context(|| "Failure occurred when resetting immutable file digest cache")?;
150        }
151
152        Ok(Arc::new(cache_provider))
153    }
154
155    /// Get an [ImmutableFileDigestCacheProvider]
156    pub async fn get_immutable_cache_provider(
157        &mut self,
158    ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
159        get_dependency!(self.immutable_cache_provider)
160    }
161
162    async fn build_transaction_repository(&mut self) -> Result<Arc<CardanoTransactionRepository>> {
163        let transaction_store = CardanoTransactionRepository::new(
164            self.get_sqlite_connection_cardano_transaction_pool().await?,
165        );
166
167        Ok(Arc::new(transaction_store))
168    }
169
170    /// Transaction repository.
171    pub async fn get_transaction_repository(
172        &mut self,
173    ) -> Result<Arc<CardanoTransactionRepository>> {
174        get_dependency!(self.transaction_repository)
175    }
176
177    async fn build_immutable_file_digest_mapper(
178        &mut self,
179    ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
180        let mapper = ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
181
182        Ok(Arc::new(mapper))
183    }
184
185    /// Immutable digest mapper.
186    pub async fn get_immutable_file_digest_mapper(
187        &mut self,
188    ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
189        get_dependency!(self.immutable_file_digest_mapper)
190    }
191
192    async fn build_signer_store(&mut self) -> Result<Arc<SignerStore>> {
193        let signer_store = Arc::new(SignerStore::new(self.get_sqlite_connection().await?));
194
195        Ok(signer_store)
196    }
197
198    /// [SignerStore] service
199    pub async fn get_signer_store(&mut self) -> Result<Arc<SignerStore>> {
200        get_dependency!(self.signer_store)
201    }
202
203    async fn build_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
204        let signed_entity_storer =
205            Arc::new(SignedEntityStore::new(self.get_sqlite_connection().await?));
206
207        Ok(signed_entity_storer)
208    }
209
210    /// [SignedEntityStorer] service
211    pub async fn get_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
212        get_dependency!(self.signed_entity_storer)
213    }
214
215    /// Create a [SignersImporter] instance.
216    pub async fn create_signer_importer(
217        &mut self,
218        cexplorer_pools_url: &str,
219    ) -> Result<SignersImporter> {
220        let retriever = CExplorerSignerRetriever::new(
221            cexplorer_pools_url,
222            Some(Duration::from_secs(30)),
223            self.root_logger(),
224        )?;
225        let persister = self.get_signer_store().await?;
226
227        Ok(SignersImporter::new(
228            Arc::new(retriever),
229            persister,
230            self.root_logger(),
231        ))
232    }
233}