mithril_aggregator/dependency_injection/builder/
mod.rs

1mod enablers;
2mod protocol;
3mod support;
4
5use anyhow::Context;
6use slog::{Logger, debug};
7use std::{path::PathBuf, sync::Arc};
8use tokio::{
9    sync::{
10        Mutex,
11        mpsc::{UnboundedReceiver, UnboundedSender},
12        watch,
13    },
14    time::Duration,
15};
16use warp::Filter;
17
18use mithril_aggregator_client::AggregatorHttpClient;
19use mithril_cardano_node_chain::{
20    chain_observer::{CardanoCliRunner, ChainObserver},
21    chain_reader::ChainBlockReader,
22    chain_scanner::BlockScanner,
23};
24use mithril_cardano_node_internal_database::{
25    ImmutableFileObserver,
26    digesters::{ImmutableDigester, cache::ImmutableFileDigestCacheProvider},
27};
28use mithril_common::{
29    api_version::APIVersionProvider,
30    certificate_chain::CertificateVerifier,
31    crypto_helper::ProtocolGenesisVerifier,
32    signable_builder::{SignableBuilderService, SignableSeedBuilder, TransactionsImporter},
33};
34use mithril_era::{EraChecker, EraReader, EraReaderAdapter};
35use mithril_persistence::{
36    database::repository::CardanoTransactionRepository,
37    sqlite::{SqliteConnection, SqliteConnectionPool},
38};
39use mithril_protocol_config::interface::MithrilNetworkConfigurationProvider;
40use mithril_signed_entity_lock::SignedEntityTypeLock;
41use mithril_ticker::TickerService;
42
43use super::{
44    DatabaseCommandDependenciesContainer, DependenciesBuilderError, EpochServiceWrapper,
45    GenesisCommandDependenciesContainer, Result, ToolsCommandDependenciesContainer,
46};
47use crate::{
48    AggregatorConfig, AggregatorRunner, AggregatorRuntime, EpochSettingsStorer,
49    ImmutableFileDigestMapper, MetricsService, MithrilSignerRegistrationLeader, MultiSigner,
50    ProtocolParametersRetriever, ServeCommandDependenciesContainer, SignerRegisterer,
51    SignerRegistrationRoundOpener, SignerRegistrationVerifier, SingleSignatureAuthenticator,
52    VerificationKeyStorer,
53    configuration::ConfigurationSource,
54    database::repository::{
55        CertificateRepository, EpochSettingsStore, OpenMessageRepository, SignedEntityStorer,
56        SignerStore, StakePoolStore,
57    },
58    event_store::{EventMessage, TransmitterService},
59    file_uploaders::FileUploader,
60    http_server::routes::router::{self, RouterConfig, RouterState},
61    services::{
62        CertificateChainSynchronizer, CertifierService, MessageService,
63        MithrilSignerRegistrationFollower, ProverService, SignedEntityService, SignerSynchronizer,
64        Snapshotter, StakeDistributionService, UpkeepService,
65    },
66    tools::file_archiver::FileArchiver,
67};
68
69/// Retrieve attribute stored in the builder.
70/// If not yet initialized, we instantiate it by calling the associated build function (build_<attribute_name>).
71/// If we don't want to to use the default build function, we can pass an expression that build the value.
72/// Usage examples:
73/// get_dependency!(self.signer_registerer)
74/// get_dependency!(self.signer_registerer = self.build_signer_registerer().await?)
75#[macro_export]
76macro_rules! get_dependency {
77    ( $self:ident.$attribute:ident ) => {{
78        paste::paste! {
79            get_dependency!($self.$attribute = $self.[<build_ $attribute>]().await?)
80        }
81    }};
82    ( $self:ident.$attribute:ident = $builder:expr ) => {{
83        paste::paste! {
84            if $self.$attribute.is_none() {
85                slog::debug!($self.root_logger(), "Building dependency {}", stringify!($attribute));
86                $self.$attribute = Some($builder);
87            }
88
89            let r:Result<_> = Ok($self.$attribute.as_ref().cloned().unwrap());
90            r
91        }
92    }};
93}
94
95const SQLITE_FILE: &str = "aggregator.sqlite3";
96const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3";
97const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
98const CARDANO_DB_ARTIFACTS_DIR: &str = "cardano-database";
99const SNAPSHOT_ARTIFACTS_DIR: &str = "cardano-immutable-files-full";
100/// Maximum offset that can be applied to the latest epoch when fetching artifacts.
101const MAX_ARTIFACT_EPOCH_OFFSET: u64 = 5;
102
103/// ## Dependencies container builder
104///
105/// This is meant to create SHARED DEPENDENCIES, ie: dependencies instances that
106/// must be shared amongst several Tokio tasks. For example, database
107/// repositories are NOT shared dependencies and therefor can be created ad hoc
108/// whereas the database connection is a shared dependency.
109///
110/// Each shared dependency must implement a `build` and a `get` function. The
111/// build function creates the dependency, the get function creates the
112/// dependency at first call then return a clone of the Arc containing the
113/// dependency for all further calls.
114pub struct DependenciesBuilder {
115    /// Configuration parameters
116    pub configuration: Arc<dyn ConfigurationSource>,
117
118    /// Application root logger
119    pub root_logger: Logger,
120
121    /// SQLite database connection
122    pub sqlite_connection: Option<Arc<SqliteConnection>>,
123
124    /// Event store SQLite database connection
125    pub sqlite_connection_event_store: Option<Arc<SqliteConnection>>,
126
127    /// Cardano transactions SQLite database connection pool
128    pub sqlite_connection_cardano_transaction_pool: Option<Arc<SqliteConnectionPool>>,
129
130    /// Stake Store used by the StakeDistributionService
131    /// It shall be a private dependency.
132    pub stake_store: Option<Arc<StakePoolStore>>,
133
134    /// Snapshot uploader service.
135    pub snapshot_uploader: Option<Arc<dyn FileUploader>>,
136
137    /// Multisigner service.
138    pub multi_signer: Option<Arc<dyn MultiSigner>>,
139
140    /// Certificate repository.
141    pub certificate_repository: Option<Arc<CertificateRepository>>,
142
143    /// Open message repository.
144    pub open_message_repository: Option<Arc<OpenMessageRepository>>,
145
146    /// Verification key store.
147    pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,
148
149    /// Epoch settings store.
150    pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,
151
152    /// Cardano CLI Runner for the [ChainObserver]
153    pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
154
155    /// Chain observer service.
156    pub chain_observer: Option<Arc<dyn ChainObserver>>,
157
158    /// Chain block reader
159    pub chain_block_reader: Option<Arc<Mutex<dyn ChainBlockReader>>>,
160
161    /// Cardano transactions repository.
162    pub transaction_repository: Option<Arc<CardanoTransactionRepository>>,
163
164    /// Cardano block scanner.
165    pub block_scanner: Option<Arc<dyn BlockScanner>>,
166
167    /// Immutable file digester service.
168    pub immutable_digester: Option<Arc<dyn ImmutableDigester>>,
169
170    /// Immutable file observer service.
171    pub immutable_file_observer: Option<Arc<dyn ImmutableFileObserver>>,
172
173    /// Immutable cache provider service.
174    pub immutable_cache_provider: Option<Arc<dyn ImmutableFileDigestCacheProvider>>,
175
176    /// Immutable file digest mapper service.
177    pub immutable_file_digest_mapper: Option<Arc<dyn ImmutableFileDigestMapper>>,
178
179    /// Digester service.
180    pub digester: Option<Arc<dyn ImmutableDigester>>,
181
182    /// File archiver service.
183    pub file_archiver: Option<Arc<FileArchiver>>,
184
185    /// Snapshotter service.
186    pub snapshotter: Option<Arc<dyn Snapshotter>>,
187
188    /// Certificate verifier service.
189    pub certificate_verifier: Option<Arc<dyn CertificateVerifier>>,
190
191    /// Genesis signature verifier service.
192    pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
193
194    /// Certificate chain synchronizer service
195    pub certificate_chain_synchronizer: Option<Arc<dyn CertificateChainSynchronizer>>,
196
197    /// Mithril signer registration leader service
198    pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
199
200    /// Mithril signer registration follower service
201    pub mithril_signer_registration_follower: Option<Arc<MithrilSignerRegistrationFollower>>,
202
203    /// Signer registerer service
204    pub signer_registerer: Option<Arc<dyn SignerRegisterer>>,
205
206    /// Signer synchronizer service
207    pub signer_synchronizer: Option<Arc<dyn SignerSynchronizer>>,
208
209    /// Signer registration verifier
210    pub signer_registration_verifier: Option<Arc<dyn SignerRegistrationVerifier>>,
211
212    /// Signer registration round opener service
213    pub signer_registration_round_opener: Option<Arc<dyn SignerRegistrationRoundOpener>>,
214
215    /// Era checker service
216    pub era_checker: Option<Arc<EraChecker>>,
217
218    /// Adapter for [EraReader]
219    pub era_reader_adapter: Option<Arc<dyn EraReaderAdapter>>,
220
221    /// Era reader service
222    pub era_reader: Option<Arc<EraReader>>,
223
224    /// Event Transmitter Service
225    pub event_transmitter: Option<Arc<TransmitterService<EventMessage>>>,
226
227    /// Event transmitter Channel Sender endpoint
228    pub event_transmitter_channel: (
229        Option<UnboundedReceiver<EventMessage>>,
230        Option<UnboundedSender<EventMessage>>,
231    ),
232
233    /// API Version provider
234    pub api_version_provider: Option<Arc<APIVersionProvider>>,
235
236    /// Stake Distribution Service
237    pub stake_distribution_service: Option<Arc<dyn StakeDistributionService>>,
238
239    /// Ticker Service
240    pub ticker_service: Option<Arc<dyn TickerService>>,
241
242    /// Signer Store
243    pub signer_store: Option<Arc<SignerStore>>,
244
245    /// Signable Seed Builder
246    pub signable_seed_builder: Option<Arc<dyn SignableSeedBuilder>>,
247
248    /// Signable Builder Service
249    pub signable_builder_service: Option<Arc<dyn SignableBuilderService>>,
250
251    /// Signed Entity Service
252    pub signed_entity_service: Option<Arc<dyn SignedEntityService>>,
253
254    /// Certifier service
255    pub certifier_service: Option<Arc<dyn CertifierService>>,
256
257    /// Epoch service.
258    pub epoch_service: Option<EpochServiceWrapper>,
259
260    /// Mithril network configuration provider
261    pub mithril_network_configuration_provider:
262        Option<Arc<dyn MithrilNetworkConfigurationProvider>>,
263
264    /// Signed Entity storer
265    pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,
266
267    /// HTTP Message service
268    pub message_service: Option<Arc<dyn MessageService>>,
269
270    /// Prover service
271    pub prover_service: Option<Arc<dyn ProverService>>,
272
273    /// Signed Entity Type Lock
274    pub signed_entity_type_lock: Option<Arc<SignedEntityTypeLock>>,
275
276    /// Transactions Importer
277    pub transactions_importer: Option<Arc<dyn TransactionsImporter>>,
278
279    /// Upkeep service
280    pub upkeep_service: Option<Arc<dyn UpkeepService>>,
281
282    /// Single signer authenticator
283    pub single_signature_authenticator: Option<Arc<SingleSignatureAuthenticator>>,
284
285    /// Metrics service
286    pub metrics_service: Option<Arc<MetricsService>>,
287
288    /// Leader aggregator client
289    pub leader_aggregator_client: Option<Arc<AggregatorHttpClient>>,
290
291    /// Protocol parameters retriever
292    pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
293
294    /// Stop signal channel
295    pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
296}
297
298impl DependenciesBuilder {
299    /// Create a new clean dependency builder
300    pub fn new(root_logger: Logger, configuration: Arc<dyn ConfigurationSource>) -> Self {
301        Self {
302            configuration,
303            root_logger,
304            sqlite_connection: None,
305            sqlite_connection_event_store: None,
306            sqlite_connection_cardano_transaction_pool: None,
307            stake_store: None,
308            snapshot_uploader: None,
309            multi_signer: None,
310            certificate_repository: None,
311            open_message_repository: None,
312            verification_key_store: None,
313            epoch_settings_store: None,
314            cardano_cli_runner: None,
315            chain_observer: None,
316            chain_block_reader: None,
317            block_scanner: None,
318            transaction_repository: None,
319            immutable_digester: None,
320            immutable_file_observer: None,
321            immutable_cache_provider: None,
322            immutable_file_digest_mapper: None,
323            digester: None,
324            file_archiver: None,
325            snapshotter: None,
326            certificate_verifier: None,
327            genesis_verifier: None,
328            certificate_chain_synchronizer: None,
329            mithril_signer_registration_leader: None,
330            mithril_signer_registration_follower: None,
331            signer_registerer: None,
332            signer_synchronizer: None,
333            signer_registration_verifier: None,
334            signer_registration_round_opener: None,
335            era_reader_adapter: None,
336            era_checker: None,
337            era_reader: None,
338            event_transmitter: None,
339            event_transmitter_channel: (None, None),
340            api_version_provider: None,
341            stake_distribution_service: None,
342            ticker_service: None,
343            signer_store: None,
344            signable_seed_builder: None,
345            signable_builder_service: None,
346            signed_entity_service: None,
347            certifier_service: None,
348            epoch_service: None,
349            mithril_network_configuration_provider: None,
350            signed_entity_storer: None,
351            message_service: None,
352            prover_service: None,
353            signed_entity_type_lock: None,
354            transactions_importer: None,
355            upkeep_service: None,
356            single_signature_authenticator: None,
357            metrics_service: None,
358            leader_aggregator_client: None,
359            protocol_parameters_retriever: None,
360            stop_signal_channel: None,
361        }
362    }
363
364    fn get_cardano_db_artifacts_dir(&self) -> Result<PathBuf> {
365        let cardano_db_artifacts_dir =
366            self.configuration.get_snapshot_dir()?.join(CARDANO_DB_ARTIFACTS_DIR);
367
368        if !cardano_db_artifacts_dir.exists() {
369            std::fs::create_dir(&cardano_db_artifacts_dir).map_err(|e| {
370                DependenciesBuilderError::Initialization {
371                    message: format!("Cannot create '{cardano_db_artifacts_dir:?}' directory."),
372                    error: Some(e.into()),
373                }
374            })?;
375        }
376
377        Ok(cardano_db_artifacts_dir)
378    }
379
380    /// Return an unconfigured [ServeCommandDependenciesContainer]
381    pub async fn build_serve_dependencies_container(
382        &mut self,
383    ) -> Result<ServeCommandDependenciesContainer> {
384        #[allow(deprecated)]
385        let dependencies_manager = ServeCommandDependenciesContainer {
386            root_logger: self.root_logger(),
387            stake_store: self.get_stake_store().await?,
388            certificate_repository: self.get_certificate_repository().await?,
389            verification_key_store: self.get_verification_key_store().await?,
390            epoch_settings_storer: self.get_epoch_settings_store().await?,
391            certificate_chain_synchronizer: self.get_certificate_chain_synchronizer().await?,
392            signer_registerer: self.get_signer_registerer().await?,
393            signer_synchronizer: self.get_signer_synchronizer().await?,
394            signer_registration_round_opener: self.get_signer_registration_round_opener().await?,
395            era_checker: self.get_era_checker().await?,
396            era_reader: self.get_era_reader().await?,
397            event_transmitter: self.get_event_transmitter().await?,
398            api_version_provider: self.get_api_version_provider().await?,
399            stake_distribution_service: self.get_stake_distribution_service().await?,
400            signer_recorder: self.get_signer_store().await?,
401            signable_builder_service: self.get_signable_builder_service().await?,
402            signed_entity_service: self.get_signed_entity_service().await?,
403            certifier_service: self.get_certifier_service().await?,
404            epoch_service: self.get_epoch_service().await?,
405            ticker_service: self.get_ticker_service().await?,
406            signed_entity_storer: self.get_signed_entity_storer().await?,
407            signer_getter: self.get_signer_store().await?,
408            message_service: self.get_message_service().await?,
409            prover_service: self.get_prover_service().await?,
410            signed_entity_type_lock: self.get_signed_entity_type_lock().await?,
411            upkeep_service: self.get_upkeep_service().await?,
412            single_signer_authenticator: self.get_single_signature_authenticator().await?,
413            metrics_service: self.get_metrics_service().await?,
414        };
415
416        self.handle_discrepancies_at_startup().await?;
417
418        Ok(dependencies_manager)
419    }
420
421    /// Create the AggregatorRunner
422    pub async fn create_aggregator_runner(&mut self) -> Result<AggregatorRuntime> {
423        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
424
425        let config = AggregatorConfig::new(
426            Duration::from_millis(self.configuration.run_interval()),
427            self.configuration.is_follower_aggregator(),
428        );
429        let runtime = AggregatorRuntime::new(
430            config,
431            None,
432            Arc::new(AggregatorRunner::new(dependency_container)),
433            self.root_logger(),
434        )
435        .await
436        .map_err(|e| DependenciesBuilderError::Initialization {
437            message: "Cannot initialize Aggregator runtime.".to_string(),
438            error: Some(e.into()),
439        })?;
440
441        Ok(runtime)
442    }
443
444    /// Create the HTTP route instance
445    pub async fn create_http_routes(
446        &mut self,
447    ) -> Result<
448        impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<>,
449    > {
450        let dependency_container = Arc::new(self.build_serve_dependencies_container().await?);
451        let snapshot_dir = self.configuration.get_snapshot_dir()?;
452        let router_state = RouterState::new(
453            dependency_container.clone(),
454            RouterConfig {
455                network: self.configuration.get_network()?,
456                server_url: self.configuration.get_server_url()?,
457                allowed_discriminants: self
458                    .configuration
459                    .compute_allowed_signed_entity_types_discriminants()?,
460                cardano_transactions_prover_max_hashes_allowed_by_request: self
461                    .configuration
462                    .cardano_transactions_prover_max_hashes_allowed_by_request(),
463                cardano_db_artifacts_directory: self.get_cardano_db_artifacts_dir()?,
464                max_artifact_epoch_offset: MAX_ARTIFACT_EPOCH_OFFSET,
465                snapshot_directory: snapshot_dir.join(SNAPSHOT_ARTIFACTS_DIR),
466                cardano_node_version: self.configuration.cardano_node_version(),
467                allow_http_serve_directory: self.configuration.allow_http_serve_directory(),
468                origin_tag_white_list: self.configuration.compute_origin_tag_white_list(),
469                aggregate_signature_type: self.configuration.aggregate_signature_type(),
470            },
471        );
472
473        Ok(router::routes(Arc::new(router_state)))
474    }
475
476    /// Create dependencies for genesis commands
477    pub async fn create_genesis_container(
478        &mut self,
479    ) -> Result<GenesisCommandDependenciesContainer> {
480        let network = self.configuration.get_network().with_context(
481            || "Dependencies Builder can not get Cardano network while building genesis container",
482        )?;
483
484        let dependencies = GenesisCommandDependenciesContainer {
485            network,
486            chain_observer: self.get_chain_observer().await?,
487            certificate_repository: self.get_certificate_repository().await?,
488            certificate_verifier: self.get_certificate_verifier().await?,
489            protocol_parameters_retriever: self.get_protocol_parameters_retriever().await?,
490            verification_key_store: self.get_verification_key_store().await?,
491        };
492
493        Ok(dependencies)
494    }
495
496    /// Create dependencies for database command
497    pub async fn create_database_command_container(
498        &mut self,
499    ) -> Result<DatabaseCommandDependenciesContainer> {
500        let main_db_connection = self
501            .get_sqlite_connection()
502            .await
503            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
504
505        self.get_event_store_sqlite_connection()
506            .await
507            .with_context(|| "Dependencies Builder can not get event store sqlite connection")?;
508
509        self.get_sqlite_connection_cardano_transaction_pool()
510            .await
511            .with_context(
512                || "Dependencies Builder can not get cardano transaction pool sqlite connection",
513            )?;
514
515        let dependencies = DatabaseCommandDependenciesContainer { main_db_connection };
516
517        Ok(dependencies)
518    }
519
520    /// Create dependencies for tools command
521    pub async fn create_tools_command_container(
522        &mut self,
523    ) -> Result<ToolsCommandDependenciesContainer> {
524        let db_connection = self
525            .get_sqlite_connection()
526            .await
527            .with_context(|| "Dependencies Builder can not get sqlite connection")?;
528
529        let dependencies = ToolsCommandDependenciesContainer { db_connection };
530
531        Ok(dependencies)
532    }
533
534    /// Look for discrepancies in stored data and fix them.
535    ///
536    /// Fix discrepancies for:
537    /// - epoch settings: ensure that the network configuration parameters for the three working epochs
538    ///   window are stored in the database (see [mithril_protocol_config::model::MithrilNetworkConfiguration])
539    pub async fn handle_discrepancies_at_startup(&mut self) -> Result<()> {
540        let logger = self.root_logger();
541        let current_epoch = self
542            .get_chain_observer()
543            .await?
544            .get_current_epoch()
545            .await
546            .map_err(|e| DependenciesBuilderError::Initialization {
547                message: "cannot handle startup discrepancies: failed to retrieve current epoch."
548                    .to_string(),
549                error: Some(e.into()),
550            })?
551            .ok_or(DependenciesBuilderError::Initialization {
552                message: "cannot handle startup discrepancies: no epoch returned.".to_string(),
553                error: None,
554            })?;
555        let network_configuration = self
556            .get_mithril_network_configuration_provider()
557            .await?
558            .get_network_configuration(current_epoch)
559            .await
560            .map_err(|e| DependenciesBuilderError::Initialization {
561                message: format!("cannot handle startup discrepancies: failed to retrieve network configuration for epoch {current_epoch}"),
562                error: Some(e),
563            })?;
564        let epoch_settings_store = self.get_epoch_settings_store().await?;
565
566        debug!(
567            logger,
568            "Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {current_epoch}";
569            "network_configuration" => ?network_configuration,
570        );
571        epoch_settings_store
572            .handle_discrepancies_at_startup(&network_configuration)
573            .await
574            .map_err(|e| DependenciesBuilderError::Initialization {
575                message: "can not create aggregator runner".to_string(),
576                error: Some(e),
577            })?;
578        Ok(())
579    }
580
581    /// Remove the dependencies builder from memory to release Arc instances.
582    pub async fn vanish(self) {
583        self.drop_sqlite_connections().await;
584    }
585}
586
587#[cfg(test)]
588impl DependenciesBuilder {
589    pub(crate) fn new_with_stdout_logger(configuration: Arc<dyn ConfigurationSource>) -> Self {
590        Self::new(crate::test::TestLogger::stdout(), configuration)
591    }
592}