mithril_aggregator/dependency_injection/builder/
mod.rs

1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::Logger;
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9    sync::{
10        Mutex,
11        mpsc::{UnboundedReceiver, UnboundedSender},
12        watch,
13    },
14    time::Duration,
15};
16use warp::Filter;
17
18use mithril_cardano_node_chain::{
19    chain_observer::{CardanoCliRunner, ChainObserver},
20    chain_reader::ChainBlockReader,
21    chain_scanner::BlockScanner,
22};
23use mithril_cardano_node_internal_database::{
24    ImmutableFileObserver,
25    digesters::{ImmutableDigester, cache::ImmutableFileDigestCacheProvider},
26};
27use mithril_common::{
28    api_version::APIVersionProvider,
29    certificate_chain::CertificateVerifier,
30    crypto_helper::ProtocolGenesisVerifier,
31    signable_builder::{SignableBuilderService, SignableSeedBuilder, TransactionsImporter},
32};
33use mithril_era::{EraChecker, EraReader, EraReaderAdapter};
34use mithril_persistence::{
35    database::repository::CardanoTransactionRepository,
36    sqlite::{SqliteConnection, SqliteConnectionPool},
37};
38use mithril_signed_entity_lock::SignedEntityTypeLock;
39use mithril_ticker::TickerService;
40
41use super::{
42    DatabaseCommandDependenciesContainer, DependenciesBuilderError, EpochServiceWrapper,
43    GenesisCommandDependenciesContainer, Result, ToolsCommandDependenciesContainer,
44};
45use crate::{
46    AggregatorConfig, AggregatorRunner, AggregatorRuntime, ImmutableFileDigestMapper,
47    MetricsService, MithrilSignerRegistrationLeader, MultiSigner, ProtocolParametersRetriever,
48    ServeCommandDependenciesContainer, SignerRegisterer, SignerRegistrationRoundOpener,
49    SignerRegistrationVerifier, SingleSignatureAuthenticator, VerificationKeyStorer,
50    configuration::ConfigurationSource,
51    database::repository::{
52        CertificateRepository, EpochSettingsStore, OpenMessageRepository, SignedEntityStorer,
53        SignerStore, StakePoolStore,
54    },
55    event_store::{EventMessage, TransmitterService},
56    file_uploaders::FileUploader,
57    http_server::routes::router::{self, RouterConfig, RouterState},
58    services::{
59        AggregatorHTTPClient, CertificateChainSynchronizer, CertifierService, MessageService,
60        MithrilSignerRegistrationFollower, ProverService, SignedEntityService, SignerSynchronizer,
61        Snapshotter, StakeDistributionService, UpkeepService,
62    },
63    tools::file_archiver::FileArchiver,
64};
65
66/// Retrieve attribute stored in the builder.
67/// If not yet initialized, we instantiate it by calling the associated build function (build_<attribute_name>).
68/// If we don't want to to use the default build function, we can pass an expression that build the value.
69/// Usage examples:
70/// get_dependency!(self.signer_registerer)
71/// get_dependency!(self.signer_registerer = self.build_signer_registerer().await?)
72#[macro_export]
73macro_rules! get_dependency {
74    ( $self:ident.$attribute:ident ) => {{
75        paste::paste! {
76            get_dependency!($self.$attribute = $self.[<build_ $attribute>]().await?)
77        }
78    }};
79    ( $self:ident.$attribute:ident = $builder:expr ) => {{
80        paste::paste! {
81            if $self.$attribute.is_none() {
82                $self.$attribute = Some($builder);
83            }
84
85            let r:Result<_> = Ok($self.$attribute.as_ref().cloned().unwrap());
86            r
87        }
88    }};
89}
90
91const SQLITE_FILE: &str = "aggregator.sqlite3";
92const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
93const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
94const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
95const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
96/// Maximum offset that can be applied to the latest epoch when fetching artifacts.
97const MAX_ARTIFACT_EPOCH_OFFSET: u64 = 5;
98
99/// ## Dependencies container builder
100///
101/// This is meant to create SHARED DEPENDENCIES, ie: dependencies instances that
102/// must be shared amongst several Tokio tasks. For example, database
103/// repositories are NOT shared dependencies and therefor can be created ad hoc
104/// whereas the database connection is a shared dependency.
105///
106/// Each shared dependency must implement a `build` and a `get` function. The
107/// build function creates the dependency, the get function creates the
108/// dependency at first call then return a clone of the Arc containing the
109/// dependency for all further calls.
110pub struct DependenciesBuilder {
111    /// Configuration parameters
112    pub configuration: Arc<dyn ConfigurationSource>,
113
114    /// Application root logger
115    pub root_logger: Logger,
116
117    /// SQLite database connection
118    pub sqlite_connection: Option<Arc<SqliteConnection>>,
119
120    /// Event store SQLite database connection
121    pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
122
123    /// Cardano transactions SQLite database connection pool
124    pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
125
126    /// Stake Store used by the StakeDistributionService
127    /// It shall be a private dependency.
128    pub stake_store: Option<Arc<StakePoolStore>>,
129
130    /// Snapshot uploader service.
131    pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
132
133    /// Multisigner service.
134    pub multi_signer: Option<Arc<dyn MultiSigner>>,
135
136    /// Certificate repository.
137    pub certificate_repository: Option<Arc<CertificateRepository>>,
138
139    /// Open message repository.
140    pub open_message_repository: Option<Arc<OpenMessageRepository>>,
141
142    /// Verification key store.
143    pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
144
145    /// Epoch settings store.
146    pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
147
148    /// Cardano CLI Runner for the [ChainObserver]
149    pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
150
151    /// Chain observer service.
152    pub chain_observer: Option<Arc<dyn ChainObserver>>,
153
154    /// Chain block reader
155    pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
156
157    /// Cardano transactions repository.
158    pub transaction_repository: Option<Arc<CardanoTransactionRepository>>,
159
160    /// Cardano block scanner.
161    pub block_scanner: Option<Arc<dyn BlockScanner>>,
162
163    /// Immutable file digester service.
164    pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
165
166    /// Immutable file observer service.
167    pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
168
169    /// Immutable cache provider service.
170    pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
171
172    /// Immutable file digest mapper service.
173    pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
174
175    /// Digester service.
176    pub digester: Option<Arc<dyn ImmutableDigester>>,
177
178    /// File archiver service.
179    pub file_archiver: Option<Arc<FileArchiver>>,
180
181    /// Snapshotter service.
182    pub snapshotter: Option<Arc<dyn Snapshotter>>,
183
184    /// Certificate verifier service.
185    pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
186
187    /// Genesis signature verifier service.
188    pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
189
190    /// Certificate chain synchronizer service
191    pub certificate_chain_synchronizer: Option<Arc<dyn CertificateChainSynchronizer>>,
192
193    /// Mithril signer registration leader service
194    pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
195
196    /// Mithril signer registration follower service
197    pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
198
199    /// Signer registerer service
200    pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
201
202    /// Signer synchronizer service
203    pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
204
205    /// Signer registration verifier
206    pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
207
208    /// Signer registration round opener service
209    pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
210
211    /// Era checker service
212    pub era_checker: Option<Arc<EraChecker>>,
213
214    /// Adapter for [EraReader]
215    pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
216
217    /// Era reader service
218    pub era_reader: Option<Arc<EraReader>>,
219
220    /// Event Transmitter Service
221    pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
222
223    /// Event transmitter Channel Sender endpoint
224    pub event_transmitter_channel: (
225        Option<UnboundedReceiver<EventMessage>>,
226        Option<UnboundedSender<EventMessage>>,
227    ),
228
229    /// API Version provider
230    pub api_version_provider: Option<Arc<APIVersionProvider>>,
231
232    /// Stake Distribution Service
233    pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
234
235    /// Ticker Service
236    pub ticker_service: Option<Arc<dyn TickerService>>,
237
238    /// Signer Store
239    pub signer_store: Option<Arc<SignerStore>>,
240
241    /// Signable Seed Builder
242    pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
243
244    /// Signable Builder Service
245    pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
246
247    /// Signed Entity Service
248    pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
249
250    /// Certifier service
251    pub certifier_service: Option<Arc<dyn CertifierService>>,
252
253    /// Epoch service.
254    pub epoch_service: Option<EpochServiceWrapper>,
255
256    /// Signed Entity storer
257    pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
258
259    /// HTTP Message service
260    pub message_service: Option<Arc<dyn MessageService>>,
261
262    /// Prover service
263    pub prover_service: Option<Arc<dyn ProverService>>,
264
265    /// Signed Entity Type Lock
266    pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
267
268    /// Transactions Importer
269    pub transactions_importer: Option<Arc<dyn TransactionsImporter>>,
270
271    /// Upkeep service
272    pub upkeep_service: Option<Arc<dyn UpkeepService>>,
273
274    /// Single signer authenticator
275    pub single_signature_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
276
277    /// Metrics service
278    pub metrics_service: Option<Arc<MetricsService>>,
279
280    /// Leader aggregator client
281    pub leader_aggregator_client: Option<Arc<AggregatorHTTPClient>>,
282
283    /// Protocol parameters retriever
284    pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
285
286    /// Stop signal channel
287    pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
288}
289
290impl DependenciesBuilder {
291    /// Create a new clean dependency builder
292    pub fn new(root_logger: Logger, configuration: Arc<dyn ConfigurationSource>) -> Self {
293        Self {
294            configuration,
295            root_logger,
296            sqlite_connection: None,
297            sqlite_connection_event_store: None,
298            sqlite_connection_cardano_transaction_pool: None,
299            stake_store: None,
300            snapshot_uploader: None,
301            multi_signer: None,
302            certificate_repository: None,
303            open_message_repository: None,
304            verification_key_store: None,
305            epoch_settings_store: None,
306            cardano_cli_runner: None,
307            chain_observer: None,
308            chain_block_reader: None,
309            block_scanner: None,
310            transaction_repository: None,
311            immutable_digester: None,
312            immutable_file_observer: None,
313            immutable_cache_provider: None,
314            immutable_file_digest_mapper: None,
315            digester: None,
316            file_archiver: None,
317            snapshotter: None,
318            certificate_verifier: None,
319            genesis_verifier: None,
320            certificate_chain_synchronizer: None,
321            mithril_signer_registration_leader: None,
322            mithril_signer_registration_follower: None,
323            signer_registerer: None,
324            signer_synchronizer: None,
325            signer_registration_verifier: None,
326            signer_registration_round_opener: None,
327            era_reader_adapter: None,
328            era_checker: None,
329            era_reader: None,
330            event_transmitter: None,
331            event_transmitter_channel: (None, None),
332            api_version_provider: None,
333            stake_distribution_service: None,
334            ticker_service: None,
335            signer_store: None,
336            signable_seed_builder: None,
337            signable_builder_service: None,
338            signed_entity_service: None,
339            certifier_service: None,
340            epoch_service: None,
341            signed_entity_storer: None,
342            message_service: None,
343            prover_service: None,
344            signed_entity_type_lock: None,
345            transactions_importer: None,
346            upkeep_service: None,
347            single_signature_authenticator: None,
348            metrics_service: None,
349            leader_aggregator_client: None,
350            protocol_parameters_retriever: None,
351            stop_signal_channel: None,
352        }
353    }
354
355    fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
356        let cardano_db_artifacts_dir =
357            self.configuration.get_snapshot_dir()?.join(CARDANO_DB_ARTIFACTS_DIR);
358
359        if !cardano_db_artifacts_dir.exists() {
360            std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
361                DependenciesBuilderError::Initialization {
362                    message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
363                    error: Some(e.into()),
364                }
365            })?;
366        }
367
368        Ok(cardano_db_artifacts_dir)
369    }
370
371    /// Return an unconfigured [ServeCommandDependenciesContainer]
372    pub async fn build_serve_dependencies_container(
373        &mut self,
374    ) -> Result<ServeCommandDependenciesContainer> {
375        #[allow(deprecated)]
376        let dependencies_manager = ServeCommandDependenciesContainer {
377            root_logger: self.root_logger(),
378            stake_store: self.get_stake_store().await?,
379            certificate_repository: self.get_certificate_repository().await?,
380            verification_key_store: self.get_verification_key_store().await?,
381            epoch_settings_storer: self.get_epoch_settings_store().await?,
382            chain_observer: self.get_chain_observer().await?,
383            certificate_chain_synchronizer: self.get_certificate_chain_synchronizer().await?,
384            signer_registerer: self.get_signer_registerer().await?,
385            signer_synchronizer: self.get_signer_synchronizer().await?,
386            signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
387            era_checker: self.get_era_checker().await?,
388            era_reader: self.get_era_reader().await?,
389            event_transmitter: self.get_event_transmitter().await?,
390            api_version_provider: self.get_api_version_provider().await?,
391            stake_distribution_service: self.get_stake_distribution_service().await?,
392            signer_recorder: self.get_signer_store().await?,
393            signable_builder_service: self.get_signable_builder_service().await?,
394            signed_entity_service: self.get_signed_entity_service().await?,
395            certifier_service: self.get_certifier_service().await?,
396            epoch_service: self.get_epoch_service().await?,
397            ticker_service: self.get_ticker_service().await?,
398            signed_entity_storer: self.get_signed_entity_storer().await?,
399            signer_getter: self.get_signer_store().await?,
400            message_service: self.get_message_service().await?,
401            prover_service: self.get_prover_service().await?,
402            signed_entity_type_lock: self.get_signed_entity_type_lock().await?,
403            upkeep_service: self.get_upkeep_service().await?,
404            single_signer_authenticator: self.get_single_signature_authenticator().await?,
405            metrics_service: self.get_metrics_service().await?,
406        };
407
408        Ok(dependencies_manager)
409    }
410
411    /// Create the AggregatorRunner
412    pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
413        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
414
415        let config = AggregatorConfig::new(
416            Duration::from_millis(self.configuration.run_interval()),
417            self.configuration.is_follower_aggregator(),
418        );
419        let runtime = AggregatorRuntime::new(
420            config,
421            None,
422            Arc::new(AggregatorRunner::new(dependency_container)),
423            self.root_logger(),
424        )
425        .await
426        .map_err(|e| DependenciesBuilderError::Initialization {
427            message: "Cannot initialize Aggregator runtime.".to_string(),
428            error: Some(e.into()),
429        })?;
430
431        Ok(runtime)
432    }
433
434    /// Create the HTTP route instance
435    pub async fn create_http_routes(
436        &mut self,
437    ) -> Result<
438        impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<>,
439    > {
440        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
441        let snapshot_dir = self.configuration.get_snapshot_dir()?;
442        let router_state = RouterState::new(
443            dependency_container.clone(),
444            RouterConfig {
445                network: self.configuration.get_network()?,
446                server_url: self.configuration.get_server_url()?,
447                allowed_discriminants: self
448                    .configuration
449                    .compute_allowed_signed_entity_types_discriminants()?,
450                cardano_transactions_prover_max_hashes_allowed_by_request: self
451                    .configuration
452                    .cardano_transactions_prover_max_hashes_allowed_by_request(),
453                cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
454                max_artifact_epoch_offset: MAX_ARTIFACT_EPOCH_OFFSET,
455                snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
456                cardano_node_version: self.configuration.cardano_node_version(),
457                allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
458                origin_tag_white_list: self.configuration.compute_origin_tag_white_list(),
459                aggregate_signature_type: self.configuration.aggregate_signature_type(),
460            },
461        );
462
463        Ok(router::routes(Arc::new(router_state)))
464    }
465
466    /// Create dependencies for genesis commands
467    pub async fn create_genesis_container(
468        &mut self,
469    ) -> Result<GenesisCommandDependenciesContainer> {
470        let network = self.configuration.get_network().with_context(
471            || "Dependencies Builder can not get Cardano network while building genesis container",
472        )?;
473
474        let dependencies = GenesisCommandDependenciesContainer {
475            network,
476            chain_observer: self.get_chain_observer().await?,
477            certificate_repository: self.get_certificate_repository().await?,
478            certificate_verifier: self.get_certificate_verifier().await?,
479            protocol_parameters_retriever: self.get_protocol_parameters_retriever().await?,
480            verification_key_store: self.get_verification_key_store().await?,
481        };
482
483        Ok(dependencies)
484    }
485
486    /// Create dependencies for database command
487    pub async fn create_database_command_container(
488        &mut self,
489    ) -> Result<DatabaseCommandDependenciesContainer> {
490        let main_db_connection = self
491            .get_sqlite_connection()
492            .await
493            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
494
495        self.get_event_store_sqlite_connection()
496            .await
497            .with_context(|| "Dependencies Builder can not get event store sqlite connection")?;
498
499        self.get_sqlite_connection_cardano_transaction_pool()
500            .await
501            .with_context(
502                || "Dependencies Builder can not get cardano transaction pool sqlite connection",
503            )?;
504
505        let dependencies = DatabaseCommandDependenciesContainer { main_db_connection };
506
507        Ok(dependencies)
508    }
509
510    /// Create dependencies for tools command
511    pub async fn create_tools_command_container(
512        &mut self,
513    ) -> Result<ToolsCommandDependenciesContainer> {
514        let db_connection = self
515            .get_sqlite_connection()
516            .await
517            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
518
519        let dependencies = ToolsCommandDependenciesContainer { db_connection };
520
521        Ok(dependencies)
522    }
523
524    /// Remove the dependencies builder from memory to release Arc instances.
525    pub async fn vanish(self) {
526        self.drop_sqlite_connections().await;
527    }
528}
529
530#[cfg(test)]
531impl DependenciesBuilder {
532    pub(crate) fn new_with_stdout_logger(configuration: Arc<dyn ConfigurationSource>) -> Self {
533        Self::new(crate::test::TestLogger::stdout(), configuration)
534    }
535}