mithril_aggregator/dependency_injection/builder/
mod.rs

1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::Logger;
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9    sync::{
10        mpsc::{UnboundedReceiver, UnboundedSender},
11        Mutex,
12    },
13    time::Duration,
14};
15use warp::Filter;
16
17use mithril_common::{
18    api_version::APIVersionProvider,
19    cardano_block_scanner::BlockScanner,
20    certificate_chain::CertificateVerifier,
21    chain_observer::{CardanoCliRunner, ChainObserver},
22    chain_reader::ChainBlockReader,
23    crypto_helper::ProtocolGenesisVerifier,
24    digesters::{
25        cache::ImmutableFileDigestCacheProvider, ImmutableDigester, ImmutableFileObserver,
26    },
27    era::{EraChecker, EraReader, EraReaderAdapter},
28    signable_builder::{SignableBuilderService, SignableSeedBuilder, TransactionsImporter},
29    TickerService,
30};
31use mithril_persistence::{
32    database::repository::CardanoTransactionRepository,
33    sqlite::{SqliteConnection, SqliteConnectionPool},
34};
35use mithril_signed_entity_lock::SignedEntityTypeLock;
36
37use super::{DependenciesBuilderError, EpochServiceWrapper, Result};
38use crate::{
39    database::repository::{
40        CertificateRepository, EpochSettingsStore, OpenMessageRepository, SignedEntityStorer,
41        SignerStore, StakePoolStore,
42    },
43    event_store::{EventMessage, TransmitterService},
44    file_uploaders::FileUploader,
45    http_server::routes::router::{self, RouterConfig, RouterState},
46    services::{
47        AggregatorClient, CertifierService, MessageService, MithrilSignerRegistrationFollower,
48        ProverService, SignedEntityService, SignerSynchronizer, Snapshotter,
49        StakeDistributionService, UpkeepService,
50    },
51    tools::{file_archiver::FileArchiver, GenesisToolsDependency},
52    AggregatorConfig, AggregatorRunner, AggregatorRuntime, Configuration, DependencyContainer,
53    ImmutableFileDigestMapper, MetricsService, MithrilSignerRegistrationLeader, MultiSigner,
54    SignerRegisterer, SignerRegistrationRoundOpener, SignerRegistrationVerifier,
55    SingleSignatureAuthenticator, VerificationKeyStorer,
56};
57
58const SQLITE_FILE: &str = "aggregator.sqlite3";
59const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
60const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
61const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
62const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
63
64/// ## Dependencies container builder
65///
66/// This is meant to create SHARED DEPENDENCIES, ie: dependencies instances that
67/// must be shared amongst several Tokio tasks. For example, database
68/// repositories are NOT shared dependencies and therefor can be created ad hoc
69/// whereas the database connection is a shared dependency.
70///
71/// Each shared dependency must implement a `build` and a `get` function. The
72/// build function creates the dependency, the get function creates the
73/// dependency at first call then return a clone of the Arc containing the
74/// dependency for all further calls.
75pub struct DependenciesBuilder {
76    /// Configuration parameters
77    pub configuration: Configuration,
78
79    /// Application root logger
80    pub root_logger: Logger,
81
82    /// SQLite database connection
83    pub sqlite_connection: Option<Arc<SqliteConnection>>,
84
85    /// Event store SQLite database connection
86    pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
87
88    /// Cardano transactions SQLite database connection pool
89    pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
90
91    /// Stake Store used by the StakeDistributionService
92    /// It shall be a private dependency.
93    pub stake_store: Option<Arc<StakePoolStore>>,
94
95    /// Snapshot uploader service.
96    pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
97
98    /// Multisigner service.
99    pub multi_signer: Option<Arc<dyn MultiSigner>>,
100
101    /// Certificate repository.
102    pub certificate_repository: Option<Arc<CertificateRepository>>,
103
104    /// Open message repository.
105    pub open_message_repository: Option<Arc<OpenMessageRepository>>,
106
107    /// Verification key store.
108    pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
109
110    /// Epoch settings store.
111    pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
112
113    /// Cardano CLI Runner for the [ChainObserver]
114    pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
115
116    /// Chain observer service.
117    pub chain_observer: Option<Arc<dyn ChainObserver>>,
118
119    /// Chain block reader
120    pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
121
122    /// Cardano transactions repository.
123    pub transaction_repository: Option<Arc<CardanoTransactionRepository>>,
124
125    /// Cardano block scanner.
126    pub block_scanner: Option<Arc<dyn BlockScanner>>,
127
128    /// Immutable file digester service.
129    pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
130
131    /// Immutable file observer service.
132    pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
133
134    /// Immutable cache provider service.
135    pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
136
137    /// Immutable file digest mapper service.
138    pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
139
140    /// Digester service.
141    pub digester: Option<Arc<dyn ImmutableDigester>>,
142
143    /// File archiver service.
144    pub file_archiver: Option<Arc<FileArchiver>>,
145
146    /// Snapshotter service.
147    pub snapshotter: Option<Arc<dyn Snapshotter>>,
148
149    /// Certificate verifier service.
150    pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
151
152    /// Genesis signature verifier service.
153    pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
154
155    /// Mithril signer registration leader service
156    pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
157
158    /// Mithril signer registration follower service
159    pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
160
161    /// Signer registerer service
162    pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
163
164    /// Signer synchronizer service
165    pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
166
167    /// Signer registration verifier
168    pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
169
170    /// Signer registration round opener service
171    pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
172
173    /// Era checker service
174    pub era_checker: Option<Arc<EraChecker>>,
175
176    /// Adapter for [EraReader]
177    pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
178
179    /// Era reader service
180    pub era_reader: Option<Arc<EraReader>>,
181
182    /// Event Transmitter Service
183    pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
184
185    /// Event transmitter Channel Sender endpoint
186    pub event_transmitter_channel: (
187        Option<UnboundedReceiver<EventMessage>>,
188        Option<UnboundedSender<EventMessage>>,
189    ),
190
191    /// API Version provider
192    pub api_version_provider: Option<Arc<APIVersionProvider>>,
193
194    /// Stake Distribution Service
195    pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
196
197    /// Ticker Service
198    pub ticker_service: Option<Arc<dyn TickerService>>,
199
200    /// Signer Store
201    pub signer_store: Option<Arc<SignerStore>>,
202
203    /// Signable Seed Builder
204    pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
205
206    /// Signable Builder Service
207    pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
208
209    /// Signed Entity Service
210    pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
211
212    /// Certifier service
213    pub certifier_service: Option<Arc<dyn CertifierService>>,
214
215    /// Epoch service.
216    pub epoch_service: Option<EpochServiceWrapper>,
217
218    /// Signed Entity storer
219    pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
220
221    /// HTTP Message service
222    pub message_service: Option<Arc<dyn MessageService>>,
223
224    /// Prover service
225    pub prover_service: Option<Arc<dyn ProverService>>,
226
227    /// Signed Entity Type Lock
228    pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
229
230    /// Transactions Importer
231    pub transactions_importer: Option<Arc<dyn TransactionsImporter>>,
232
233    /// Upkeep service
234    pub upkeep_service: Option<Arc<dyn UpkeepService>>,
235
236    /// Single signer authenticator
237    pub single_signer_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
238
239    /// Metrics service
240    pub metrics_service: Option<Arc<MetricsService>>,
241
242    /// Leader aggregator client
243    pub leader_aggregator_client: Option<Arc<dyn AggregatorClient>>,
244}
245
246impl DependenciesBuilder {
247    /// Create a new clean dependency builder
248    pub fn new(root_logger: Logger, configuration: Configuration) -> Self {
249        Self {
250            configuration,
251            root_logger,
252            sqlite_connection: None,
253            sqlite_connection_event_store: None,
254            sqlite_connection_cardano_transaction_pool: None,
255            stake_store: None,
256            snapshot_uploader: None,
257            multi_signer: None,
258            certificate_repository: None,
259            open_message_repository: None,
260            verification_key_store: None,
261            epoch_settings_store: None,
262            cardano_cli_runner: None,
263            chain_observer: None,
264            chain_block_reader: None,
265            block_scanner: None,
266            transaction_repository: None,
267            immutable_digester: None,
268            immutable_file_observer: None,
269            immutable_cache_provider: None,
270            immutable_file_digest_mapper: None,
271            digester: None,
272            file_archiver: None,
273            snapshotter: None,
274            certificate_verifier: None,
275            genesis_verifier: None,
276            mithril_signer_registration_leader: None,
277            mithril_signer_registration_follower: None,
278            signer_registerer: None,
279            signer_synchronizer: None,
280            signer_registration_verifier: None,
281            signer_registration_round_opener: None,
282            era_reader_adapter: None,
283            era_checker: None,
284            era_reader: None,
285            event_transmitter: None,
286            event_transmitter_channel: (None, None),
287            api_version_provider: None,
288            stake_distribution_service: None,
289            ticker_service: None,
290            signer_store: None,
291            signable_seed_builder: None,
292            signable_builder_service: None,
293            signed_entity_service: None,
294            certifier_service: None,
295            epoch_service: None,
296            signed_entity_storer: None,
297            message_service: None,
298            prover_service: None,
299            signed_entity_type_lock: None,
300            transactions_importer: None,
301            upkeep_service: None,
302            single_signer_authenticator: None,
303            metrics_service: None,
304            leader_aggregator_client: None,
305        }
306    }
307
308    fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
309        let cardano_db_artifacts_dir = self
310            .configuration
311            .get_snapshot_dir()?
312            .join(CARDANO_DB_ARTIFACTS_DIR);
313
314        if !cardano_db_artifacts_dir.exists() {
315            std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
316                DependenciesBuilderError::Initialization {
317                    message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
318                    error: Some(e.into()),
319                }
320            })?;
321        }
322
323        Ok(cardano_db_artifacts_dir)
324    }
325
326    /// Return an unconfigured [DependencyContainer]
327    pub async fn build_dependency_container(&mut self) -> Result<DependencyContainer> {
328        #[allow(deprecated)]
329        let dependency_manager = DependencyContainer {
330            root_logger: self.root_logger(),
331            sqlite_connection: self.get_sqlite_connection().await?,
332            sqlite_connection_cardano_transaction_pool: self
333                .get_sqlite_connection_cardano_transaction_pool()
334                .await?,
335            stake_store: self.get_stake_store().await?,
336            snapshot_uploader: self.get_snapshot_uploader().await?,
337            multi_signer: self.get_multi_signer().await?,
338            certificate_repository: self.get_certificate_repository().await?,
339            open_message_repository: self.get_open_message_repository().await?,
340            verification_key_store: self.get_verification_key_store().await?,
341            epoch_settings_storer: self.get_epoch_settings_store().await?,
342            chain_observer: self.get_chain_observer().await?,
343            immutable_file_observer: self.get_immutable_file_observer().await?,
344            digester: self.get_immutable_digester().await?,
345            snapshotter: self.get_snapshotter().await?,
346            certificate_verifier: self.get_certificate_verifier().await?,
347            genesis_verifier: self.get_genesis_verifier().await?,
348            signer_registerer: self.get_signer_registerer().await?,
349            signer_synchronizer: self.get_signer_synchronizer().await?,
350            signer_registration_verifier: self.get_signer_registration_verifier().await?,
351            signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
352            era_checker: self.get_era_checker().await?,
353            era_reader: self.get_era_reader().await?,
354            event_transmitter: self.get_event_transmitter().await?,
355            api_version_provider: self.get_api_version_provider().await?,
356            stake_distribution_service: self.get_stake_distribution_service().await?,
357            signer_recorder: self.get_signer_store().await?,
358            signable_builder_service: self.get_signable_builder_service().await?,
359            signed_entity_service: self.get_signed_entity_service().await?,
360            certifier_service: self.get_certifier_service().await?,
361            epoch_service: self.get_epoch_service().await?,
362            ticker_service: self.get_ticker_service().await?,
363            signed_entity_storer: self.get_signed_entity_storer().await?,
364            signer_getter: self.get_signer_store().await?,
365            message_service: self.get_message_service().await?,
366            block_scanner: self.get_block_scanner().await?,
367            transaction_store: self.get_transaction_repository().await?,
368            prover_service: self.get_prover_service().await?,
369            signed_entity_type_lock: self.get_signed_entity_lock().await?,
370            upkeep_service: self.get_upkeep_service().await?,
371            single_signer_authenticator: self.get_single_signature_authenticator().await?,
372            metrics_service: self.get_metrics_service().await?,
373            leader_aggregator_client: self.get_leader_aggregator_client().await?,
374        };
375
376        Ok(dependency_manager)
377    }
378
379    /// Create the AggregatorRunner
380    pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
381        let dependency_container = Arc::new(self.build_dependency_container().await?);
382
383        let config = AggregatorConfig::new(
384            Duration::from_millis(self.configuration.run_interval),
385            self.configuration.is_follower_aggregator(),
386        );
387        let runtime = AggregatorRuntime::new(
388            config,
389            None,
390            Arc::new(AggregatorRunner::new(dependency_container)),
391            self.root_logger(),
392        )
393        .await
394        .map_err(|e| DependenciesBuilderError::Initialization {
395            message: "Cannot initialize Aggregator runtime.".to_string(),
396            error: Some(e.into()),
397        })?;
398
399        Ok(runtime)
400    }
401
402    /// Create the HTTP route instance
403    pub async fn create_http_routes(
404        &mut self,
405    ) -> Result<impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone> {
406        let dependency_container = Arc::new(self.build_dependency_container().await?);
407        let snapshot_dir = self.configuration.get_snapshot_dir()?;
408        let router_state = RouterState::new(
409            dependency_container.clone(),
410            RouterConfig {
411                network: self.configuration.get_network()?,
412                server_url: self.configuration.get_server_url()?,
413                allowed_discriminants: self
414                    .configuration
415                    .compute_allowed_signed_entity_types_discriminants()?,
416                cardano_transactions_prover_max_hashes_allowed_by_request: self
417                    .configuration
418                    .cardano_transactions_prover_max_hashes_allowed_by_request,
419                cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
420                snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
421                cardano_node_version: self.configuration.cardano_node_version.clone(),
422                allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
423            },
424        );
425
426        Ok(router::routes(Arc::new(router_state)))
427    }
428
429    /// Create dependencies for genesis commands
430    pub async fn create_genesis_container(&mut self) -> Result<GenesisToolsDependency> {
431        let network = self.configuration.get_network().with_context(|| {
432            "Dependencies Builder can not get Cardano network while building genesis container"
433        })?;
434
435        // Disable store pruning for genesis commands
436        self.configuration.store_retention_limit = None;
437
438        let dependencies = GenesisToolsDependency {
439            network,
440            ticker_service: self.get_ticker_service().await?,
441            certificate_repository: self.get_certificate_repository().await?,
442            certificate_verifier: self.get_certificate_verifier().await?,
443            genesis_verifier: self.get_genesis_verifier().await?,
444            epoch_settings_storer: self.get_epoch_settings_store().await?,
445            verification_key_store: self.get_verification_key_store().await?,
446        };
447
448        Ok(dependencies)
449    }
450
451    /// Remove the dependencies builder from memory to release Arc instances.
452    pub async fn vanish(self) {
453        self.drop_sqlite_connections().await;
454    }
455}
456
457#[cfg(test)]
458impl DependenciesBuilder {
459    pub(crate) fn new_with_stdout_logger(configuration: Configuration) -> Self {
460        Self::new(crate::test_tools::TestLogger::stdout(), configuration)
461    }
462}