1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::{Logger, debug};
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9 sync::{
10 Mutex,
11 mpsc::{UnboundedReceiver, UnboundedSender},
12 watch,
13 },
14 time::Duration,
15};
16use warp::Filter;
17
18use mithril_aggregator_client::AggregatorHttpClient;
19use mithril_cardano_node_chain::{
20 chain_observer::{CardanoCliRunner, ChainObserver},
21 chain_reader::ChainBlockReader,
22 chain_scanner::BlockScanner,
23};
24use mithril_cardano_node_internal_database::{
25 ImmutableFileObserver,
26 digesters::{ImmutableDigester, cache::ImmutableFileDigestCacheProvider},
27};
28use mithril_common::{
29 api_version::APIVersionProvider,
30 certificate_chain::CertificateVerifier,
31 crypto_helper::ProtocolGenesisVerifier,
32 signable_builder::{SignableBuilderService, SignableSeedBuilder},
33};
34use mithril_era::{EraChecker, EraReader, EraReaderAdapter};
35use mithril_persistence::sqlite::{SqliteConnection, SqliteConnectionPool};
36use mithril_protocol_config::interface::MithrilNetworkConfigurationProvider;
37use mithril_signed_entity_lock::SignedEntityTypeLock;
38use mithril_ticker::TickerService;
39
40use crate::{
41 AggregatorConfig, AggregatorRunner, AggregatorRuntime, EpochSettingsStorer,
42 ImmutableFileDigestMapper, MetricsService, MithrilSignerRegistrationLeader, MultiSigner,
43 ProtocolParametersRetriever, ServeCommandDependenciesContainer, SignerRegisterer,
44 SignerRegistrationRoundOpener, SignerRegistrationVerifier, SingleSignatureAuthenticator,
45 VerificationKeyStorer,
46 configuration::ConfigurationSource,
47 database::repository::{
48 AggregatorCardanoChainDataRepository, CertificateRepository, EpochSettingsStore,
49 OpenMessageRepository, SignedEntityStorer, SignerStore, StakePoolStore,
50 },
51 event_store::{EventMessage, TransmitterService},
52 file_uploaders::FileUploader,
53 http_server::routes::router::{self, RouterConfig, RouterState},
54 services::{
55 AggregatorChainDataImporter, CertificateChainSynchronizer, CertifierService,
56 LegacyProverService, MessageService, MithrilSignerRegistrationFollower, ProverService,
57 SignedEntityService, SignerSynchronizer, Snapshotter, StakeDistributionService,
58 UpkeepService,
59 },
60 tools::file_archiver::FileArchiver,
61};
62
63use super::{
64 DatabaseCommandDependenciesContainer, DependenciesBuilderError, EpochServiceWrapper,
65 GenesisCommandDependenciesContainer, Result, ToolsCommandDependenciesContainer,
66};
67
68#[macro_export]
75macro_rules! get_dependency {
76 ( $self:ident.$attribute:ident ) => {{
77 paste::paste! {
78 get_dependency!($self.$attribute = $self.[<build_ $attribute>]().await?)
79 }
80 }};
81 ( $self:ident.$attribute:ident = $builder:expr ) => {{
82 paste::paste! {
83 if $self.$attribute.is_none() {
84 slog::debug!($self.root_logger(), "Building dependency {}", stringify!($attribute));
85 $self.$attribute = Some($builder);
86 }
87
88 let r:Result<_> = Ok($self.$attribute.as_ref().cloned().unwrap());
89 r
90 }
91 }};
92}
93
94const SQLITE_FILE: &str = "aggregator.sqlite3";
95const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
96const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
97const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
98const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
99const MAX_ARTIFACT_EPOCH_OFFSET: u64 = 5;
101
102pub struct DependenciesBuilder {
114 pub configuration: Arc<dyn ConfigurationSource>,
116
117 pub root_logger: Logger,
119
120 pub sqlite_connection: Option<Arc<SqliteConnection>>,
122
123 pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
125
126 pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
128
129 pub stake_store: Option<Arc<StakePoolStore>>,
132
133 pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
135
136 pub multi_signer: Option<Arc<dyn MultiSigner>>,
138
139 pub certificate_repository: Option<Arc<CertificateRepository>>,
141
142 pub open_message_repository: Option<Arc<OpenMessageRepository>>,
144
145 pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
147
148 pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
150
151 pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
153
154 pub chain_observer: Option<Arc<dyn ChainObserver>>,
156
157 pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
159
160 pub chain_data_repository: Option<Arc<AggregatorCardanoChainDataRepository>>,
162
163 pub block_scanner: Option<Arc<dyn BlockScanner>>,
165
166 pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
168
169 pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
171
172 pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
174
175 pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
177
178 pub digester: Option<Arc<dyn ImmutableDigester>>,
180
181 pub file_archiver: Option<Arc<FileArchiver>>,
183
184 pub snapshotter: Option<Arc<dyn Snapshotter>>,
186
187 pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
189
190 pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
192
193 pub certificate_chain_synchronizer: Option<Arc<dyn CertificateChainSynchronizer>>,
195
196 pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
198
199 pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
201
202 pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
204
205 pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
207
208 pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
210
211 pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
213
214 pub era_checker: Option<Arc<EraChecker>>,
216
217 pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
219
220 pub era_reader: Option<Arc<EraReader>>,
222
223 pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
225
226 pub event_transmitter_channel: (
228 Option<UnboundedReceiver<EventMessage>>,
229 Option<UnboundedSender<EventMessage>>,
230 ),
231
232 pub api_version_provider: Option<Arc<APIVersionProvider>>,
234
235 pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
237
238 pub ticker_service: Option<Arc<dyn TickerService>>,
240
241 pub signer_store: Option<Arc<SignerStore>>,
243
244 pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
246
247 pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
249
250 pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
252
253 pub certifier_service: Option<Arc<dyn CertifierService>>,
255
256 pub epoch_service: Option<EpochServiceWrapper>,
258
259 pub mithril_network_configuration_provider:
261 Option<Arc<dyn MithrilNetworkConfigurationProvider>>,
262
263 pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
265
266 pub message_service: Option<Arc<dyn MessageService>>,
268
269 pub legacy_prover_service: Option<Arc<dyn LegacyProverService>>,
271
272 pub prover_service: Option<Arc<dyn ProverService>>,
274
275 pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
277
278 pub chain_data_importer: Option<Arc<AggregatorChainDataImporter>>,
280
281 pub upkeep_service: Option<Arc<dyn UpkeepService>>,
283
284 pub single_signature_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
286
287 pub metrics_service: Option<Arc<MetricsService>>,
289
290 pub leader_aggregator_client: Option<Arc<AggregatorHttpClient>>,
292
293 pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
295
296 pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
298}
299
300impl DependenciesBuilder {
301 pub fn new(root_logger: Logger, configuration: Arc<dyn ConfigurationSource>) -> Self {
303 Self {
304 configuration,
305 root_logger,
306 sqlite_connection: None,
307 sqlite_connection_event_store: None,
308 sqlite_connection_cardano_transaction_pool: None,
309 stake_store: None,
310 snapshot_uploader: None,
311 multi_signer: None,
312 certificate_repository: None,
313 open_message_repository: None,
314 verification_key_store: None,
315 epoch_settings_store: None,
316 cardano_cli_runner: None,
317 chain_observer: None,
318 chain_block_reader: None,
319 block_scanner: None,
320 immutable_digester: None,
321 immutable_file_observer: None,
322 immutable_cache_provider: None,
323 immutable_file_digest_mapper: None,
324 digester: None,
325 file_archiver: None,
326 snapshotter: None,
327 certificate_verifier: None,
328 genesis_verifier: None,
329 certificate_chain_synchronizer: None,
330 mithril_signer_registration_leader: None,
331 mithril_signer_registration_follower: None,
332 signer_registerer: None,
333 signer_synchronizer: None,
334 signer_registration_verifier: None,
335 signer_registration_round_opener: None,
336 era_reader_adapter: None,
337 era_checker: None,
338 era_reader: None,
339 event_transmitter: None,
340 event_transmitter_channel: (None, None),
341 api_version_provider: None,
342 stake_distribution_service: None,
343 ticker_service: None,
344 signer_store: None,
345 signable_seed_builder: None,
346 signable_builder_service: None,
347 signed_entity_service: None,
348 certifier_service: None,
349 epoch_service: None,
350 mithril_network_configuration_provider: None,
351 signed_entity_storer: None,
352 message_service: None,
353 legacy_prover_service: None,
354 prover_service: None,
355 signed_entity_type_lock: None,
356 chain_data_importer: None,
357 upkeep_service: None,
358 single_signature_authenticator: None,
359 metrics_service: None,
360 leader_aggregator_client: None,
361 protocol_parameters_retriever: None,
362 stop_signal_channel: None,
363 chain_data_repository: None,
364 }
365 }
366
367 fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
368 let cardano_db_artifacts_dir =
369 self.configuration.get_snapshot_dir()?.join(CARDANO_DB_ARTIFACTS_DIR);
370
371 if !cardano_db_artifacts_dir.exists() {
372 std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
373 DependenciesBuilderError::Initialization {
374 message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
375 error: Some(e.into()),
376 }
377 })?;
378 }
379
380 Ok(cardano_db_artifacts_dir)
381 }
382
383 pub async fn build_serve_dependencies_container(
385 &mut self,
386 ) -> Result<ServeCommandDependenciesContainer> {
387 #[allow(deprecated)]
388 let dependencies_manager = ServeCommandDependenciesContainer {
389 root_logger: self.root_logger(),
390 stake_store: self.get_stake_store().await?,
391 certificate_repository: self.get_certificate_repository().await?,
392 verification_key_store: self.get_verification_key_store().await?,
393 epoch_settings_storer: self.get_epoch_settings_store().await?,
394 certificate_chain_synchronizer: self.get_certificate_chain_synchronizer().await?,
395 signer_registerer: self.get_signer_registerer().await?,
396 signer_synchronizer: self.get_signer_synchronizer().await?,
397 signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
398 era_checker: self.get_era_checker().await?,
399 era_reader: self.get_era_reader().await?,
400 event_transmitter: self.get_event_transmitter().await?,
401 api_version_provider: self.get_api_version_provider().await?,
402 stake_distribution_service: self.get_stake_distribution_service().await?,
403 signer_recorder: self.get_signer_store().await?,
404 signable_builder_service: self.get_signable_builder_service().await?,
405 signed_entity_service: self.get_signed_entity_service().await?,
406 certifier_service: self.get_certifier_service().await?,
407 epoch_service: self.get_epoch_service().await?,
408 ticker_service: self.get_ticker_service().await?,
409 signed_entity_storer: self.get_signed_entity_storer().await?,
410 signer_getter: self.get_signer_store().await?,
411 message_service: self.get_message_service().await?,
412 legacy_prover_service: self.get_legacy_prover_service().await?,
413 prover_service: self.get_prover_service().await?,
414 signed_entity_type_lock: self.get_signed_entity_type_lock().await?,
415 upkeep_service: self.get_upkeep_service().await?,
416 single_signer_authenticator: self.get_single_signature_authenticator().await?,
417 metrics_service: self.get_metrics_service().await?,
418 };
419
420 self.handle_discrepancies_at_startup().await?;
421
422 Ok(dependencies_manager)
423 }
424
425 pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
427 let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
428
429 let config = AggregatorConfig::new(
430 Duration::from_millis(self.configuration.run_interval()),
431 self.configuration.is_follower_aggregator(),
432 );
433 let runtime = AggregatorRuntime::new(
434 config,
435 None,
436 Arc::new(AggregatorRunner::new(dependency_container)),
437 self.root_logger(),
438 )
439 .await
440 .map_err(|e| DependenciesBuilderError::Initialization {
441 message: "Cannot initialize Aggregator runtime.".to_string(),
442 error: Some(e.into()),
443 })?;
444
445 Ok(runtime)
446 }
447
448 pub async fn create_http_routes(
450 &mut self,
451 ) -> Result<
452 impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<>,
453 > {
454 let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
455 let snapshot_dir = self.configuration.get_snapshot_dir()?;
456 let router_state = RouterState::new(
457 dependency_container.clone(),
458 RouterConfig {
459 network: self.configuration.get_network()?,
460 server_url: self.configuration.get_server_url()?,
461 allowed_discriminants: self
462 .configuration
463 .compute_allowed_signed_entity_types_discriminants()?,
464 cardano_transactions_prover_max_hashes_allowed_by_request: self
465 .configuration
466 .cardano_transactions_prover_max_hashes_allowed_by_request(),
467 cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
468 max_artifact_epoch_offset: MAX_ARTIFACT_EPOCH_OFFSET,
469 snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
470 cardano_node_version: self.configuration.cardano_node_version(),
471 allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
472 origin_tag_white_list: self.configuration.compute_origin_tag_white_list(),
473 aggregate_signature_type: self.configuration.aggregate_signature_type(),
474 },
475 );
476
477 Ok(router::routes(Arc::new(router_state)))
478 }
479
480 pub async fn create_genesis_container(
482 &mut self,
483 ) -> Result<GenesisCommandDependenciesContainer> {
484 let network = self.configuration.get_network().with_context(
485 || "Dependencies Builder can not get Cardano network while building genesis container",
486 )?;
487
488 let dependencies = GenesisCommandDependenciesContainer {
489 network,
490 chain_observer: self.get_chain_observer().await?,
491 certificate_repository: self.get_certificate_repository().await?,
492 certificate_verifier: self.get_certificate_verifier().await?,
493 protocol_parameters_retriever: self.get_protocol_parameters_retriever().await?,
494 verification_key_store: self.get_verification_key_store().await?,
495 };
496
497 Ok(dependencies)
498 }
499
500 pub async fn create_database_command_container(
502 &mut self,
503 ) -> Result<DatabaseCommandDependenciesContainer> {
504 let main_db_connection = self
505 .get_sqlite_connection()
506 .await
507 .with_context(|| "Dependencies Builder can not get sqlite connection")?;
508
509 self.get_event_store_sqlite_connection()
510 .await
511 .with_context(|| "Dependencies Builder can not get event store sqlite connection")?;
512
513 self.get_sqlite_connection_cardano_transaction_pool()
514 .await
515 .with_context(
516 || "Dependencies Builder can not get cardano transaction pool sqlite connection",
517 )?;
518
519 let dependencies = DatabaseCommandDependenciesContainer { main_db_connection };
520
521 Ok(dependencies)
522 }
523
524 pub async fn create_tools_command_container(
526 &mut self,
527 ) -> Result<ToolsCommandDependenciesContainer> {
528 let db_connection = self
529 .get_sqlite_connection()
530 .await
531 .with_context(|| "Dependencies Builder can not get sqlite connection")?;
532
533 let dependencies = ToolsCommandDependenciesContainer { db_connection };
534
535 Ok(dependencies)
536 }
537
538 pub async fn handle_discrepancies_at_startup(&mut self) -> Result<()> {
544 let logger = self.root_logger();
545 let current_epoch = self
546 .get_chain_observer()
547 .await?
548 .get_current_epoch()
549 .await
550 .map_err(|e| DependenciesBuilderError::Initialization {
551 message: "cannot handle startup discrepancies: failed to retrieve current epoch."
552 .to_string(),
553 error: Some(e.into()),
554 })?
555 .ok_or(DependenciesBuilderError::Initialization {
556 message: "cannot handle startup discrepancies: no epoch returned.".to_string(),
557 error: None,
558 })?;
559 let network_configuration = self
560 .get_mithril_network_configuration_provider()
561 .await?
562 .get_network_configuration(current_epoch)
563 .await
564 .map_err(|e| DependenciesBuilderError::Initialization {
565 message: format!("cannot handle startup discrepancies: failed to retrieve network configuration for epoch {current_epoch}"),
566 error: Some(e),
567 })?;
568 let epoch_settings_store = self.get_epoch_settings_store().await?;
569
570 debug!(
571 logger,
572 "Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {current_epoch}";
573 "network_configuration" => ?network_configuration,
574 );
575 epoch_settings_store
576 .handle_discrepancies_at_startup(&network_configuration)
577 .await
578 .map_err(|e| DependenciesBuilderError::Initialization {
579 message: "can not create aggregator runner".to_string(),
580 error: Some(e),
581 })?;
582 Ok(())
583 }
584
585 pub async fn vanish(self) {
587 self.drop_sqlite_connections().await;
588 }
589}
590
591#[cfg(test)]
592impl DependenciesBuilder {
593 pub(crate) fn new_with_stdout_logger(configuration: Arc<dyn ConfigurationSource>) -> Self {
594 Self::new(crate::test::TestLogger::stdout(), configuration)
595 }
596}