mithril_aggregator/dependency_injection/builder/support/
stores.rs1use anyhow::Context;
2use slog::debug;
3use std::sync::Arc;
4use std::time::Duration;
5
6use mithril_common::digesters::cache::ImmutableFileDigestCacheProvider;
7use mithril_persistence::database::repository::CardanoTransactionRepository;
8
9use crate::database::repository::{
10 CertificateRepository, EpochSettingsStore, ImmutableFileDigestRepository,
11 OpenMessageRepository, SignedEntityStore, SignedEntityStorer, SignerRegistrationStore,
12 SignerStore, StakePoolStore,
13};
14use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
15use crate::get_dependency;
16use crate::{
17 CExplorerSignerRetriever, EpochSettingsStorer, ImmutableFileDigestMapper,
18 ProtocolParametersRetriever, SignersImporter, VerificationKeyStorer,
19};
20
21impl DependenciesBuilder {
22 async fn build_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
23 let stake_pool_store = Arc::new(StakePoolStore::new(
24 self.get_sqlite_connection().await?,
25 self.configuration.safe_epoch_retention_limit(),
26 ));
27
28 Ok(stake_pool_store)
29 }
30
31 pub async fn get_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
33 get_dependency!(self.stake_store)
34 }
35
36 async fn build_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
37 Ok(Arc::new(CertificateRepository::new(
38 self.get_sqlite_connection().await?,
39 )))
40 }
41
42 pub async fn get_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
44 get_dependency!(self.certificate_repository)
45 }
46
47 async fn build_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
48 Ok(Arc::new(OpenMessageRepository::new(
49 self.get_sqlite_connection().await?,
50 )))
51 }
52
53 pub async fn get_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
55 get_dependency!(self.open_message_repository)
56 }
57
58 async fn build_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
59 Ok(Arc::new(SignerRegistrationStore::new(
60 self.get_sqlite_connection().await?,
61 self.configuration.safe_epoch_retention_limit(),
62 )))
63 }
64
65 pub async fn get_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
67 get_dependency!(self.verification_key_store)
68 }
69
70 async fn build_protocol_parameters_retriever(
71 &mut self,
72 ) -> Result<Arc<dyn ProtocolParametersRetriever>> {
73 let protocol_parameters_retriever =
74 EpochSettingsStore::new(self.get_sqlite_connection().await?, None);
75
76 Ok(Arc::new(protocol_parameters_retriever))
77 }
78
79 pub async fn get_protocol_parameters_retriever(
81 &mut self,
82 ) -> Result<Arc<dyn ProtocolParametersRetriever>> {
83 if self.protocol_parameters_retriever.is_none() {
84 self.protocol_parameters_retriever =
85 Some(self.build_protocol_parameters_retriever().await?);
86 }
87
88 Ok(self
89 .protocol_parameters_retriever
90 .as_ref()
91 .cloned()
92 .unwrap())
93 }
94
95 async fn build_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
96 let logger = self.root_logger();
97 let epoch_settings_store = EpochSettingsStore::new(
98 self.get_sqlite_connection().await?,
99 self.configuration.safe_epoch_retention_limit(),
100 );
101 let current_epoch = self
102 .get_chain_observer()
103 .await?
104 .get_current_epoch()
105 .await
106 .map_err(|e| DependenciesBuilderError::Initialization {
107 message: "cannot create aggregator runner: failed to retrieve current epoch."
108 .to_string(),
109 error: Some(e.into()),
110 })?
111 .ok_or(DependenciesBuilderError::Initialization {
112 message: "cannot build aggregator runner: no epoch returned.".to_string(),
113 error: None,
114 })?;
115 let retrieval_epoch = current_epoch
116 .offset_to_signer_retrieval_epoch()
117 .map_err(|e| DependenciesBuilderError::Initialization {
118 message: format!("cannot create aggregator runner: failed to offset current epoch '{current_epoch}' to signer retrieval epoch."),
119 error: Some(e.into()),
120 })?;
121
122 let epoch_settings_configuration = self.configuration.get_epoch_settings_configuration();
123 debug!(
124 logger,
125 "Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {retrieval_epoch}";
126 "epoch_settings_configuration" => ?epoch_settings_configuration,
127 );
128 epoch_settings_store
129 .handle_discrepancies_at_startup(retrieval_epoch, &epoch_settings_configuration)
130 .await
131 .map_err(|e| DependenciesBuilderError::Initialization {
132 message: "can not create aggregator runner".to_string(),
133 error: Some(e),
134 })?;
135
136 Ok(Arc::new(epoch_settings_store))
137 }
138
139 pub async fn get_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
141 get_dependency!(self.epoch_settings_store)
142 }
143
144 async fn build_immutable_cache_provider(
145 &mut self,
146 ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
147 let cache_provider =
148 ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
149 if self.configuration.reset_digests_cache() {
150 cache_provider
151 .reset()
152 .await
153 .with_context(|| "Failure occurred when resetting immutable file digest cache")?;
154 }
155
156 Ok(Arc::new(cache_provider))
157 }
158
159 pub async fn get_immutable_cache_provider(
161 &mut self,
162 ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
163 get_dependency!(self.immutable_cache_provider)
164 }
165
166 async fn build_transaction_repository(&mut self) -> Result<Arc<CardanoTransactionRepository>> {
167 let transaction_store = CardanoTransactionRepository::new(
168 self.get_sqlite_connection_cardano_transaction_pool()
169 .await?,
170 );
171
172 Ok(Arc::new(transaction_store))
173 }
174
175 pub async fn get_transaction_repository(
177 &mut self,
178 ) -> Result<Arc<CardanoTransactionRepository>> {
179 get_dependency!(self.transaction_repository)
180 }
181
182 async fn build_immutable_file_digest_mapper(
183 &mut self,
184 ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
185 let mapper = ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
186
187 Ok(Arc::new(mapper))
188 }
189
190 pub async fn get_immutable_file_digest_mapper(
192 &mut self,
193 ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
194 get_dependency!(self.immutable_file_digest_mapper)
195 }
196
197 async fn build_signer_store(&mut self) -> Result<Arc<SignerStore>> {
198 let signer_store = Arc::new(SignerStore::new(self.get_sqlite_connection().await?));
199
200 Ok(signer_store)
201 }
202
203 pub async fn get_signer_store(&mut self) -> Result<Arc<SignerStore>> {
205 get_dependency!(self.signer_store)
206 }
207
208 async fn build_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
209 let signed_entity_storer =
210 Arc::new(SignedEntityStore::new(self.get_sqlite_connection().await?));
211
212 Ok(signed_entity_storer)
213 }
214
215 pub async fn get_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
217 get_dependency!(self.signed_entity_storer)
218 }
219
220 pub async fn create_signer_importer(
222 &mut self,
223 cexplorer_pools_url: &str,
224 ) -> Result<SignersImporter> {
225 let retriever = CExplorerSignerRetriever::new(
226 cexplorer_pools_url,
227 Some(Duration::from_secs(30)),
228 self.root_logger(),
229 )?;
230 let persister = self.get_signer_store().await?;
231
232 Ok(SignersImporter::new(
233 Arc::new(retriever),
234 persister,
235 self.root_logger(),
236 ))
237 }
238}