mithril_aggregator/dependency_injection/builder/
mod.rs

1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::Logger;
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9    sync::{
10        mpsc::{UnboundedReceiver, UnboundedSender},
11        Mutex,
12    },
13    time::Duration,
14};
15use warp::Filter;
16
17use mithril_common::{
18    api_version::APIVersionProvider,
19    cardano_block_scanner::BlockScanner,
20    certificate_chain::CertificateVerifier,
21    chain_observer::{CardanoCliRunner, ChainObserver},
22    chain_reader::ChainBlockReader,
23    crypto_helper::ProtocolGenesisVerifier,
24    digesters::{
25        cache::ImmutableFileDigestCacheProvider, ImmutableDigester, ImmutableFileObserver,
26    },
27    era::{EraChecker, EraReader, EraReaderAdapter},
28    signable_builder::{SignableBuilderService, SignableSeedBuilder, TransactionsImporter},
29    TickerService,
30};
31use mithril_persistence::{
32    database::repository::CardanoTransactionRepository,
33    sqlite::{SqliteConnection, SqliteConnectionPool},
34};
35use mithril_signed_entity_lock::SignedEntityTypeLock;
36
37use super::{
38    DatabaseCommandDependenciesContainer, DependenciesBuilderError, EpochServiceWrapper,
39    GenesisCommandDependenciesContainer, Result, ToolsCommandDependenciesContainer,
40};
41use crate::{
42    configuration::ConfigurationSource,
43    database::repository::{
44        CertificateRepository, EpochSettingsStore, OpenMessageRepository, SignedEntityStorer,
45        SignerStore, StakePoolStore,
46    },
47    event_store::{EventMessage, TransmitterService},
48    file_uploaders::FileUploader,
49    http_server::routes::router::{self, RouterConfig, RouterState},
50    services::{
51        AggregatorClient, CertifierService, MessageService, MithrilSignerRegistrationFollower,
52        ProverService, SignedEntityService, SignerSynchronizer, Snapshotter,
53        StakeDistributionService, UpkeepService,
54    },
55    tools::file_archiver::FileArchiver,
56    AggregatorConfig, AggregatorRunner, AggregatorRuntime, ImmutableFileDigestMapper,
57    MetricsService, MithrilSignerRegistrationLeader, MultiSigner, ProtocolParametersRetriever,
58    ServeCommandDependenciesContainer, SignerRegisterer, SignerRegistrationRoundOpener,
59    SignerRegistrationVerifier, SingleSignatureAuthenticator, VerificationKeyStorer,
60};
61
62/// Retrieve attribute stored in the builder.
63/// If not yet initialized, we instantiate it by calling the associated build function (build_<attribute_name>).
64/// If we don't want to to use the default build function, we can pass an expression that build the value.
65/// Usage examples:
66/// get_dependency!(self.signer_registerer)
67/// get_dependency!(self.signer_registerer = self.build_signer_registerer().await?)
68#[macro_export]
69macro_rules! get_dependency {
70    ( $self:ident.$attribute:ident ) => {{
71        paste::paste! {
72            get_dependency!($self.$attribute = $self.[<build_ $attribute>]().await?)
73        }
74    }};
75    ( $self:ident.$attribute:ident = $builder:expr ) => {{
76        paste::paste! {
77            if $self.$attribute.is_none() {
78                $self.$attribute = Some($builder);
79            }
80
81            let r:Result<_> = Ok($self.$attribute.as_ref().cloned().unwrap());
82            r
83        }
84    }};
85}
86
87const SQLITE_FILE: &str = "aggregator.sqlite3";
88const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
89const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
90const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
91const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
92
93/// ## Dependencies container builder
94///
95/// This is meant to create SHARED DEPENDENCIES, ie: dependencies instances that
96/// must be shared amongst several Tokio tasks. For example, database
97/// repositories are NOT shared dependencies and therefor can be created ad hoc
98/// whereas the database connection is a shared dependency.
99///
100/// Each shared dependency must implement a `build` and a `get` function. The
101/// build function creates the dependency, the get function creates the
102/// dependency at first call then return a clone of the Arc containing the
103/// dependency for all further calls.
104pub struct DependenciesBuilder {
105    /// Configuration parameters
106    pub configuration: Arc<dyn ConfigurationSource>,
107
108    /// Application root logger
109    pub root_logger: Logger,
110
111    /// SQLite database connection
112    pub sqlite_connection: Option<Arc<SqliteConnection>>,
113
114    /// Event store SQLite database connection
115    pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
116
117    /// Cardano transactions SQLite database connection pool
118    pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
119
120    /// Stake Store used by the StakeDistributionService
121    /// It shall be a private dependency.
122    pub stake_store: Option<Arc<StakePoolStore>>,
123
124    /// Snapshot uploader service.
125    pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
126
127    /// Multisigner service.
128    pub multi_signer: Option<Arc<dyn MultiSigner>>,
129
130    /// Certificate repository.
131    pub certificate_repository: Option<Arc<CertificateRepository>>,
132
133    /// Open message repository.
134    pub open_message_repository: Option<Arc<OpenMessageRepository>>,
135
136    /// Verification key store.
137    pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
138
139    /// Epoch settings store.
140    pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
141
142    /// Cardano CLI Runner for the [ChainObserver]
143    pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
144
145    /// Chain observer service.
146    pub chain_observer: Option<Arc<dyn ChainObserver>>,
147
148    /// Chain block reader
149    pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
150
151    /// Cardano transactions repository.
152    pub transaction_repository: Option<Arc<CardanoTransactionRepository>>,
153
154    /// Cardano block scanner.
155    pub block_scanner: Option<Arc<dyn BlockScanner>>,
156
157    /// Immutable file digester service.
158    pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
159
160    /// Immutable file observer service.
161    pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
162
163    /// Immutable cache provider service.
164    pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
165
166    /// Immutable file digest mapper service.
167    pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
168
169    /// Digester service.
170    pub digester: Option<Arc<dyn ImmutableDigester>>,
171
172    /// File archiver service.
173    pub file_archiver: Option<Arc<FileArchiver>>,
174
175    /// Snapshotter service.
176    pub snapshotter: Option<Arc<dyn Snapshotter>>,
177
178    /// Certificate verifier service.
179    pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
180
181    /// Genesis signature verifier service.
182    pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
183
184    /// Mithril signer registration leader service
185    pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
186
187    /// Mithril signer registration follower service
188    pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
189
190    /// Signer registerer service
191    pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
192
193    /// Signer synchronizer service
194    pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
195
196    /// Signer registration verifier
197    pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
198
199    /// Signer registration round opener service
200    pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
201
202    /// Era checker service
203    pub era_checker: Option<Arc<EraChecker>>,
204
205    /// Adapter for [EraReader]
206    pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
207
208    /// Era reader service
209    pub era_reader: Option<Arc<EraReader>>,
210
211    /// Event Transmitter Service
212    pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
213
214    /// Event transmitter Channel Sender endpoint
215    pub event_transmitter_channel: (
216        Option<UnboundedReceiver<EventMessage>>,
217        Option<UnboundedSender<EventMessage>>,
218    ),
219
220    /// API Version provider
221    pub api_version_provider: Option<Arc<APIVersionProvider>>,
222
223    /// Stake Distribution Service
224    pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
225
226    /// Ticker Service
227    pub ticker_service: Option<Arc<dyn TickerService>>,
228
229    /// Signer Store
230    pub signer_store: Option<Arc<SignerStore>>,
231
232    /// Signable Seed Builder
233    pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
234
235    /// Signable Builder Service
236    pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
237
238    /// Signed Entity Service
239    pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
240
241    /// Certifier service
242    pub certifier_service: Option<Arc<dyn CertifierService>>,
243
244    /// Epoch service.
245    pub epoch_service: Option<EpochServiceWrapper>,
246
247    /// Signed Entity storer
248    pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
249
250    /// HTTP Message service
251    pub message_service: Option<Arc<dyn MessageService>>,
252
253    /// Prover service
254    pub prover_service: Option<Arc<dyn ProverService>>,
255
256    /// Signed Entity Type Lock
257    pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
258
259    /// Transactions Importer
260    pub transactions_importer: Option<Arc<dyn TransactionsImporter>>,
261
262    /// Upkeep service
263    pub upkeep_service: Option<Arc<dyn UpkeepService>>,
264
265    /// Single signer authenticator
266    pub single_signature_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
267
268    /// Metrics service
269    pub metrics_service: Option<Arc<MetricsService>>,
270
271    /// Leader aggregator client
272    pub leader_aggregator_client: Option<Arc<dyn AggregatorClient>>,
273
274    /// Protocol parameters retriever
275    pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
276}
277
278impl DependenciesBuilder {
279    /// Create a new clean dependency builder
280    pub fn new(root_logger: Logger, configuration: Arc<dyn ConfigurationSource>) -> Self {
281        Self {
282            configuration,
283            root_logger,
284            sqlite_connection: None,
285            sqlite_connection_event_store: None,
286            sqlite_connection_cardano_transaction_pool: None,
287            stake_store: None,
288            snapshot_uploader: None,
289            multi_signer: None,
290            certificate_repository: None,
291            open_message_repository: None,
292            verification_key_store: None,
293            epoch_settings_store: None,
294            cardano_cli_runner: None,
295            chain_observer: None,
296            chain_block_reader: None,
297            block_scanner: None,
298            transaction_repository: None,
299            immutable_digester: None,
300            immutable_file_observer: None,
301            immutable_cache_provider: None,
302            immutable_file_digest_mapper: None,
303            digester: None,
304            file_archiver: None,
305            snapshotter: None,
306            certificate_verifier: None,
307            genesis_verifier: None,
308            mithril_signer_registration_leader: None,
309            mithril_signer_registration_follower: None,
310            signer_registerer: None,
311            signer_synchronizer: None,
312            signer_registration_verifier: None,
313            signer_registration_round_opener: None,
314            era_reader_adapter: None,
315            era_checker: None,
316            era_reader: None,
317            event_transmitter: None,
318            event_transmitter_channel: (None, None),
319            api_version_provider: None,
320            stake_distribution_service: None,
321            ticker_service: None,
322            signer_store: None,
323            signable_seed_builder: None,
324            signable_builder_service: None,
325            signed_entity_service: None,
326            certifier_service: None,
327            epoch_service: None,
328            signed_entity_storer: None,
329            message_service: None,
330            prover_service: None,
331            signed_entity_type_lock: None,
332            transactions_importer: None,
333            upkeep_service: None,
334            single_signature_authenticator: None,
335            metrics_service: None,
336            leader_aggregator_client: None,
337            protocol_parameters_retriever: None,
338        }
339    }
340
341    fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
342        let cardano_db_artifacts_dir = self
343            .configuration
344            .get_snapshot_dir()?
345            .join(CARDANO_DB_ARTIFACTS_DIR);
346
347        if !cardano_db_artifacts_dir.exists() {
348            std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
349                DependenciesBuilderError::Initialization {
350                    message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
351                    error: Some(e.into()),
352                }
353            })?;
354        }
355
356        Ok(cardano_db_artifacts_dir)
357    }
358
359    /// Return an unconfigured [ServeCommandDependenciesContainer]
360    pub async fn build_serve_dependencies_container(
361        &mut self,
362    ) -> Result<ServeCommandDependenciesContainer> {
363        #[allow(deprecated)]
364        let dependencies_manager = ServeCommandDependenciesContainer {
365            root_logger: self.root_logger(),
366            stake_store: self.get_stake_store().await?,
367            certificate_repository: self.get_certificate_repository().await?,
368            verification_key_store: self.get_verification_key_store().await?,
369            epoch_settings_storer: self.get_epoch_settings_store().await?,
370            chain_observer: self.get_chain_observer().await?,
371            signer_registerer: self.get_signer_registerer().await?,
372            signer_synchronizer: self.get_signer_synchronizer().await?,
373            signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
374            era_checker: self.get_era_checker().await?,
375            era_reader: self.get_era_reader().await?,
376            event_transmitter: self.get_event_transmitter().await?,
377            api_version_provider: self.get_api_version_provider().await?,
378            stake_distribution_service: self.get_stake_distribution_service().await?,
379            signer_recorder: self.get_signer_store().await?,
380            signable_builder_service: self.get_signable_builder_service().await?,
381            signed_entity_service: self.get_signed_entity_service().await?,
382            certifier_service: self.get_certifier_service().await?,
383            epoch_service: self.get_epoch_service().await?,
384            ticker_service: self.get_ticker_service().await?,
385            signed_entity_storer: self.get_signed_entity_storer().await?,
386            signer_getter: self.get_signer_store().await?,
387            message_service: self.get_message_service().await?,
388            prover_service: self.get_prover_service().await?,
389            signed_entity_type_lock: self.get_signed_entity_type_lock().await?,
390            upkeep_service: self.get_upkeep_service().await?,
391            single_signer_authenticator: self.get_single_signature_authenticator().await?,
392            metrics_service: self.get_metrics_service().await?,
393        };
394
395        Ok(dependencies_manager)
396    }
397
398    /// Create the AggregatorRunner
399    pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
400        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
401
402        let config = AggregatorConfig::new(
403            Duration::from_millis(self.configuration.run_interval()),
404            self.configuration.is_follower_aggregator(),
405        );
406        let runtime = AggregatorRuntime::new(
407            config,
408            None,
409            Arc::new(AggregatorRunner::new(dependency_container)),
410            self.root_logger(),
411        )
412        .await
413        .map_err(|e| DependenciesBuilderError::Initialization {
414            message: "Cannot initialize Aggregator runtime.".to_string(),
415            error: Some(e.into()),
416        })?;
417
418        Ok(runtime)
419    }
420
421    /// Create the HTTP route instance
422    pub async fn create_http_routes(
423        &mut self,
424    ) -> Result<impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone> {
425        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
426        let snapshot_dir = self.configuration.get_snapshot_dir()?;
427        let router_state = RouterState::new(
428            dependency_container.clone(),
429            RouterConfig {
430                network: self.configuration.get_network()?,
431                server_url: self.configuration.get_server_url()?,
432                allowed_discriminants: self
433                    .configuration
434                    .compute_allowed_signed_entity_types_discriminants()?,
435                cardano_transactions_prover_max_hashes_allowed_by_request: self
436                    .configuration
437                    .cardano_transactions_prover_max_hashes_allowed_by_request(),
438                cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
439                snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
440                cardano_node_version: self.configuration.cardano_node_version(),
441                allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
442                origin_tag_white_list: self.configuration.compute_origin_tag_white_list(),
443            },
444        );
445
446        Ok(router::routes(Arc::new(router_state)))
447    }
448
449    /// Create dependencies for genesis commands
450    pub async fn create_genesis_container(
451        &mut self,
452    ) -> Result<GenesisCommandDependenciesContainer> {
453        let network = self.configuration.get_network().with_context(|| {
454            "Dependencies Builder can not get Cardano network while building genesis container"
455        })?;
456
457        let dependencies = GenesisCommandDependenciesContainer {
458            network,
459            chain_observer: self.get_chain_observer().await?,
460            certificate_repository: self.get_certificate_repository().await?,
461            certificate_verifier: self.get_certificate_verifier().await?,
462            protocol_parameters_retriever: self.get_protocol_parameters_retriever().await?,
463            verification_key_store: self.get_verification_key_store().await?,
464        };
465
466        Ok(dependencies)
467    }
468
469    /// Create dependencies for database command
470    pub async fn create_database_command_container(
471        &mut self,
472    ) -> Result<DatabaseCommandDependenciesContainer> {
473        let main_db_connection = self
474            .get_sqlite_connection()
475            .await
476            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
477
478        self.get_event_store_sqlite_connection()
479            .await
480            .with_context(|| "Dependencies Builder can not get event store sqlite connection")?;
481
482        self.get_sqlite_connection_cardano_transaction_pool()
483            .await
484            .with_context(|| {
485                "Dependencies Builder can not get cardano transaction pool sqlite connection"
486            })?;
487
488        let dependencies = DatabaseCommandDependenciesContainer { main_db_connection };
489
490        Ok(dependencies)
491    }
492
493    /// Create dependencies for tools command
494    pub async fn create_tools_command_container(
495        &mut self,
496    ) -> Result<ToolsCommandDependenciesContainer> {
497        let db_connection = self
498            .get_sqlite_connection()
499            .await
500            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
501
502        let dependencies = ToolsCommandDependenciesContainer { db_connection };
503
504        Ok(dependencies)
505    }
506
507    /// Remove the dependencies builder from memory to release Arc instances.
508    pub async fn vanish(self) {
509        self.drop_sqlite_connections().await;
510    }
511}
512
513#[cfg(test)]
514impl DependenciesBuilder {
515    pub(crate) fn new_with_stdout_logger(configuration: Arc<dyn ConfigurationSource>) -> Self {
516        Self::new(crate::test_tools::TestLogger::stdout(), configuration)
517    }
518}