1use std::collections::HashMap;
2use std::fs;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, anyhow};
7use slog::Logger;
8use tokio::sync::{Mutex, RwLock};
9
10use mithril_aggregator_client::AggregatorHttpClient;
11use mithril_cardano_node_chain::{
12 chain_importer::{
13 CardanoChainDataImporter, ChainDataImporterByChunk, ChainDataImporterWithPruner,
14 },
15 chain_observer::{CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType},
16 chain_reader::PallasChainReader,
17 chain_scanner::CardanoBlockScanner,
18};
19use mithril_cardano_node_internal_database::{
20 ImmutableFileObserver, ImmutableFileSystemObserver,
21 digesters::CardanoImmutableDigester,
22 digesters::cache::{
23 ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
24 },
25 signable_builder::{CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder},
26};
27use mithril_common::api_version::APIVersionProvider;
28use mithril_common::crypto_helper::{
29 KesSigner, KesSignerStandard, OpCert, ProtocolPartyId, SerDeShelleyFileFormat,
30};
31use mithril_common::messages::RegisterSignatureMessageDmq;
32use mithril_common::signable_builder::{
33 CardanoBlocksTransactionsSignableBuilder, CardanoStakeDistributionSignableBuilder,
34 CardanoTransactionsSignableBuilder, MithrilSignableBuilderService,
35 MithrilStakeDistributionSignableBuilder, SignableBuilderServiceDependencies,
36};
37use mithril_common::{MITHRIL_SIGNER_VERSION_HEADER, StdResult};
38
39use mithril_era::{EraChecker, EraReader};
40use mithril_signed_entity_lock::SignedEntityTypeLock;
41use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
42use mithril_ticker::{MithrilTickerService, TickerService};
43
44use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
45use mithril_persistence::sqlite::{
46 ConnectionBuilder, ConnectionOptions, SqliteConnection, SqliteConnectionPool,
47};
48
49use mithril_protocol_config::http::HttpMithrilNetworkConfigurationProvider;
50
51use mithril_dmq::{DmqMessageBuilder, DmqPublisherClientPallas};
52
53use crate::database::repository::{
54 ProtocolInitializerRepository, SignedBeaconRepository, SignerCardanoChainDataRepository,
55 StakePoolStore,
56};
57use crate::dependency_injection::SignerDependencyContainer;
58use crate::services::{
59 CardanoTransactionsPreloaderActivationSigner, ChainDataImporterWithVacuum, MithrilEpochService,
60 MithrilSingleSigner, SignaturePublishRetryPolicy, SignaturePublisher,
61 SignaturePublisherDelayer, SignaturePublisherDmq, SignaturePublisherNoop,
62 SignaturePublisherRetrier, SignerCertifierService, SignerChainDataImporter,
63 SignerSignableSeedBuilder, SignerSignedEntityConfigProvider, SignerUpkeepService,
64};
65use crate::store::MKTreeStoreSqlite;
66use crate::{
67 Configuration, HTTP_REQUEST_TIMEOUT_DURATION, MetricsService, SQLITE_FILE,
68 SQLITE_FILE_CARDANO_TRANSACTION,
69};
70
71pub struct DependenciesBuilder<'a> {
75 config: &'a Configuration,
76 chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
77 immutable_file_observer_builder:
78 fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
79 root_logger: Logger,
80}
81
82impl<'a> DependenciesBuilder<'a> {
83 pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
85 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
86 |config: &Configuration| {
87 let chain_observer_type = ChainObserverType::Pallas;
88 let cardano_cli_path = &config.cardano_cli_path;
89 let cardano_node_socket_path = &config.cardano_node_socket_path;
90 let cardano_network = &config.get_network().with_context(|| {
91 "Dependencies Builder can not get Cardano network while building the chain observer"
92 })?;
93 let cardano_cli_runner = &CardanoCliRunner::new(
94 cardano_cli_path.to_owned(),
95 cardano_node_socket_path.to_owned(),
96 cardano_network.to_owned(),
97 );
98
99 let chain_observer_builder = ChainObserverBuilder::new(
100 &chain_observer_type,
101 cardano_node_socket_path,
102 cardano_network,
103 Some(cardano_cli_runner),
104 );
105
106 chain_observer_builder
107 .build()
108 .with_context(|| "Dependencies Builder can not build chain observer")
109 };
110
111 let immutable_file_observer_builder: fn(
112 &Configuration,
113 )
114 -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
115 Ok(Arc::new(ImmutableFileSystemObserver::new(
116 &config.db_directory,
117 )))
118 };
119
120 Self {
121 config,
122 chain_observer_builder,
123 immutable_file_observer_builder,
124 root_logger,
125 }
126 }
127
128 pub fn override_immutable_file_observer_builder(
130 &mut self,
131 builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
132 ) -> &mut Self {
133 self.immutable_file_observer_builder = builder;
134
135 self
136 }
137
138 pub fn root_logger(&self) -> Logger {
140 self.root_logger.clone()
141 }
142
143 pub fn override_chain_observer_builder(
145 &mut self,
146 builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
147 ) -> &mut Self {
148 self.chain_observer_builder = builder;
149
150 self
151 }
152
153 fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
155 match &self.config.operational_certificate_path {
156 Some(operational_certificate_path) => {
157 let opcert: OpCert = OpCert::from_file(operational_certificate_path)
158 .with_context(|| "Could not decode operational certificate")?;
159 Ok(opcert
160 .compute_protocol_party_id()
161 .with_context(|| "Could not compute party_id from operational certificate")?)
162 }
163 _ => Ok(self
164 .config
165 .party_id
166 .to_owned()
167 .with_context(|| "A party_id should at least be provided")?),
168 }
169 }
170
171 async fn build_digester_cache_provider(
172 &self,
173 ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
174 if self.config.disable_digests_cache {
175 return Ok(None);
176 }
177
178 let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
179 &self.config.data_stores_directory,
180 &format!("immutables_digests_{}.json", self.config.network),
181 )
182 .should_reset_digests_cache(self.config.reset_digests_cache)
183 .with_logger(self.root_logger())
184 .build()
185 .await?;
186
187 Ok(Some(Arc::new(cache_provider)))
188 }
189
190 pub async fn build_main_sqlite_connection(
192 &self,
193 sqlite_file_name: &str,
194 ) -> StdResult<SqliteConnection> {
195 self.setup_sqlite_connection_builder(
196 sqlite_file_name,
197 crate::database::migration::get_migrations(),
198 &[],
199 )?
200 .build()
201 .with_context(|| "Main database connection initialisation error")
202 }
203
204 pub async fn build_cardano_tx_sqlite_connection_pool(
206 &self,
207 sqlite_file_name: &str,
208 pool_size: usize,
209 ) -> StdResult<SqliteConnectionPool> {
210 self.setup_sqlite_connection_builder(
211 sqlite_file_name,
212 mithril_persistence::database::cardano_transaction_migration::get_migrations(),
213 &[ConnectionOptions::EnableForeignKeys],
214 )?
215 .build_pool(pool_size)
216 .with_context(|| "'cardano_tx' Database connection initialisation error")
217 }
218
219 fn setup_sqlite_connection_builder(
220 &self,
221 sqlite_file_name: &str,
222 migrations: Vec<SqlMigration>,
223 options: &[ConnectionOptions],
224 ) -> StdResult<ConnectionBuilder> {
225 let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
226 let builder = ConnectionBuilder::open_file(&sqlite_db_path)
227 .with_node_type(ApplicationNodeType::Signer)
228 .with_migrations(migrations)
229 .with_options(options)
230 .with_logger(self.root_logger());
231
232 Ok(builder)
233 }
234
235 fn build_aggregator_client(
236 &self,
237 api_version_provider: &Arc<APIVersionProvider>,
238 ) -> StdResult<Arc<AggregatorHttpClient>> {
239 let client = AggregatorHttpClient::builder(self.config.aggregator_endpoint.clone())
240 .with_headers(HashMap::from([(
241 MITHRIL_SIGNER_VERSION_HEADER.to_string(),
242 env!("CARGO_PKG_VERSION").to_string(),
243 )]))
244 .with_relay_endpoint(self.config.relay_endpoint.clone())
245 .with_api_version_provider(api_version_provider.clone())
246 .with_timeout(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION))
247 .with_logger(self.root_logger())
248 .build()?;
249 Ok(Arc::new(client))
250 }
251
252 pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
254 if !self.config.data_stores_directory.exists() {
255 fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
256 format!(
257 "Could not create data stores directory: `{}`",
258 self.config.data_stores_directory.display()
259 )
260 })?;
261 }
262
263 let network = self.config.get_network()?;
264 let sqlite_connection = Arc::new(self.build_main_sqlite_connection(SQLITE_FILE).await?);
265 let sqlite_connection_cardano_transaction_pool = Arc::new(
266 self.build_cardano_tx_sqlite_connection_pool(SQLITE_FILE_CARDANO_TRANSACTION, 1)
267 .await?,
268 );
269
270 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
271
272 let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
273 sqlite_connection.clone(),
274 self.config.store_retention_limit.map(|limit| limit as u64),
275 ));
276
277 let digester = Arc::new(CardanoImmutableDigester::new(
278 network.to_string(),
279 self.build_digester_cache_provider().await?,
280 self.root_logger(),
281 ));
282 let stake_store = Arc::new(StakePoolStore::new(
283 sqlite_connection.clone(),
284 self.config.store_retention_limit.map(|limit| limit as u64),
285 ));
286 let chain_observer = {
287 let builder = self.chain_observer_builder;
288 builder(self.config)?
289 };
290 let ticker_service = {
291 let builder = self.immutable_file_observer_builder;
292 Arc::new(MithrilTickerService::new(
293 chain_observer.clone(),
294 builder(self.config)?,
295 ))
296 };
297
298 let era_reader = Arc::new(EraReader::new(
299 self.config.build_era_reader_adapter(chain_observer.clone())?,
300 ));
301 let era_epoch_token = era_reader
302 .read_era_epoch_token(ticker_service.get_current_epoch().await?)
303 .await?;
304 let era_checker = Arc::new(EraChecker::new(
305 era_epoch_token.get_current_supported_era()?,
306 era_epoch_token.get_current_epoch(),
307 ));
308
309 let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
310 let aggregator_client = self.build_aggregator_client(&api_version_provider)?;
311
312 let cardano_immutable_snapshot_builder =
313 Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
314 digester.clone(),
315 &self.config.db_directory,
316 self.root_logger(),
317 ));
318 let mithril_stake_distribution_signable_builder =
319 Arc::new(MithrilStakeDistributionSignableBuilder::default());
320 let chain_data_store = Arc::new(SignerCardanoChainDataRepository::new(
321 sqlite_connection_cardano_transaction_pool.clone(),
322 ));
323 let chain_block_reader = PallasChainReader::new(
324 &self.config.cardano_node_socket_path,
325 network,
326 self.root_logger(),
327 );
328 let block_scanner = Arc::new(
329 CardanoBlockScanner::new(
330 Arc::new(Mutex::new(chain_block_reader)),
331 self.config
332 .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
333 self.root_logger(),
334 )
335 .set_throttling_interval(
336 self.config
337 .cardano_transactions_block_streamer_throttling_interval
338 .map(Duration::from_millis),
339 ),
340 );
341 let chain_data_importer = Arc::new(CardanoChainDataImporter::new(
342 block_scanner,
343 chain_data_store.clone(),
344 self.root_logger(),
345 ));
346 let chain_data_importer = Arc::new(ChainDataImporterWithPruner::new(
348 self.config
349 .enable_transaction_pruning
350 .then_some(self.config.network_security_parameter),
351 chain_data_store.clone(),
352 chain_data_importer,
353 self.root_logger(),
354 ));
355 let state_machine_transactions_importer = Arc::new(SignerChainDataImporter::new(Arc::new(
358 ChainDataImporterByChunk::new(
359 chain_data_store.clone(),
360 chain_data_importer.clone(),
361 self.config.transactions_import_block_chunk_size,
362 self.root_logger(),
363 ),
364 )));
365 let preloader_chain_data_importer = Arc::new(SignerChainDataImporter::new(Arc::new(
368 ChainDataImporterByChunk::new(
369 chain_data_store.clone(),
370 Arc::new(ChainDataImporterWithVacuum::new(
371 sqlite_connection_cardano_transaction_pool.clone(),
372 chain_data_importer.clone(),
373 self.root_logger(),
374 )),
375 self.config.transactions_import_block_chunk_size,
376 self.root_logger(),
377 ),
378 )));
379 let block_range_root_retriever = chain_data_store.clone();
380 let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
381 MKTreeStoreSqlite,
382 >::new(
383 state_machine_transactions_importer.clone(),
384 block_range_root_retriever.clone(),
385 ));
386 let cardano_blocks_transactions_builder = Arc::new(
387 CardanoBlocksTransactionsSignableBuilder::<MKTreeStoreSqlite>::new(
388 state_machine_transactions_importer,
389 block_range_root_retriever,
390 ),
391 );
392 let cardano_stake_distribution_signable_builder = Arc::new(
393 CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
394 );
395 let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
396 digester.clone(),
397 &self.config.db_directory,
398 self.root_logger(),
399 ));
400 let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
401 stake_store.clone(),
402 protocol_initializer_store.clone(),
403 self.root_logger(),
404 )));
405 let single_signer = Arc::new(MithrilSingleSigner::new(
406 self.compute_protocol_party_id()?,
407 epoch_service.clone(),
408 self.root_logger(),
409 ));
410 let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
411 epoch_service.clone(),
412 protocol_initializer_store.clone(),
413 ));
414 let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
415 mithril_stake_distribution_signable_builder,
416 cardano_immutable_snapshot_builder,
417 cardano_transactions_builder,
418 cardano_blocks_transactions_builder,
419 cardano_stake_distribution_signable_builder,
420 cardano_database_signable_builder,
421 );
422 let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
423 signable_seed_builder_service,
424 signable_builders_dependencies,
425 self.root_logger(),
426 ));
427 let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
428 let network_configuration_service = Arc::new(HttpMithrilNetworkConfigurationProvider::new(
429 aggregator_client.clone(),
430 ));
431 let preloader_activation = CardanoTransactionsPreloaderActivationSigner::new(
432 network_configuration_service.clone(),
433 ticker_service.clone(),
434 );
435 let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
436 signed_entity_type_lock.clone(),
437 preloader_chain_data_importer,
438 self.config.preload_security_parameter,
439 chain_observer.clone(),
440 self.root_logger(),
441 Arc::new(preloader_activation),
442 ));
443 let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
444 sqlite_connection.clone(),
445 self.config.store_retention_limit.map(|limit| limit as u64),
446 ));
447 let upkeep_service = Arc::new(SignerUpkeepService::new(
448 sqlite_connection.clone(),
449 sqlite_connection_cardano_transaction_pool,
450 signed_entity_type_lock.clone(),
451 vec![
452 signed_beacon_repository.clone(),
453 stake_store.clone(),
454 protocol_initializer_store.clone(),
455 ],
456 self.root_logger(),
457 ));
458
459 let kes_signer = match (
460 &self.config.kes_secret_key_path,
461 &self.config.operational_certificate_path,
462 ) {
463 (Some(kes_secret_key_path), Some(operational_certificate_path)) => {
464 Some(Arc::new(KesSignerStandard::new(
465 kes_secret_key_path.clone(),
466 operational_certificate_path.clone(),
467 )) as Arc<dyn KesSigner>)
468 }
469 (Some(_), None) | (None, Some(_)) => {
470 return Err(anyhow!(
471 "kes_secret_key and operational_certificate are both mandatory".to_string(),
472 ));
473 }
474 _ => None,
475 };
476
477 let signature_publisher = {
478 let first_publisher = SignaturePublisherRetrier::new(
479 match &self.config.dmq_node_socket_path {
480 Some(dmq_node_socket_path) => {
481 let dmq_network = &self.config.get_dmq_network()?;
482 let dmq_message_builder = DmqMessageBuilder::new(
483 kes_signer.clone().with_context(
484 || "A KES signer is mandatory to sign DMQ messages",
485 )?,
486 chain_observer.clone(),
487 );
488 Arc::new(SignaturePublisherDmq::new(Arc::new(
489 DmqPublisherClientPallas::<RegisterSignatureMessageDmq>::new(
490 dmq_node_socket_path.to_owned(),
491 *dmq_network,
492 dmq_message_builder,
493 self.root_logger(),
494 ),
495 ))) as Arc<dyn SignaturePublisher>
496 }
497 _ => Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
498 },
499 SignaturePublishRetryPolicy::never(),
500 );
501
502 let second_publisher = SignaturePublisherRetrier::new(
503 aggregator_client.clone(),
504 SignaturePublishRetryPolicy {
505 attempts: self.config.signature_publisher_config.retry_attempts,
506 delay_between_attempts: Duration::from_millis(
507 self.config.signature_publisher_config.retry_delay_ms,
508 ),
509 },
510 );
511
512 if self.config.signature_publisher_config.skip_delayer {
513 Arc::new(first_publisher) as Arc<dyn SignaturePublisher>
514 } else {
515 Arc::new(SignaturePublisherDelayer::new(
516 Arc::new(first_publisher),
517 Arc::new(second_publisher),
518 Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
519 self.root_logger(),
520 )) as Arc<dyn SignaturePublisher>
521 }
522 };
523
524 let certifier = Arc::new(SignerCertifierService::new(
525 signed_beacon_repository,
526 Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
527 signed_entity_type_lock.clone(),
528 single_signer.clone(),
529 signature_publisher,
530 self.root_logger(),
531 ));
532
533 let services = SignerDependencyContainer {
534 ticker_service,
535 chain_observer,
536 digester,
537 single_signer,
538 stake_store,
539 protocol_initializer_store,
540 era_checker,
541 era_reader,
542 api_version_provider,
543 signable_builder_service,
544 metrics_service,
545 signed_entity_type_lock,
546 cardano_transactions_preloader,
547 upkeep_service,
548 epoch_service,
549 certifier,
550 signer_registration_publisher: aggregator_client.clone(),
551 signers_registration_retriever: aggregator_client,
552 kes_signer,
553 network_configuration_service,
554 };
555
556 Ok(services)
557 }
558}
559
560#[cfg(test)]
561mod tests {
562 use std::path::PathBuf;
563
564 use mithril_aggregator_client::query::GetAggregatorFeaturesQuery;
565 use mithril_cardano_node_chain::test::double::FakeChainObserver;
566 use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
567 use mithril_common::{
568 entities::TimePoint,
569 messages::AggregatorFeaturesMessage,
570 test::{TempDir, api_version_extensions::ApiVersionProviderTestExtension, double::Dummy},
571 };
572
573 use crate::test::TestLogger;
574
575 use super::*;
576
577 fn get_test_dir(test_name: &str) -> PathBuf {
578 TempDir::create("signer_service", test_name)
579 }
580
581 #[tokio::test]
582 async fn test_auto_create_stores_directory() {
583 let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
584 let config = Configuration {
585 data_stores_directory: stores_dir.clone(),
586 ..Configuration::new_sample("party-123456")
587 };
588
589 assert!(!stores_dir.exists());
590 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
591 |_config| Ok(Arc::new(FakeChainObserver::new(Some(TimePoint::dummy()))));
592 let immutable_file_observer_builder: fn(
593 &Configuration,
594 )
595 -> StdResult<Arc<dyn ImmutableFileObserver>> =
596 |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
597
598 let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
599 dependencies_builder
600 .override_chain_observer_builder(chain_observer_builder)
601 .override_immutable_file_observer_builder(immutable_file_observer_builder)
602 .build()
603 .await
604 .expect("service builder build should not fail");
605 assert!(stores_dir.exists());
606 }
607
608 #[tokio::test]
609 async fn set_signer_version_header_to_built_aggregator_client() {
610 let server = httpmock::MockServer::start();
611 server.mock(|when, then| {
612 when.is_true(|req| {
613 req.headers()
614 .get(MITHRIL_SIGNER_VERSION_HEADER)
615 .is_some_and(|header| header == env!("CARGO_PKG_VERSION"))
616 });
617 then.status(200)
618 .json_body(serde_json::json!(AggregatorFeaturesMessage::dummy()));
619 });
620
621 let config = Configuration {
622 aggregator_endpoint: server.base_url(),
623 ..Configuration::new_sample("party-123456")
624 };
625 let dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
626 let aggregator_client = dependencies_builder
627 .build_aggregator_client(&Arc::new(APIVersionProvider::new_failing()))
629 .unwrap();
630
631 aggregator_client
632 .send(GetAggregatorFeaturesQuery::current())
633 .await
634 .unwrap();
635 }
636}