1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, anyhow};
6use slog::Logger;
7use tokio::sync::{Mutex, RwLock};
8
9use mithril_cardano_node_chain::{
10 chain_observer::{CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType},
11 chain_reader::PallasChainReader,
12 chain_scanner::CardanoBlockScanner,
13};
14use mithril_cardano_node_internal_database::{
15 ImmutableFileObserver, ImmutableFileSystemObserver,
16 digesters::CardanoImmutableDigester,
17 digesters::cache::{
18 ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
19 },
20 signable_builder::{CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder},
21};
22use mithril_common::StdResult;
23use mithril_common::api_version::APIVersionProvider;
24use mithril_common::crypto_helper::{
25 KesSigner, KesSignerStandard, OpCert, ProtocolPartyId, SerDeShelleyFileFormat,
26};
27#[cfg(feature = "future_dmq")]
28use mithril_common::messages::RegisterSignatureMessageDmq;
29use mithril_common::signable_builder::{
30 CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder,
31 MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
32 SignableBuilderServiceDependencies,
33};
34
35use mithril_era::{EraChecker, EraReader};
36use mithril_signed_entity_lock::SignedEntityTypeLock;
37use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
38use mithril_ticker::{MithrilTickerService, TickerService};
39
40use mithril_persistence::database::repository::CardanoTransactionRepository;
41use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
42use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool};
43
44#[cfg(feature = "future_dmq")]
45use mithril_dmq::{DmqMessageBuilder, DmqPublisherPallas};
46
47use crate::dependency_injection::SignerDependencyContainer;
48#[cfg(feature = "future_dmq")]
49use crate::services::SignaturePublisherDmq;
50use crate::services::{
51 AggregatorHTTPClient, CardanoTransactionsImporter,
52 CardanoTransactionsPreloaderActivationSigner, MithrilEpochService, MithrilSingleSigner,
53 SignaturePublishRetryPolicy, SignaturePublisherDelayer, SignaturePublisherNoop,
54 SignaturePublisherRetrier, SignerCertifierService, SignerSignableSeedBuilder,
55 SignerSignedEntityConfigProvider, SignerUpkeepService, TransactionsImporterByChunk,
56 TransactionsImporterWithPruner, TransactionsImporterWithVacuum,
57};
58use crate::store::MKTreeStoreSqlite;
59use crate::{
60 Configuration, HTTP_REQUEST_TIMEOUT_DURATION, MetricsService, SQLITE_FILE,
61 SQLITE_FILE_CARDANO_TRANSACTION,
62};
63use crate::{
64 database::repository::{ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore},
65 services::SignaturePublisher,
66};
67
68pub struct DependenciesBuilder<'a> {
72 config: &'a Configuration,
73 chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
74 immutable_file_observer_builder:
75 fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
76 root_logger: Logger,
77}
78
79impl<'a> DependenciesBuilder<'a> {
80 pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
82 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
83 |config: &Configuration| {
84 let chain_observer_type = ChainObserverType::Pallas;
85 let cardano_cli_path = &config.cardano_cli_path;
86 let cardano_node_socket_path = &config.cardano_node_socket_path;
87 let cardano_network = &config.get_network().with_context(|| {
88 "Dependencies Builder can not get Cardano network while building the chain observer"
89 })?;
90 let cardano_cli_runner = &CardanoCliRunner::new(
91 cardano_cli_path.to_owned(),
92 cardano_node_socket_path.to_owned(),
93 cardano_network.to_owned(),
94 );
95
96 let chain_observer_builder = ChainObserverBuilder::new(
97 &chain_observer_type,
98 cardano_node_socket_path,
99 cardano_network,
100 Some(cardano_cli_runner),
101 );
102
103 chain_observer_builder
104 .build()
105 .with_context(|| "Dependencies Builder can not build chain observer")
106 };
107
108 let immutable_file_observer_builder: fn(
109 &Configuration,
110 )
111 -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
112 Ok(Arc::new(ImmutableFileSystemObserver::new(
113 &config.db_directory,
114 )))
115 };
116
117 Self {
118 config,
119 chain_observer_builder,
120 immutable_file_observer_builder,
121 root_logger,
122 }
123 }
124
125 pub fn override_immutable_file_observer_builder(
127 &mut self,
128 builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
129 ) -> &mut Self {
130 self.immutable_file_observer_builder = builder;
131
132 self
133 }
134
135 pub fn root_logger(&self) -> Logger {
137 self.root_logger.clone()
138 }
139
140 pub fn override_chain_observer_builder(
142 &mut self,
143 builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
144 ) -> &mut Self {
145 self.chain_observer_builder = builder;
146
147 self
148 }
149
150 fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
152 match &self.config.operational_certificate_path {
153 Some(operational_certificate_path) => {
154 let opcert: OpCert = OpCert::from_file(operational_certificate_path)
155 .with_context(|| "Could not decode operational certificate")?;
156 Ok(opcert
157 .compute_protocol_party_id()
158 .with_context(|| "Could not compute party_id from operational certificate")?)
159 }
160 _ => Ok(self
161 .config
162 .party_id
163 .to_owned()
164 .ok_or(anyhow!("A party_id should at least be provided"))?),
165 }
166 }
167
168 async fn build_digester_cache_provider(
169 &self,
170 ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
171 if self.config.disable_digests_cache {
172 return Ok(None);
173 }
174
175 let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
176 &self.config.data_stores_directory,
177 &format!("immutables_digests_{}.json", self.config.network),
178 )
179 .should_reset_digests_cache(self.config.reset_digests_cache)
180 .with_logger(self.root_logger())
181 .build()
182 .await?;
183
184 Ok(Some(Arc::new(cache_provider)))
185 }
186
187 pub async fn build_sqlite_connection(
189 &self,
190 sqlite_file_name: &str,
191 migrations: Vec<SqlMigration>,
192 ) -> StdResult<SqliteConnection> {
193 let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
194 let connection = ConnectionBuilder::open_file(&sqlite_db_path)
195 .with_node_type(ApplicationNodeType::Signer)
196 .with_migrations(migrations)
197 .with_logger(self.root_logger())
198 .build()
199 .with_context(|| "Database connection initialisation error")?;
200
201 Ok(connection)
202 }
203
204 pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
206 if !self.config.data_stores_directory.exists() {
207 fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
208 format!(
209 "Could not create data stores directory: `{}`",
210 self.config.data_stores_directory.display()
211 )
212 })?;
213 }
214
215 let network = self.config.get_network()?;
216 let sqlite_connection = Arc::new(
217 self.build_sqlite_connection(SQLITE_FILE, crate::database::migration::get_migrations())
218 .await?,
219 );
220 let transaction_sqlite_connection = self
221 .build_sqlite_connection(
222 SQLITE_FILE_CARDANO_TRANSACTION,
223 mithril_persistence::database::cardano_transaction_migration::get_migrations(),
224 )
225 .await?;
226 let sqlite_connection_cardano_transaction_pool = Arc::new(
227 SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
228 );
229
230 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
231
232 let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
233 sqlite_connection.clone(),
234 self.config.store_retention_limit.map(|limit| limit as u64),
235 ));
236
237 let digester = Arc::new(CardanoImmutableDigester::new(
238 network.to_string(),
239 self.build_digester_cache_provider().await?,
240 self.root_logger(),
241 ));
242 let stake_store = Arc::new(StakePoolStore::new(
243 sqlite_connection.clone(),
244 self.config.store_retention_limit.map(|limit| limit as u64),
245 ));
246 let chain_observer = {
247 let builder = self.chain_observer_builder;
248 builder(self.config)?
249 };
250 let ticker_service = {
251 let builder = self.immutable_file_observer_builder;
252 Arc::new(MithrilTickerService::new(
253 chain_observer.clone(),
254 builder(self.config)?,
255 ))
256 };
257
258 let era_reader = Arc::new(EraReader::new(
259 self.config.build_era_reader_adapter(chain_observer.clone())?,
260 ));
261 let era_epoch_token = era_reader
262 .read_era_epoch_token(ticker_service.get_current_epoch().await?)
263 .await?;
264 let era_checker = Arc::new(EraChecker::new(
265 era_epoch_token.get_current_supported_era()?,
266 era_epoch_token.get_current_epoch(),
267 ));
268
269 let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
270 let aggregator_client = Arc::new(AggregatorHTTPClient::new(
271 self.config.aggregator_endpoint.clone(),
272 self.config.relay_endpoint.clone(),
273 api_version_provider.clone(),
274 Some(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION)),
275 self.root_logger(),
276 ));
277
278 let cardano_immutable_snapshot_builder =
279 Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
280 digester.clone(),
281 &self.config.db_directory,
282 self.root_logger(),
283 ));
284 let mithril_stake_distribution_signable_builder =
285 Arc::new(MithrilStakeDistributionSignableBuilder::default());
286 let transaction_store = Arc::new(CardanoTransactionRepository::new(
287 sqlite_connection_cardano_transaction_pool.clone(),
288 ));
289 let chain_block_reader = PallasChainReader::new(
290 &self.config.cardano_node_socket_path,
291 network,
292 self.root_logger(),
293 );
294 let block_scanner = Arc::new(CardanoBlockScanner::new(
295 Arc::new(Mutex::new(chain_block_reader)),
296 self.config
297 .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
298 self.root_logger(),
299 ));
300 let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
301 block_scanner,
302 transaction_store.clone(),
303 self.root_logger(),
304 ));
305 let transactions_importer = Arc::new(TransactionsImporterWithPruner::new(
307 self.config
308 .enable_transaction_pruning
309 .then_some(self.config.network_security_parameter),
310 transaction_store.clone(),
311 transactions_importer,
312 self.root_logger(),
313 ));
314 let state_machine_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
317 transaction_store.clone(),
318 transactions_importer.clone(),
319 self.config.transactions_import_block_chunk_size,
320 self.root_logger(),
321 ));
322 let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
325 transaction_store.clone(),
326 Arc::new(TransactionsImporterWithVacuum::new(
327 sqlite_connection_cardano_transaction_pool.clone(),
328 transactions_importer.clone(),
329 self.root_logger(),
330 )),
331 self.config.transactions_import_block_chunk_size,
332 self.root_logger(),
333 ));
334 let block_range_root_retriever = transaction_store.clone();
335 let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
336 MKTreeStoreSqlite,
337 >::new(
338 state_machine_transactions_importer,
339 block_range_root_retriever,
340 ));
341 let cardano_stake_distribution_signable_builder = Arc::new(
342 CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
343 );
344 let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
345 digester.clone(),
346 &self.config.db_directory,
347 self.root_logger(),
348 ));
349 let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
350 stake_store.clone(),
351 protocol_initializer_store.clone(),
352 self.root_logger(),
353 )));
354 let single_signer = Arc::new(MithrilSingleSigner::new(
355 self.compute_protocol_party_id()?,
356 epoch_service.clone(),
357 self.root_logger(),
358 ));
359 let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
360 epoch_service.clone(),
361 protocol_initializer_store.clone(),
362 ));
363 let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
364 mithril_stake_distribution_signable_builder,
365 cardano_immutable_snapshot_builder,
366 cardano_transactions_builder,
367 cardano_stake_distribution_signable_builder,
368 cardano_database_signable_builder,
369 );
370 let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
371 signable_seed_builder_service,
372 signable_builders_dependencies,
373 self.root_logger(),
374 ));
375 let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
376 let preloader_activation =
377 CardanoTransactionsPreloaderActivationSigner::new(aggregator_client.clone());
378 let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
379 signed_entity_type_lock.clone(),
380 preloader_transactions_importer,
381 self.config.preload_security_parameter,
382 chain_observer.clone(),
383 self.root_logger(),
384 Arc::new(preloader_activation),
385 ));
386 let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
387 sqlite_connection.clone(),
388 self.config.store_retention_limit.map(|limit| limit as u64),
389 ));
390 let upkeep_service = Arc::new(SignerUpkeepService::new(
391 sqlite_connection.clone(),
392 sqlite_connection_cardano_transaction_pool,
393 signed_entity_type_lock.clone(),
394 vec![
395 signed_beacon_repository.clone(),
396 stake_store.clone(),
397 protocol_initializer_store.clone(),
398 ],
399 self.root_logger(),
400 ));
401
402 let kes_signer = match (
403 &self.config.kes_secret_key_path,
404 &self.config.operational_certificate_path,
405 ) {
406 (Some(kes_secret_key_path), Some(operational_certificate_path)) => {
407 Some(Arc::new(KesSignerStandard::new(
408 kes_secret_key_path.clone(),
409 operational_certificate_path.clone(),
410 )) as Arc<dyn KesSigner>)
411 }
412 (Some(_), None) | (None, Some(_)) => {
413 return Err(anyhow!(
414 "kes_secret_key and operational_certificate are both mandatory".to_string(),
415 ));
416 }
417 _ => None,
418 };
419
420 let signature_publisher = {
421 let first_publisher = SignaturePublisherRetrier::new(
422 {
423 #[cfg(feature = "future_dmq")]
424 let publisher = match &self.config.dmq_node_socket_path {
425 Some(dmq_node_socket_path) => {
426 let cardano_network = &self.config.get_network()?;
427 let dmq_message_builder = DmqMessageBuilder::new(
428 kes_signer.clone().ok_or(anyhow!(
429 "A KES signer is mandatory to sign DMQ messages"
430 ))?,
431 chain_observer.clone(),
432 );
433 Arc::new(SignaturePublisherDmq::new(Arc::new(DmqPublisherPallas::<
434 RegisterSignatureMessageDmq,
435 >::new(
436 dmq_node_socket_path.to_owned(),
437 *cardano_network,
438 dmq_message_builder,
439 self.root_logger(),
440 )))) as Arc<dyn SignaturePublisher>
441 }
442 _ => Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
443 };
444 #[cfg(not(feature = "future_dmq"))]
445 let publisher = Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>;
446
447 publisher
448 },
449 SignaturePublishRetryPolicy::never(),
450 );
451
452 let second_publisher = SignaturePublisherRetrier::new(
453 aggregator_client.clone(),
454 SignaturePublishRetryPolicy {
455 attempts: self.config.signature_publisher_config.retry_attempts,
456 delay_between_attempts: Duration::from_millis(
457 self.config.signature_publisher_config.retry_delay_ms,
458 ),
459 },
460 );
461
462 Arc::new(SignaturePublisherDelayer::new(
463 Arc::new(first_publisher),
464 Arc::new(second_publisher),
465 Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
466 self.root_logger(),
467 ))
468 };
469
470 let certifier = Arc::new(SignerCertifierService::new(
471 signed_beacon_repository,
472 Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
473 signed_entity_type_lock.clone(),
474 single_signer.clone(),
475 signature_publisher,
476 self.root_logger(),
477 ));
478
479 let services = SignerDependencyContainer {
480 ticker_service,
481 certificate_handler: aggregator_client,
482 chain_observer,
483 digester,
484 single_signer,
485 stake_store,
486 protocol_initializer_store,
487 era_checker,
488 era_reader,
489 api_version_provider,
490 signable_builder_service,
491 metrics_service,
492 signed_entity_type_lock,
493 cardano_transactions_preloader,
494 upkeep_service,
495 epoch_service,
496 certifier,
497 kes_signer,
498 };
499
500 Ok(services)
501 }
502}
503
504#[cfg(test)]
505mod tests {
506 use std::path::PathBuf;
507
508 use mithril_cardano_node_chain::test::double::FakeChainObserver;
509 use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
510 use mithril_common::test::double::Dummy;
511 use mithril_common::{entities::TimePoint, test::TempDir};
512
513 use crate::test_tools::TestLogger;
514
515 use super::*;
516
517 fn get_test_dir(test_name: &str) -> PathBuf {
518 TempDir::create("signer_service", test_name)
519 }
520
521 #[tokio::test]
522 async fn test_auto_create_stores_directory() {
523 let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
524 let config = Configuration {
525 data_stores_directory: stores_dir.clone(),
526 ..Configuration::new_sample("party-123456")
527 };
528
529 assert!(!stores_dir.exists());
530 let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
531 |_config| Ok(Arc::new(FakeChainObserver::new(Some(TimePoint::dummy()))));
532 let immutable_file_observer_builder: fn(
533 &Configuration,
534 )
535 -> StdResult<Arc<dyn ImmutableFileObserver>> =
536 |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
537
538 let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
539 dependencies_builder
540 .override_chain_observer_builder(chain_observer_builder)
541 .override_immutable_file_observer_builder(immutable_file_observer_builder)
542 .build()
543 .await
544 .expect("service builder build should not fail");
545 assert!(stores_dir.exists());
546 }
547}