mithril_aggregator/dependency_injection/builder/support/
stores.rs1use anyhow::Context;
2use slog::debug;
3use std::sync::Arc;
4use std::time::Duration;
5
6use mithril_cardano_node_internal_database::digesters::cache::ImmutableFileDigestCacheProvider;
7use mithril_persistence::database::repository::CardanoTransactionRepository;
8
9use crate::database::repository::{
10 CertificateRepository, EpochSettingsStore, ImmutableFileDigestRepository,
11 OpenMessageRepository, SignedEntityStore, SignedEntityStorer, SignerRegistrationStore,
12 SignerStore, StakePoolStore,
13};
14use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
15use crate::get_dependency;
16use crate::{
17 CExplorerSignerRetriever, EpochSettingsStorer, ImmutableFileDigestMapper,
18 ProtocolParametersRetriever, SignersImporter, VerificationKeyStorer,
19};
20
21impl DependenciesBuilder {
22 async fn build_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
23 let stake_pool_store = Arc::new(StakePoolStore::new(
24 self.get_sqlite_connection().await?,
25 self.configuration.safe_epoch_retention_limit(),
26 ));
27
28 Ok(stake_pool_store)
29 }
30
31 pub async fn get_stake_store(&mut self) -> Result<Arc<StakePoolStore>> {
33 get_dependency!(self.stake_store)
34 }
35
36 async fn build_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
37 Ok(Arc::new(CertificateRepository::new(
38 self.get_sqlite_connection().await?,
39 )))
40 }
41
42 pub async fn get_certificate_repository(&mut self) -> Result<Arc<CertificateRepository>> {
44 get_dependency!(self.certificate_repository)
45 }
46
47 async fn build_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
48 Ok(Arc::new(OpenMessageRepository::new(
49 self.get_sqlite_connection().await?,
50 )))
51 }
52
53 pub async fn get_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
55 get_dependency!(self.open_message_repository)
56 }
57
58 async fn build_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
59 Ok(Arc::new(SignerRegistrationStore::new(
60 self.get_sqlite_connection().await?,
61 self.configuration.safe_epoch_retention_limit(),
62 )))
63 }
64
65 pub async fn get_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
67 get_dependency!(self.verification_key_store)
68 }
69
70 async fn build_protocol_parameters_retriever(
71 &mut self,
72 ) -> Result<Arc<dyn ProtocolParametersRetriever>> {
73 let protocol_parameters_retriever =
74 EpochSettingsStore::new(self.get_sqlite_connection().await?, None);
75
76 Ok(Arc::new(protocol_parameters_retriever))
77 }
78
79 pub async fn get_protocol_parameters_retriever(
81 &mut self,
82 ) -> Result<Arc<dyn ProtocolParametersRetriever>> {
83 if self.protocol_parameters_retriever.is_none() {
84 self.protocol_parameters_retriever =
85 Some(self.build_protocol_parameters_retriever().await?);
86 }
87
88 Ok(self.protocol_parameters_retriever.as_ref().cloned().unwrap())
89 }
90
91 async fn build_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
92 let logger = self.root_logger();
93 let epoch_settings_store = EpochSettingsStore::new(
94 self.get_sqlite_connection().await?,
95 self.configuration.safe_epoch_retention_limit(),
96 );
97 let current_epoch = self
98 .get_chain_observer()
99 .await?
100 .get_current_epoch()
101 .await
102 .map_err(|e| DependenciesBuilderError::Initialization {
103 message: "cannot create aggregator runner: failed to retrieve current epoch."
104 .to_string(),
105 error: Some(e.into()),
106 })?
107 .ok_or(DependenciesBuilderError::Initialization {
108 message: "cannot build aggregator runner: no epoch returned.".to_string(),
109 error: None,
110 })?;
111 let retrieval_epoch = current_epoch
112 .offset_to_signer_retrieval_epoch()
113 .map_err(|e| DependenciesBuilderError::Initialization {
114 message: format!("cannot create aggregator runner: failed to offset current epoch '{current_epoch}' to signer retrieval epoch."),
115 error: Some(e.into()),
116 })?;
117
118 let epoch_settings_configuration = self.configuration.get_epoch_settings_configuration();
119 debug!(
120 logger,
121 "Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {retrieval_epoch}";
122 "epoch_settings_configuration" => ?epoch_settings_configuration,
123 );
124 epoch_settings_store
125 .handle_discrepancies_at_startup(retrieval_epoch, &epoch_settings_configuration)
126 .await
127 .map_err(|e| DependenciesBuilderError::Initialization {
128 message: "can not create aggregator runner".to_string(),
129 error: Some(e),
130 })?;
131
132 Ok(Arc::new(epoch_settings_store))
133 }
134
135 pub async fn get_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
137 get_dependency!(self.epoch_settings_store)
138 }
139
140 async fn build_immutable_cache_provider(
141 &mut self,
142 ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
143 let cache_provider =
144 ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
145 if self.configuration.reset_digests_cache() {
146 cache_provider
147 .reset()
148 .await
149 .with_context(|| "Failure occurred when resetting immutable file digest cache")?;
150 }
151
152 Ok(Arc::new(cache_provider))
153 }
154
155 pub async fn get_immutable_cache_provider(
157 &mut self,
158 ) -> Result<Arc<dyn ImmutableFileDigestCacheProvider>> {
159 get_dependency!(self.immutable_cache_provider)
160 }
161
162 async fn build_transaction_repository(&mut self) -> Result<Arc<CardanoTransactionRepository>> {
163 let transaction_store = CardanoTransactionRepository::new(
164 self.get_sqlite_connection_cardano_transaction_pool().await?,
165 );
166
167 Ok(Arc::new(transaction_store))
168 }
169
170 pub async fn get_transaction_repository(
172 &mut self,
173 ) -> Result<Arc<CardanoTransactionRepository>> {
174 get_dependency!(self.transaction_repository)
175 }
176
177 async fn build_immutable_file_digest_mapper(
178 &mut self,
179 ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
180 let mapper = ImmutableFileDigestRepository::new(self.get_sqlite_connection().await?);
181
182 Ok(Arc::new(mapper))
183 }
184
185 pub async fn get_immutable_file_digest_mapper(
187 &mut self,
188 ) -> Result<Arc<dyn ImmutableFileDigestMapper>> {
189 get_dependency!(self.immutable_file_digest_mapper)
190 }
191
192 async fn build_signer_store(&mut self) -> Result<Arc<SignerStore>> {
193 let signer_store = Arc::new(SignerStore::new(self.get_sqlite_connection().await?));
194
195 Ok(signer_store)
196 }
197
198 pub async fn get_signer_store(&mut self) -> Result<Arc<SignerStore>> {
200 get_dependency!(self.signer_store)
201 }
202
203 async fn build_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
204 let signed_entity_storer =
205 Arc::new(SignedEntityStore::new(self.get_sqlite_connection().await?));
206
207 Ok(signed_entity_storer)
208 }
209
210 pub async fn get_signed_entity_storer(&mut self) -> Result<Arc<dyn SignedEntityStorer>> {
212 get_dependency!(self.signed_entity_storer)
213 }
214
215 pub async fn create_signer_importer(
217 &mut self,
218 cexplorer_pools_url: &str,
219 ) -> Result<SignersImporter> {
220 let retriever = CExplorerSignerRetriever::new(
221 cexplorer_pools_url,
222 Some(Duration::from_secs(30)),
223 self.root_logger(),
224 )?;
225 let persister = self.get_signer_store().await?;
226
227 Ok(SignersImporter::new(
228 Arc::new(retriever),
229 persister,
230 self.root_logger(),
231 ))
232 }
233}