1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::{Logger, debug};
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9 sync::{
10 Mutex,
11 mpsc::{UnboundedReceiver, UnboundedSender},
12 watch,
13 },
14 time::Duration,
15};
16use warp::Filter;
17
18use mithril_aggregator_client::AggregatorHttpClient;
19use mithril_cardano_node_chain::{
20 chain_observer::{CardanoCliRunner, ChainObserver},
21 chain_reader::ChainBlockReader,
22 chain_scanner::BlockScanner,
23};
24use mithril_cardano_node_internal_database::{
25 ImmutableFileObserver,
26 digesters::{ImmutableDigester, cache::ImmutableFileDigestCacheProvider},
27};
28use mithril_common::{
29 api_version::APIVersionProvider,
30 certificate_chain::CertificateVerifier,
31 crypto_helper::ProtocolGenesisVerifier,
32 signable_builder::{SignableBuilderService, SignableSeedBuilder, TransactionsImporter},
33};
34use mithril_era::{EraChecker, EraReader, EraReaderAdapter};
35use mithril_persistence::{
36 database::repository::CardanoTransactionRepository,
37 sqlite::{SqliteConnection, SqliteConnectionPool},
38};
39use mithril_protocol_config::interface::MithrilNetworkConfigurationProvider;
40use mithril_signed_entity_lock::SignedEntityTypeLock;
41use mithril_ticker::TickerService;
42
43use super::{
44 DatabaseCommandDependenciesContainer, DependenciesBuilderError, EpochServiceWrapper,
45 GenesisCommandDependenciesContainer, Result, ToolsCommandDependenciesContainer,
46};
47use crate::{
48 AggregatorConfig, AggregatorRunner, AggregatorRuntime, EpochSettingsStorer,
49 ImmutableFileDigestMapper, MetricsService, MithrilSignerRegistrationLeader, MultiSigner,
50 ProtocolParametersRetriever, ServeCommandDependenciesContainer, SignerRegisterer,
51 SignerRegistrationRoundOpener, SignerRegistrationVerifier, SingleSignatureAuthenticator,
52 VerificationKeyStorer,
53 configuration::ConfigurationSource,
54 database::repository::{
55 CertificateRepository, EpochSettingsStore, OpenMessageRepository, SignedEntityStorer,
56 SignerStore, StakePoolStore,
57 },
58 event_store::{EventMessage, TransmitterService},
59 file_uploaders::FileUploader,
60 http_server::routes::router::{self, RouterConfig, RouterState},
61 services::{
62 CertificateChainSynchronizer, CertifierService, MessageService,
63 MithrilSignerRegistrationFollower, ProverService, SignedEntityService, SignerSynchronizer,
64 Snapshotter, StakeDistributionService, UpkeepService,
65 },
66 tools::file_archiver::FileArchiver,
67};
68
69#[macro_export]
76macro_rules! get_dependency {
77 ( $self:ident.$attribute:ident ) => {{
78 paste::paste! {
79 get_dependency!($self.$attribute = $self.[<build_ $attribute>]().await?)
80 }
81 }};
82 ( $self:ident.$attribute:ident = $builder:expr ) => {{
83 paste::paste! {
84 if $self.$attribute.is_none() {
85 slog::debug!($self.root_logger(), "Building dependency {}", stringify!($attribute));
86 $self.$attribute = Some($builder);
87 }
88
89 let r:Result<_> = Ok($self.$attribute.as_ref().cloned().unwrap());
90 r
91 }
92 }};
93}
94
95const SQLITE_FILE: &str = "aggregator.sqlite3";
96const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
97const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
98const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
99const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
100const MAX_ARTIFACT_EPOCH_OFFSET: u64 = 5;
102
103pub struct DependenciesBuilder {
115 pub configuration: Arc<dyn ConfigurationSource>,
117
118 pub root_logger: Logger,
120
121 pub sqlite_connection: Option<Arc<SqliteConnection>>,
123
124 pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
126
127 pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
129
130 pub stake_store: Option<Arc<StakePoolStore>>,
133
134 pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
136
137 pub multi_signer: Option<Arc<dyn MultiSigner>>,
139
140 pub certificate_repository: Option<Arc<CertificateRepository>>,
142
143 pub open_message_repository: Option<Arc<OpenMessageRepository>>,
145
146 pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
148
149 pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
151
152 pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
154
155 pub chain_observer: Option<Arc<dyn ChainObserver>>,
157
158 pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
160
161 pub transaction_repository: Option<Arc<CardanoTransactionRepository>>,
163
164 pub block_scanner: Option<Arc<dyn BlockScanner>>,
166
167 pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
169
170 pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
172
173 pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
175
176 pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
178
179 pub digester: Option<Arc<dyn ImmutableDigester>>,
181
182 pub file_archiver: Option<Arc<FileArchiver>>,
184
185 pub snapshotter: Option<Arc<dyn Snapshotter>>,
187
188 pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
190
191 pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
193
194 pub certificate_chain_synchronizer: Option<Arc<dyn CertificateChainSynchronizer>>,
196
197 pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
199
200 pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
202
203 pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
205
206 pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
208
209 pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
211
212 pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
214
215 pub era_checker: Option<Arc<EraChecker>>,
217
218 pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
220
221 pub era_reader: Option<Arc<EraReader>>,
223
224 pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
226
227 pub event_transmitter_channel: (
229 Option<UnboundedReceiver<EventMessage>>,
230 Option<UnboundedSender<EventMessage>>,
231 ),
232
233 pub api_version_provider: Option<Arc<APIVersionProvider>>,
235
236 pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
238
239 pub ticker_service: Option<Arc<dyn TickerService>>,
241
242 pub signer_store: Option<Arc<SignerStore>>,
244
245 pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
247
248 pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
250
251 pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
253
254 pub certifier_service: Option<Arc<dyn CertifierService>>,
256
257 pub epoch_service: Option<EpochServiceWrapper>,
259
260 pub mithril_network_configuration_provider:
262 Option<Arc<dyn MithrilNetworkConfigurationProvider>>,
263
264 pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
266
267 pub message_service: Option<Arc<dyn MessageService>>,
269
270 pub prover_service: Option<Arc<dyn ProverService>>,
272
273 pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
275
276 pub transactions_importer: Option<Arc<dyn TransactionsImporter>>,
278
279 pub upkeep_service: Option<Arc<dyn UpkeepService>>,
281
282 pub single_signature_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
284
285 pub metrics_service: Option<Arc<MetricsService>>,
287
288 pub leader_aggregator_client: Option<Arc<AggregatorHttpClient>>,
290
291 pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
293
294 pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
296}
297
298impl DependenciesBuilder {
299 pub fn new(root_logger: Logger, configuration: Arc<dyn ConfigurationSource>) -> Self {
301 Self {
302 configuration,
303 root_logger,
304 sqlite_connection: None,
305 sqlite_connection_event_store: None,
306 sqlite_connection_cardano_transaction_pool: None,
307 stake_store: None,
308 snapshot_uploader: None,
309 multi_signer: None,
310 certificate_repository: None,
311 open_message_repository: None,
312 verification_key_store: None,
313 epoch_settings_store: None,
314 cardano_cli_runner: None,
315 chain_observer: None,
316 chain_block_reader: None,
317 block_scanner: None,
318 transaction_repository: None,
319 immutable_digester: None,
320 immutable_file_observer: None,
321 immutable_cache_provider: None,
322 immutable_file_digest_mapper: None,
323 digester: None,
324 file_archiver: None,
325 snapshotter: None,
326 certificate_verifier: None,
327 genesis_verifier: None,
328 certificate_chain_synchronizer: None,
329 mithril_signer_registration_leader: None,
330 mithril_signer_registration_follower: None,
331 signer_registerer: None,
332 signer_synchronizer: None,
333 signer_registration_verifier: None,
334 signer_registration_round_opener: None,
335 era_reader_adapter: None,
336 era_checker: None,
337 era_reader: None,
338 event_transmitter: None,
339 event_transmitter_channel: (None, None),
340 api_version_provider: None,
341 stake_distribution_service: None,
342 ticker_service: None,
343 signer_store: None,
344 signable_seed_builder: None,
345 signable_builder_service: None,
346 signed_entity_service: None,
347 certifier_service: None,
348 epoch_service: None,
349 mithril_network_configuration_provider: None,
350 signed_entity_storer: None,
351 message_service: None,
352 prover_service: None,
353 signed_entity_type_lock: None,
354 transactions_importer: None,
355 upkeep_service: None,
356 single_signature_authenticator: None,
357 metrics_service: None,
358 leader_aggregator_client: None,
359 protocol_parameters_retriever: None,
360 stop_signal_channel: None,
361 }
362 }
363
364 fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
365 let cardano_db_artifacts_dir =
366 self.configuration.get_snapshot_dir()?.join(CARDANO_DB_ARTIFACTS_DIR);
367
368 if !cardano_db_artifacts_dir.exists() {
369 std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
370 DependenciesBuilderError::Initialization {
371 message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
372 error: Some(e.into()),
373 }
374 })?;
375 }
376
377 Ok(cardano_db_artifacts_dir)
378 }
379
380 pub async fn build_serve_dependencies_container(
382 &mut self,
383 ) -> Result<ServeCommandDependenciesContainer> {
384 #[allow(deprecated)]
385 let dependencies_manager = ServeCommandDependenciesContainer {
386 root_logger: self.root_logger(),
387 stake_store: self.get_stake_store().await?,
388 certificate_repository: self.get_certificate_repository().await?,
389 verification_key_store: self.get_verification_key_store().await?,
390 epoch_settings_storer: self.get_epoch_settings_store().await?,
391 certificate_chain_synchronizer: self.get_certificate_chain_synchronizer().await?,
392 signer_registerer: self.get_signer_registerer().await?,
393 signer_synchronizer: self.get_signer_synchronizer().await?,
394 signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
395 era_checker: self.get_era_checker().await?,
396 era_reader: self.get_era_reader().await?,
397 event_transmitter: self.get_event_transmitter().await?,
398 api_version_provider: self.get_api_version_provider().await?,
399 stake_distribution_service: self.get_stake_distribution_service().await?,
400 signer_recorder: self.get_signer_store().await?,
401 signable_builder_service: self.get_signable_builder_service().await?,
402 signed_entity_service: self.get_signed_entity_service().await?,
403 certifier_service: self.get_certifier_service().await?,
404 epoch_service: self.get_epoch_service().await?,
405 ticker_service: self.get_ticker_service().await?,
406 signed_entity_storer: self.get_signed_entity_storer().await?,
407 signer_getter: self.get_signer_store().await?,
408 message_service: self.get_message_service().await?,
409 prover_service: self.get_prover_service().await?,
410 signed_entity_type_lock: self.get_signed_entity_type_lock().await?,
411 upkeep_service: self.get_upkeep_service().await?,
412 single_signer_authenticator: self.get_single_signature_authenticator().await?,
413 metrics_service: self.get_metrics_service().await?,
414 };
415
416 self.handle_discrepancies_at_startup().await?;
417
418 Ok(dependencies_manager)
419 }
420
421 pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
423 let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
424
425 let config = AggregatorConfig::new(
426 Duration::from_millis(self.configuration.run_interval()),
427 self.configuration.is_follower_aggregator(),
428 );
429 let runtime = AggregatorRuntime::new(
430 config,
431 None,
432 Arc::new(AggregatorRunner::new(dependency_container)),
433 self.root_logger(),
434 )
435 .await
436 .map_err(|e| DependenciesBuilderError::Initialization {
437 message: "Cannot initialize Aggregator runtime.".to_string(),
438 error: Some(e.into()),
439 })?;
440
441 Ok(runtime)
442 }
443
444 pub async fn create_http_routes(
446 &mut self,
447 ) -> Result<
448 impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<>,
449 > {
450 let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
451 let snapshot_dir = self.configuration.get_snapshot_dir()?;
452 let router_state = RouterState::new(
453 dependency_container.clone(),
454 RouterConfig {
455 network: self.configuration.get_network()?,
456 server_url: self.configuration.get_server_url()?,
457 allowed_discriminants: self
458 .configuration
459 .compute_allowed_signed_entity_types_discriminants()?,
460 cardano_transactions_prover_max_hashes_allowed_by_request: self
461 .configuration
462 .cardano_transactions_prover_max_hashes_allowed_by_request(),
463 cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
464 max_artifact_epoch_offset: MAX_ARTIFACT_EPOCH_OFFSET,
465 snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
466 cardano_node_version: self.configuration.cardano_node_version(),
467 allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
468 origin_tag_white_list: self.configuration.compute_origin_tag_white_list(),
469 aggregate_signature_type: self.configuration.aggregate_signature_type(),
470 },
471 );
472
473 Ok(router::routes(Arc::new(router_state)))
474 }
475
476 pub async fn create_genesis_container(
478 &mut self,
479 ) -> Result<GenesisCommandDependenciesContainer> {
480 let network = self.configuration.get_network().with_context(
481 || "Dependencies Builder can not get Cardano network while building genesis container",
482 )?;
483
484 let dependencies = GenesisCommandDependenciesContainer {
485 network,
486 chain_observer: self.get_chain_observer().await?,
487 certificate_repository: self.get_certificate_repository().await?,
488 certificate_verifier: self.get_certificate_verifier().await?,
489 protocol_parameters_retriever: self.get_protocol_parameters_retriever().await?,
490 verification_key_store: self.get_verification_key_store().await?,
491 };
492
493 Ok(dependencies)
494 }
495
496 pub async fn create_database_command_container(
498 &mut self,
499 ) -> Result<DatabaseCommandDependenciesContainer> {
500 let main_db_connection = self
501 .get_sqlite_connection()
502 .await
503 .with_context(|| "Dependencies Builder can not get sqlite connection")?;
504
505 self.get_event_store_sqlite_connection()
506 .await
507 .with_context(|| "Dependencies Builder can not get event store sqlite connection")?;
508
509 self.get_sqlite_connection_cardano_transaction_pool()
510 .await
511 .with_context(
512 || "Dependencies Builder can not get cardano transaction pool sqlite connection",
513 )?;
514
515 let dependencies = DatabaseCommandDependenciesContainer { main_db_connection };
516
517 Ok(dependencies)
518 }
519
520 pub async fn create_tools_command_container(
522 &mut self,
523 ) -> Result<ToolsCommandDependenciesContainer> {
524 let db_connection = self
525 .get_sqlite_connection()
526 .await
527 .with_context(|| "Dependencies Builder can not get sqlite connection")?;
528
529 let dependencies = ToolsCommandDependenciesContainer { db_connection };
530
531 Ok(dependencies)
532 }
533
534 pub async fn handle_discrepancies_at_startup(&mut self) -> Result<()> {
540 let logger = self.root_logger();
541 let current_epoch = self
542 .get_chain_observer()
543 .await?
544 .get_current_epoch()
545 .await
546 .map_err(|e| DependenciesBuilderError::Initialization {
547 message: "cannot handle startup discrepancies: failed to retrieve current epoch."
548 .to_string(),
549 error: Some(e.into()),
550 })?
551 .ok_or(DependenciesBuilderError::Initialization {
552 message: "cannot handle startup discrepancies: no epoch returned.".to_string(),
553 error: None,
554 })?;
555 let network_configuration = self
556 .get_mithril_network_configuration_provider()
557 .await?
558 .get_network_configuration(current_epoch)
559 .await
560 .map_err(|e| DependenciesBuilderError::Initialization {
561 message: format!("cannot handle startup discrepancies: failed to retrieve network configuration for epoch {current_epoch}"),
562 error: Some(e),
563 })?;
564 let epoch_settings_store = self.get_epoch_settings_store().await?;
565
566 debug!(
567 logger,
568 "Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {current_epoch}";
569 "network_configuration" => ?network_configuration,
570 );
571 epoch_settings_store
572 .handle_discrepancies_at_startup(&network_configuration)
573 .await
574 .map_err(|e| DependenciesBuilderError::Initialization {
575 message: "can not create aggregator runner".to_string(),
576 error: Some(e),
577 })?;
578 Ok(())
579 }
580
581 pub async fn vanish(self) {
583 self.drop_sqlite_connections().await;
584 }
585}
586
587#[cfg(test)]
588impl DependenciesBuilder {
589 pub(crate) fn new_with_stdout_logger(configuration: Arc<dyn ConfigurationSource>) -> Self {
590 Self::new(crate::test::TestLogger::stdout(), configuration)
591 }
592}