mithril_signer/dependency_injection/
builder.rs

1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, anyhow};
6use slog::Logger;
7use tokio::sync::{Mutex, RwLock};
8
9use mithril_aggregator_client::AggregatorHttpClient;
10use mithril_cardano_node_chain::{
11    chain_observer::{CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType},
12    chain_reader::PallasChainReader,
13    chain_scanner::CardanoBlockScanner,
14};
15use mithril_cardano_node_internal_database::{
16    ImmutableFileObserver, ImmutableFileSystemObserver,
17    digesters::CardanoImmutableDigester,
18    digesters::cache::{
19        ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
20    },
21    signable_builder::{CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder},
22};
23use mithril_common::StdResult;
24use mithril_common::api_version::APIVersionProvider;
25use mithril_common::crypto_helper::{
26    KesSigner, KesSignerStandard, OpCert, ProtocolPartyId, SerDeShelleyFileFormat,
27};
28use mithril_common::messages::RegisterSignatureMessageDmq;
29use mithril_common::signable_builder::{
30    CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder,
31    MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
32    SignableBuilderServiceDependencies,
33};
34
35use mithril_era::{EraChecker, EraReader};
36use mithril_signed_entity_lock::SignedEntityTypeLock;
37use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
38use mithril_ticker::{MithrilTickerService, TickerService};
39
40use mithril_persistence::database::repository::CardanoTransactionRepository;
41use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
42use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool};
43
44use mithril_protocol_config::http::HttpMithrilNetworkConfigurationProvider;
45
46use mithril_dmq::{DmqMessageBuilder, DmqPublisherClientPallas};
47
48use crate::dependency_injection::SignerDependencyContainer;
49use crate::services::SignaturePublisherDmq;
50use crate::services::{
51    CardanoTransactionsImporter, CardanoTransactionsPreloaderActivationSigner, MithrilEpochService,
52    MithrilSingleSigner, SignaturePublishRetryPolicy, SignaturePublisherDelayer,
53    SignaturePublisherNoop, SignaturePublisherRetrier, SignerCertifierService,
54    SignerSignableSeedBuilder, SignerSignedEntityConfigProvider, SignerUpkeepService,
55    TransactionsImporterByChunk, TransactionsImporterWithPruner, TransactionsImporterWithVacuum,
56};
57use crate::store::MKTreeStoreSqlite;
58use crate::{
59    Configuration, HTTP_REQUEST_TIMEOUT_DURATION, MetricsService, SQLITE_FILE,
60    SQLITE_FILE_CARDANO_TRANSACTION,
61};
62use crate::{
63    database::repository::{ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore},
64    services::SignaturePublisher,
65};
66
67/// The `DependenciesBuilder` is intended to manage Services instance creation.
68///
69/// The goal of this is to put all this code out of the way of business code.
70pub struct DependenciesBuilder<'a> {
71    config: &'a Configuration,
72    chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
73    immutable_file_observer_builder:
74        fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
75    root_logger: Logger,
76}
77
78impl<'a> DependenciesBuilder<'a> {
79    /// Create a new `DependenciesBuilder`.
80    pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
81        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
82            |config: &Configuration| {
83                let chain_observer_type = ChainObserverType::Pallas;
84                let cardano_cli_path = &config.cardano_cli_path;
85                let cardano_node_socket_path = &config.cardano_node_socket_path;
86                let cardano_network = &config.get_network().with_context(|| {
87                    "Dependencies Builder can not get Cardano network while building the chain observer"
88                })?;
89                let cardano_cli_runner = &CardanoCliRunner::new(
90                    cardano_cli_path.to_owned(),
91                    cardano_node_socket_path.to_owned(),
92                    cardano_network.to_owned(),
93                );
94
95                let chain_observer_builder = ChainObserverBuilder::new(
96                    &chain_observer_type,
97                    cardano_node_socket_path,
98                    cardano_network,
99                    Some(cardano_cli_runner),
100                );
101
102                chain_observer_builder
103                    .build()
104                    .with_context(|| "Dependencies Builder can not build chain observer")
105            };
106
107        let immutable_file_observer_builder: fn(
108            &Configuration,
109        )
110            -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
111            Ok(Arc::new(ImmutableFileSystemObserver::new(
112                &config.db_directory,
113            )))
114        };
115
116        Self {
117            config,
118            chain_observer_builder,
119            immutable_file_observer_builder,
120            root_logger,
121        }
122    }
123
124    /// Override immutable file observer builder.
125    pub fn override_immutable_file_observer_builder(
126        &mut self,
127        builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
128    ) -> &mut Self {
129        self.immutable_file_observer_builder = builder;
130
131        self
132    }
133
134    /// Return a copy of the root logger.
135    pub fn root_logger(&self) -> Logger {
136        self.root_logger.clone()
137    }
138
139    /// Override default chain observer builder.
140    pub fn override_chain_observer_builder(
141        &mut self,
142        builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
143    ) -> &mut Self {
144        self.chain_observer_builder = builder;
145
146        self
147    }
148
149    /// Compute protocol party id
150    fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
151        match &self.config.operational_certificate_path {
152            Some(operational_certificate_path) => {
153                let opcert: OpCert = OpCert::from_file(operational_certificate_path)
154                    .with_context(|| "Could not decode operational certificate")?;
155                Ok(opcert
156                    .compute_protocol_party_id()
157                    .with_context(|| "Could not compute party_id from operational certificate")?)
158            }
159            _ => Ok(self
160                .config
161                .party_id
162                .to_owned()
163                .with_context(|| "A party_id should at least be provided")?),
164        }
165    }
166
167    async fn build_digester_cache_provider(
168        &self,
169    ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
170        if self.config.disable_digests_cache {
171            return Ok(None);
172        }
173
174        let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
175            &self.config.data_stores_directory,
176            &format!("immutables_digests_{}.json", self.config.network),
177        )
178        .should_reset_digests_cache(self.config.reset_digests_cache)
179        .with_logger(self.root_logger())
180        .build()
181        .await?;
182
183        Ok(Some(Arc::new(cache_provider)))
184    }
185
186    /// Build a SQLite connection.
187    pub async fn build_sqlite_connection(
188        &self,
189        sqlite_file_name: &str,
190        migrations: Vec<SqlMigration>,
191    ) -> StdResult<SqliteConnection> {
192        let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
193        let connection = ConnectionBuilder::open_file(&sqlite_db_path)
194            .with_node_type(ApplicationNodeType::Signer)
195            .with_migrations(migrations)
196            .with_logger(self.root_logger())
197            .build()
198            .with_context(|| "Database connection initialisation error")?;
199
200        Ok(connection)
201    }
202
203    /// Build dependencies for the Production environment.
204    pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
205        if !self.config.data_stores_directory.exists() {
206            fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
207                format!(
208                    "Could not create data stores directory: `{}`",
209                    self.config.data_stores_directory.display()
210                )
211            })?;
212        }
213
214        let network = self.config.get_network()?;
215        let sqlite_connection = Arc::new(
216            self.build_sqlite_connection(SQLITE_FILE, crate::database::migration::get_migrations())
217                .await?,
218        );
219        let transaction_sqlite_connection = self
220            .build_sqlite_connection(
221                SQLITE_FILE_CARDANO_TRANSACTION,
222                mithril_persistence::database::cardano_transaction_migration::get_migrations(),
223            )
224            .await?;
225        let sqlite_connection_cardano_transaction_pool = Arc::new(
226            SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
227        );
228
229        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
230
231        let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
232            sqlite_connection.clone(),
233            self.config.store_retention_limit.map(|limit| limit as u64),
234        ));
235
236        let digester = Arc::new(CardanoImmutableDigester::new(
237            network.to_string(),
238            self.build_digester_cache_provider().await?,
239            self.root_logger(),
240        ));
241        let stake_store = Arc::new(StakePoolStore::new(
242            sqlite_connection.clone(),
243            self.config.store_retention_limit.map(|limit| limit as u64),
244        ));
245        let chain_observer = {
246            let builder = self.chain_observer_builder;
247            builder(self.config)?
248        };
249        let ticker_service = {
250            let builder = self.immutable_file_observer_builder;
251            Arc::new(MithrilTickerService::new(
252                chain_observer.clone(),
253                builder(self.config)?,
254            ))
255        };
256
257        let era_reader = Arc::new(EraReader::new(
258            self.config.build_era_reader_adapter(chain_observer.clone())?,
259        ));
260        let era_epoch_token = era_reader
261            .read_era_epoch_token(ticker_service.get_current_epoch().await?)
262            .await?;
263        let era_checker = Arc::new(EraChecker::new(
264            era_epoch_token.get_current_supported_era()?,
265            era_epoch_token.get_current_epoch(),
266        ));
267
268        let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
269        let aggregator_client = Arc::new(
270            AggregatorHttpClient::builder(self.config.aggregator_endpoint.clone())
271                .with_relay_endpoint(self.config.relay_endpoint.clone())
272                .with_api_version_provider(api_version_provider.clone())
273                .with_timeout(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION))
274                .with_logger(self.root_logger())
275                .build()?,
276        );
277
278        let cardano_immutable_snapshot_builder =
279            Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
280                digester.clone(),
281                &self.config.db_directory,
282                self.root_logger(),
283            ));
284        let mithril_stake_distribution_signable_builder =
285            Arc::new(MithrilStakeDistributionSignableBuilder::default());
286        let transaction_store = Arc::new(CardanoTransactionRepository::new(
287            sqlite_connection_cardano_transaction_pool.clone(),
288        ));
289        let chain_block_reader = PallasChainReader::new(
290            &self.config.cardano_node_socket_path,
291            network,
292            self.root_logger(),
293        );
294        let block_scanner = Arc::new(CardanoBlockScanner::new(
295            Arc::new(Mutex::new(chain_block_reader)),
296            self.config
297                .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
298            self.root_logger(),
299        ));
300        let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
301            block_scanner,
302            transaction_store.clone(),
303            self.root_logger(),
304        ));
305        // Wrap the transaction importer with decorator to prune the transactions after import
306        let transactions_importer = Arc::new(TransactionsImporterWithPruner::new(
307            self.config
308                .enable_transaction_pruning
309                .then_some(self.config.network_security_parameter),
310            transaction_store.clone(),
311            transactions_importer,
312            self.root_logger(),
313        ));
314        // Wrap the transaction importer with decorator to chunk its workload, so it prunes
315        // transactions after each chunk, reducing the storage footprint
316        let state_machine_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
317            transaction_store.clone(),
318            transactions_importer.clone(),
319            self.config.transactions_import_block_chunk_size,
320            self.root_logger(),
321        ));
322        // For the preloader, we want to vacuum the database after each chunk, to reclaim disk space
323        // earlier than with just auto_vacuum (that execute only after the end of all import).
324        let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
325            transaction_store.clone(),
326            Arc::new(TransactionsImporterWithVacuum::new(
327                sqlite_connection_cardano_transaction_pool.clone(),
328                transactions_importer.clone(),
329                self.root_logger(),
330            )),
331            self.config.transactions_import_block_chunk_size,
332            self.root_logger(),
333        ));
334        let block_range_root_retriever = transaction_store.clone();
335        let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
336            MKTreeStoreSqlite,
337        >::new(
338            state_machine_transactions_importer,
339            block_range_root_retriever,
340        ));
341        let cardano_stake_distribution_signable_builder = Arc::new(
342            CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
343        );
344        let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
345            digester.clone(),
346            &self.config.db_directory,
347            self.root_logger(),
348        ));
349        let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
350            stake_store.clone(),
351            protocol_initializer_store.clone(),
352            self.root_logger(),
353        )));
354        let single_signer = Arc::new(MithrilSingleSigner::new(
355            self.compute_protocol_party_id()?,
356            epoch_service.clone(),
357            self.root_logger(),
358        ));
359        let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
360            epoch_service.clone(),
361            protocol_initializer_store.clone(),
362        ));
363        let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
364            mithril_stake_distribution_signable_builder,
365            cardano_immutable_snapshot_builder,
366            cardano_transactions_builder,
367            cardano_stake_distribution_signable_builder,
368            cardano_database_signable_builder,
369        );
370        let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
371            signable_seed_builder_service,
372            signable_builders_dependencies,
373            self.root_logger(),
374        ));
375        let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
376        let network_configuration_service = Arc::new(HttpMithrilNetworkConfigurationProvider::new(
377            aggregator_client.clone(),
378        ));
379        let preloader_activation = CardanoTransactionsPreloaderActivationSigner::new(
380            network_configuration_service.clone(),
381            ticker_service.clone(),
382        );
383        let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
384            signed_entity_type_lock.clone(),
385            preloader_transactions_importer,
386            self.config.preload_security_parameter,
387            chain_observer.clone(),
388            self.root_logger(),
389            Arc::new(preloader_activation),
390        ));
391        let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
392            sqlite_connection.clone(),
393            self.config.store_retention_limit.map(|limit| limit as u64),
394        ));
395        let upkeep_service = Arc::new(SignerUpkeepService::new(
396            sqlite_connection.clone(),
397            sqlite_connection_cardano_transaction_pool,
398            signed_entity_type_lock.clone(),
399            vec![
400                signed_beacon_repository.clone(),
401                stake_store.clone(),
402                protocol_initializer_store.clone(),
403            ],
404            self.root_logger(),
405        ));
406
407        let kes_signer = match (
408            &self.config.kes_secret_key_path,
409            &self.config.operational_certificate_path,
410        ) {
411            (Some(kes_secret_key_path), Some(operational_certificate_path)) => {
412                Some(Arc::new(KesSignerStandard::new(
413                    kes_secret_key_path.clone(),
414                    operational_certificate_path.clone(),
415                )) as Arc<dyn KesSigner>)
416            }
417            (Some(_), None) | (None, Some(_)) => {
418                return Err(anyhow!(
419                    "kes_secret_key and operational_certificate are both mandatory".to_string(),
420                ));
421            }
422            _ => None,
423        };
424
425        let signature_publisher = {
426            let first_publisher = SignaturePublisherRetrier::new(
427                match &self.config.dmq_node_socket_path {
428                    Some(dmq_node_socket_path) => {
429                        let dmq_network = &self.config.get_dmq_network()?;
430                        let dmq_message_builder = DmqMessageBuilder::new(
431                            kes_signer.clone().with_context(
432                                || "A KES signer is mandatory to sign DMQ messages",
433                            )?,
434                            chain_observer.clone(),
435                        );
436                        Arc::new(SignaturePublisherDmq::new(Arc::new(
437                            DmqPublisherClientPallas::<RegisterSignatureMessageDmq>::new(
438                                dmq_node_socket_path.to_owned(),
439                                *dmq_network,
440                                dmq_message_builder,
441                                self.root_logger(),
442                            ),
443                        ))) as Arc<dyn SignaturePublisher>
444                    }
445                    _ => Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
446                },
447                SignaturePublishRetryPolicy::never(),
448            );
449
450            let second_publisher = SignaturePublisherRetrier::new(
451                aggregator_client.clone(),
452                SignaturePublishRetryPolicy {
453                    attempts: self.config.signature_publisher_config.retry_attempts,
454                    delay_between_attempts: Duration::from_millis(
455                        self.config.signature_publisher_config.retry_delay_ms,
456                    ),
457                },
458            );
459
460            if self.config.signature_publisher_config.skip_delayer {
461                Arc::new(first_publisher) as Arc<dyn SignaturePublisher>
462            } else {
463                Arc::new(SignaturePublisherDelayer::new(
464                    Arc::new(first_publisher),
465                    Arc::new(second_publisher),
466                    Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
467                    self.root_logger(),
468                )) as Arc<dyn SignaturePublisher>
469            }
470        };
471
472        let certifier = Arc::new(SignerCertifierService::new(
473            signed_beacon_repository,
474            Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
475            signed_entity_type_lock.clone(),
476            single_signer.clone(),
477            signature_publisher,
478            self.root_logger(),
479        ));
480
481        let services = SignerDependencyContainer {
482            ticker_service,
483            chain_observer,
484            digester,
485            single_signer,
486            stake_store,
487            protocol_initializer_store,
488            era_checker,
489            era_reader,
490            api_version_provider,
491            signable_builder_service,
492            metrics_service,
493            signed_entity_type_lock,
494            cardano_transactions_preloader,
495            upkeep_service,
496            epoch_service,
497            certifier,
498            signer_registration_publisher: aggregator_client.clone(),
499            signers_registration_retriever: aggregator_client,
500            kes_signer,
501            network_configuration_service,
502        };
503
504        Ok(services)
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use std::path::PathBuf;
511
512    use mithril_cardano_node_chain::test::double::FakeChainObserver;
513    use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
514    use mithril_common::test::double::Dummy;
515    use mithril_common::{entities::TimePoint, test::TempDir};
516
517    use crate::test_tools::TestLogger;
518
519    use super::*;
520
521    fn get_test_dir(test_name: &str) -> PathBuf {
522        TempDir::create("signer_service", test_name)
523    }
524
525    #[tokio::test]
526    async fn test_auto_create_stores_directory() {
527        let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
528        let config = Configuration {
529            data_stores_directory: stores_dir.clone(),
530            ..Configuration::new_sample("party-123456")
531        };
532
533        assert!(!stores_dir.exists());
534        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
535            |_config| Ok(Arc::new(FakeChainObserver::new(Some(TimePoint::dummy()))));
536        let immutable_file_observer_builder: fn(
537            &Configuration,
538        )
539            -> StdResult<Arc<dyn ImmutableFileObserver>> =
540            |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
541
542        let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
543        dependencies_builder
544            .override_chain_observer_builder(chain_observer_builder)
545            .override_immutable_file_observer_builder(immutable_file_observer_builder)
546            .build()
547            .await
548            .expect("service builder build should not fail");
549        assert!(stores_dir.exists());
550    }
551}