1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{anyhow, Context};
6use slog::Logger;
7use tokio::sync::{Mutex, RwLock};
8
9use mithril_common::api_version::APIVersionProvider;
10use mithril_common::cardano_block_scanner::CardanoBlockScanner;
11use mithril_common::chain_observer::{
12 CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType,
13};
14use mithril_common::chain_reader::PallasChainReader;
15use mithril_common::crypto_helper::{OpCert, ProtocolPartyId, SerDeShelleyFileFormat};
16use mithril_common::digesters::cache::{
17 ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
18};
19use mithril_common::digesters::{
20 CardanoImmutableDigester, ImmutableFileObserver, ImmutableFileSystemObserver,
21};
22use mithril_common::era::{EraChecker, EraReader};
23use mithril_common::signable_builder::{
24 CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder,
25 CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder,
26 MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
27 SignableBuilderServiceDependencies,
28};
29use mithril_common::{MithrilTickerService, StdResult, TickerService};
30use mithril_signed_entity_lock::SignedEntityTypeLock;
31use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
32
33use mithril_persistence::database::repository::CardanoTransactionRepository;
34use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
35use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool};
36
37use crate::database::repository::{
38 ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore,
39};
40use crate::dependency_injection::SignerDependencyContainer;
41use crate::services::{
42 AggregatorHTTPClient, CardanoTransactionsImporter,
43 CardanoTransactionsPreloaderActivationSigner, MithrilEpochService, MithrilSingleSigner,
44 SignaturePublishRetryPolicy, SignaturePublisherDelayer, SignaturePublisherNoop,
45 SignaturePublisherRetrier, SignerCertifierService, SignerSignableSeedBuilder,
46 SignerSignedEntityConfigProvider, SignerUpkeepService, TransactionsImporterByChunk,
47 TransactionsImporterWithPruner, TransactionsImporterWithVacuum,
48};
49use crate::store::MKTreeStoreSqlite;
50use crate::{
51 Configuration, MetricsService, HTTP_REQUEST_TIMEOUT_DURATION, SQLITE_FILE,
52 SQLITE_FILE_CARDANO_TRANSACTION,
53};
54
55pub struct DependenciesBuilder<'a> {
59 config: &'a Configuration,
60 chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
61 immutable_file_observer_builder:
62 fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
63 root_logger: Logger,
64}
65
66impl<'a> DependenciesBuilder<'a> {
67 pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
69 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
70 |config: &Configuration| {
71 let chain_observer_type = ChainObserverType::Pallas;
72 let cardano_cli_path = &config.cardano_cli_path;
73 let cardano_node_socket_path = &config.cardano_node_socket_path;
74 let cardano_network = &config.get_network().with_context(|| {
75 "Dependencies Builder can not get Cardano network while building the chain observer"
76 })?;
77 let cardano_cli_runner = &CardanoCliRunner::new(
78 cardano_cli_path.to_owned(),
79 cardano_node_socket_path.to_owned(),
80 cardano_network.to_owned(),
81 );
82
83 let chain_observer_builder = ChainObserverBuilder::new(
84 &chain_observer_type,
85 cardano_node_socket_path,
86 cardano_network,
87 Some(cardano_cli_runner),
88 );
89
90 chain_observer_builder
91 .build()
92 .with_context(|| "Dependencies Builder can not build chain observer")
93 };
94
95 let immutable_file_observer_builder: fn(
96 &Configuration,
97 )
98 -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
99 Ok(Arc::new(ImmutableFileSystemObserver::new(
100 &config.db_directory,
101 )))
102 };
103
104 Self {
105 config,
106 chain_observer_builder,
107 immutable_file_observer_builder,
108 root_logger,
109 }
110 }
111
112 pub fn override_immutable_file_observer_builder(
114 &mut self,
115 builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
116 ) -> &mut Self {
117 self.immutable_file_observer_builder = builder;
118
119 self
120 }
121
122 pub fn root_logger(&self) -> Logger {
124 self.root_logger.clone()
125 }
126
127 pub fn override_chain_observer_builder(
129 &mut self,
130 builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
131 ) -> &mut Self {
132 self.chain_observer_builder = builder;
133
134 self
135 }
136
137 fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
139 match &self.config.operational_certificate_path {
140 Some(operational_certificate_path) => {
141 let opcert: OpCert = OpCert::from_file(operational_certificate_path)
142 .with_context(|| "Could not decode operational certificate")?;
143 Ok(opcert
144 .compute_protocol_party_id()
145 .with_context(|| "Could not compute party_id from operational certificate")?)
146 }
147 _ => Ok(self
148 .config
149 .party_id
150 .to_owned()
151 .ok_or(anyhow!("A party_id should at least be provided"))?),
152 }
153 }
154
155 async fn build_digester_cache_provider(
156 &self,
157 ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
158 if self.config.disable_digests_cache {
159 return Ok(None);
160 }
161
162 let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
163 &self.config.data_stores_directory,
164 &format!("immutables_digests_{}.json", self.config.network),
165 )
166 .should_reset_digests_cache(self.config.reset_digests_cache)
167 .with_logger(self.root_logger())
168 .build()
169 .await?;
170
171 Ok(Some(Arc::new(cache_provider)))
172 }
173
174 pub async fn build_sqlite_connection(
176 &self,
177 sqlite_file_name: &str,
178 migrations: Vec<SqlMigration>,
179 ) -> StdResult<SqliteConnection> {
180 let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
181 let connection = ConnectionBuilder::open_file(&sqlite_db_path)
182 .with_node_type(ApplicationNodeType::Signer)
183 .with_migrations(migrations)
184 .with_logger(self.root_logger())
185 .build()
186 .with_context(|| "Database connection initialisation error")?;
187
188 Ok(connection)
189 }
190
191 pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
193 if !self.config.data_stores_directory.exists() {
194 fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
195 format!(
196 "Could not create data stores directory: `{}`",
197 self.config.data_stores_directory.display()
198 )
199 })?;
200 }
201
202 let network = self.config.get_network()?;
203 let sqlite_connection = Arc::new(
204 self.build_sqlite_connection(SQLITE_FILE, crate::database::migration::get_migrations())
205 .await?,
206 );
207 let transaction_sqlite_connection = self
208 .build_sqlite_connection(
209 SQLITE_FILE_CARDANO_TRANSACTION,
210 mithril_persistence::database::cardano_transaction_migration::get_migrations(),
211 )
212 .await?;
213 let sqlite_connection_cardano_transaction_pool = Arc::new(
214 SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
215 );
216
217 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
218
219 let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
220 sqlite_connection.clone(),
221 self.config.store_retention_limit.map(|limit| limit as u64),
222 ));
223
224 let digester = Arc::new(CardanoImmutableDigester::new(
225 network.to_string(),
226 self.build_digester_cache_provider().await?,
227 self.root_logger(),
228 ));
229 let stake_store = Arc::new(StakePoolStore::new(
230 sqlite_connection.clone(),
231 self.config.store_retention_limit.map(|limit| limit as u64),
232 ));
233 let chain_observer = {
234 let builder = self.chain_observer_builder;
235 builder(self.config)?
236 };
237 let ticker_service = {
238 let builder = self.immutable_file_observer_builder;
239 Arc::new(MithrilTickerService::new(
240 chain_observer.clone(),
241 builder(self.config)?,
242 ))
243 };
244
245 let era_reader = Arc::new(EraReader::new(
246 self.config
247 .build_era_reader_adapter(chain_observer.clone())?,
248 ));
249 let era_epoch_token = era_reader
250 .read_era_epoch_token(ticker_service.get_current_epoch().await?)
251 .await?;
252 let era_checker = Arc::new(EraChecker::new(
253 era_epoch_token.get_current_supported_era()?,
254 era_epoch_token.get_current_epoch(),
255 ));
256
257 let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
258 let aggregator_client = Arc::new(AggregatorHTTPClient::new(
259 self.config.aggregator_endpoint.clone(),
260 self.config.relay_endpoint.clone(),
261 api_version_provider.clone(),
262 Some(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION)),
263 self.root_logger(),
264 ));
265
266 let cardano_immutable_snapshot_builder =
267 Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
268 digester.clone(),
269 &self.config.db_directory,
270 self.root_logger(),
271 ));
272 let mithril_stake_distribution_signable_builder =
273 Arc::new(MithrilStakeDistributionSignableBuilder::default());
274 let transaction_store = Arc::new(CardanoTransactionRepository::new(
275 sqlite_connection_cardano_transaction_pool.clone(),
276 ));
277 let chain_block_reader = PallasChainReader::new(
278 &self.config.cardano_node_socket_path,
279 network,
280 self.root_logger(),
281 );
282 let block_scanner = Arc::new(CardanoBlockScanner::new(
283 Arc::new(Mutex::new(chain_block_reader)),
284 self.config
285 .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
286 self.root_logger(),
287 ));
288 let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
289 block_scanner,
290 transaction_store.clone(),
291 self.root_logger(),
292 ));
293 let transactions_importer = Arc::new(TransactionsImporterWithPruner::new(
295 self.config
296 .enable_transaction_pruning
297 .then_some(self.config.network_security_parameter),
298 transaction_store.clone(),
299 transactions_importer,
300 self.root_logger(),
301 ));
302 let state_machine_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
305 transaction_store.clone(),
306 transactions_importer.clone(),
307 self.config.transactions_import_block_chunk_size,
308 self.root_logger(),
309 ));
310 let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
313 transaction_store.clone(),
314 Arc::new(TransactionsImporterWithVacuum::new(
315 sqlite_connection_cardano_transaction_pool.clone(),
316 transactions_importer.clone(),
317 self.root_logger(),
318 )),
319 self.config.transactions_import_block_chunk_size,
320 self.root_logger(),
321 ));
322 let block_range_root_retriever = transaction_store.clone();
323 let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
324 MKTreeStoreSqlite,
325 >::new(
326 state_machine_transactions_importer,
327 block_range_root_retriever,
328 ));
329 let cardano_stake_distribution_signable_builder = Arc::new(
330 CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
331 );
332 let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
333 digester.clone(),
334 &self.config.db_directory,
335 self.root_logger(),
336 ));
337 let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
338 stake_store.clone(),
339 protocol_initializer_store.clone(),
340 self.root_logger(),
341 )));
342 let single_signer = Arc::new(MithrilSingleSigner::new(
343 self.compute_protocol_party_id()?,
344 epoch_service.clone(),
345 self.root_logger(),
346 ));
347 let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
348 epoch_service.clone(),
349 protocol_initializer_store.clone(),
350 ));
351 let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
352 mithril_stake_distribution_signable_builder,
353 cardano_immutable_snapshot_builder,
354 cardano_transactions_builder,
355 cardano_stake_distribution_signable_builder,
356 cardano_database_signable_builder,
357 );
358 let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
359 signable_seed_builder_service,
360 signable_builders_dependencies,
361 self.root_logger(),
362 ));
363 let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
364 let preloader_activation =
365 CardanoTransactionsPreloaderActivationSigner::new(aggregator_client.clone());
366 let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
367 signed_entity_type_lock.clone(),
368 preloader_transactions_importer,
369 self.config.preload_security_parameter,
370 chain_observer.clone(),
371 self.root_logger(),
372 Arc::new(preloader_activation),
373 ));
374 let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
375 sqlite_connection.clone(),
376 self.config.store_retention_limit.map(|limit| limit as u64),
377 ));
378 let upkeep_service = Arc::new(SignerUpkeepService::new(
379 sqlite_connection.clone(),
380 sqlite_connection_cardano_transaction_pool,
381 signed_entity_type_lock.clone(),
382 vec![
383 signed_beacon_repository.clone(),
384 stake_store.clone(),
385 protocol_initializer_store.clone(),
386 ],
387 self.root_logger(),
388 ));
389
390 let signature_publisher = {
391 let first_publisher = SignaturePublisherRetrier::new(
393 Arc::new(SignaturePublisherNoop {}),
394 SignaturePublishRetryPolicy::never(),
395 );
396
397 let second_publisher = SignaturePublisherRetrier::new(
398 aggregator_client.clone(),
399 SignaturePublishRetryPolicy {
400 attempts: self.config.signature_publisher_config.retry_attempts,
401 delay_between_attempts: Duration::from_millis(
402 self.config.signature_publisher_config.retry_delay_ms,
403 ),
404 },
405 );
406
407 Arc::new(SignaturePublisherDelayer::new(
408 Arc::new(first_publisher),
409 Arc::new(second_publisher),
410 Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
411 self.root_logger(),
412 ))
413 };
414
415 let certifier = Arc::new(SignerCertifierService::new(
416 signed_beacon_repository,
417 Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
418 signed_entity_type_lock.clone(),
419 single_signer.clone(),
420 signature_publisher,
421 self.root_logger(),
422 ));
423
424 let services = SignerDependencyContainer {
425 ticker_service,
426 certificate_handler: aggregator_client,
427 chain_observer,
428 digester,
429 single_signer,
430 stake_store,
431 protocol_initializer_store,
432 era_checker,
433 era_reader,
434 api_version_provider,
435 signable_builder_service,
436 metrics_service,
437 signed_entity_type_lock,
438 cardano_transactions_preloader,
439 upkeep_service,
440 epoch_service,
441 certifier,
442 };
443
444 Ok(services)
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use std::path::PathBuf;
451
452 use mithril_common::{
453 chain_observer::FakeObserver, digesters::DumbImmutableFileObserver, entities::TimePoint,
454 test_utils::TempDir,
455 };
456
457 use crate::test_tools::TestLogger;
458
459 use super::*;
460
461 fn get_test_dir(test_name: &str) -> PathBuf {
462 TempDir::create("signer_service", test_name)
463 }
464
465 #[tokio::test]
466 async fn test_auto_create_stores_directory() {
467 let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
468 let config = Configuration {
469 data_stores_directory: stores_dir.clone(),
470 ..Configuration::new_sample("party-123456")
471 };
472
473 assert!(!stores_dir.exists());
474 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
475 |_config| Ok(Arc::new(FakeObserver::new(Some(TimePoint::dummy()))));
476 let immutable_file_observer_builder: fn(
477 &Configuration,
478 )
479 -> StdResult<Arc<dyn ImmutableFileObserver>> =
480 |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
481
482 let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
483 dependencies_builder
484 .override_chain_observer_builder(chain_observer_builder)
485 .override_immutable_file_observer_builder(immutable_file_observer_builder)
486 .build()
487 .await
488 .expect("service builder build should not fail");
489 assert!(stores_dir.exists());
490 }
491}