1use std::collections::HashMap;
2use std::fs;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, anyhow};
7use slog::Logger;
8use tokio::sync::{Mutex, RwLock};
9
10use mithril_aggregator_client::AggregatorHttpClient;
11use mithril_cardano_node_chain::{
12 chain_importer::{
13 CardanoChainDataImporter, ChainDataImporterByChunk, ChainDataImporterWithPruner,
14 },
15 chain_observer::{CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType},
16 chain_reader::PallasChainReader,
17 chain_scanner::CardanoBlockScanner,
18};
19use mithril_cardano_node_internal_database::{
20 ImmutableFileObserver, ImmutableFileSystemObserver,
21 digesters::CardanoImmutableDigester,
22 digesters::cache::{
23 ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
24 },
25 signable_builder::{CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder},
26};
27use mithril_common::api_version::APIVersionProvider;
28use mithril_common::crypto_helper::{
29 KesSigner, KesSignerStandard, OpCert, ProtocolPartyId, SerDeShelleyFileFormat,
30};
31use mithril_common::messages::RegisterSignatureMessageDmq;
32use mithril_common::signable_builder::{
33 CardanoBlocksTransactionsSignableBuilder, CardanoStakeDistributionSignableBuilder,
34 CardanoTransactionsSignableBuilder, MithrilSignableBuilderService,
35 MithrilStakeDistributionSignableBuilder, SignableBuilderServiceDependencies,
36};
37use mithril_common::{MITHRIL_SIGNER_VERSION_HEADER, StdResult};
38
39use mithril_era::{EraChecker, EraReader};
40use mithril_signed_entity_lock::SignedEntityTypeLock;
41use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
42use mithril_ticker::{MithrilTickerService, TickerService};
43
44use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
45use mithril_persistence::sqlite::{
46 ConnectionBuilder, ConnectionOptions, SqliteConnection, SqliteConnectionPool,
47};
48
49use mithril_protocol_config::http::HttpMithrilNetworkConfigurationProvider;
50
51use mithril_dmq::{DmqMessageBuilder, DmqPublisherClientPallas};
52
53use crate::database::repository::{
54 ProtocolInitializerRepository, SignedBeaconRepository, SignerCardanoChainDataRepository,
55 StakePoolStore,
56};
57use crate::dependency_injection::SignerDependencyContainer;
58use crate::services::{
59 CardanoTransactionsPreloaderActivationSigner, ChainDataImporterWithVacuum, MithrilEpochService,
60 MithrilSingleSigner, SignaturePublishRetryPolicy, SignaturePublisher,
61 SignaturePublisherDelayer, SignaturePublisherDmq, SignaturePublisherNoop,
62 SignaturePublisherRetrier, SignerCertifierService, SignerChainDataImporter,
63 SignerSignableSeedBuilder, SignerSignedEntityConfigProvider, SignerUpkeepService,
64};
65use crate::store::MKTreeStoreSqlite;
66use crate::{
67 Configuration, HTTP_REQUEST_TIMEOUT_DURATION, MetricsService, SQLITE_FILE,
68 SQLITE_FILE_CARDANO_TRANSACTION,
69};
70
71pub struct DependenciesBuilder<'a> {
75 config: &'a Configuration,
76 chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
77 immutable_file_observer_builder:
78 fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
79 root_logger: Logger,
80}
81
82impl<'a> DependenciesBuilder<'a> {
83 pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
85 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
86 |config: &Configuration| {
87 let chain_observer_type = ChainObserverType::Pallas;
88 let cardano_cli_path = &config.cardano_cli_path;
89 let cardano_node_socket_path = &config.cardano_node_socket_path;
90 let cardano_network = &config.get_network().with_context(|| {
91 "Dependencies Builder can not get Cardano network while building the chain observer"
92 })?;
93 let cardano_cli_runner = &CardanoCliRunner::new(
94 cardano_cli_path.to_owned(),
95 cardano_node_socket_path.to_owned(),
96 cardano_network.to_owned(),
97 );
98
99 let chain_observer_builder = ChainObserverBuilder::new(
100 &chain_observer_type,
101 cardano_node_socket_path,
102 cardano_network,
103 Some(cardano_cli_runner),
104 );
105
106 chain_observer_builder
107 .build()
108 .with_context(|| "Dependencies Builder can not build chain observer")
109 };
110
111 let immutable_file_observer_builder: fn(
112 &Configuration,
113 )
114 -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
115 Ok(Arc::new(ImmutableFileSystemObserver::new(
116 &config.db_directory,
117 )))
118 };
119
120 Self {
121 config,
122 chain_observer_builder,
123 immutable_file_observer_builder,
124 root_logger,
125 }
126 }
127
128 pub fn override_immutable_file_observer_builder(
130 &mut self,
131 builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
132 ) -> &mut Self {
133 self.immutable_file_observer_builder = builder;
134
135 self
136 }
137
138 pub fn root_logger(&self) -> Logger {
140 self.root_logger.clone()
141 }
142
143 pub fn override_chain_observer_builder(
145 &mut self,
146 builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
147 ) -> &mut Self {
148 self.chain_observer_builder = builder;
149
150 self
151 }
152
153 fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
155 match &self.config.operational_certificate_path {
156 Some(operational_certificate_path) => {
157 let opcert: OpCert = OpCert::from_file(operational_certificate_path)
158 .with_context(|| "Could not decode operational certificate")?;
159 Ok(opcert
160 .compute_protocol_party_id()
161 .with_context(|| "Could not compute party_id from operational certificate")?)
162 }
163 _ => Ok(self
164 .config
165 .party_id
166 .to_owned()
167 .with_context(|| "A party_id should at least be provided")?),
168 }
169 }
170
171 async fn build_digester_cache_provider(
172 &self,
173 ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
174 if self.config.disable_digests_cache {
175 return Ok(None);
176 }
177
178 let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
179 &self.config.data_stores_directory,
180 &format!("immutables_digests_{}.json", self.config.network),
181 )
182 .should_reset_digests_cache(self.config.reset_digests_cache)
183 .with_logger(self.root_logger())
184 .build()
185 .await?;
186
187 Ok(Some(Arc::new(cache_provider)))
188 }
189
190 pub async fn build_main_sqlite_connection(
192 &self,
193 sqlite_file_name: &str,
194 ) -> StdResult<SqliteConnection> {
195 self.build_sqlite_connection(
196 sqlite_file_name,
197 crate::database::migration::get_migrations(),
198 &[],
199 )
200 }
201
202 pub async fn build_cardano_tx_sqlite_connection(
204 &self,
205 sqlite_file_name: &str,
206 ) -> StdResult<SqliteConnection> {
207 self.build_sqlite_connection(
208 sqlite_file_name,
209 mithril_persistence::database::cardano_transaction_migration::get_migrations(),
210 &[ConnectionOptions::EnableForeignKeys],
211 )
212 }
213
214 fn build_sqlite_connection(
215 &self,
216 sqlite_file_name: &str,
217 migrations: Vec<SqlMigration>,
218 options: &[ConnectionOptions],
219 ) -> StdResult<SqliteConnection> {
220 let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
221 let connection = ConnectionBuilder::open_file(&sqlite_db_path)
222 .with_node_type(ApplicationNodeType::Signer)
223 .with_migrations(migrations)
224 .with_options(options)
225 .with_logger(self.root_logger())
226 .build()
227 .with_context(|| "Database connection initialisation error")?;
228
229 Ok(connection)
230 }
231
232 fn build_aggregator_client(
233 &self,
234 api_version_provider: &Arc<APIVersionProvider>,
235 ) -> StdResult<Arc<AggregatorHttpClient>> {
236 let client = AggregatorHttpClient::builder(self.config.aggregator_endpoint.clone())
237 .with_headers(HashMap::from([(
238 MITHRIL_SIGNER_VERSION_HEADER.to_string(),
239 env!("CARGO_PKG_VERSION").to_string(),
240 )]))
241 .with_relay_endpoint(self.config.relay_endpoint.clone())
242 .with_api_version_provider(api_version_provider.clone())
243 .with_timeout(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION))
244 .with_logger(self.root_logger())
245 .build()?;
246 Ok(Arc::new(client))
247 }
248
249 pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
251 if !self.config.data_stores_directory.exists() {
252 fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
253 format!(
254 "Could not create data stores directory: `{}`",
255 self.config.data_stores_directory.display()
256 )
257 })?;
258 }
259
260 let network = self.config.get_network()?;
261 let sqlite_connection = Arc::new(self.build_main_sqlite_connection(SQLITE_FILE).await?);
262 let transaction_sqlite_connection = self
263 .build_cardano_tx_sqlite_connection(SQLITE_FILE_CARDANO_TRANSACTION)
264 .await?;
265 let sqlite_connection_cardano_transaction_pool = Arc::new(
266 SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
267 );
268
269 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
270
271 let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
272 sqlite_connection.clone(),
273 self.config.store_retention_limit.map(|limit| limit as u64),
274 ));
275
276 let digester = Arc::new(CardanoImmutableDigester::new(
277 network.to_string(),
278 self.build_digester_cache_provider().await?,
279 self.root_logger(),
280 ));
281 let stake_store = Arc::new(StakePoolStore::new(
282 sqlite_connection.clone(),
283 self.config.store_retention_limit.map(|limit| limit as u64),
284 ));
285 let chain_observer = {
286 let builder = self.chain_observer_builder;
287 builder(self.config)?
288 };
289 let ticker_service = {
290 let builder = self.immutable_file_observer_builder;
291 Arc::new(MithrilTickerService::new(
292 chain_observer.clone(),
293 builder(self.config)?,
294 ))
295 };
296
297 let era_reader = Arc::new(EraReader::new(
298 self.config.build_era_reader_adapter(chain_observer.clone())?,
299 ));
300 let era_epoch_token = era_reader
301 .read_era_epoch_token(ticker_service.get_current_epoch().await?)
302 .await?;
303 let era_checker = Arc::new(EraChecker::new(
304 era_epoch_token.get_current_supported_era()?,
305 era_epoch_token.get_current_epoch(),
306 ));
307
308 let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
309 let aggregator_client = self.build_aggregator_client(&api_version_provider)?;
310
311 let cardano_immutable_snapshot_builder =
312 Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
313 digester.clone(),
314 &self.config.db_directory,
315 self.root_logger(),
316 ));
317 let mithril_stake_distribution_signable_builder =
318 Arc::new(MithrilStakeDistributionSignableBuilder::default());
319 let chain_data_store = Arc::new(SignerCardanoChainDataRepository::new(
320 sqlite_connection_cardano_transaction_pool.clone(),
321 ));
322 let chain_block_reader = PallasChainReader::new(
323 &self.config.cardano_node_socket_path,
324 network,
325 self.root_logger(),
326 );
327 let block_scanner = Arc::new(
328 CardanoBlockScanner::new(
329 Arc::new(Mutex::new(chain_block_reader)),
330 self.config
331 .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
332 self.root_logger(),
333 )
334 .set_throttling_interval(
335 self.config
336 .cardano_transactions_block_streamer_throttling_interval
337 .map(Duration::from_millis),
338 ),
339 );
340 let chain_data_importer = Arc::new(CardanoChainDataImporter::new(
341 block_scanner,
342 chain_data_store.clone(),
343 self.root_logger(),
344 ));
345 let chain_data_importer = Arc::new(ChainDataImporterWithPruner::new(
347 self.config
348 .enable_transaction_pruning
349 .then_some(self.config.network_security_parameter),
350 chain_data_store.clone(),
351 chain_data_importer,
352 self.root_logger(),
353 ));
354 let state_machine_transactions_importer = Arc::new(SignerChainDataImporter::new(Arc::new(
357 ChainDataImporterByChunk::new(
358 chain_data_store.clone(),
359 chain_data_importer.clone(),
360 self.config.transactions_import_block_chunk_size,
361 self.root_logger(),
362 ),
363 )));
364 let preloader_chain_data_importer = Arc::new(SignerChainDataImporter::new(Arc::new(
367 ChainDataImporterByChunk::new(
368 chain_data_store.clone(),
369 Arc::new(ChainDataImporterWithVacuum::new(
370 sqlite_connection_cardano_transaction_pool.clone(),
371 chain_data_importer.clone(),
372 self.root_logger(),
373 )),
374 self.config.transactions_import_block_chunk_size,
375 self.root_logger(),
376 ),
377 )));
378 let block_range_root_retriever = chain_data_store.clone();
379 let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
380 MKTreeStoreSqlite,
381 >::new(
382 state_machine_transactions_importer.clone(),
383 block_range_root_retriever.clone(),
384 ));
385 let cardano_blocks_transactions_builder = Arc::new(
386 CardanoBlocksTransactionsSignableBuilder::<MKTreeStoreSqlite>::new(
387 state_machine_transactions_importer,
388 block_range_root_retriever,
389 ),
390 );
391 let cardano_stake_distribution_signable_builder = Arc::new(
392 CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
393 );
394 let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
395 digester.clone(),
396 &self.config.db_directory,
397 self.root_logger(),
398 ));
399 let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
400 stake_store.clone(),
401 protocol_initializer_store.clone(),
402 self.root_logger(),
403 )));
404 let single_signer = Arc::new(MithrilSingleSigner::new(
405 self.compute_protocol_party_id()?,
406 epoch_service.clone(),
407 self.root_logger(),
408 ));
409 let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
410 epoch_service.clone(),
411 protocol_initializer_store.clone(),
412 ));
413 let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
414 mithril_stake_distribution_signable_builder,
415 cardano_immutable_snapshot_builder,
416 cardano_transactions_builder,
417 cardano_blocks_transactions_builder,
418 cardano_stake_distribution_signable_builder,
419 cardano_database_signable_builder,
420 );
421 let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
422 signable_seed_builder_service,
423 signable_builders_dependencies,
424 self.root_logger(),
425 ));
426 let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
427 let network_configuration_service = Arc::new(HttpMithrilNetworkConfigurationProvider::new(
428 aggregator_client.clone(),
429 ));
430 let preloader_activation = CardanoTransactionsPreloaderActivationSigner::new(
431 network_configuration_service.clone(),
432 ticker_service.clone(),
433 );
434 let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
435 signed_entity_type_lock.clone(),
436 preloader_chain_data_importer,
437 self.config.preload_security_parameter,
438 chain_observer.clone(),
439 self.root_logger(),
440 Arc::new(preloader_activation),
441 ));
442 let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
443 sqlite_connection.clone(),
444 self.config.store_retention_limit.map(|limit| limit as u64),
445 ));
446 let upkeep_service = Arc::new(SignerUpkeepService::new(
447 sqlite_connection.clone(),
448 sqlite_connection_cardano_transaction_pool,
449 signed_entity_type_lock.clone(),
450 vec![
451 signed_beacon_repository.clone(),
452 stake_store.clone(),
453 protocol_initializer_store.clone(),
454 ],
455 self.root_logger(),
456 ));
457
458 let kes_signer = match (
459 &self.config.kes_secret_key_path,
460 &self.config.operational_certificate_path,
461 ) {
462 (Some(kes_secret_key_path), Some(operational_certificate_path)) => {
463 Some(Arc::new(KesSignerStandard::new(
464 kes_secret_key_path.clone(),
465 operational_certificate_path.clone(),
466 )) as Arc<dyn KesSigner>)
467 }
468 (Some(_), None) | (None, Some(_)) => {
469 return Err(anyhow!(
470 "kes_secret_key and operational_certificate are both mandatory".to_string(),
471 ));
472 }
473 _ => None,
474 };
475
476 let signature_publisher = {
477 let first_publisher = SignaturePublisherRetrier::new(
478 match &self.config.dmq_node_socket_path {
479 Some(dmq_node_socket_path) => {
480 let dmq_network = &self.config.get_dmq_network()?;
481 let dmq_message_builder = DmqMessageBuilder::new(
482 kes_signer.clone().with_context(
483 || "A KES signer is mandatory to sign DMQ messages",
484 )?,
485 chain_observer.clone(),
486 );
487 Arc::new(SignaturePublisherDmq::new(Arc::new(
488 DmqPublisherClientPallas::<RegisterSignatureMessageDmq>::new(
489 dmq_node_socket_path.to_owned(),
490 *dmq_network,
491 dmq_message_builder,
492 self.root_logger(),
493 ),
494 ))) as Arc<dyn SignaturePublisher>
495 }
496 _ => Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
497 },
498 SignaturePublishRetryPolicy::never(),
499 );
500
501 let second_publisher = SignaturePublisherRetrier::new(
502 aggregator_client.clone(),
503 SignaturePublishRetryPolicy {
504 attempts: self.config.signature_publisher_config.retry_attempts,
505 delay_between_attempts: Duration::from_millis(
506 self.config.signature_publisher_config.retry_delay_ms,
507 ),
508 },
509 );
510
511 if self.config.signature_publisher_config.skip_delayer {
512 Arc::new(first_publisher) as Arc<dyn SignaturePublisher>
513 } else {
514 Arc::new(SignaturePublisherDelayer::new(
515 Arc::new(first_publisher),
516 Arc::new(second_publisher),
517 Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
518 self.root_logger(),
519 )) as Arc<dyn SignaturePublisher>
520 }
521 };
522
523 let certifier = Arc::new(SignerCertifierService::new(
524 signed_beacon_repository,
525 Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
526 signed_entity_type_lock.clone(),
527 single_signer.clone(),
528 signature_publisher,
529 self.root_logger(),
530 ));
531
532 let services = SignerDependencyContainer {
533 ticker_service,
534 chain_observer,
535 digester,
536 single_signer,
537 stake_store,
538 protocol_initializer_store,
539 era_checker,
540 era_reader,
541 api_version_provider,
542 signable_builder_service,
543 metrics_service,
544 signed_entity_type_lock,
545 cardano_transactions_preloader,
546 upkeep_service,
547 epoch_service,
548 certifier,
549 signer_registration_publisher: aggregator_client.clone(),
550 signers_registration_retriever: aggregator_client,
551 kes_signer,
552 network_configuration_service,
553 };
554
555 Ok(services)
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use std::path::PathBuf;
562
563 use mithril_aggregator_client::query::GetAggregatorFeaturesQuery;
564 use mithril_cardano_node_chain::test::double::FakeChainObserver;
565 use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
566 use mithril_common::{
567 entities::TimePoint,
568 messages::AggregatorFeaturesMessage,
569 test::{TempDir, api_version_extensions::ApiVersionProviderTestExtension, double::Dummy},
570 };
571
572 use crate::test::TestLogger;
573
574 use super::*;
575
576 fn get_test_dir(test_name: &str) -> PathBuf {
577 TempDir::create("signer_service", test_name)
578 }
579
580 #[tokio::test]
581 async fn test_auto_create_stores_directory() {
582 let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
583 let config = Configuration {
584 data_stores_directory: stores_dir.clone(),
585 ..Configuration::new_sample("party-123456")
586 };
587
588 assert!(!stores_dir.exists());
589 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
590 |_config| Ok(Arc::new(FakeChainObserver::new(Some(TimePoint::dummy()))));
591 let immutable_file_observer_builder: fn(
592 &Configuration,
593 )
594 -> StdResult<Arc<dyn ImmutableFileObserver>> =
595 |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
596
597 let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
598 dependencies_builder
599 .override_chain_observer_builder(chain_observer_builder)
600 .override_immutable_file_observer_builder(immutable_file_observer_builder)
601 .build()
602 .await
603 .expect("service builder build should not fail");
604 assert!(stores_dir.exists());
605 }
606
607 #[tokio::test]
608 async fn set_signer_version_header_to_built_aggregator_client() {
609 let server = httpmock::MockServer::start();
610 server.mock(|when, then| {
611 when.is_true(|req| {
612 req.headers()
613 .get(MITHRIL_SIGNER_VERSION_HEADER)
614 .is_some_and(|header| header == env!("CARGO_PKG_VERSION"))
615 });
616 then.status(200)
617 .json_body(serde_json::json!(AggregatorFeaturesMessage::dummy()));
618 });
619
620 let config = Configuration {
621 aggregator_endpoint: server.base_url(),
622 ..Configuration::new_sample("party-123456")
623 };
624 let dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
625 let aggregator_client = dependencies_builder
626 .build_aggregator_client(&Arc::new(APIVersionProvider::new_failing()))
628 .unwrap();
629
630 aggregator_client
631 .send(GetAggregatorFeaturesQuery::current())
632 .await
633 .unwrap();
634 }
635}