mithril_aggregator/dependency_injection/builder/
mod.rs

1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::{Logger, debug};
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9    sync::{
10        Mutex,
11        mpsc::{UnboundedReceiver, UnboundedSender},
12        watch,
13    },
14    time::Duration,
15};
16use warp::Filter;
17
18use mithril_aggregator_client::AggregatorHttpClient;
19use mithril_cardano_node_chain::{
20    chain_observer::{CardanoCliRunner, ChainObserver},
21    chain_reader::ChainBlockReader,
22    chain_scanner::BlockScanner,
23};
24use mithril_cardano_node_internal_database::{
25    ImmutableFileObserver,
26    digesters::{ImmutableDigester, cache::ImmutableFileDigestCacheProvider},
27};
28use mithril_common::{
29    api_version::APIVersionProvider,
30    certificate_chain::CertificateVerifier,
31    crypto_helper::ProtocolGenesisVerifier,
32    signable_builder::{SignableBuilderService, SignableSeedBuilder},
33};
34use mithril_era::{EraChecker, EraReader, EraReaderAdapter};
35use mithril_persistence::sqlite::{SqliteConnection, SqliteConnectionPool};
36use mithril_protocol_config::interface::MithrilNetworkConfigurationProvider;
37use mithril_signed_entity_lock::SignedEntityTypeLock;
38use mithril_ticker::TickerService;
39
40use crate::{
41    AggregatorConfig, AggregatorRunner, AggregatorRuntime, EpochSettingsStorer,
42    ImmutableFileDigestMapper, MetricsService, MithrilSignerRegistrationLeader, MultiSigner,
43    ProtocolParametersRetriever, ServeCommandDependenciesContainer, SignerRegisterer,
44    SignerRegistrationRoundOpener, SignerRegistrationVerifier, SingleSignatureAuthenticator,
45    VerificationKeyStorer,
46    configuration::ConfigurationSource,
47    database::repository::{
48        AggregatorCardanoChainDataRepository, CertificateRepository, EpochSettingsStore,
49        OpenMessageRepository, SignedEntityStorer, SignerStore, StakePoolStore,
50    },
51    event_store::{EventMessage, TransmitterService},
52    file_uploaders::FileUploader,
53    http_server::routes::router::{self, RouterConfig, RouterState},
54    services::{
55        AggregatorChainDataImporter, CertificateChainSynchronizer, CertifierService,
56        LegacyProverService, MessageService, MithrilSignerRegistrationFollower, ProverService,
57        SignedEntityService, SignerSynchronizer, Snapshotter, StakeDistributionService,
58        UpkeepService,
59    },
60    tools::file_archiver::FileArchiver,
61};
62
63use super::{
64    DatabaseCommandDependenciesContainer, DependenciesBuilderError, EpochServiceWrapper,
65    GenesisCommandDependenciesContainer, Result, ToolsCommandDependenciesContainer,
66};
67
68/// Retrieve attribute stored in the builder.
69/// If not yet initialized, we instantiate it by calling the associated build function (build_<attribute_name>).
70/// If we don't want to to use the default build function, we can pass an expression that build the value.
71/// Usage examples:
72/// get_dependency!(self.signer_registerer)
73/// get_dependency!(self.signer_registerer = self.build_signer_registerer().await?)
74#[macro_export]
75macro_rules! get_dependency {
76    ( $self:ident.$attribute:ident ) => {{
77        paste::paste! {
78            get_dependency!($self.$attribute = $self.[<build_ $attribute>]().await?)
79        }
80    }};
81    ( $self:ident.$attribute:ident = $builder:expr ) => {{
82        paste::paste! {
83            if $self.$attribute.is_none() {
84                slog::debug!($self.root_logger(), "Building dependency {}", stringify!($attribute));
85                $self.$attribute = Some($builder);
86            }
87
88            let r:Result<_> = Ok($self.$attribute.as_ref().cloned().unwrap());
89            r
90        }
91    }};
92}
93
94const SQLITE_FILE: &str = "aggregator.sqlite3";
95const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
96const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
97const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
98const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
99/// Maximum offset that can be applied to the latest epoch when fetching artifacts.
100const MAX_ARTIFACT_EPOCH_OFFSET: u64 = 5;
101
102/// ## Dependencies container builder
103///
104/// This is meant to create SHARED DEPENDENCIES, ie: dependencies instances that
105/// must be shared amongst several Tokio tasks. For example, database
106/// repositories are NOT shared dependencies and therefor can be created ad hoc
107/// whereas the database connection is a shared dependency.
108///
109/// Each shared dependency must implement a `build` and a `get` function. The
110/// build function creates the dependency, the get function creates the
111/// dependency at first call then return a clone of the Arc containing the
112/// dependency for all further calls.
113pub struct DependenciesBuilder {
114    /// Configuration parameters
115    pub configuration: Arc<dyn ConfigurationSource>,
116
117    /// Application root logger
118    pub root_logger: Logger,
119
120    /// SQLite database connection
121    pub sqlite_connection: Option<Arc<SqliteConnection>>,
122
123    /// Event store SQLite database connection
124    pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
125
126    /// Cardano transactions SQLite database connection pool
127    pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
128
129    /// Stake Store used by the StakeDistributionService
130    /// It shall be a private dependency.
131    pub stake_store: Option<Arc<StakePoolStore>>,
132
133    /// Snapshot uploader service.
134    pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
135
136    /// Multisigner service.
137    pub multi_signer: Option<Arc<dyn MultiSigner>>,
138
139    /// Certificate repository.
140    pub certificate_repository: Option<Arc<CertificateRepository>>,
141
142    /// Open message repository.
143    pub open_message_repository: Option<Arc<OpenMessageRepository>>,
144
145    /// Verification key store.
146    pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
147
148    /// Epoch settings store.
149    pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
150
151    /// Cardano CLI Runner for the [ChainObserver]
152    pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
153
154    /// Chain observer service.
155    pub chain_observer: Option<Arc<dyn ChainObserver>>,
156
157    /// Chain block reader
158    pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
159
160    /// Cardano chain data repository.
161    pub chain_data_repository: Option<Arc<AggregatorCardanoChainDataRepository>>,
162
163    /// Cardano block scanner.
164    pub block_scanner: Option<Arc<dyn BlockScanner>>,
165
166    /// Immutable file digester service.
167    pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
168
169    /// Immutable file observer service.
170    pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
171
172    /// Immutable cache provider service.
173    pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
174
175    /// Immutable file digest mapper service.
176    pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
177
178    /// Digester service.
179    pub digester: Option<Arc<dyn ImmutableDigester>>,
180
181    /// File archiver service.
182    pub file_archiver: Option<Arc<FileArchiver>>,
183
184    /// Snapshotter service.
185    pub snapshotter: Option<Arc<dyn Snapshotter>>,
186
187    /// Certificate verifier service.
188    pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
189
190    /// Genesis signature verifier service.
191    pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
192
193    /// Certificate chain synchronizer service
194    pub certificate_chain_synchronizer: Option<Arc<dyn CertificateChainSynchronizer>>,
195
196    /// Mithril signer registration leader service
197    pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
198
199    /// Mithril signer registration follower service
200    pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
201
202    /// Signer registerer service
203    pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
204
205    /// Signer synchronizer service
206    pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
207
208    /// Signer registration verifier
209    pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
210
211    /// Signer registration round opener service
212    pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
213
214    /// Era checker service
215    pub era_checker: Option<Arc<EraChecker>>,
216
217    /// Adapter for [EraReader]
218    pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
219
220    /// Era reader service
221    pub era_reader: Option<Arc<EraReader>>,
222
223    /// Event Transmitter Service
224    pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
225
226    /// Event transmitter Channel Sender endpoint
227    pub event_transmitter_channel: (
228        Option<UnboundedReceiver<EventMessage>>,
229        Option<UnboundedSender<EventMessage>>,
230    ),
231
232    /// API Version provider
233    pub api_version_provider: Option<Arc<APIVersionProvider>>,
234
235    /// Stake Distribution Service
236    pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
237
238    /// Ticker Service
239    pub ticker_service: Option<Arc<dyn TickerService>>,
240
241    /// Signer Store
242    pub signer_store: Option<Arc<SignerStore>>,
243
244    /// Signable Seed Builder
245    pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
246
247    /// Signable Builder Service
248    pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
249
250    /// Signed Entity Service
251    pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
252
253    /// Certifier service
254    pub certifier_service: Option<Arc<dyn CertifierService>>,
255
256    /// Epoch service.
257    pub epoch_service: Option<EpochServiceWrapper>,
258
259    /// Mithril network configuration provider
260    pub mithril_network_configuration_provider:
261        Option<Arc<dyn MithrilNetworkConfigurationProvider>>,
262
263    /// Signed Entity storer
264    pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
265
266    /// HTTP Message service
267    pub message_service: Option<Arc<dyn MessageService>>,
268
269    /// Legacy Prover service
270    pub legacy_prover_service: Option<Arc<dyn LegacyProverService>>,
271
272    /// Prover service
273    pub prover_service: Option<Arc<dyn ProverService>>,
274
275    /// Signed Entity Type Lock
276    pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
277
278    /// Chain Data Importer
279    pub chain_data_importer: Option<Arc<AggregatorChainDataImporter>>,
280
281    /// Upkeep service
282    pub upkeep_service: Option<Arc<dyn UpkeepService>>,
283
284    /// Single signer authenticator
285    pub single_signature_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
286
287    /// Metrics service
288    pub metrics_service: Option<Arc<MetricsService>>,
289
290    /// Leader aggregator client
291    pub leader_aggregator_client: Option<Arc<AggregatorHttpClient>>,
292
293    /// Protocol parameters retriever
294    pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
295
296    /// Stop signal channel
297    pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
298}
299
300impl DependenciesBuilder {
301    /// Create a new clean dependency builder
302    pub fn new(root_logger: Logger, configuration: Arc<dyn ConfigurationSource>) -> Self {
303        Self {
304            configuration,
305            root_logger,
306            sqlite_connection: None,
307            sqlite_connection_event_store: None,
308            sqlite_connection_cardano_transaction_pool: None,
309            stake_store: None,
310            snapshot_uploader: None,
311            multi_signer: None,
312            certificate_repository: None,
313            open_message_repository: None,
314            verification_key_store: None,
315            epoch_settings_store: None,
316            cardano_cli_runner: None,
317            chain_observer: None,
318            chain_block_reader: None,
319            block_scanner: None,
320            immutable_digester: None,
321            immutable_file_observer: None,
322            immutable_cache_provider: None,
323            immutable_file_digest_mapper: None,
324            digester: None,
325            file_archiver: None,
326            snapshotter: None,
327            certificate_verifier: None,
328            genesis_verifier: None,
329            certificate_chain_synchronizer: None,
330            mithril_signer_registration_leader: None,
331            mithril_signer_registration_follower: None,
332            signer_registerer: None,
333            signer_synchronizer: None,
334            signer_registration_verifier: None,
335            signer_registration_round_opener: None,
336            era_reader_adapter: None,
337            era_checker: None,
338            era_reader: None,
339            event_transmitter: None,
340            event_transmitter_channel: (None, None),
341            api_version_provider: None,
342            stake_distribution_service: None,
343            ticker_service: None,
344            signer_store: None,
345            signable_seed_builder: None,
346            signable_builder_service: None,
347            signed_entity_service: None,
348            certifier_service: None,
349            epoch_service: None,
350            mithril_network_configuration_provider: None,
351            signed_entity_storer: None,
352            message_service: None,
353            legacy_prover_service: None,
354            prover_service: None,
355            signed_entity_type_lock: None,
356            chain_data_importer: None,
357            upkeep_service: None,
358            single_signature_authenticator: None,
359            metrics_service: None,
360            leader_aggregator_client: None,
361            protocol_parameters_retriever: None,
362            stop_signal_channel: None,
363            chain_data_repository: None,
364        }
365    }
366
367    fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
368        let cardano_db_artifacts_dir =
369            self.configuration.get_snapshot_dir()?.join(CARDANO_DB_ARTIFACTS_DIR);
370
371        if !cardano_db_artifacts_dir.exists() {
372            std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
373                DependenciesBuilderError::Initialization {
374                    message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
375                    error: Some(e.into()),
376                }
377            })?;
378        }
379
380        Ok(cardano_db_artifacts_dir)
381    }
382
383    /// Return an unconfigured [ServeCommandDependenciesContainer]
384    pub async fn build_serve_dependencies_container(
385        &mut self,
386    ) -> Result<ServeCommandDependenciesContainer> {
387        #[allow(deprecated)]
388        let dependencies_manager = ServeCommandDependenciesContainer {
389            root_logger: self.root_logger(),
390            stake_store: self.get_stake_store().await?,
391            certificate_repository: self.get_certificate_repository().await?,
392            verification_key_store: self.get_verification_key_store().await?,
393            epoch_settings_storer: self.get_epoch_settings_store().await?,
394            certificate_chain_synchronizer: self.get_certificate_chain_synchronizer().await?,
395            signer_registerer: self.get_signer_registerer().await?,
396            signer_synchronizer: self.get_signer_synchronizer().await?,
397            signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
398            era_checker: self.get_era_checker().await?,
399            era_reader: self.get_era_reader().await?,
400            event_transmitter: self.get_event_transmitter().await?,
401            api_version_provider: self.get_api_version_provider().await?,
402            stake_distribution_service: self.get_stake_distribution_service().await?,
403            signer_recorder: self.get_signer_store().await?,
404            signable_builder_service: self.get_signable_builder_service().await?,
405            signed_entity_service: self.get_signed_entity_service().await?,
406            certifier_service: self.get_certifier_service().await?,
407            epoch_service: self.get_epoch_service().await?,
408            ticker_service: self.get_ticker_service().await?,
409            signed_entity_storer: self.get_signed_entity_storer().await?,
410            signer_getter: self.get_signer_store().await?,
411            message_service: self.get_message_service().await?,
412            legacy_prover_service: self.get_legacy_prover_service().await?,
413            prover_service: self.get_prover_service().await?,
414            signed_entity_type_lock: self.get_signed_entity_type_lock().await?,
415            upkeep_service: self.get_upkeep_service().await?,
416            single_signer_authenticator: self.get_single_signature_authenticator().await?,
417            metrics_service: self.get_metrics_service().await?,
418        };
419
420        self.handle_discrepancies_at_startup().await?;
421
422        Ok(dependencies_manager)
423    }
424
425    /// Create the AggregatorRunner
426    pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
427        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
428
429        let config = AggregatorConfig::new(
430            Duration::from_millis(self.configuration.run_interval()),
431            self.configuration.is_follower_aggregator(),
432        );
433        let runtime = AggregatorRuntime::new(
434            config,
435            None,
436            Arc::new(AggregatorRunner::new(dependency_container)),
437            self.root_logger(),
438        )
439        .await
440        .map_err(|e| DependenciesBuilderError::Initialization {
441            message: "Cannot initialize Aggregator runtime.".to_string(),
442            error: Some(e.into()),
443        })?;
444
445        Ok(runtime)
446    }
447
448    /// Create the HTTP route instance
449    pub async fn create_http_routes(
450        &mut self,
451    ) -> Result<
452        impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<>,
453    > {
454        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
455        let snapshot_dir = self.configuration.get_snapshot_dir()?;
456        let router_state = RouterState::new(
457            dependency_container.clone(),
458            RouterConfig {
459                network: self.configuration.get_network()?,
460                server_url: self.configuration.get_server_url()?,
461                allowed_discriminants: self
462                    .configuration
463                    .compute_allowed_signed_entity_types_discriminants()?,
464                cardano_transactions_prover_max_hashes_allowed_by_request: self
465                    .configuration
466                    .cardano_transactions_prover_max_hashes_allowed_by_request(),
467                cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
468                max_artifact_epoch_offset: MAX_ARTIFACT_EPOCH_OFFSET,
469                snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
470                cardano_node_version: self.configuration.cardano_node_version(),
471                allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
472                origin_tag_white_list: self.configuration.compute_origin_tag_white_list(),
473                aggregate_signature_type: self.configuration.aggregate_signature_type(),
474            },
475        );
476
477        Ok(router::routes(Arc::new(router_state)))
478    }
479
480    /// Create dependencies for genesis commands
481    pub async fn create_genesis_container(
482        &mut self,
483    ) -> Result<GenesisCommandDependenciesContainer> {
484        let network = self.configuration.get_network().with_context(
485            || "Dependencies Builder can not get Cardano network while building genesis container",
486        )?;
487
488        let dependencies = GenesisCommandDependenciesContainer {
489            network,
490            chain_observer: self.get_chain_observer().await?,
491            certificate_repository: self.get_certificate_repository().await?,
492            certificate_verifier: self.get_certificate_verifier().await?,
493            protocol_parameters_retriever: self.get_protocol_parameters_retriever().await?,
494            verification_key_store: self.get_verification_key_store().await?,
495        };
496
497        Ok(dependencies)
498    }
499
500    /// Create dependencies for database command
501    pub async fn create_database_command_container(
502        &mut self,
503    ) -> Result<DatabaseCommandDependenciesContainer> {
504        let main_db_connection = self
505            .get_sqlite_connection()
506            .await
507            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
508
509        self.get_event_store_sqlite_connection()
510            .await
511            .with_context(|| "Dependencies Builder can not get event store sqlite connection")?;
512
513        self.get_sqlite_connection_cardano_transaction_pool()
514            .await
515            .with_context(
516                || "Dependencies Builder can not get cardano transaction pool sqlite connection",
517            )?;
518
519        let dependencies = DatabaseCommandDependenciesContainer { main_db_connection };
520
521        Ok(dependencies)
522    }
523
524    /// Create dependencies for tools command
525    pub async fn create_tools_command_container(
526        &mut self,
527    ) -> Result<ToolsCommandDependenciesContainer> {
528        let db_connection = self
529            .get_sqlite_connection()
530            .await
531            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
532
533        let dependencies = ToolsCommandDependenciesContainer { db_connection };
534
535        Ok(dependencies)
536    }
537
538    /// Look for discrepancies in stored data and fix them.
539    ///
540    /// Fix discrepancies for:
541    /// - epoch settings: ensure that the network configuration parameters for the three working epochs
542    ///   window are stored in the database (see [mithril_protocol_config::model::MithrilNetworkConfiguration])
543    pub async fn handle_discrepancies_at_startup(&mut self) -> Result<()> {
544        let logger = self.root_logger();
545        let current_epoch = self
546            .get_chain_observer()
547            .await?
548            .get_current_epoch()
549            .await
550            .map_err(|e| DependenciesBuilderError::Initialization {
551                message: "cannot handle startup discrepancies: failed to retrieve current epoch."
552                    .to_string(),
553                error: Some(e.into()),
554            })?
555            .ok_or(DependenciesBuilderError::Initialization {
556                message: "cannot handle startup discrepancies: no epoch returned.".to_string(),
557                error: None,
558            })?;
559        let network_configuration = self
560            .get_mithril_network_configuration_provider()
561            .await?
562            .get_network_configuration(current_epoch)
563            .await
564            .map_err(|e| DependenciesBuilderError::Initialization {
565                message: format!("cannot handle startup discrepancies: failed to retrieve network configuration for epoch {current_epoch}"),
566                error: Some(e),
567            })?;
568        let epoch_settings_store = self.get_epoch_settings_store().await?;
569
570        debug!(
571            logger,
572            "Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {current_epoch}";
573            "network_configuration" => ?network_configuration,
574        );
575        epoch_settings_store
576            .handle_discrepancies_at_startup(&network_configuration)
577            .await
578            .map_err(|e| DependenciesBuilderError::Initialization {
579                message: "can not create aggregator runner".to_string(),
580                error: Some(e),
581            })?;
582        Ok(())
583    }
584
585    /// Remove the dependencies builder from memory to release Arc instances.
586    pub async fn vanish(self) {
587        self.drop_sqlite_connections().await;
588    }
589}
590
591#[cfg(test)]
592impl DependenciesBuilder {
593    pub(crate) fn new_with_stdout_logger(configuration: Arc<dyn ConfigurationSource>) -> Self {
594        Self::new(crate::test::TestLogger::stdout(), configuration)
595    }
596}