1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{anyhow, Context};
6use slog::Logger;
7use tokio::sync::{Mutex, RwLock};
8
9use mithril_common::api_version::APIVersionProvider;
10use mithril_common::cardano_block_scanner::CardanoBlockScanner;
11use mithril_common::chain_observer::{
12 CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType,
13};
14use mithril_common::chain_reader::PallasChainReader;
15use mithril_common::crypto_helper::{OpCert, ProtocolPartyId, SerDeShelleyFileFormat};
16use mithril_common::digesters::cache::{
17 ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
18};
19use mithril_common::digesters::{
20 CardanoImmutableDigester, ImmutableFileObserver, ImmutableFileSystemObserver,
21};
22use mithril_common::era::{EraChecker, EraReader};
23use mithril_common::signable_builder::{
24 CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder,
25 CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder,
26 MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
27 SignableBuilderServiceDependencies,
28};
29use mithril_common::{MithrilTickerService, StdResult, TickerService};
30use mithril_signed_entity_lock::SignedEntityTypeLock;
31use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
32
33use mithril_persistence::database::repository::CardanoTransactionRepository;
34use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
35use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool};
36
37use crate::database::repository::{
38 ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore,
39};
40use crate::dependency_injection::SignerDependencyContainer;
41use crate::services::{
42 AggregatorHTTPClient, CardanoTransactionsImporter,
43 CardanoTransactionsPreloaderActivationSigner, MithrilEpochService, MithrilSingleSigner,
44 SignerCertifierService, SignerSignableSeedBuilder, SignerSignedEntityConfigProvider,
45 SignerUpkeepService, TransactionsImporterByChunk, TransactionsImporterWithPruner,
46 TransactionsImporterWithVacuum,
47};
48use crate::store::MKTreeStoreSqlite;
49use crate::{
50 Configuration, MetricsService, HTTP_REQUEST_TIMEOUT_DURATION, SQLITE_FILE,
51 SQLITE_FILE_CARDANO_TRANSACTION,
52};
53
54pub struct DependenciesBuilder<'a> {
58 config: &'a Configuration,
59 chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
60 immutable_file_observer_builder:
61 fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
62 root_logger: Logger,
63}
64
65impl<'a> DependenciesBuilder<'a> {
66 pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
68 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
69 |config: &Configuration| {
70 let chain_observer_type = ChainObserverType::Pallas;
71 let cardano_cli_path = &config.cardano_cli_path;
72 let cardano_node_socket_path = &config.cardano_node_socket_path;
73 let cardano_network = &config.get_network().with_context(|| {
74 "Dependencies Builder can not get Cardano network while building the chain observer"
75 })?;
76 let cardano_cli_runner = &CardanoCliRunner::new(
77 cardano_cli_path.to_owned(),
78 cardano_node_socket_path.to_owned(),
79 cardano_network.to_owned(),
80 );
81
82 let chain_observer_builder = ChainObserverBuilder::new(
83 &chain_observer_type,
84 cardano_node_socket_path,
85 cardano_network,
86 Some(cardano_cli_runner),
87 );
88
89 chain_observer_builder
90 .build()
91 .with_context(|| "Dependencies Builder can not build chain observer")
92 };
93
94 let immutable_file_observer_builder: fn(
95 &Configuration,
96 )
97 -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
98 Ok(Arc::new(ImmutableFileSystemObserver::new(
99 &config.db_directory,
100 )))
101 };
102
103 Self {
104 config,
105 chain_observer_builder,
106 immutable_file_observer_builder,
107 root_logger,
108 }
109 }
110
111 pub fn override_immutable_file_observer_builder(
113 &mut self,
114 builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
115 ) -> &mut Self {
116 self.immutable_file_observer_builder = builder;
117
118 self
119 }
120
121 pub fn root_logger(&self) -> Logger {
123 self.root_logger.clone()
124 }
125
126 pub fn override_chain_observer_builder(
128 &mut self,
129 builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
130 ) -> &mut Self {
131 self.chain_observer_builder = builder;
132
133 self
134 }
135
136 fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
138 match &self.config.operational_certificate_path {
139 Some(operational_certificate_path) => {
140 let opcert: OpCert = OpCert::from_file(operational_certificate_path)
141 .with_context(|| "Could not decode operational certificate")?;
142 Ok(opcert
143 .compute_protocol_party_id()
144 .with_context(|| "Could not compute party_id from operational certificate")?)
145 }
146 _ => Ok(self
147 .config
148 .party_id
149 .to_owned()
150 .ok_or(anyhow!("A party_id should at least be provided"))?),
151 }
152 }
153
154 async fn build_digester_cache_provider(
155 &self,
156 ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
157 if self.config.disable_digests_cache {
158 return Ok(None);
159 }
160
161 let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
162 &self.config.data_stores_directory,
163 &format!("immutables_digests_{}.json", self.config.network),
164 )
165 .should_reset_digests_cache(self.config.reset_digests_cache)
166 .with_logger(self.root_logger())
167 .build()
168 .await?;
169
170 Ok(Some(Arc::new(cache_provider)))
171 }
172
173 pub async fn build_sqlite_connection(
175 &self,
176 sqlite_file_name: &str,
177 migrations: Vec<SqlMigration>,
178 ) -> StdResult<SqliteConnection> {
179 let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
180 let connection = ConnectionBuilder::open_file(&sqlite_db_path)
181 .with_node_type(ApplicationNodeType::Signer)
182 .with_migrations(migrations)
183 .with_logger(self.root_logger())
184 .build()
185 .with_context(|| "Database connection initialisation error")?;
186
187 Ok(connection)
188 }
189
190 pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
192 if !self.config.data_stores_directory.exists() {
193 fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
194 format!(
195 "Could not create data stores directory: `{}`",
196 self.config.data_stores_directory.display()
197 )
198 })?;
199 }
200
201 let network = self.config.get_network()?;
202 let sqlite_connection = Arc::new(
203 self.build_sqlite_connection(SQLITE_FILE, crate::database::migration::get_migrations())
204 .await?,
205 );
206 let transaction_sqlite_connection = self
207 .build_sqlite_connection(
208 SQLITE_FILE_CARDANO_TRANSACTION,
209 mithril_persistence::database::cardano_transaction_migration::get_migrations(),
210 )
211 .await?;
212 let sqlite_connection_cardano_transaction_pool = Arc::new(
213 SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
214 );
215
216 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
217
218 let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
219 sqlite_connection.clone(),
220 self.config.store_retention_limit.map(|limit| limit as u64),
221 ));
222
223 let digester = Arc::new(CardanoImmutableDigester::new(
224 network.to_string(),
225 self.build_digester_cache_provider().await?,
226 self.root_logger(),
227 ));
228 let stake_store = Arc::new(StakePoolStore::new(
229 sqlite_connection.clone(),
230 self.config.store_retention_limit.map(|limit| limit as u64),
231 ));
232 let chain_observer = {
233 let builder = self.chain_observer_builder;
234 builder(self.config)?
235 };
236 let ticker_service = {
237 let builder = self.immutable_file_observer_builder;
238 Arc::new(MithrilTickerService::new(
239 chain_observer.clone(),
240 builder(self.config)?,
241 ))
242 };
243
244 let era_reader = Arc::new(EraReader::new(
245 self.config
246 .build_era_reader_adapter(chain_observer.clone())?,
247 ));
248 let era_epoch_token = era_reader
249 .read_era_epoch_token(ticker_service.get_current_epoch().await?)
250 .await?;
251 let era_checker = Arc::new(EraChecker::new(
252 era_epoch_token.get_current_supported_era()?,
253 era_epoch_token.get_current_epoch(),
254 ));
255
256 let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
257 let aggregator_client = Arc::new(AggregatorHTTPClient::new(
258 self.config.aggregator_endpoint.clone(),
259 self.config.relay_endpoint.clone(),
260 api_version_provider.clone(),
261 Some(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION)),
262 self.root_logger(),
263 ));
264
265 let cardano_immutable_snapshot_builder =
266 Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
267 digester.clone(),
268 &self.config.db_directory,
269 self.root_logger(),
270 ));
271 let mithril_stake_distribution_signable_builder =
272 Arc::new(MithrilStakeDistributionSignableBuilder::default());
273 let transaction_store = Arc::new(CardanoTransactionRepository::new(
274 sqlite_connection_cardano_transaction_pool.clone(),
275 ));
276 let chain_block_reader = PallasChainReader::new(
277 &self.config.cardano_node_socket_path,
278 network,
279 self.root_logger(),
280 );
281 let block_scanner = Arc::new(CardanoBlockScanner::new(
282 Arc::new(Mutex::new(chain_block_reader)),
283 self.config
284 .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
285 self.root_logger(),
286 ));
287 let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
288 block_scanner,
289 transaction_store.clone(),
290 self.root_logger(),
291 ));
292 let transactions_importer = Arc::new(TransactionsImporterWithPruner::new(
294 self.config
295 .enable_transaction_pruning
296 .then_some(self.config.network_security_parameter),
297 transaction_store.clone(),
298 transactions_importer,
299 self.root_logger(),
300 ));
301 let state_machine_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
304 transaction_store.clone(),
305 transactions_importer.clone(),
306 self.config.transactions_import_block_chunk_size,
307 self.root_logger(),
308 ));
309 let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
312 transaction_store.clone(),
313 Arc::new(TransactionsImporterWithVacuum::new(
314 sqlite_connection_cardano_transaction_pool.clone(),
315 transactions_importer.clone(),
316 self.root_logger(),
317 )),
318 self.config.transactions_import_block_chunk_size,
319 self.root_logger(),
320 ));
321 let block_range_root_retriever = transaction_store.clone();
322 let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
323 MKTreeStoreSqlite,
324 >::new(
325 state_machine_transactions_importer,
326 block_range_root_retriever,
327 ));
328 let cardano_stake_distribution_signable_builder = Arc::new(
329 CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
330 );
331 let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
332 digester.clone(),
333 &self.config.db_directory,
334 self.root_logger(),
335 ));
336 let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
337 stake_store.clone(),
338 protocol_initializer_store.clone(),
339 self.root_logger(),
340 )));
341 let single_signer = Arc::new(MithrilSingleSigner::new(
342 self.compute_protocol_party_id()?,
343 epoch_service.clone(),
344 self.root_logger(),
345 ));
346 let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
347 epoch_service.clone(),
348 protocol_initializer_store.clone(),
349 ));
350 let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
351 mithril_stake_distribution_signable_builder,
352 cardano_immutable_snapshot_builder,
353 cardano_transactions_builder,
354 cardano_stake_distribution_signable_builder,
355 cardano_database_signable_builder,
356 );
357 let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
358 signable_seed_builder_service,
359 signable_builders_dependencies,
360 self.root_logger(),
361 ));
362 let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
363 let preloader_activation =
364 CardanoTransactionsPreloaderActivationSigner::new(aggregator_client.clone());
365 let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
366 signed_entity_type_lock.clone(),
367 preloader_transactions_importer,
368 self.config.preload_security_parameter,
369 chain_observer.clone(),
370 self.root_logger(),
371 Arc::new(preloader_activation),
372 ));
373 let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
374 sqlite_connection.clone(),
375 self.config.store_retention_limit.map(|limit| limit as u64),
376 ));
377 let upkeep_service = Arc::new(SignerUpkeepService::new(
378 sqlite_connection.clone(),
379 sqlite_connection_cardano_transaction_pool,
380 signed_entity_type_lock.clone(),
381 vec![
382 signed_beacon_repository.clone(),
383 stake_store.clone(),
384 protocol_initializer_store.clone(),
385 ],
386 self.root_logger(),
387 ));
388 let certifier = Arc::new(SignerCertifierService::new(
389 signed_beacon_repository,
390 Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
391 signed_entity_type_lock.clone(),
392 single_signer.clone(),
393 aggregator_client.clone(),
394 self.root_logger(),
395 ));
396
397 let services = SignerDependencyContainer {
398 ticker_service,
399 certificate_handler: aggregator_client,
400 chain_observer,
401 digester,
402 single_signer,
403 stake_store,
404 protocol_initializer_store,
405 era_checker,
406 era_reader,
407 api_version_provider,
408 signable_builder_service,
409 metrics_service,
410 signed_entity_type_lock,
411 cardano_transactions_preloader,
412 upkeep_service,
413 epoch_service,
414 certifier,
415 };
416
417 Ok(services)
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use std::path::PathBuf;
424
425 use mithril_common::{
426 chain_observer::FakeObserver, digesters::DumbImmutableFileObserver, entities::TimePoint,
427 test_utils::TempDir,
428 };
429
430 use crate::test_tools::TestLogger;
431
432 use super::*;
433
434 fn get_test_dir(test_name: &str) -> PathBuf {
435 TempDir::create("signer_service", test_name)
436 }
437
438 #[tokio::test]
439 async fn test_auto_create_stores_directory() {
440 let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
441 let config = Configuration {
442 data_stores_directory: stores_dir.clone(),
443 ..Configuration::new_sample("party-123456")
444 };
445
446 assert!(!stores_dir.exists());
447 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
448 |_config| Ok(Arc::new(FakeObserver::new(Some(TimePoint::dummy()))));
449 let immutable_file_observer_builder: fn(
450 &Configuration,
451 )
452 -> StdResult<Arc<dyn ImmutableFileObserver>> =
453 |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
454
455 let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
456 dependencies_builder
457 .override_chain_observer_builder(chain_observer_builder)
458 .override_immutable_file_observer_builder(immutable_file_observer_builder)
459 .build()
460 .await
461 .expect("service builder build should not fail");
462 assert!(stores_dir.exists());
463 }
464}