1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, anyhow};
6use slog::Logger;
7use tokio::sync::{Mutex, RwLock};
8
9use mithril_aggregator_client::AggregatorHttpClient;
10use mithril_cardano_node_chain::{
11 chain_observer::{CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType},
12 chain_reader::PallasChainReader,
13 chain_scanner::CardanoBlockScanner,
14};
15use mithril_cardano_node_internal_database::{
16 ImmutableFileObserver, ImmutableFileSystemObserver,
17 digesters::CardanoImmutableDigester,
18 digesters::cache::{
19 ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
20 },
21 signable_builder::{CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder},
22};
23use mithril_common::StdResult;
24use mithril_common::api_version::APIVersionProvider;
25use mithril_common::crypto_helper::{
26 KesSigner, KesSignerStandard, OpCert, ProtocolPartyId, SerDeShelleyFileFormat,
27};
28#[cfg(feature = "future_dmq")]
29use mithril_common::messages::RegisterSignatureMessageDmq;
30use mithril_common::signable_builder::{
31 CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder,
32 MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
33 SignableBuilderServiceDependencies,
34};
35
36use mithril_era::{EraChecker, EraReader};
37use mithril_signed_entity_lock::SignedEntityTypeLock;
38use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
39use mithril_ticker::{MithrilTickerService, TickerService};
40
41use mithril_persistence::database::repository::CardanoTransactionRepository;
42use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
43use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool};
44
45use mithril_protocol_config::http::HttpMithrilNetworkConfigurationProvider;
46
47#[cfg(feature = "future_dmq")]
48use mithril_dmq::{DmqMessageBuilder, DmqPublisherClientPallas};
49
50use crate::dependency_injection::SignerDependencyContainer;
51#[cfg(feature = "future_dmq")]
52use crate::services::SignaturePublisherDmq;
53use crate::services::{
54 CardanoTransactionsImporter, CardanoTransactionsPreloaderActivationSigner, MithrilEpochService,
55 MithrilSingleSigner, SignaturePublishRetryPolicy, SignaturePublisherDelayer,
56 SignaturePublisherNoop, SignaturePublisherRetrier, SignerCertifierService,
57 SignerSignableSeedBuilder, SignerSignedEntityConfigProvider, SignerUpkeepService,
58 TransactionsImporterByChunk, TransactionsImporterWithPruner, TransactionsImporterWithVacuum,
59};
60use crate::store::MKTreeStoreSqlite;
61use crate::{
62 Configuration, HTTP_REQUEST_TIMEOUT_DURATION, MetricsService, SQLITE_FILE,
63 SQLITE_FILE_CARDANO_TRANSACTION,
64};
65use crate::{
66 database::repository::{ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore},
67 services::SignaturePublisher,
68};
69
70pub struct DependenciesBuilder<'a> {
74 config: &'a Configuration,
75 chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
76 immutable_file_observer_builder:
77 fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
78 root_logger: Logger,
79}
80
81impl<'a> DependenciesBuilder<'a> {
82 pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
84 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
85 |config: &Configuration| {
86 let chain_observer_type = ChainObserverType::Pallas;
87 let cardano_cli_path = &config.cardano_cli_path;
88 let cardano_node_socket_path = &config.cardano_node_socket_path;
89 let cardano_network = &config.get_network().with_context(|| {
90 "Dependencies Builder can not get Cardano network while building the chain observer"
91 })?;
92 let cardano_cli_runner = &CardanoCliRunner::new(
93 cardano_cli_path.to_owned(),
94 cardano_node_socket_path.to_owned(),
95 cardano_network.to_owned(),
96 );
97
98 let chain_observer_builder = ChainObserverBuilder::new(
99 &chain_observer_type,
100 cardano_node_socket_path,
101 cardano_network,
102 Some(cardano_cli_runner),
103 );
104
105 chain_observer_builder
106 .build()
107 .with_context(|| "Dependencies Builder can not build chain observer")
108 };
109
110 let immutable_file_observer_builder: fn(
111 &Configuration,
112 )
113 -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
114 Ok(Arc::new(ImmutableFileSystemObserver::new(
115 &config.db_directory,
116 )))
117 };
118
119 Self {
120 config,
121 chain_observer_builder,
122 immutable_file_observer_builder,
123 root_logger,
124 }
125 }
126
127 pub fn override_immutable_file_observer_builder(
129 &mut self,
130 builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
131 ) -> &mut Self {
132 self.immutable_file_observer_builder = builder;
133
134 self
135 }
136
137 pub fn root_logger(&self) -> Logger {
139 self.root_logger.clone()
140 }
141
142 pub fn override_chain_observer_builder(
144 &mut self,
145 builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
146 ) -> &mut Self {
147 self.chain_observer_builder = builder;
148
149 self
150 }
151
152 fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
154 match &self.config.operational_certificate_path {
155 Some(operational_certificate_path) => {
156 let opcert: OpCert = OpCert::from_file(operational_certificate_path)
157 .with_context(|| "Could not decode operational certificate")?;
158 Ok(opcert
159 .compute_protocol_party_id()
160 .with_context(|| "Could not compute party_id from operational certificate")?)
161 }
162 _ => Ok(self
163 .config
164 .party_id
165 .to_owned()
166 .ok_or(anyhow!("A party_id should at least be provided"))?),
167 }
168 }
169
170 async fn build_digester_cache_provider(
171 &self,
172 ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
173 if self.config.disable_digests_cache {
174 return Ok(None);
175 }
176
177 let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
178 &self.config.data_stores_directory,
179 &format!("immutables_digests_{}.json", self.config.network),
180 )
181 .should_reset_digests_cache(self.config.reset_digests_cache)
182 .with_logger(self.root_logger())
183 .build()
184 .await?;
185
186 Ok(Some(Arc::new(cache_provider)))
187 }
188
189 pub async fn build_sqlite_connection(
191 &self,
192 sqlite_file_name: &str,
193 migrations: Vec<SqlMigration>,
194 ) -> StdResult<SqliteConnection> {
195 let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
196 let connection = ConnectionBuilder::open_file(&sqlite_db_path)
197 .with_node_type(ApplicationNodeType::Signer)
198 .with_migrations(migrations)
199 .with_logger(self.root_logger())
200 .build()
201 .with_context(|| "Database connection initialisation error")?;
202
203 Ok(connection)
204 }
205
206 pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
208 if !self.config.data_stores_directory.exists() {
209 fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
210 format!(
211 "Could not create data stores directory: `{}`",
212 self.config.data_stores_directory.display()
213 )
214 })?;
215 }
216
217 let network = self.config.get_network()?;
218 let sqlite_connection = Arc::new(
219 self.build_sqlite_connection(SQLITE_FILE, crate::database::migration::get_migrations())
220 .await?,
221 );
222 let transaction_sqlite_connection = self
223 .build_sqlite_connection(
224 SQLITE_FILE_CARDANO_TRANSACTION,
225 mithril_persistence::database::cardano_transaction_migration::get_migrations(),
226 )
227 .await?;
228 let sqlite_connection_cardano_transaction_pool = Arc::new(
229 SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
230 );
231
232 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
233
234 let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
235 sqlite_connection.clone(),
236 self.config.store_retention_limit.map(|limit| limit as u64),
237 ));
238
239 let digester = Arc::new(CardanoImmutableDigester::new(
240 network.to_string(),
241 self.build_digester_cache_provider().await?,
242 self.root_logger(),
243 ));
244 let stake_store = Arc::new(StakePoolStore::new(
245 sqlite_connection.clone(),
246 self.config.store_retention_limit.map(|limit| limit as u64),
247 ));
248 let chain_observer = {
249 let builder = self.chain_observer_builder;
250 builder(self.config)?
251 };
252 let ticker_service = {
253 let builder = self.immutable_file_observer_builder;
254 Arc::new(MithrilTickerService::new(
255 chain_observer.clone(),
256 builder(self.config)?,
257 ))
258 };
259
260 let era_reader = Arc::new(EraReader::new(
261 self.config.build_era_reader_adapter(chain_observer.clone())?,
262 ));
263 let era_epoch_token = era_reader
264 .read_era_epoch_token(ticker_service.get_current_epoch().await?)
265 .await?;
266 let era_checker = Arc::new(EraChecker::new(
267 era_epoch_token.get_current_supported_era()?,
268 era_epoch_token.get_current_epoch(),
269 ));
270
271 let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
272 let aggregator_client = Arc::new(
273 AggregatorHttpClient::builder(self.config.aggregator_endpoint.clone())
274 .with_relay_endpoint(self.config.relay_endpoint.clone())
275 .with_api_version_provider(api_version_provider.clone())
276 .with_timeout(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION))
277 .with_logger(self.root_logger())
278 .build()?,
279 );
280
281 let cardano_immutable_snapshot_builder =
282 Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
283 digester.clone(),
284 &self.config.db_directory,
285 self.root_logger(),
286 ));
287 let mithril_stake_distribution_signable_builder =
288 Arc::new(MithrilStakeDistributionSignableBuilder::default());
289 let transaction_store = Arc::new(CardanoTransactionRepository::new(
290 sqlite_connection_cardano_transaction_pool.clone(),
291 ));
292 let chain_block_reader = PallasChainReader::new(
293 &self.config.cardano_node_socket_path,
294 network,
295 self.root_logger(),
296 );
297 let block_scanner = Arc::new(CardanoBlockScanner::new(
298 Arc::new(Mutex::new(chain_block_reader)),
299 self.config
300 .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
301 self.root_logger(),
302 ));
303 let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
304 block_scanner,
305 transaction_store.clone(),
306 self.root_logger(),
307 ));
308 let transactions_importer = Arc::new(TransactionsImporterWithPruner::new(
310 self.config
311 .enable_transaction_pruning
312 .then_some(self.config.network_security_parameter),
313 transaction_store.clone(),
314 transactions_importer,
315 self.root_logger(),
316 ));
317 let state_machine_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
320 transaction_store.clone(),
321 transactions_importer.clone(),
322 self.config.transactions_import_block_chunk_size,
323 self.root_logger(),
324 ));
325 let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
328 transaction_store.clone(),
329 Arc::new(TransactionsImporterWithVacuum::new(
330 sqlite_connection_cardano_transaction_pool.clone(),
331 transactions_importer.clone(),
332 self.root_logger(),
333 )),
334 self.config.transactions_import_block_chunk_size,
335 self.root_logger(),
336 ));
337 let block_range_root_retriever = transaction_store.clone();
338 let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
339 MKTreeStoreSqlite,
340 >::new(
341 state_machine_transactions_importer,
342 block_range_root_retriever,
343 ));
344 let cardano_stake_distribution_signable_builder = Arc::new(
345 CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
346 );
347 let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
348 digester.clone(),
349 &self.config.db_directory,
350 self.root_logger(),
351 ));
352 let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
353 stake_store.clone(),
354 protocol_initializer_store.clone(),
355 self.root_logger(),
356 )));
357 let single_signer = Arc::new(MithrilSingleSigner::new(
358 self.compute_protocol_party_id()?,
359 epoch_service.clone(),
360 self.root_logger(),
361 ));
362 let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
363 epoch_service.clone(),
364 protocol_initializer_store.clone(),
365 ));
366 let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
367 mithril_stake_distribution_signable_builder,
368 cardano_immutable_snapshot_builder,
369 cardano_transactions_builder,
370 cardano_stake_distribution_signable_builder,
371 cardano_database_signable_builder,
372 );
373 let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
374 signable_seed_builder_service,
375 signable_builders_dependencies,
376 self.root_logger(),
377 ));
378 let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
379 let network_configuration_service = Arc::new(HttpMithrilNetworkConfigurationProvider::new(
380 aggregator_client.clone(),
381 ));
382 let preloader_activation = CardanoTransactionsPreloaderActivationSigner::new(
383 network_configuration_service.clone(),
384 ticker_service.clone(),
385 );
386 let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
387 signed_entity_type_lock.clone(),
388 preloader_transactions_importer,
389 self.config.preload_security_parameter,
390 chain_observer.clone(),
391 self.root_logger(),
392 Arc::new(preloader_activation),
393 ));
394 let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
395 sqlite_connection.clone(),
396 self.config.store_retention_limit.map(|limit| limit as u64),
397 ));
398 let upkeep_service = Arc::new(SignerUpkeepService::new(
399 sqlite_connection.clone(),
400 sqlite_connection_cardano_transaction_pool,
401 signed_entity_type_lock.clone(),
402 vec![
403 signed_beacon_repository.clone(),
404 stake_store.clone(),
405 protocol_initializer_store.clone(),
406 ],
407 self.root_logger(),
408 ));
409
410 let kes_signer = match (
411 &self.config.kes_secret_key_path,
412 &self.config.operational_certificate_path,
413 ) {
414 (Some(kes_secret_key_path), Some(operational_certificate_path)) => {
415 Some(Arc::new(KesSignerStandard::new(
416 kes_secret_key_path.clone(),
417 operational_certificate_path.clone(),
418 )) as Arc<dyn KesSigner>)
419 }
420 (Some(_), None) | (None, Some(_)) => {
421 return Err(anyhow!(
422 "kes_secret_key and operational_certificate are both mandatory".to_string(),
423 ));
424 }
425 _ => None,
426 };
427
428 #[cfg(feature = "future_dmq")]
429 let signature_publisher = {
430 let first_publisher = SignaturePublisherRetrier::new(
431 match &self.config.dmq_node_socket_path {
432 Some(dmq_node_socket_path) => {
433 let dmq_network = &self.config.get_dmq_network()?;
434 let dmq_message_builder = DmqMessageBuilder::new(
435 kes_signer
436 .clone()
437 .ok_or(anyhow!("A KES signer is mandatory to sign DMQ messages"))?,
438 chain_observer.clone(),
439 );
440 Arc::new(SignaturePublisherDmq::new(Arc::new(
441 DmqPublisherClientPallas::<RegisterSignatureMessageDmq>::new(
442 dmq_node_socket_path.to_owned(),
443 *dmq_network,
444 dmq_message_builder,
445 self.root_logger(),
446 ),
447 ))) as Arc<dyn SignaturePublisher>
448 }
449 _ => Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
450 },
451 SignaturePublishRetryPolicy::never(),
452 );
453
454 let second_publisher = SignaturePublisherRetrier::new(
455 aggregator_client.clone(),
456 SignaturePublishRetryPolicy {
457 attempts: self.config.signature_publisher_config.retry_attempts,
458 delay_between_attempts: Duration::from_millis(
459 self.config.signature_publisher_config.retry_delay_ms,
460 ),
461 },
462 );
463
464 if self.config.signature_publisher_config.skip_delayer {
465 Arc::new(first_publisher) as Arc<dyn SignaturePublisher>
466 } else {
467 Arc::new(SignaturePublisherDelayer::new(
468 Arc::new(first_publisher),
469 Arc::new(second_publisher),
470 Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
471 self.root_logger(),
472 )) as Arc<dyn SignaturePublisher>
473 }
474 };
475
476 #[cfg(not(feature = "future_dmq"))]
477 let signature_publisher = {
478 let first_publisher = SignaturePublisherRetrier::new(
479 Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
480 SignaturePublishRetryPolicy::never(),
481 );
482
483 let second_publisher = SignaturePublisherRetrier::new(
484 aggregator_client.clone(),
485 SignaturePublishRetryPolicy {
486 attempts: self.config.signature_publisher_config.retry_attempts,
487 delay_between_attempts: Duration::from_millis(
488 self.config.signature_publisher_config.retry_delay_ms,
489 ),
490 },
491 );
492
493 if self.config.signature_publisher_config.skip_delayer {
494 Arc::new(second_publisher) as Arc<dyn SignaturePublisher>
495 } else {
496 Arc::new(SignaturePublisherDelayer::new(
497 Arc::new(first_publisher),
498 Arc::new(second_publisher),
499 Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
500 self.root_logger(),
501 )) as Arc<dyn SignaturePublisher>
502 }
503 };
504
505 let certifier = Arc::new(SignerCertifierService::new(
506 signed_beacon_repository,
507 Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
508 signed_entity_type_lock.clone(),
509 single_signer.clone(),
510 signature_publisher,
511 self.root_logger(),
512 ));
513
514 let services = SignerDependencyContainer {
515 ticker_service,
516 certificate_handler: aggregator_client,
517 chain_observer,
518 digester,
519 single_signer,
520 stake_store,
521 protocol_initializer_store,
522 era_checker,
523 era_reader,
524 api_version_provider,
525 signable_builder_service,
526 metrics_service,
527 signed_entity_type_lock,
528 cardano_transactions_preloader,
529 upkeep_service,
530 epoch_service,
531 certifier,
532 kes_signer,
533 network_configuration_service,
534 };
535
536 Ok(services)
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use std::path::PathBuf;
543
544 use mithril_cardano_node_chain::test::double::FakeChainObserver;
545 use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
546 use mithril_common::test::double::Dummy;
547 use mithril_common::{entities::TimePoint, test::TempDir};
548
549 use crate::test_tools::TestLogger;
550
551 use super::*;
552
553 fn get_test_dir(test_name: &str) -> PathBuf {
554 TempDir::create("signer_service", test_name)
555 }
556
557 #[tokio::test]
558 async fn test_auto_create_stores_directory() {
559 let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
560 let config = Configuration {
561 data_stores_directory: stores_dir.clone(),
562 ..Configuration::new_sample("party-123456")
563 };
564
565 assert!(!stores_dir.exists());
566 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
567 |_config| Ok(Arc::new(FakeChainObserver::new(Some(TimePoint::dummy()))));
568 let immutable_file_observer_builder: fn(
569 &Configuration,
570 )
571 -> StdResult<Arc<dyn ImmutableFileObserver>> =
572 |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
573
574 let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
575 dependencies_builder
576 .override_chain_observer_builder(chain_observer_builder)
577 .override_immutable_file_observer_builder(immutable_file_observer_builder)
578 .build()
579 .await
580 .expect("service builder build should not fail");
581 assert!(stores_dir.exists());
582 }
583}