mithril_aggregator/dependency_injection/builder/support/
stores.rs1use anyhow::Context;
2use slog::debug;
3use std::sync::Arc;
4use std::time::Duration;
5
6use mithril_common::digesters::cache::ImmutableFileDigestCacheProvider;
7use mithril_persistence::database::repository::CardanoTransactionRepository;
8
9use crate::database::repository::{
10 CertificateRepository, EpochSettingsStore, ImmutableFileDigestRepository,
11 OpenMessageRepository, SignedEntityStore, SignedEntityStorer, SignerRegistrationStore,
12 SignerStore, StakePoolStore,
13};
14use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
15use crate::{
16 CExplorerSignerRetriever, EpochSettingsStorer, ImmutableFileDigestMapper, SignersImporter,
17 VerificationKeyStorer,
18};
19
20impl DependenciesBuilder {
21 async fn build_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
22 let stake_pool_store = Arc::new(StakePoolStore::new(
23 self.get_sqlite_connection().await?,
24 self.configuration.safe_epoch_retention_limit(),
25 ));
26
27 Ok(stake_pool_store)
28 }
29
30 pub async fn get_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
32 if self.stake_store.is_none() {
33 self.stake_store = Some(self.build_stake_store().await?);
34 }
35
36 Ok(self.stake_store.as_ref().cloned().unwrap())
37 }
38
39 async fn build_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
40 Ok(Arc::new(CertificateRepository::new(
41 self.get_sqlite_connection().await?,
42 )))
43 }
44
45 pub async fn get_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
47 if self.certificate_repository.is_none() {
48 self.certificate_repository = Some(self.build_certificate_repository().await?);
49 }
50
51 Ok(self.certificate_repository.as_ref().cloned().unwrap())
52 }
53
54 async fn build_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
55 Ok(Arc::new(OpenMessageRepository::new(
56 self.get_sqlite_connection().await?,
57 )))
58 }
59
60 pub async fn get_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
62 if self.open_message_repository.is_none() {
63 self.open_message_repository = Some(self.build_open_message_repository().await?);
64 }
65
66 Ok(self.open_message_repository.as_ref().cloned().unwrap())
67 }
68
69 async fn build_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
70 Ok(Arc::new(SignerRegistrationStore::new(
71 self.get_sqlite_connection().await?,
72 )))
73 }
74
75 pub async fn get_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
77 if self.verification_key_store.is_none() {
78 self.verification_key_store = Some(self.build_verification_key_store().await?);
79 }
80
81 Ok(self.verification_key_store.as_ref().cloned().unwrap())
82 }
83
84 async fn build_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
85 let logger = self.root_logger();
86 let epoch_settings_store = EpochSettingsStore::new(
87 self.get_sqlite_connection().await?,
88 self.configuration.safe_epoch_retention_limit(),
89 );
90 let current_epoch = self
91 .get_chain_observer()
92 .await?
93 .get_current_epoch()
94 .await
95 .map_err(|e| DependenciesBuilderError::Initialization {
96 message: "cannot create aggregator runner: failed to retrieve current epoch."
97 .to_string(),
98 error: Some(e.into()),
99 })?
100 .ok_or(DependenciesBuilderError::Initialization {
101 message: "cannot build aggregator runner: no epoch returned.".to_string(),
102 error: None,
103 })?;
104 let retrieval_epoch = current_epoch
105 .offset_to_signer_retrieval_epoch()
106 .map_err(|e| DependenciesBuilderError::Initialization {
107 message: format!("cannot create aggregator runner: failed to offset current epoch '{current_epoch}' to signer retrieval epoch."),
108 error: Some(e.into()),
109 })?;
110
111 let epoch_settings_configuration = self.configuration.get_epoch_settings_configuration();
112 debug!(
113 logger,
114 "Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {retrieval_epoch}";
115 "epoch_settings_configuration" => ?epoch_settings_configuration,
116 );
117 epoch_settings_store
118 .handle_discrepancies_at_startup(retrieval_epoch, &epoch_settings_configuration)
119 .await
120 .map_err(|e| DependenciesBuilderError::Initialization {
121 message: "can not create aggregator runner".to_string(),
122 error: Some(e),
123 })?;
124
125 Ok(Arc::new(epoch_settings_store))
126 }
127
128 pub async fn get_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
130 if self.epoch_settings_store.is_none() {
131 self.epoch_settings_store = Some(self.build_epoch_settings_store().await?);
132 }
133
134 Ok(self.epoch_settings_store.as_ref().cloned().unwrap())
135 }
136
137 async fn build_immutable_cache_provider(
138 &mut self,
139 ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
140 let cache_provider =
141 ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
142 if self.configuration.reset_digests_cache {
143 cache_provider
144 .reset()
145 .await
146 .with_context(|| "Failure occurred when resetting immutable file digest cache")?;
147 }
148
149 Ok(Arc::new(cache_provider))
150 }
151
152 pub async fn get_immutable_cache_provider(
154 &mut self,
155 ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
156 if self.immutable_cache_provider.is_none() {
157 self.immutable_cache_provider = Some(self.build_immutable_cache_provider().await?);
158 }
159
160 Ok(self.immutable_cache_provider.as_ref().cloned().unwrap())
161 }
162
163 async fn build_transaction_repository(&mut self) -> Result<Arc<CardanoTransactionRepository>> {
164 let transaction_store = CardanoTransactionRepository::new(
165 self.get_sqlite_connection_cardano_transaction_pool()
166 .await?,
167 );
168
169 Ok(Arc::new(transaction_store))
170 }
171
172 pub async fn get_transaction_repository(
174 &mut self,
175 ) -> Result<Arc<CardanoTransactionRepository>> {
176 if self.transaction_repository.is_none() {
177 self.transaction_repository = Some(self.build_transaction_repository().await?);
178 }
179
180 Ok(self.transaction_repository.as_ref().cloned().unwrap())
181 }
182
183 async fn build_immutable_file_digest_mapper(
184 &mut self,
185 ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
186 let mapper = ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
187
188 Ok(Arc::new(mapper))
189 }
190
191 pub async fn get_immutable_file_digest_mapper(
193 &mut self,
194 ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
195 if self.immutable_file_digest_mapper.is_none() {
196 self.immutable_file_digest_mapper =
197 Some(self.build_immutable_file_digest_mapper().await?);
198 }
199
200 Ok(self.immutable_file_digest_mapper.as_ref().cloned().unwrap())
201 }
202
203 async fn build_signer_store(&mut self) -> Result<Arc<SignerStore>> {
204 let signer_store = Arc::new(SignerStore::new(self.get_sqlite_connection().await?));
205
206 Ok(signer_store)
207 }
208
209 pub async fn get_signer_store(&mut self) -> Result<Arc<SignerStore>> {
211 match self.signer_store.as_ref().cloned() {
212 None => {
213 let store = self.build_signer_store().await?;
214 self.signer_store = Some(store.clone());
215 Ok(store)
216 }
217 Some(store) => Ok(store),
218 }
219 }
220
221 async fn build_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
222 let signed_entity_storer =
223 Arc::new(SignedEntityStore::new(self.get_sqlite_connection().await?));
224
225 Ok(signed_entity_storer)
226 }
227
228 pub async fn get_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
230 if self.signed_entity_storer.is_none() {
231 self.signed_entity_storer = Some(self.build_signed_entity_storer().await?);
232 }
233
234 Ok(self.signed_entity_storer.as_ref().cloned().unwrap())
235 }
236
237 pub async fn create_signer_importer(
239 &mut self,
240 cexplorer_pools_url: &str,
241 ) -> Result<SignersImporter> {
242 let retriever = CExplorerSignerRetriever::new(
243 cexplorer_pools_url,
244 Some(Duration::from_secs(30)),
245 self.root_logger(),
246 )?;
247 let persister = self.get_signer_store().await?;
248
249 Ok(SignersImporter::new(
250 Arc::new(retriever),
251 persister,
252 self.root_logger(),
253 ))
254 }
255}