1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::Logger;
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9 sync::{
10 mpsc::{UnboundedReceiver, UnboundedSender},
11 watch, Mutex,
12 },
13 time::Duration,
14};
15use warp::Filter;
16
17use mithril_cardano_node_chain::{
18 chain_observer::{CardanoCliRunner, ChainObserver},
19 chain_reader::ChainBlockReader,
20 chain_scanner::BlockScanner,
21};
22use mithril_cardano_node_internal_database::{
23 digesters::{cache::ImmutableFileDigestCacheProvider, ImmutableDigester},
24 ImmutableFileObserver,
25};
26use mithril_common::{
27 api_version::APIVersionProvider,
28 certificate_chain::CertificateVerifier,
29 crypto_helper::ProtocolGenesisVerifier,
30 signable_builder::{SignableBuilderService, SignableSeedBuilder, TransactionsImporter},
31};
32use mithril_era::{EraChecker, EraReader, EraReaderAdapter};
33use mithril_persistence::{
34 database::repository::CardanoTransactionRepository,
35 sqlite::{SqliteConnection, SqliteConnectionPool},
36};
37use mithril_signed_entity_lock::SignedEntityTypeLock;
38use mithril_ticker::TickerService;
39
40use super::{
41 DatabaseCommandDependenciesContainer, DependenciesBuilderError, EpochServiceWrapper,
42 GenesisCommandDependenciesContainer, Result, ToolsCommandDependenciesContainer,
43};
44use crate::{
45 configuration::ConfigurationSource,
46 database::repository::{
47 CertificateRepository, EpochSettingsStore, OpenMessageRepository, SignedEntityStorer,
48 SignerStore, StakePoolStore,
49 },
50 event_store::{EventMessage, TransmitterService},
51 file_uploaders::FileUploader,
52 http_server::routes::router::{self, RouterConfig, RouterState},
53 services::{
54 AggregatorClient, CertifierService, MessageService, MithrilSignerRegistrationFollower,
55 ProverService, SignedEntityService, SignerSynchronizer, Snapshotter,
56 StakeDistributionService, UpkeepService,
57 },
58 tools::file_archiver::FileArchiver,
59 AggregatorConfig, AggregatorRunner, AggregatorRuntime, ImmutableFileDigestMapper,
60 MetricsService, MithrilSignerRegistrationLeader, MultiSigner, ProtocolParametersRetriever,
61 ServeCommandDependenciesContainer, SignerRegisterer, SignerRegistrationRoundOpener,
62 SignerRegistrationVerifier, SingleSignatureAuthenticator, VerificationKeyStorer,
63};
64
65#[macro_export]
72macro_rules! get_dependency {
73 ( $self:ident.$attribute:ident ) => {{
74 paste::paste! {
75 get_dependency!($self.$attribute = $self.[<build_ $attribute>]().await?)
76 }
77 }};
78 ( $self:ident.$attribute:ident = $builder:expr ) => {{
79 paste::paste! {
80 if $self.$attribute.is_none() {
81 $self.$attribute = Some($builder);
82 }
83
84 let r:Result<_> = Ok($self.$attribute.as_ref().cloned().unwrap());
85 r
86 }
87 }};
88}
89
90const SQLITE_FILE: &str = "aggregator.sqlite3";
91const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
92const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
93const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
94const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
95
96pub struct DependenciesBuilder {
108 pub configuration: Arc<dyn ConfigurationSource>,
110
111 pub root_logger: Logger,
113
114 pub sqlite_connection: Option<Arc<SqliteConnection>>,
116
117 pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
119
120 pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
122
123 pub stake_store: Option<Arc<StakePoolStore>>,
126
127 pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
129
130 pub multi_signer: Option<Arc<dyn MultiSigner>>,
132
133 pub certificate_repository: Option<Arc<CertificateRepository>>,
135
136 pub open_message_repository: Option<Arc<OpenMessageRepository>>,
138
139 pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
141
142 pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
144
145 pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
147
148 pub chain_observer: Option<Arc<dyn ChainObserver>>,
150
151 pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
153
154 pub transaction_repository: Option<Arc<CardanoTransactionRepository>>,
156
157 pub block_scanner: Option<Arc<dyn BlockScanner>>,
159
160 pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
162
163 pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
165
166 pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
168
169 pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
171
172 pub digester: Option<Arc<dyn ImmutableDigester>>,
174
175 pub file_archiver: Option<Arc<FileArchiver>>,
177
178 pub snapshotter: Option<Arc<dyn Snapshotter>>,
180
181 pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
183
184 pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
186
187 pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
189
190 pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
192
193 pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
195
196 pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
198
199 pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
201
202 pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
204
205 pub era_checker: Option<Arc<EraChecker>>,
207
208 pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
210
211 pub era_reader: Option<Arc<EraReader>>,
213
214 pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
216
217 pub event_transmitter_channel: (
219 Option<UnboundedReceiver<EventMessage>>,
220 Option<UnboundedSender<EventMessage>>,
221 ),
222
223 pub api_version_provider: Option<Arc<APIVersionProvider>>,
225
226 pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
228
229 pub ticker_service: Option<Arc<dyn TickerService>>,
231
232 pub signer_store: Option<Arc<SignerStore>>,
234
235 pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
237
238 pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
240
241 pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
243
244 pub certifier_service: Option<Arc<dyn CertifierService>>,
246
247 pub epoch_service: Option<EpochServiceWrapper>,
249
250 pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
252
253 pub message_service: Option<Arc<dyn MessageService>>,
255
256 pub prover_service: Option<Arc<dyn ProverService>>,
258
259 pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
261
262 pub transactions_importer: Option<Arc<dyn TransactionsImporter>>,
264
265 pub upkeep_service: Option<Arc<dyn UpkeepService>>,
267
268 pub single_signature_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
270
271 pub metrics_service: Option<Arc<MetricsService>>,
273
274 pub leader_aggregator_client: Option<Arc<dyn AggregatorClient>>,
276
277 pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
279
280 pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
282}
283
284impl DependenciesBuilder {
285 pub fn new(root_logger: Logger, configuration: Arc<dyn ConfigurationSource>) -> Self {
287 Self {
288 configuration,
289 root_logger,
290 sqlite_connection: None,
291 sqlite_connection_event_store: None,
292 sqlite_connection_cardano_transaction_pool: None,
293 stake_store: None,
294 snapshot_uploader: None,
295 multi_signer: None,
296 certificate_repository: None,
297 open_message_repository: None,
298 verification_key_store: None,
299 epoch_settings_store: None,
300 cardano_cli_runner: None,
301 chain_observer: None,
302 chain_block_reader: None,
303 block_scanner: None,
304 transaction_repository: None,
305 immutable_digester: None,
306 immutable_file_observer: None,
307 immutable_cache_provider: None,
308 immutable_file_digest_mapper: None,
309 digester: None,
310 file_archiver: None,
311 snapshotter: None,
312 certificate_verifier: None,
313 genesis_verifier: None,
314 mithril_signer_registration_leader: None,
315 mithril_signer_registration_follower: None,
316 signer_registerer: None,
317 signer_synchronizer: None,
318 signer_registration_verifier: None,
319 signer_registration_round_opener: None,
320 era_reader_adapter: None,
321 era_checker: None,
322 era_reader: None,
323 event_transmitter: None,
324 event_transmitter_channel: (None, None),
325 api_version_provider: None,
326 stake_distribution_service: None,
327 ticker_service: None,
328 signer_store: None,
329 signable_seed_builder: None,
330 signable_builder_service: None,
331 signed_entity_service: None,
332 certifier_service: None,
333 epoch_service: None,
334 signed_entity_storer: None,
335 message_service: None,
336 prover_service: None,
337 signed_entity_type_lock: None,
338 transactions_importer: None,
339 upkeep_service: None,
340 single_signature_authenticator: None,
341 metrics_service: None,
342 leader_aggregator_client: None,
343 protocol_parameters_retriever: None,
344 stop_signal_channel: None,
345 }
346 }
347
348 fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
349 let cardano_db_artifacts_dir = self
350 .configuration
351 .get_snapshot_dir()?
352 .join(CARDANO_DB_ARTIFACTS_DIR);
353
354 if !cardano_db_artifacts_dir.exists() {
355 std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
356 DependenciesBuilderError::Initialization {
357 message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
358 error: Some(e.into()),
359 }
360 })?;
361 }
362
363 Ok(cardano_db_artifacts_dir)
364 }
365
366 pub async fn build_serve_dependencies_container(
368 &mut self,
369 ) -> Result<ServeCommandDependenciesContainer> {
370 #[allow(deprecated)]
371 let dependencies_manager = ServeCommandDependenciesContainer {
372 root_logger: self.root_logger(),
373 stake_store: self.get_stake_store().await?,
374 certificate_repository: self.get_certificate_repository().await?,
375 verification_key_store: self.get_verification_key_store().await?,
376 epoch_settings_storer: self.get_epoch_settings_store().await?,
377 chain_observer: self.get_chain_observer().await?,
378 signer_registerer: self.get_signer_registerer().await?,
379 signer_synchronizer: self.get_signer_synchronizer().await?,
380 signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
381 era_checker: self.get_era_checker().await?,
382 era_reader: self.get_era_reader().await?,
383 event_transmitter: self.get_event_transmitter().await?,
384 api_version_provider: self.get_api_version_provider().await?,
385 stake_distribution_service: self.get_stake_distribution_service().await?,
386 signer_recorder: self.get_signer_store().await?,
387 signable_builder_service: self.get_signable_builder_service().await?,
388 signed_entity_service: self.get_signed_entity_service().await?,
389 certifier_service: self.get_certifier_service().await?,
390 epoch_service: self.get_epoch_service().await?,
391 ticker_service: self.get_ticker_service().await?,
392 signed_entity_storer: self.get_signed_entity_storer().await?,
393 signer_getter: self.get_signer_store().await?,
394 message_service: self.get_message_service().await?,
395 prover_service: self.get_prover_service().await?,
396 signed_entity_type_lock: self.get_signed_entity_type_lock().await?,
397 upkeep_service: self.get_upkeep_service().await?,
398 single_signer_authenticator: self.get_single_signature_authenticator().await?,
399 metrics_service: self.get_metrics_service().await?,
400 };
401
402 Ok(dependencies_manager)
403 }
404
405 pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
407 let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
408
409 let config = AggregatorConfig::new(
410 Duration::from_millis(self.configuration.run_interval()),
411 self.configuration.is_follower_aggregator(),
412 );
413 let runtime = AggregatorRuntime::new(
414 config,
415 None,
416 Arc::new(AggregatorRunner::new(dependency_container)),
417 self.root_logger(),
418 )
419 .await
420 .map_err(|e| DependenciesBuilderError::Initialization {
421 message: "Cannot initialize Aggregator runtime.".to_string(),
422 error: Some(e.into()),
423 })?;
424
425 Ok(runtime)
426 }
427
428 pub async fn create_http_routes(
430 &mut self,
431 ) -> Result<impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone> {
432 let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
433 let snapshot_dir = self.configuration.get_snapshot_dir()?;
434 let router_state = RouterState::new(
435 dependency_container.clone(),
436 RouterConfig {
437 network: self.configuration.get_network()?,
438 server_url: self.configuration.get_server_url()?,
439 allowed_discriminants: self
440 .configuration
441 .compute_allowed_signed_entity_types_discriminants()?,
442 cardano_transactions_prover_max_hashes_allowed_by_request: self
443 .configuration
444 .cardano_transactions_prover_max_hashes_allowed_by_request(),
445 cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
446 snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
447 cardano_node_version: self.configuration.cardano_node_version(),
448 allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
449 origin_tag_white_list: self.configuration.compute_origin_tag_white_list(),
450 },
451 );
452
453 Ok(router::routes(Arc::new(router_state)))
454 }
455
456 pub async fn create_genesis_container(
458 &mut self,
459 ) -> Result<GenesisCommandDependenciesContainer> {
460 let network = self.configuration.get_network().with_context(|| {
461 "Dependencies Builder can not get Cardano network while building genesis container"
462 })?;
463
464 let dependencies = GenesisCommandDependenciesContainer {
465 network,
466 chain_observer: self.get_chain_observer().await?,
467 certificate_repository: self.get_certificate_repository().await?,
468 certificate_verifier: self.get_certificate_verifier().await?,
469 protocol_parameters_retriever: self.get_protocol_parameters_retriever().await?,
470 verification_key_store: self.get_verification_key_store().await?,
471 };
472
473 Ok(dependencies)
474 }
475
476 pub async fn create_database_command_container(
478 &mut self,
479 ) -> Result<DatabaseCommandDependenciesContainer> {
480 let main_db_connection = self
481 .get_sqlite_connection()
482 .await
483 .with_context(|| "Dependencies Builder can not get sqlite connection")?;
484
485 self.get_event_store_sqlite_connection()
486 .await
487 .with_context(|| "Dependencies Builder can not get event store sqlite connection")?;
488
489 self.get_sqlite_connection_cardano_transaction_pool()
490 .await
491 .with_context(|| {
492 "Dependencies Builder can not get cardano transaction pool sqlite connection"
493 })?;
494
495 let dependencies = DatabaseCommandDependenciesContainer { main_db_connection };
496
497 Ok(dependencies)
498 }
499
500 pub async fn create_tools_command_container(
502 &mut self,
503 ) -> Result<ToolsCommandDependenciesContainer> {
504 let db_connection = self
505 .get_sqlite_connection()
506 .await
507 .with_context(|| "Dependencies Builder can not get sqlite connection")?;
508
509 let dependencies = ToolsCommandDependenciesContainer { db_connection };
510
511 Ok(dependencies)
512 }
513
514 pub async fn vanish(self) {
516 self.drop_sqlite_connections().await;
517 }
518}
519
520#[cfg(test)]
521impl DependenciesBuilder {
522 pub(crate) fn new_with_stdout_logger(configuration: Arc<dyn ConfigurationSource>) -> Self {
523 Self::new(crate::test_tools::TestLogger::stdout(), configuration)
524 }
525}