1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, anyhow};
6use slog::Logger;
7use tokio::sync::{Mutex, RwLock};
8
9use mithril_aggregator_client::AggregatorHttpClient;
10use mithril_cardano_node_chain::{
11 chain_observer::{CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType},
12 chain_reader::PallasChainReader,
13 chain_scanner::CardanoBlockScanner,
14};
15use mithril_cardano_node_internal_database::{
16 ImmutableFileObserver, ImmutableFileSystemObserver,
17 digesters::CardanoImmutableDigester,
18 digesters::cache::{
19 ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
20 },
21 signable_builder::{CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder},
22};
23use mithril_common::StdResult;
24use mithril_common::api_version::APIVersionProvider;
25use mithril_common::crypto_helper::{
26 KesSigner, KesSignerStandard, OpCert, ProtocolPartyId, SerDeShelleyFileFormat,
27};
28use mithril_common::messages::RegisterSignatureMessageDmq;
29use mithril_common::signable_builder::{
30 CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder,
31 MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
32 SignableBuilderServiceDependencies,
33};
34
35use mithril_era::{EraChecker, EraReader};
36use mithril_signed_entity_lock::SignedEntityTypeLock;
37use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
38use mithril_ticker::{MithrilTickerService, TickerService};
39
40use mithril_persistence::database::repository::CardanoTransactionRepository;
41use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
42use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool};
43
44use mithril_protocol_config::http::HttpMithrilNetworkConfigurationProvider;
45
46use mithril_dmq::{DmqMessageBuilder, DmqPublisherClientPallas};
47
48use crate::dependency_injection::SignerDependencyContainer;
49use crate::services::SignaturePublisherDmq;
50use crate::services::{
51 CardanoTransactionsImporter, CardanoTransactionsPreloaderActivationSigner, MithrilEpochService,
52 MithrilSingleSigner, SignaturePublishRetryPolicy, SignaturePublisherDelayer,
53 SignaturePublisherNoop, SignaturePublisherRetrier, SignerCertifierService,
54 SignerSignableSeedBuilder, SignerSignedEntityConfigProvider, SignerUpkeepService,
55 TransactionsImporterByChunk, TransactionsImporterWithPruner, TransactionsImporterWithVacuum,
56};
57use crate::store::MKTreeStoreSqlite;
58use crate::{
59 Configuration, HTTP_REQUEST_TIMEOUT_DURATION, MetricsService, SQLITE_FILE,
60 SQLITE_FILE_CARDANO_TRANSACTION,
61};
62use crate::{
63 database::repository::{ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore},
64 services::SignaturePublisher,
65};
66
67pub struct DependenciesBuilder<'a> {
71 config: &'a Configuration,
72 chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
73 immutable_file_observer_builder:
74 fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
75 root_logger: Logger,
76}
77
78impl<'a> DependenciesBuilder<'a> {
79 pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
81 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
82 |config: &Configuration| {
83 let chain_observer_type = ChainObserverType::Pallas;
84 let cardano_cli_path = &config.cardano_cli_path;
85 let cardano_node_socket_path = &config.cardano_node_socket_path;
86 let cardano_network = &config.get_network().with_context(|| {
87 "Dependencies Builder can not get Cardano network while building the chain observer"
88 })?;
89 let cardano_cli_runner = &CardanoCliRunner::new(
90 cardano_cli_path.to_owned(),
91 cardano_node_socket_path.to_owned(),
92 cardano_network.to_owned(),
93 );
94
95 let chain_observer_builder = ChainObserverBuilder::new(
96 &chain_observer_type,
97 cardano_node_socket_path,
98 cardano_network,
99 Some(cardano_cli_runner),
100 );
101
102 chain_observer_builder
103 .build()
104 .with_context(|| "Dependencies Builder can not build chain observer")
105 };
106
107 let immutable_file_observer_builder: fn(
108 &Configuration,
109 )
110 -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
111 Ok(Arc::new(ImmutableFileSystemObserver::new(
112 &config.db_directory,
113 )))
114 };
115
116 Self {
117 config,
118 chain_observer_builder,
119 immutable_file_observer_builder,
120 root_logger,
121 }
122 }
123
124 pub fn override_immutable_file_observer_builder(
126 &mut self,
127 builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
128 ) -> &mut Self {
129 self.immutable_file_observer_builder = builder;
130
131 self
132 }
133
134 pub fn root_logger(&self) -> Logger {
136 self.root_logger.clone()
137 }
138
139 pub fn override_chain_observer_builder(
141 &mut self,
142 builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
143 ) -> &mut Self {
144 self.chain_observer_builder = builder;
145
146 self
147 }
148
149 fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
151 match &self.config.operational_certificate_path {
152 Some(operational_certificate_path) => {
153 let opcert: OpCert = OpCert::from_file(operational_certificate_path)
154 .with_context(|| "Could not decode operational certificate")?;
155 Ok(opcert
156 .compute_protocol_party_id()
157 .with_context(|| "Could not compute party_id from operational certificate")?)
158 }
159 _ => Ok(self
160 .config
161 .party_id
162 .to_owned()
163 .with_context(|| "A party_id should at least be provided")?),
164 }
165 }
166
167 async fn build_digester_cache_provider(
168 &self,
169 ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
170 if self.config.disable_digests_cache {
171 return Ok(None);
172 }
173
174 let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
175 &self.config.data_stores_directory,
176 &format!("immutables_digests_{}.json", self.config.network),
177 )
178 .should_reset_digests_cache(self.config.reset_digests_cache)
179 .with_logger(self.root_logger())
180 .build()
181 .await?;
182
183 Ok(Some(Arc::new(cache_provider)))
184 }
185
186 pub async fn build_sqlite_connection(
188 &self,
189 sqlite_file_name: &str,
190 migrations: Vec<SqlMigration>,
191 ) -> StdResult<SqliteConnection> {
192 let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
193 let connection = ConnectionBuilder::open_file(&sqlite_db_path)
194 .with_node_type(ApplicationNodeType::Signer)
195 .with_migrations(migrations)
196 .with_logger(self.root_logger())
197 .build()
198 .with_context(|| "Database connection initialisation error")?;
199
200 Ok(connection)
201 }
202
203 pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
205 if !self.config.data_stores_directory.exists() {
206 fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
207 format!(
208 "Could not create data stores directory: `{}`",
209 self.config.data_stores_directory.display()
210 )
211 })?;
212 }
213
214 let network = self.config.get_network()?;
215 let sqlite_connection = Arc::new(
216 self.build_sqlite_connection(SQLITE_FILE, crate::database::migration::get_migrations())
217 .await?,
218 );
219 let transaction_sqlite_connection = self
220 .build_sqlite_connection(
221 SQLITE_FILE_CARDANO_TRANSACTION,
222 mithril_persistence::database::cardano_transaction_migration::get_migrations(),
223 )
224 .await?;
225 let sqlite_connection_cardano_transaction_pool = Arc::new(
226 SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
227 );
228
229 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
230
231 let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
232 sqlite_connection.clone(),
233 self.config.store_retention_limit.map(|limit| limit as u64),
234 ));
235
236 let digester = Arc::new(CardanoImmutableDigester::new(
237 network.to_string(),
238 self.build_digester_cache_provider().await?,
239 self.root_logger(),
240 ));
241 let stake_store = Arc::new(StakePoolStore::new(
242 sqlite_connection.clone(),
243 self.config.store_retention_limit.map(|limit| limit as u64),
244 ));
245 let chain_observer = {
246 let builder = self.chain_observer_builder;
247 builder(self.config)?
248 };
249 let ticker_service = {
250 let builder = self.immutable_file_observer_builder;
251 Arc::new(MithrilTickerService::new(
252 chain_observer.clone(),
253 builder(self.config)?,
254 ))
255 };
256
257 let era_reader = Arc::new(EraReader::new(
258 self.config.build_era_reader_adapter(chain_observer.clone())?,
259 ));
260 let era_epoch_token = era_reader
261 .read_era_epoch_token(ticker_service.get_current_epoch().await?)
262 .await?;
263 let era_checker = Arc::new(EraChecker::new(
264 era_epoch_token.get_current_supported_era()?,
265 era_epoch_token.get_current_epoch(),
266 ));
267
268 let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
269 let aggregator_client = Arc::new(
270 AggregatorHttpClient::builder(self.config.aggregator_endpoint.clone())
271 .with_relay_endpoint(self.config.relay_endpoint.clone())
272 .with_api_version_provider(api_version_provider.clone())
273 .with_timeout(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION))
274 .with_logger(self.root_logger())
275 .build()?,
276 );
277
278 let cardano_immutable_snapshot_builder =
279 Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
280 digester.clone(),
281 &self.config.db_directory,
282 self.root_logger(),
283 ));
284 let mithril_stake_distribution_signable_builder =
285 Arc::new(MithrilStakeDistributionSignableBuilder::default());
286 let transaction_store = Arc::new(CardanoTransactionRepository::new(
287 sqlite_connection_cardano_transaction_pool.clone(),
288 ));
289 let chain_block_reader = PallasChainReader::new(
290 &self.config.cardano_node_socket_path,
291 network,
292 self.root_logger(),
293 );
294 let block_scanner = Arc::new(CardanoBlockScanner::new(
295 Arc::new(Mutex::new(chain_block_reader)),
296 self.config
297 .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
298 self.root_logger(),
299 ));
300 let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
301 block_scanner,
302 transaction_store.clone(),
303 self.root_logger(),
304 ));
305 let transactions_importer = Arc::new(TransactionsImporterWithPruner::new(
307 self.config
308 .enable_transaction_pruning
309 .then_some(self.config.network_security_parameter),
310 transaction_store.clone(),
311 transactions_importer,
312 self.root_logger(),
313 ));
314 let state_machine_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
317 transaction_store.clone(),
318 transactions_importer.clone(),
319 self.config.transactions_import_block_chunk_size,
320 self.root_logger(),
321 ));
322 let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
325 transaction_store.clone(),
326 Arc::new(TransactionsImporterWithVacuum::new(
327 sqlite_connection_cardano_transaction_pool.clone(),
328 transactions_importer.clone(),
329 self.root_logger(),
330 )),
331 self.config.transactions_import_block_chunk_size,
332 self.root_logger(),
333 ));
334 let block_range_root_retriever = transaction_store.clone();
335 let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
336 MKTreeStoreSqlite,
337 >::new(
338 state_machine_transactions_importer,
339 block_range_root_retriever,
340 ));
341 let cardano_stake_distribution_signable_builder = Arc::new(
342 CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
343 );
344 let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
345 digester.clone(),
346 &self.config.db_directory,
347 self.root_logger(),
348 ));
349 let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
350 stake_store.clone(),
351 protocol_initializer_store.clone(),
352 self.root_logger(),
353 )));
354 let single_signer = Arc::new(MithrilSingleSigner::new(
355 self.compute_protocol_party_id()?,
356 epoch_service.clone(),
357 self.root_logger(),
358 ));
359 let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
360 epoch_service.clone(),
361 protocol_initializer_store.clone(),
362 ));
363 let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
364 mithril_stake_distribution_signable_builder,
365 cardano_immutable_snapshot_builder,
366 cardano_transactions_builder,
367 cardano_stake_distribution_signable_builder,
368 cardano_database_signable_builder,
369 );
370 let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
371 signable_seed_builder_service,
372 signable_builders_dependencies,
373 self.root_logger(),
374 ));
375 let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
376 let network_configuration_service = Arc::new(HttpMithrilNetworkConfigurationProvider::new(
377 aggregator_client.clone(),
378 ));
379 let preloader_activation = CardanoTransactionsPreloaderActivationSigner::new(
380 network_configuration_service.clone(),
381 ticker_service.clone(),
382 );
383 let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
384 signed_entity_type_lock.clone(),
385 preloader_transactions_importer,
386 self.config.preload_security_parameter,
387 chain_observer.clone(),
388 self.root_logger(),
389 Arc::new(preloader_activation),
390 ));
391 let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
392 sqlite_connection.clone(),
393 self.config.store_retention_limit.map(|limit| limit as u64),
394 ));
395 let upkeep_service = Arc::new(SignerUpkeepService::new(
396 sqlite_connection.clone(),
397 sqlite_connection_cardano_transaction_pool,
398 signed_entity_type_lock.clone(),
399 vec![
400 signed_beacon_repository.clone(),
401 stake_store.clone(),
402 protocol_initializer_store.clone(),
403 ],
404 self.root_logger(),
405 ));
406
407 let kes_signer = match (
408 &self.config.kes_secret_key_path,
409 &self.config.operational_certificate_path,
410 ) {
411 (Some(kes_secret_key_path), Some(operational_certificate_path)) => {
412 Some(Arc::new(KesSignerStandard::new(
413 kes_secret_key_path.clone(),
414 operational_certificate_path.clone(),
415 )) as Arc<dyn KesSigner>)
416 }
417 (Some(_), None) | (None, Some(_)) => {
418 return Err(anyhow!(
419 "kes_secret_key and operational_certificate are both mandatory".to_string(),
420 ));
421 }
422 _ => None,
423 };
424
425 let signature_publisher = {
426 let first_publisher = SignaturePublisherRetrier::new(
427 match &self.config.dmq_node_socket_path {
428 Some(dmq_node_socket_path) => {
429 let dmq_network = &self.config.get_dmq_network()?;
430 let dmq_message_builder = DmqMessageBuilder::new(
431 kes_signer.clone().with_context(
432 || "A KES signer is mandatory to sign DMQ messages",
433 )?,
434 chain_observer.clone(),
435 );
436 Arc::new(SignaturePublisherDmq::new(Arc::new(
437 DmqPublisherClientPallas::<RegisterSignatureMessageDmq>::new(
438 dmq_node_socket_path.to_owned(),
439 *dmq_network,
440 dmq_message_builder,
441 self.root_logger(),
442 ),
443 ))) as Arc<dyn SignaturePublisher>
444 }
445 _ => Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
446 },
447 SignaturePublishRetryPolicy::never(),
448 );
449
450 let second_publisher = SignaturePublisherRetrier::new(
451 aggregator_client.clone(),
452 SignaturePublishRetryPolicy {
453 attempts: self.config.signature_publisher_config.retry_attempts,
454 delay_between_attempts: Duration::from_millis(
455 self.config.signature_publisher_config.retry_delay_ms,
456 ),
457 },
458 );
459
460 if self.config.signature_publisher_config.skip_delayer {
461 Arc::new(first_publisher) as Arc<dyn SignaturePublisher>
462 } else {
463 Arc::new(SignaturePublisherDelayer::new(
464 Arc::new(first_publisher),
465 Arc::new(second_publisher),
466 Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
467 self.root_logger(),
468 )) as Arc<dyn SignaturePublisher>
469 }
470 };
471
472 let certifier = Arc::new(SignerCertifierService::new(
473 signed_beacon_repository,
474 Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
475 signed_entity_type_lock.clone(),
476 single_signer.clone(),
477 signature_publisher,
478 self.root_logger(),
479 ));
480
481 let services = SignerDependencyContainer {
482 ticker_service,
483 chain_observer,
484 digester,
485 single_signer,
486 stake_store,
487 protocol_initializer_store,
488 era_checker,
489 era_reader,
490 api_version_provider,
491 signable_builder_service,
492 metrics_service,
493 signed_entity_type_lock,
494 cardano_transactions_preloader,
495 upkeep_service,
496 epoch_service,
497 certifier,
498 signer_registration_publisher: aggregator_client.clone(),
499 signers_registration_retriever: aggregator_client,
500 kes_signer,
501 network_configuration_service,
502 };
503
504 Ok(services)
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use std::path::PathBuf;
511
512 use mithril_cardano_node_chain::test::double::FakeChainObserver;
513 use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
514 use mithril_common::test::double::Dummy;
515 use mithril_common::{entities::TimePoint, test::TempDir};
516
517 use crate::test_tools::TestLogger;
518
519 use super::*;
520
521 fn get_test_dir(test_name: &str) -> PathBuf {
522 TempDir::create("signer_service", test_name)
523 }
524
525 #[tokio::test]
526 async fn test_auto_create_stores_directory() {
527 let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
528 let config = Configuration {
529 data_stores_directory: stores_dir.clone(),
530 ..Configuration::new_sample("party-123456")
531 };
532
533 assert!(!stores_dir.exists());
534 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
535 |_config| Ok(Arc::new(FakeChainObserver::new(Some(TimePoint::dummy()))));
536 let immutable_file_observer_builder: fn(
537 &Configuration,
538 )
539 -> StdResult<Arc<dyn ImmutableFileObserver>> =
540 |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
541
542 let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
543 dependencies_builder
544 .override_chain_observer_builder(chain_observer_builder)
545 .override_immutable_file_observer_builder(immutable_file_observer_builder)
546 .build()
547 .await
548 .expect("service builder build should not fail");
549 assert!(stores_dir.exists());
550 }
551}