mithril_aggregator/dependency_injection/builder/
mod.rs

1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::Logger;
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9    sync::{
10        Mutex,
11        mpsc::{UnboundedReceiver, UnboundedSender},
12        watch,
13    },
14    time::Duration,
15};
16use warp::Filter;
17
18use mithril_cardano_node_chain::{
19    chain_observer::{CardanoCliRunner, ChainObserver},
20    chain_reader::ChainBlockReader,
21    chain_scanner::BlockScanner,
22};
23use mithril_cardano_node_internal_database::{
24    ImmutableFileObserver,
25    digesters::{ImmutableDigester, cache::ImmutableFileDigestCacheProvider},
26};
27use mithril_common::{
28    api_version::APIVersionProvider,
29    certificate_chain::CertificateVerifier,
30    crypto_helper::ProtocolGenesisVerifier,
31    signable_builder::{SignableBuilderService, SignableSeedBuilder, TransactionsImporter},
32};
33use mithril_era::{EraChecker, EraReader, EraReaderAdapter};
34use mithril_persistence::{
35    database::repository::CardanoTransactionRepository,
36    sqlite::{SqliteConnection, SqliteConnectionPool},
37};
38use mithril_signed_entity_lock::SignedEntityTypeLock;
39use mithril_ticker::TickerService;
40
41use super::{
42    DatabaseCommandDependenciesContainer, DependenciesBuilderError, EpochServiceWrapper,
43    GenesisCommandDependenciesContainer, Result, ToolsCommandDependenciesContainer,
44};
45use crate::{
46    AggregatorConfig, AggregatorRunner, AggregatorRuntime, ImmutableFileDigestMapper,
47    MetricsService, MithrilSignerRegistrationLeader, MultiSigner, ProtocolParametersRetriever,
48    ServeCommandDependenciesContainer, SignerRegisterer, SignerRegistrationRoundOpener,
49    SignerRegistrationVerifier, SingleSignatureAuthenticator, VerificationKeyStorer,
50    configuration::ConfigurationSource,
51    database::repository::{
52        CertificateRepository, EpochSettingsStore, OpenMessageRepository, SignedEntityStorer,
53        SignerStore, StakePoolStore,
54    },
55    event_store::{EventMessage, TransmitterService},
56    file_uploaders::FileUploader,
57    http_server::routes::router::{self, RouterConfig, RouterState},
58    services::{
59        AggregatorHTTPClient, CertificateChainSynchronizer, CertifierService, MessageService,
60        MithrilSignerRegistrationFollower, ProverService, SignedEntityService, SignerSynchronizer,
61        Snapshotter, StakeDistributionService, UpkeepService,
62    },
63    tools::file_archiver::FileArchiver,
64};
65
66/// Retrieve attribute stored in the builder.
67/// If not yet initialized, we instantiate it by calling the associated build function (build_<attribute_name>).
68/// If we don't want to to use the default build function, we can pass an expression that build the value.
69/// Usage examples:
70/// get_dependency!(self.signer_registerer)
71/// get_dependency!(self.signer_registerer = self.build_signer_registerer().await?)
72#[macro_export]
73macro_rules! get_dependency {
74    ( $self:ident.$attribute:ident ) => {{
75        paste::paste! {
76            get_dependency!($self.$attribute = $self.[<build_ $attribute>]().await?)
77        }
78    }};
79    ( $self:ident.$attribute:ident = $builder:expr ) => {{
80        paste::paste! {
81            if $self.$attribute.is_none() {
82                $self.$attribute = Some($builder);
83            }
84
85            let r:Result<_> = Ok($self.$attribute.as_ref().cloned().unwrap());
86            r
87        }
88    }};
89}
90
91const SQLITE_FILE: &str = "aggregator.sqlite3";
92const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
93const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
94const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
95const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
96
97/// ## Dependencies container builder
98///
99/// This is meant to create SHARED DEPENDENCIES, ie: dependencies instances that
100/// must be shared amongst several Tokio tasks. For example, database
101/// repositories are NOT shared dependencies and therefor can be created ad hoc
102/// whereas the database connection is a shared dependency.
103///
104/// Each shared dependency must implement a `build` and a `get` function. The
105/// build function creates the dependency, the get function creates the
106/// dependency at first call then return a clone of the Arc containing the
107/// dependency for all further calls.
108pub struct DependenciesBuilder {
109    /// Configuration parameters
110    pub configuration: Arc<dyn ConfigurationSource>,
111
112    /// Application root logger
113    pub root_logger: Logger,
114
115    /// SQLite database connection
116    pub sqlite_connection: Option<Arc<SqliteConnection>>,
117
118    /// Event store SQLite database connection
119    pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
120
121    /// Cardano transactions SQLite database connection pool
122    pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
123
124    /// Stake Store used by the StakeDistributionService
125    /// It shall be a private dependency.
126    pub stake_store: Option<Arc<StakePoolStore>>,
127
128    /// Snapshot uploader service.
129    pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
130
131    /// Multisigner service.
132    pub multi_signer: Option<Arc<dyn MultiSigner>>,
133
134    /// Certificate repository.
135    pub certificate_repository: Option<Arc<CertificateRepository>>,
136
137    /// Open message repository.
138    pub open_message_repository: Option<Arc<OpenMessageRepository>>,
139
140    /// Verification key store.
141    pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
142
143    /// Epoch settings store.
144    pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
145
146    /// Cardano CLI Runner for the [ChainObserver]
147    pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
148
149    /// Chain observer service.
150    pub chain_observer: Option<Arc<dyn ChainObserver>>,
151
152    /// Chain block reader
153    pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
154
155    /// Cardano transactions repository.
156    pub transaction_repository: Option<Arc<CardanoTransactionRepository>>,
157
158    /// Cardano block scanner.
159    pub block_scanner: Option<Arc<dyn BlockScanner>>,
160
161    /// Immutable file digester service.
162    pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
163
164    /// Immutable file observer service.
165    pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
166
167    /// Immutable cache provider service.
168    pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
169
170    /// Immutable file digest mapper service.
171    pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
172
173    /// Digester service.
174    pub digester: Option<Arc<dyn ImmutableDigester>>,
175
176    /// File archiver service.
177    pub file_archiver: Option<Arc<FileArchiver>>,
178
179    /// Snapshotter service.
180    pub snapshotter: Option<Arc<dyn Snapshotter>>,
181
182    /// Certificate verifier service.
183    pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
184
185    /// Genesis signature verifier service.
186    pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
187
188    /// Certificate chain synchronizer service
189    pub certificate_chain_synchronizer: Option<Arc<dyn CertificateChainSynchronizer>>,
190
191    /// Mithril signer registration leader service
192    pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
193
194    /// Mithril signer registration follower service
195    pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
196
197    /// Signer registerer service
198    pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
199
200    /// Signer synchronizer service
201    pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
202
203    /// Signer registration verifier
204    pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
205
206    /// Signer registration round opener service
207    pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
208
209    /// Era checker service
210    pub era_checker: Option<Arc<EraChecker>>,
211
212    /// Adapter for [EraReader]
213    pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
214
215    /// Era reader service
216    pub era_reader: Option<Arc<EraReader>>,
217
218    /// Event Transmitter Service
219    pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
220
221    /// Event transmitter Channel Sender endpoint
222    pub event_transmitter_channel: (
223        Option<UnboundedReceiver<EventMessage>>,
224        Option<UnboundedSender<EventMessage>>,
225    ),
226
227    /// API Version provider
228    pub api_version_provider: Option<Arc<APIVersionProvider>>,
229
230    /// Stake Distribution Service
231    pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
232
233    /// Ticker Service
234    pub ticker_service: Option<Arc<dyn TickerService>>,
235
236    /// Signer Store
237    pub signer_store: Option<Arc<SignerStore>>,
238
239    /// Signable Seed Builder
240    pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
241
242    /// Signable Builder Service
243    pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
244
245    /// Signed Entity Service
246    pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
247
248    /// Certifier service
249    pub certifier_service: Option<Arc<dyn CertifierService>>,
250
251    /// Epoch service.
252    pub epoch_service: Option<EpochServiceWrapper>,
253
254    /// Signed Entity storer
255    pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
256
257    /// HTTP Message service
258    pub message_service: Option<Arc<dyn MessageService>>,
259
260    /// Prover service
261    pub prover_service: Option<Arc<dyn ProverService>>,
262
263    /// Signed Entity Type Lock
264    pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
265
266    /// Transactions Importer
267    pub transactions_importer: Option<Arc<dyn TransactionsImporter>>,
268
269    /// Upkeep service
270    pub upkeep_service: Option<Arc<dyn UpkeepService>>,
271
272    /// Single signer authenticator
273    pub single_signature_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
274
275    /// Metrics service
276    pub metrics_service: Option<Arc<MetricsService>>,
277
278    /// Leader aggregator client
279    pub leader_aggregator_client: Option<Arc<AggregatorHTTPClient>>,
280
281    /// Protocol parameters retriever
282    pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
283
284    /// Stop signal channel
285    pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
286}
287
288impl DependenciesBuilder {
289    /// Create a new clean dependency builder
290    pub fn new(root_logger: Logger, configuration: Arc<dyn ConfigurationSource>) -> Self {
291        Self {
292            configuration,
293            root_logger,
294            sqlite_connection: None,
295            sqlite_connection_event_store: None,
296            sqlite_connection_cardano_transaction_pool: None,
297            stake_store: None,
298            snapshot_uploader: None,
299            multi_signer: None,
300            certificate_repository: None,
301            open_message_repository: None,
302            verification_key_store: None,
303            epoch_settings_store: None,
304            cardano_cli_runner: None,
305            chain_observer: None,
306            chain_block_reader: None,
307            block_scanner: None,
308            transaction_repository: None,
309            immutable_digester: None,
310            immutable_file_observer: None,
311            immutable_cache_provider: None,
312            immutable_file_digest_mapper: None,
313            digester: None,
314            file_archiver: None,
315            snapshotter: None,
316            certificate_verifier: None,
317            genesis_verifier: None,
318            certificate_chain_synchronizer: None,
319            mithril_signer_registration_leader: None,
320            mithril_signer_registration_follower: None,
321            signer_registerer: None,
322            signer_synchronizer: None,
323            signer_registration_verifier: None,
324            signer_registration_round_opener: None,
325            era_reader_adapter: None,
326            era_checker: None,
327            era_reader: None,
328            event_transmitter: None,
329            event_transmitter_channel: (None, None),
330            api_version_provider: None,
331            stake_distribution_service: None,
332            ticker_service: None,
333            signer_store: None,
334            signable_seed_builder: None,
335            signable_builder_service: None,
336            signed_entity_service: None,
337            certifier_service: None,
338            epoch_service: None,
339            signed_entity_storer: None,
340            message_service: None,
341            prover_service: None,
342            signed_entity_type_lock: None,
343            transactions_importer: None,
344            upkeep_service: None,
345            single_signature_authenticator: None,
346            metrics_service: None,
347            leader_aggregator_client: None,
348            protocol_parameters_retriever: None,
349            stop_signal_channel: None,
350        }
351    }
352
353    fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
354        let cardano_db_artifacts_dir =
355            self.configuration.get_snapshot_dir()?.join(CARDANO_DB_ARTIFACTS_DIR);
356
357        if !cardano_db_artifacts_dir.exists() {
358            std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
359                DependenciesBuilderError::Initialization {
360                    message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
361                    error: Some(e.into()),
362                }
363            })?;
364        }
365
366        Ok(cardano_db_artifacts_dir)
367    }
368
369    /// Return an unconfigured [ServeCommandDependenciesContainer]
370    pub async fn build_serve_dependencies_container(
371        &mut self,
372    ) -> Result<ServeCommandDependenciesContainer> {
373        #[allow(deprecated)]
374        let dependencies_manager = ServeCommandDependenciesContainer {
375            root_logger: self.root_logger(),
376            stake_store: self.get_stake_store().await?,
377            certificate_repository: self.get_certificate_repository().await?,
378            verification_key_store: self.get_verification_key_store().await?,
379            epoch_settings_storer: self.get_epoch_settings_store().await?,
380            chain_observer: self.get_chain_observer().await?,
381            certificate_chain_synchronizer: self.get_certificate_chain_synchronizer().await?,
382            signer_registerer: self.get_signer_registerer().await?,
383            signer_synchronizer: self.get_signer_synchronizer().await?,
384            signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
385            era_checker: self.get_era_checker().await?,
386            era_reader: self.get_era_reader().await?,
387            event_transmitter: self.get_event_transmitter().await?,
388            api_version_provider: self.get_api_version_provider().await?,
389            stake_distribution_service: self.get_stake_distribution_service().await?,
390            signer_recorder: self.get_signer_store().await?,
391            signable_builder_service: self.get_signable_builder_service().await?,
392            signed_entity_service: self.get_signed_entity_service().await?,
393            certifier_service: self.get_certifier_service().await?,
394            epoch_service: self.get_epoch_service().await?,
395            ticker_service: self.get_ticker_service().await?,
396            signed_entity_storer: self.get_signed_entity_storer().await?,
397            signer_getter: self.get_signer_store().await?,
398            message_service: self.get_message_service().await?,
399            prover_service: self.get_prover_service().await?,
400            signed_entity_type_lock: self.get_signed_entity_type_lock().await?,
401            upkeep_service: self.get_upkeep_service().await?,
402            single_signer_authenticator: self.get_single_signature_authenticator().await?,
403            metrics_service: self.get_metrics_service().await?,
404        };
405
406        Ok(dependencies_manager)
407    }
408
409    /// Create the AggregatorRunner
410    pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
411        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
412
413        let config = AggregatorConfig::new(
414            Duration::from_millis(self.configuration.run_interval()),
415            self.configuration.is_follower_aggregator(),
416        );
417        let runtime = AggregatorRuntime::new(
418            config,
419            None,
420            Arc::new(AggregatorRunner::new(dependency_container)),
421            self.root_logger(),
422        )
423        .await
424        .map_err(|e| DependenciesBuilderError::Initialization {
425            message: "Cannot initialize Aggregator runtime.".to_string(),
426            error: Some(e.into()),
427        })?;
428
429        Ok(runtime)
430    }
431
432    /// Create the HTTP route instance
433    pub async fn create_http_routes(
434        &mut self,
435    ) -> Result<
436        impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<>,
437    > {
438        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
439        let snapshot_dir = self.configuration.get_snapshot_dir()?;
440        let router_state = RouterState::new(
441            dependency_container.clone(),
442            RouterConfig {
443                network: self.configuration.get_network()?,
444                server_url: self.configuration.get_server_url()?,
445                allowed_discriminants: self
446                    .configuration
447                    .compute_allowed_signed_entity_types_discriminants()?,
448                cardano_transactions_prover_max_hashes_allowed_by_request: self
449                    .configuration
450                    .cardano_transactions_prover_max_hashes_allowed_by_request(),
451                cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
452                snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
453                cardano_node_version: self.configuration.cardano_node_version(),
454                allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
455                origin_tag_white_list: self.configuration.compute_origin_tag_white_list(),
456            },
457        );
458
459        Ok(router::routes(Arc::new(router_state)))
460    }
461
462    /// Create dependencies for genesis commands
463    pub async fn create_genesis_container(
464        &mut self,
465    ) -> Result<GenesisCommandDependenciesContainer> {
466        let network = self.configuration.get_network().with_context(
467            || "Dependencies Builder can not get Cardano network while building genesis container",
468        )?;
469
470        let dependencies = GenesisCommandDependenciesContainer {
471            network,
472            chain_observer: self.get_chain_observer().await?,
473            certificate_repository: self.get_certificate_repository().await?,
474            certificate_verifier: self.get_certificate_verifier().await?,
475            protocol_parameters_retriever: self.get_protocol_parameters_retriever().await?,
476            verification_key_store: self.get_verification_key_store().await?,
477        };
478
479        Ok(dependencies)
480    }
481
482    /// Create dependencies for database command
483    pub async fn create_database_command_container(
484        &mut self,
485    ) -> Result<DatabaseCommandDependenciesContainer> {
486        let main_db_connection = self
487            .get_sqlite_connection()
488            .await
489            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
490
491        self.get_event_store_sqlite_connection()
492            .await
493            .with_context(|| "Dependencies Builder can not get event store sqlite connection")?;
494
495        self.get_sqlite_connection_cardano_transaction_pool()
496            .await
497            .with_context(
498                || "Dependencies Builder can not get cardano transaction pool sqlite connection",
499            )?;
500
501        let dependencies = DatabaseCommandDependenciesContainer { main_db_connection };
502
503        Ok(dependencies)
504    }
505
506    /// Create dependencies for tools command
507    pub async fn create_tools_command_container(
508        &mut self,
509    ) -> Result<ToolsCommandDependenciesContainer> {
510        let db_connection = self
511            .get_sqlite_connection()
512            .await
513            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
514
515        let dependencies = ToolsCommandDependenciesContainer { db_connection };
516
517        Ok(dependencies)
518    }
519
520    /// Remove the dependencies builder from memory to release Arc instances.
521    pub async fn vanish(self) {
522        self.drop_sqlite_connections().await;
523    }
524}
525
526#[cfg(test)]
527impl DependenciesBuilder {
528    pub(crate) fn new_with_stdout_logger(configuration: Arc<dyn ConfigurationSource>) -> Self {
529        Self::new(crate::test::TestLogger::stdout(), configuration)
530    }
531}