1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::Logger;
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9 sync::{
10 Mutex,
11 mpsc::{UnboundedReceiver, UnboundedSender},
12 watch,
13 },
14 time::Duration,
15};
16use warp::Filter;
17
18use mithril_cardano_node_chain::{
19 chain_observer::{CardanoCliRunner, ChainObserver},
20 chain_reader::ChainBlockReader,
21 chain_scanner::BlockScanner,
22};
23use mithril_cardano_node_internal_database::{
24 ImmutableFileObserver,
25 digesters::{ImmutableDigester, cache::ImmutableFileDigestCacheProvider},
26};
27use mithril_common::{
28 api_version::APIVersionProvider,
29 certificate_chain::CertificateVerifier,
30 crypto_helper::ProtocolGenesisVerifier,
31 signable_builder::{SignableBuilderService, SignableSeedBuilder, TransactionsImporter},
32};
33use mithril_era::{EraChecker, EraReader, EraReaderAdapter};
34use mithril_persistence::{
35 database::repository::CardanoTransactionRepository,
36 sqlite::{SqliteConnection, SqliteConnectionPool},
37};
38use mithril_signed_entity_lock::SignedEntityTypeLock;
39use mithril_ticker::TickerService;
40
41use super::{
42 DatabaseCommandDependenciesContainer, DependenciesBuilderError, EpochServiceWrapper,
43 GenesisCommandDependenciesContainer, Result, ToolsCommandDependenciesContainer,
44};
45use crate::{
46 AggregatorConfig, AggregatorRunner, AggregatorRuntime, ImmutableFileDigestMapper,
47 MetricsService, MithrilSignerRegistrationLeader, MultiSigner, ProtocolParametersRetriever,
48 ServeCommandDependenciesContainer, SignerRegisterer, SignerRegistrationRoundOpener,
49 SignerRegistrationVerifier, SingleSignatureAuthenticator, VerificationKeyStorer,
50 configuration::ConfigurationSource,
51 database::repository::{
52 CertificateRepository, EpochSettingsStore, OpenMessageRepository, SignedEntityStorer,
53 SignerStore, StakePoolStore,
54 },
55 event_store::{EventMessage, TransmitterService},
56 file_uploaders::FileUploader,
57 http_server::routes::router::{self, RouterConfig, RouterState},
58 services::{
59 AggregatorHTTPClient, CertificateChainSynchronizer, CertifierService, MessageService,
60 MithrilSignerRegistrationFollower, ProverService, SignedEntityService, SignerSynchronizer,
61 Snapshotter, StakeDistributionService, UpkeepService,
62 },
63 tools::file_archiver::FileArchiver,
64};
65
66#[macro_export]
73macro_rules! get_dependency {
74 ( $self:ident.$attribute:ident ) => {{
75 paste::paste! {
76 get_dependency!($self.$attribute = $self.[<build_ $attribute>]().await?)
77 }
78 }};
79 ( $self:ident.$attribute:ident = $builder:expr ) => {{
80 paste::paste! {
81 if $self.$attribute.is_none() {
82 $self.$attribute = Some($builder);
83 }
84
85 let r:Result<_> = Ok($self.$attribute.as_ref().cloned().unwrap());
86 r
87 }
88 }};
89}
90
91const SQLITE_FILE: &str = "aggregator.sqlite3";
92const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
93const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
94const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
95const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
96const MAX_ARTIFACT_EPOCH_OFFSET: u64 = 5;
98
99pub struct DependenciesBuilder {
111 pub configuration: Arc<dyn ConfigurationSource>,
113
114 pub root_logger: Logger,
116
117 pub sqlite_connection: Option<Arc<SqliteConnection>>,
119
120 pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
122
123 pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
125
126 pub stake_store: Option<Arc<StakePoolStore>>,
129
130 pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
132
133 pub multi_signer: Option<Arc<dyn MultiSigner>>,
135
136 pub certificate_repository: Option<Arc<CertificateRepository>>,
138
139 pub open_message_repository: Option<Arc<OpenMessageRepository>>,
141
142 pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
144
145 pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
147
148 pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
150
151 pub chain_observer: Option<Arc<dyn ChainObserver>>,
153
154 pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
156
157 pub transaction_repository: Option<Arc<CardanoTransactionRepository>>,
159
160 pub block_scanner: Option<Arc<dyn BlockScanner>>,
162
163 pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
165
166 pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
168
169 pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
171
172 pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
174
175 pub digester: Option<Arc<dyn ImmutableDigester>>,
177
178 pub file_archiver: Option<Arc<FileArchiver>>,
180
181 pub snapshotter: Option<Arc<dyn Snapshotter>>,
183
184 pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
186
187 pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
189
190 pub certificate_chain_synchronizer: Option<Arc<dyn CertificateChainSynchronizer>>,
192
193 pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
195
196 pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
198
199 pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
201
202 pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
204
205 pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
207
208 pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
210
211 pub era_checker: Option<Arc<EraChecker>>,
213
214 pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
216
217 pub era_reader: Option<Arc<EraReader>>,
219
220 pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
222
223 pub event_transmitter_channel: (
225 Option<UnboundedReceiver<EventMessage>>,
226 Option<UnboundedSender<EventMessage>>,
227 ),
228
229 pub api_version_provider: Option<Arc<APIVersionProvider>>,
231
232 pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
234
235 pub ticker_service: Option<Arc<dyn TickerService>>,
237
238 pub signer_store: Option<Arc<SignerStore>>,
240
241 pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
243
244 pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
246
247 pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
249
250 pub certifier_service: Option<Arc<dyn CertifierService>>,
252
253 pub epoch_service: Option<EpochServiceWrapper>,
255
256 pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
258
259 pub message_service: Option<Arc<dyn MessageService>>,
261
262 pub prover_service: Option<Arc<dyn ProverService>>,
264
265 pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
267
268 pub transactions_importer: Option<Arc<dyn TransactionsImporter>>,
270
271 pub upkeep_service: Option<Arc<dyn UpkeepService>>,
273
274 pub single_signature_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
276
277 pub metrics_service: Option<Arc<MetricsService>>,
279
280 pub leader_aggregator_client: Option<Arc<AggregatorHTTPClient>>,
282
283 pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
285
286 pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
288}
289
290impl DependenciesBuilder {
291 pub fn new(root_logger: Logger, configuration: Arc<dyn ConfigurationSource>) -> Self {
293 Self {
294 configuration,
295 root_logger,
296 sqlite_connection: None,
297 sqlite_connection_event_store: None,
298 sqlite_connection_cardano_transaction_pool: None,
299 stake_store: None,
300 snapshot_uploader: None,
301 multi_signer: None,
302 certificate_repository: None,
303 open_message_repository: None,
304 verification_key_store: None,
305 epoch_settings_store: None,
306 cardano_cli_runner: None,
307 chain_observer: None,
308 chain_block_reader: None,
309 block_scanner: None,
310 transaction_repository: None,
311 immutable_digester: None,
312 immutable_file_observer: None,
313 immutable_cache_provider: None,
314 immutable_file_digest_mapper: None,
315 digester: None,
316 file_archiver: None,
317 snapshotter: None,
318 certificate_verifier: None,
319 genesis_verifier: None,
320 certificate_chain_synchronizer: None,
321 mithril_signer_registration_leader: None,
322 mithril_signer_registration_follower: None,
323 signer_registerer: None,
324 signer_synchronizer: None,
325 signer_registration_verifier: None,
326 signer_registration_round_opener: None,
327 era_reader_adapter: None,
328 era_checker: None,
329 era_reader: None,
330 event_transmitter: None,
331 event_transmitter_channel: (None, None),
332 api_version_provider: None,
333 stake_distribution_service: None,
334 ticker_service: None,
335 signer_store: None,
336 signable_seed_builder: None,
337 signable_builder_service: None,
338 signed_entity_service: None,
339 certifier_service: None,
340 epoch_service: None,
341 signed_entity_storer: None,
342 message_service: None,
343 prover_service: None,
344 signed_entity_type_lock: None,
345 transactions_importer: None,
346 upkeep_service: None,
347 single_signature_authenticator: None,
348 metrics_service: None,
349 leader_aggregator_client: None,
350 protocol_parameters_retriever: None,
351 stop_signal_channel: None,
352 }
353 }
354
355 fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
356 let cardano_db_artifacts_dir =
357 self.configuration.get_snapshot_dir()?.join(CARDANO_DB_ARTIFACTS_DIR);
358
359 if !cardano_db_artifacts_dir.exists() {
360 std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
361 DependenciesBuilderError::Initialization {
362 message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
363 error: Some(e.into()),
364 }
365 })?;
366 }
367
368 Ok(cardano_db_artifacts_dir)
369 }
370
371 pub async fn build_serve_dependencies_container(
373 &mut self,
374 ) -> Result<ServeCommandDependenciesContainer> {
375 #[allow(deprecated)]
376 let dependencies_manager = ServeCommandDependenciesContainer {
377 root_logger: self.root_logger(),
378 stake_store: self.get_stake_store().await?,
379 certificate_repository: self.get_certificate_repository().await?,
380 verification_key_store: self.get_verification_key_store().await?,
381 epoch_settings_storer: self.get_epoch_settings_store().await?,
382 chain_observer: self.get_chain_observer().await?,
383 certificate_chain_synchronizer: self.get_certificate_chain_synchronizer().await?,
384 signer_registerer: self.get_signer_registerer().await?,
385 signer_synchronizer: self.get_signer_synchronizer().await?,
386 signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
387 era_checker: self.get_era_checker().await?,
388 era_reader: self.get_era_reader().await?,
389 event_transmitter: self.get_event_transmitter().await?,
390 api_version_provider: self.get_api_version_provider().await?,
391 stake_distribution_service: self.get_stake_distribution_service().await?,
392 signer_recorder: self.get_signer_store().await?,
393 signable_builder_service: self.get_signable_builder_service().await?,
394 signed_entity_service: self.get_signed_entity_service().await?,
395 certifier_service: self.get_certifier_service().await?,
396 epoch_service: self.get_epoch_service().await?,
397 ticker_service: self.get_ticker_service().await?,
398 signed_entity_storer: self.get_signed_entity_storer().await?,
399 signer_getter: self.get_signer_store().await?,
400 message_service: self.get_message_service().await?,
401 prover_service: self.get_prover_service().await?,
402 signed_entity_type_lock: self.get_signed_entity_type_lock().await?,
403 upkeep_service: self.get_upkeep_service().await?,
404 single_signer_authenticator: self.get_single_signature_authenticator().await?,
405 metrics_service: self.get_metrics_service().await?,
406 };
407
408 Ok(dependencies_manager)
409 }
410
411 pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
413 let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
414
415 let config = AggregatorConfig::new(
416 Duration::from_millis(self.configuration.run_interval()),
417 self.configuration.is_follower_aggregator(),
418 );
419 let runtime = AggregatorRuntime::new(
420 config,
421 None,
422 Arc::new(AggregatorRunner::new(dependency_container)),
423 self.root_logger(),
424 )
425 .await
426 .map_err(|e| DependenciesBuilderError::Initialization {
427 message: "Cannot initialize Aggregator runtime.".to_string(),
428 error: Some(e.into()),
429 })?;
430
431 Ok(runtime)
432 }
433
434 pub async fn create_http_routes(
436 &mut self,
437 ) -> Result<
438 impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<>,
439 > {
440 let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
441 let snapshot_dir = self.configuration.get_snapshot_dir()?;
442 let router_state = RouterState::new(
443 dependency_container.clone(),
444 RouterConfig {
445 network: self.configuration.get_network()?,
446 server_url: self.configuration.get_server_url()?,
447 allowed_discriminants: self
448 .configuration
449 .compute_allowed_signed_entity_types_discriminants()?,
450 cardano_transactions_prover_max_hashes_allowed_by_request: self
451 .configuration
452 .cardano_transactions_prover_max_hashes_allowed_by_request(),
453 cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
454 max_artifact_epoch_offset: MAX_ARTIFACT_EPOCH_OFFSET,
455 snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
456 cardano_node_version: self.configuration.cardano_node_version(),
457 allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
458 origin_tag_white_list: self.configuration.compute_origin_tag_white_list(),
459 aggregate_signature_type: self.configuration.aggregate_signature_type(),
460 },
461 );
462
463 Ok(router::routes(Arc::new(router_state)))
464 }
465
466 pub async fn create_genesis_container(
468 &mut self,
469 ) -> Result<GenesisCommandDependenciesContainer> {
470 let network = self.configuration.get_network().with_context(
471 || "Dependencies Builder can not get Cardano network while building genesis container",
472 )?;
473
474 let dependencies = GenesisCommandDependenciesContainer {
475 network,
476 chain_observer: self.get_chain_observer().await?,
477 certificate_repository: self.get_certificate_repository().await?,
478 certificate_verifier: self.get_certificate_verifier().await?,
479 protocol_parameters_retriever: self.get_protocol_parameters_retriever().await?,
480 verification_key_store: self.get_verification_key_store().await?,
481 };
482
483 Ok(dependencies)
484 }
485
486 pub async fn create_database_command_container(
488 &mut self,
489 ) -> Result<DatabaseCommandDependenciesContainer> {
490 let main_db_connection = self
491 .get_sqlite_connection()
492 .await
493 .with_context(|| "Dependencies Builder can not get sqlite connection")?;
494
495 self.get_event_store_sqlite_connection()
496 .await
497 .with_context(|| "Dependencies Builder can not get event store sqlite connection")?;
498
499 self.get_sqlite_connection_cardano_transaction_pool()
500 .await
501 .with_context(
502 || "Dependencies Builder can not get cardano transaction pool sqlite connection",
503 )?;
504
505 let dependencies = DatabaseCommandDependenciesContainer { main_db_connection };
506
507 Ok(dependencies)
508 }
509
510 pub async fn create_tools_command_container(
512 &mut self,
513 ) -> Result<ToolsCommandDependenciesContainer> {
514 let db_connection = self
515 .get_sqlite_connection()
516 .await
517 .with_context(|| "Dependencies Builder can not get sqlite connection")?;
518
519 let dependencies = ToolsCommandDependenciesContainer { db_connection };
520
521 Ok(dependencies)
522 }
523
524 pub async fn vanish(self) {
526 self.drop_sqlite_connections().await;
527 }
528}
529
530#[cfg(test)]
531impl DependenciesBuilder {
532 pub(crate) fn new_with_stdout_logger(configuration: Arc<dyn ConfigurationSource>) -> Self {
533 Self::new(crate::test::TestLogger::stdout(), configuration)
534 }
535}