mithril_aggregator/dependency_injection/builder/
mod.rs

1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::Logger;
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9    sync::{
10        mpsc::{UnboundedReceiver, UnboundedSender},
11        watch, Mutex,
12    },
13    time::Duration,
14};
15use warp::Filter;
16
17use mithril_cardano_node_chain::{
18    chain_observer::{CardanoCliRunner, ChainObserver},
19    chain_reader::ChainBlockReader,
20    chain_scanner::BlockScanner,
21};
22use mithril_cardano_node_internal_database::{
23    digesters::{cache::ImmutableFileDigestCacheProvider, ImmutableDigester},
24    ImmutableFileObserver,
25};
26use mithril_common::{
27    api_version::APIVersionProvider,
28    certificate_chain::CertificateVerifier,
29    crypto_helper::ProtocolGenesisVerifier,
30    signable_builder::{SignableBuilderService, SignableSeedBuilder, TransactionsImporter},
31};
32use mithril_era::{EraChecker, EraReader, EraReaderAdapter};
33use mithril_persistence::{
34    database::repository::CardanoTransactionRepository,
35    sqlite::{SqliteConnection, SqliteConnectionPool},
36};
37use mithril_signed_entity_lock::SignedEntityTypeLock;
38use mithril_ticker::TickerService;
39
40use super::{
41    DatabaseCommandDependenciesContainer, DependenciesBuilderError, EpochServiceWrapper,
42    GenesisCommandDependenciesContainer, Result, ToolsCommandDependenciesContainer,
43};
44use crate::{
45    configuration::ConfigurationSource,
46    database::repository::{
47        CertificateRepository, EpochSettingsStore, OpenMessageRepository, SignedEntityStorer,
48        SignerStore, StakePoolStore,
49    },
50    event_store::{EventMessage, TransmitterService},
51    file_uploaders::FileUploader,
52    http_server::routes::router::{self, RouterConfig, RouterState},
53    services::{
54        AggregatorClient, CertifierService, MessageService, MithrilSignerRegistrationFollower,
55        ProverService, SignedEntityService, SignerSynchronizer, Snapshotter,
56        StakeDistributionService, UpkeepService,
57    },
58    tools::file_archiver::FileArchiver,
59    AggregatorConfig, AggregatorRunner, AggregatorRuntime, ImmutableFileDigestMapper,
60    MetricsService, MithrilSignerRegistrationLeader, MultiSigner, ProtocolParametersRetriever,
61    ServeCommandDependenciesContainer, SignerRegisterer, SignerRegistrationRoundOpener,
62    SignerRegistrationVerifier, SingleSignatureAuthenticator, VerificationKeyStorer,
63};
64
65/// Retrieve attribute stored in the builder.
66/// If not yet initialized, we instantiate it by calling the associated build function (build_<attribute_name>).
67/// If we don't want to to use the default build function, we can pass an expression that build the value.
68/// Usage examples:
69/// get_dependency!(self.signer_registerer)
70/// get_dependency!(self.signer_registerer = self.build_signer_registerer().await?)
71#[macro_export]
72macro_rules! get_dependency {
73    ( $self:ident.$attribute:ident ) => {{
74        paste::paste! {
75            get_dependency!($self.$attribute = $self.[<build_ $attribute>]().await?)
76        }
77    }};
78    ( $self:ident.$attribute:ident = $builder:expr ) => {{
79        paste::paste! {
80            if $self.$attribute.is_none() {
81                $self.$attribute = Some($builder);
82            }
83
84            let r:Result<_> = Ok($self.$attribute.as_ref().cloned().unwrap());
85            r
86        }
87    }};
88}
89
90const SQLITE_FILE: &str = "aggregator.sqlite3";
91const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
92const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
93const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
94const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
95
96/// ## Dependencies container builder
97///
98/// This is meant to create SHARED DEPENDENCIES, ie: dependencies instances that
99/// must be shared amongst several Tokio tasks. For example, database
100/// repositories are NOT shared dependencies and therefor can be created ad hoc
101/// whereas the database connection is a shared dependency.
102///
103/// Each shared dependency must implement a `build` and a `get` function. The
104/// build function creates the dependency, the get function creates the
105/// dependency at first call then return a clone of the Arc containing the
106/// dependency for all further calls.
107pub struct DependenciesBuilder {
108    /// Configuration parameters
109    pub configuration: Arc<dyn ConfigurationSource>,
110
111    /// Application root logger
112    pub root_logger: Logger,
113
114    /// SQLite database connection
115    pub sqlite_connection: Option<Arc<SqliteConnection>>,
116
117    /// Event store SQLite database connection
118    pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
119
120    /// Cardano transactions SQLite database connection pool
121    pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
122
123    /// Stake Store used by the StakeDistributionService
124    /// It shall be a private dependency.
125    pub stake_store: Option<Arc<StakePoolStore>>,
126
127    /// Snapshot uploader service.
128    pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
129
130    /// Multisigner service.
131    pub multi_signer: Option<Arc<dyn MultiSigner>>,
132
133    /// Certificate repository.
134    pub certificate_repository: Option<Arc<CertificateRepository>>,
135
136    /// Open message repository.
137    pub open_message_repository: Option<Arc<OpenMessageRepository>>,
138
139    /// Verification key store.
140    pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
141
142    /// Epoch settings store.
143    pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
144
145    /// Cardano CLI Runner for the [ChainObserver]
146    pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
147
148    /// Chain observer service.
149    pub chain_observer: Option<Arc<dyn ChainObserver>>,
150
151    /// Chain block reader
152    pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
153
154    /// Cardano transactions repository.
155    pub transaction_repository: Option<Arc<CardanoTransactionRepository>>,
156
157    /// Cardano block scanner.
158    pub block_scanner: Option<Arc<dyn BlockScanner>>,
159
160    /// Immutable file digester service.
161    pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
162
163    /// Immutable file observer service.
164    pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
165
166    /// Immutable cache provider service.
167    pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
168
169    /// Immutable file digest mapper service.
170    pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
171
172    /// Digester service.
173    pub digester: Option<Arc<dyn ImmutableDigester>>,
174
175    /// File archiver service.
176    pub file_archiver: Option<Arc<FileArchiver>>,
177
178    /// Snapshotter service.
179    pub snapshotter: Option<Arc<dyn Snapshotter>>,
180
181    /// Certificate verifier service.
182    pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
183
184    /// Genesis signature verifier service.
185    pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
186
187    /// Mithril signer registration leader service
188    pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
189
190    /// Mithril signer registration follower service
191    pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
192
193    /// Signer registerer service
194    pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
195
196    /// Signer synchronizer service
197    pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
198
199    /// Signer registration verifier
200    pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
201
202    /// Signer registration round opener service
203    pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
204
205    /// Era checker service
206    pub era_checker: Option<Arc<EraChecker>>,
207
208    /// Adapter for [EraReader]
209    pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
210
211    /// Era reader service
212    pub era_reader: Option<Arc<EraReader>>,
213
214    /// Event Transmitter Service
215    pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
216
217    /// Event transmitter Channel Sender endpoint
218    pub event_transmitter_channel: (
219        Option<UnboundedReceiver<EventMessage>>,
220        Option<UnboundedSender<EventMessage>>,
221    ),
222
223    /// API Version provider
224    pub api_version_provider: Option<Arc<APIVersionProvider>>,
225
226    /// Stake Distribution Service
227    pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
228
229    /// Ticker Service
230    pub ticker_service: Option<Arc<dyn TickerService>>,
231
232    /// Signer Store
233    pub signer_store: Option<Arc<SignerStore>>,
234
235    /// Signable Seed Builder
236    pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
237
238    /// Signable Builder Service
239    pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
240
241    /// Signed Entity Service
242    pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
243
244    /// Certifier service
245    pub certifier_service: Option<Arc<dyn CertifierService>>,
246
247    /// Epoch service.
248    pub epoch_service: Option<EpochServiceWrapper>,
249
250    /// Signed Entity storer
251    pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
252
253    /// HTTP Message service
254    pub message_service: Option<Arc<dyn MessageService>>,
255
256    /// Prover service
257    pub prover_service: Option<Arc<dyn ProverService>>,
258
259    /// Signed Entity Type Lock
260    pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
261
262    /// Transactions Importer
263    pub transactions_importer: Option<Arc<dyn TransactionsImporter>>,
264
265    /// Upkeep service
266    pub upkeep_service: Option<Arc<dyn UpkeepService>>,
267
268    /// Single signer authenticator
269    pub single_signature_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
270
271    /// Metrics service
272    pub metrics_service: Option<Arc<MetricsService>>,
273
274    /// Leader aggregator client
275    pub leader_aggregator_client: Option<Arc<dyn AggregatorClient>>,
276
277    /// Protocol parameters retriever
278    pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
279
280    /// Stop signal channel
281    pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
282}
283
284impl DependenciesBuilder {
285    /// Create a new clean dependency builder
286    pub fn new(root_logger: Logger, configuration: Arc<dyn ConfigurationSource>) -> Self {
287        Self {
288            configuration,
289            root_logger,
290            sqlite_connection: None,
291            sqlite_connection_event_store: None,
292            sqlite_connection_cardano_transaction_pool: None,
293            stake_store: None,
294            snapshot_uploader: None,
295            multi_signer: None,
296            certificate_repository: None,
297            open_message_repository: None,
298            verification_key_store: None,
299            epoch_settings_store: None,
300            cardano_cli_runner: None,
301            chain_observer: None,
302            chain_block_reader: None,
303            block_scanner: None,
304            transaction_repository: None,
305            immutable_digester: None,
306            immutable_file_observer: None,
307            immutable_cache_provider: None,
308            immutable_file_digest_mapper: None,
309            digester: None,
310            file_archiver: None,
311            snapshotter: None,
312            certificate_verifier: None,
313            genesis_verifier: None,
314            mithril_signer_registration_leader: None,
315            mithril_signer_registration_follower: None,
316            signer_registerer: None,
317            signer_synchronizer: None,
318            signer_registration_verifier: None,
319            signer_registration_round_opener: None,
320            era_reader_adapter: None,
321            era_checker: None,
322            era_reader: None,
323            event_transmitter: None,
324            event_transmitter_channel: (None, None),
325            api_version_provider: None,
326            stake_distribution_service: None,
327            ticker_service: None,
328            signer_store: None,
329            signable_seed_builder: None,
330            signable_builder_service: None,
331            signed_entity_service: None,
332            certifier_service: None,
333            epoch_service: None,
334            signed_entity_storer: None,
335            message_service: None,
336            prover_service: None,
337            signed_entity_type_lock: None,
338            transactions_importer: None,
339            upkeep_service: None,
340            single_signature_authenticator: None,
341            metrics_service: None,
342            leader_aggregator_client: None,
343            protocol_parameters_retriever: None,
344            stop_signal_channel: None,
345        }
346    }
347
348    fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
349        let cardano_db_artifacts_dir = self
350            .configuration
351            .get_snapshot_dir()?
352            .join(CARDANO_DB_ARTIFACTS_DIR);
353
354        if !cardano_db_artifacts_dir.exists() {
355            std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
356                DependenciesBuilderError::Initialization {
357                    message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
358                    error: Some(e.into()),
359                }
360            })?;
361        }
362
363        Ok(cardano_db_artifacts_dir)
364    }
365
366    /// Return an unconfigured [ServeCommandDependenciesContainer]
367    pub async fn build_serve_dependencies_container(
368        &mut self,
369    ) -> Result<ServeCommandDependenciesContainer> {
370        #[allow(deprecated)]
371        let dependencies_manager = ServeCommandDependenciesContainer {
372            root_logger: self.root_logger(),
373            stake_store: self.get_stake_store().await?,
374            certificate_repository: self.get_certificate_repository().await?,
375            verification_key_store: self.get_verification_key_store().await?,
376            epoch_settings_storer: self.get_epoch_settings_store().await?,
377            chain_observer: self.get_chain_observer().await?,
378            signer_registerer: self.get_signer_registerer().await?,
379            signer_synchronizer: self.get_signer_synchronizer().await?,
380            signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
381            era_checker: self.get_era_checker().await?,
382            era_reader: self.get_era_reader().await?,
383            event_transmitter: self.get_event_transmitter().await?,
384            api_version_provider: self.get_api_version_provider().await?,
385            stake_distribution_service: self.get_stake_distribution_service().await?,
386            signer_recorder: self.get_signer_store().await?,
387            signable_builder_service: self.get_signable_builder_service().await?,
388            signed_entity_service: self.get_signed_entity_service().await?,
389            certifier_service: self.get_certifier_service().await?,
390            epoch_service: self.get_epoch_service().await?,
391            ticker_service: self.get_ticker_service().await?,
392            signed_entity_storer: self.get_signed_entity_storer().await?,
393            signer_getter: self.get_signer_store().await?,
394            message_service: self.get_message_service().await?,
395            prover_service: self.get_prover_service().await?,
396            signed_entity_type_lock: self.get_signed_entity_type_lock().await?,
397            upkeep_service: self.get_upkeep_service().await?,
398            single_signer_authenticator: self.get_single_signature_authenticator().await?,
399            metrics_service: self.get_metrics_service().await?,
400        };
401
402        Ok(dependencies_manager)
403    }
404
405    /// Create the AggregatorRunner
406    pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
407        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
408
409        let config = AggregatorConfig::new(
410            Duration::from_millis(self.configuration.run_interval()),
411            self.configuration.is_follower_aggregator(),
412        );
413        let runtime = AggregatorRuntime::new(
414            config,
415            None,
416            Arc::new(AggregatorRunner::new(dependency_container)),
417            self.root_logger(),
418        )
419        .await
420        .map_err(|e| DependenciesBuilderError::Initialization {
421            message: "Cannot initialize Aggregator runtime.".to_string(),
422            error: Some(e.into()),
423        })?;
424
425        Ok(runtime)
426    }
427
428    /// Create the HTTP route instance
429    pub async fn create_http_routes(
430        &mut self,
431    ) -> Result<impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone> {
432        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
433        let snapshot_dir = self.configuration.get_snapshot_dir()?;
434        let router_state = RouterState::new(
435            dependency_container.clone(),
436            RouterConfig {
437                network: self.configuration.get_network()?,
438                server_url: self.configuration.get_server_url()?,
439                allowed_discriminants: self
440                    .configuration
441                    .compute_allowed_signed_entity_types_discriminants()?,
442                cardano_transactions_prover_max_hashes_allowed_by_request: self
443                    .configuration
444                    .cardano_transactions_prover_max_hashes_allowed_by_request(),
445                cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
446                snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
447                cardano_node_version: self.configuration.cardano_node_version(),
448                allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
449                origin_tag_white_list: self.configuration.compute_origin_tag_white_list(),
450            },
451        );
452
453        Ok(router::routes(Arc::new(router_state)))
454    }
455
456    /// Create dependencies for genesis commands
457    pub async fn create_genesis_container(
458        &mut self,
459    ) -> Result<GenesisCommandDependenciesContainer> {
460        let network = self.configuration.get_network().with_context(|| {
461            "Dependencies Builder can not get Cardano network while building genesis container"
462        })?;
463
464        let dependencies = GenesisCommandDependenciesContainer {
465            network,
466            chain_observer: self.get_chain_observer().await?,
467            certificate_repository: self.get_certificate_repository().await?,
468            certificate_verifier: self.get_certificate_verifier().await?,
469            protocol_parameters_retriever: self.get_protocol_parameters_retriever().await?,
470            verification_key_store: self.get_verification_key_store().await?,
471        };
472
473        Ok(dependencies)
474    }
475
476    /// Create dependencies for database command
477    pub async fn create_database_command_container(
478        &mut self,
479    ) -> Result<DatabaseCommandDependenciesContainer> {
480        let main_db_connection = self
481            .get_sqlite_connection()
482            .await
483            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
484
485        self.get_event_store_sqlite_connection()
486            .await
487            .with_context(|| "Dependencies Builder can not get event store sqlite connection")?;
488
489        self.get_sqlite_connection_cardano_transaction_pool()
490            .await
491            .with_context(|| {
492                "Dependencies Builder can not get cardano transaction pool sqlite connection"
493            })?;
494
495        let dependencies = DatabaseCommandDependenciesContainer { main_db_connection };
496
497        Ok(dependencies)
498    }
499
500    /// Create dependencies for tools command
501    pub async fn create_tools_command_container(
502        &mut self,
503    ) -> Result<ToolsCommandDependenciesContainer> {
504        let db_connection = self
505            .get_sqlite_connection()
506            .await
507            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
508
509        let dependencies = ToolsCommandDependenciesContainer { db_connection };
510
511        Ok(dependencies)
512    }
513
514    /// Remove the dependencies builder from memory to release Arc instances.
515    pub async fn vanish(self) {
516        self.drop_sqlite_connections().await;
517    }
518}
519
520#[cfg(test)]
521impl DependenciesBuilder {
522    pub(crate) fn new_with_stdout_logger(configuration: Arc<dyn ConfigurationSource>) -> Self {
523        Self::new(crate::test_tools::TestLogger::stdout(), configuration)
524    }
525}