mithril_signer/dependency_injection/
builder.rs

1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, anyhow};
6use slog::Logger;
7use tokio::sync::{Mutex, RwLock};
8
9use mithril_aggregator_client::AggregatorHttpClient;
10use mithril_cardano_node_chain::{
11    chain_observer::{CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType},
12    chain_reader::PallasChainReader,
13    chain_scanner::CardanoBlockScanner,
14};
15use mithril_cardano_node_internal_database::{
16    ImmutableFileObserver, ImmutableFileSystemObserver,
17    digesters::CardanoImmutableDigester,
18    digesters::cache::{
19        ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
20    },
21    signable_builder::{CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder},
22};
23use mithril_common::StdResult;
24use mithril_common::api_version::APIVersionProvider;
25use mithril_common::crypto_helper::{
26    KesSigner, KesSignerStandard, OpCert, ProtocolPartyId, SerDeShelleyFileFormat,
27};
28#[cfg(feature = "future_dmq")]
29use mithril_common::messages::RegisterSignatureMessageDmq;
30use mithril_common::signable_builder::{
31    CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder,
32    MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
33    SignableBuilderServiceDependencies,
34};
35
36use mithril_era::{EraChecker, EraReader};
37use mithril_signed_entity_lock::SignedEntityTypeLock;
38use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
39use mithril_ticker::{MithrilTickerService, TickerService};
40
41use mithril_persistence::database::repository::CardanoTransactionRepository;
42use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
43use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool};
44
45use mithril_protocol_config::http::HttpMithrilNetworkConfigurationProvider;
46
47#[cfg(feature = "future_dmq")]
48use mithril_dmq::{DmqMessageBuilder, DmqPublisherClientPallas};
49
50use crate::dependency_injection::SignerDependencyContainer;
51#[cfg(feature = "future_dmq")]
52use crate::services::SignaturePublisherDmq;
53use crate::services::{
54    CardanoTransactionsImporter, CardanoTransactionsPreloaderActivationSigner, MithrilEpochService,
55    MithrilSingleSigner, SignaturePublishRetryPolicy, SignaturePublisherDelayer,
56    SignaturePublisherNoop, SignaturePublisherRetrier, SignerCertifierService,
57    SignerSignableSeedBuilder, SignerSignedEntityConfigProvider, SignerUpkeepService,
58    TransactionsImporterByChunk, TransactionsImporterWithPruner, TransactionsImporterWithVacuum,
59};
60use crate::store::MKTreeStoreSqlite;
61use crate::{
62    Configuration, HTTP_REQUEST_TIMEOUT_DURATION, MetricsService, SQLITE_FILE,
63    SQLITE_FILE_CARDANO_TRANSACTION,
64};
65use crate::{
66    database::repository::{ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore},
67    services::SignaturePublisher,
68};
69
70/// The `DependenciesBuilder` is intended to manage Services instance creation.
71///
72/// The goal of this is to put all this code out of the way of business code.
73pub struct DependenciesBuilder<'a> {
74    config: &'a Configuration,
75    chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
76    immutable_file_observer_builder:
77        fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
78    root_logger: Logger,
79}
80
81impl<'a> DependenciesBuilder<'a> {
82    /// Create a new `DependenciesBuilder`.
83    pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
84        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
85            |config: &Configuration| {
86                let chain_observer_type = ChainObserverType::Pallas;
87                let cardano_cli_path = &config.cardano_cli_path;
88                let cardano_node_socket_path = &config.cardano_node_socket_path;
89                let cardano_network = &config.get_network().with_context(|| {
90                    "Dependencies Builder can not get Cardano network while building the chain observer"
91                })?;
92                let cardano_cli_runner = &CardanoCliRunner::new(
93                    cardano_cli_path.to_owned(),
94                    cardano_node_socket_path.to_owned(),
95                    cardano_network.to_owned(),
96                );
97
98                let chain_observer_builder = ChainObserverBuilder::new(
99                    &chain_observer_type,
100                    cardano_node_socket_path,
101                    cardano_network,
102                    Some(cardano_cli_runner),
103                );
104
105                chain_observer_builder
106                    .build()
107                    .with_context(|| "Dependencies Builder can not build chain observer")
108            };
109
110        let immutable_file_observer_builder: fn(
111            &Configuration,
112        )
113            -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
114            Ok(Arc::new(ImmutableFileSystemObserver::new(
115                &config.db_directory,
116            )))
117        };
118
119        Self {
120            config,
121            chain_observer_builder,
122            immutable_file_observer_builder,
123            root_logger,
124        }
125    }
126
127    /// Override immutable file observer builder.
128    pub fn override_immutable_file_observer_builder(
129        &mut self,
130        builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
131    ) -> &mut Self {
132        self.immutable_file_observer_builder = builder;
133
134        self
135    }
136
137    /// Return a copy of the root logger.
138    pub fn root_logger(&self) -> Logger {
139        self.root_logger.clone()
140    }
141
142    /// Override default chain observer builder.
143    pub fn override_chain_observer_builder(
144        &mut self,
145        builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
146    ) -> &mut Self {
147        self.chain_observer_builder = builder;
148
149        self
150    }
151
152    /// Compute protocol party id
153    fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
154        match &self.config.operational_certificate_path {
155            Some(operational_certificate_path) => {
156                let opcert: OpCert = OpCert::from_file(operational_certificate_path)
157                    .with_context(|| "Could not decode operational certificate")?;
158                Ok(opcert
159                    .compute_protocol_party_id()
160                    .with_context(|| "Could not compute party_id from operational certificate")?)
161            }
162            _ => Ok(self
163                .config
164                .party_id
165                .to_owned()
166                .ok_or(anyhow!("A party_id should at least be provided"))?),
167        }
168    }
169
170    async fn build_digester_cache_provider(
171        &self,
172    ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
173        if self.config.disable_digests_cache {
174            return Ok(None);
175        }
176
177        let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
178            &self.config.data_stores_directory,
179            &format!("immutables_digests_{}.json", self.config.network),
180        )
181        .should_reset_digests_cache(self.config.reset_digests_cache)
182        .with_logger(self.root_logger())
183        .build()
184        .await?;
185
186        Ok(Some(Arc::new(cache_provider)))
187    }
188
189    /// Build a SQLite connection.
190    pub async fn build_sqlite_connection(
191        &self,
192        sqlite_file_name: &str,
193        migrations: Vec<SqlMigration>,
194    ) -> StdResult<SqliteConnection> {
195        let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
196        let connection = ConnectionBuilder::open_file(&sqlite_db_path)
197            .with_node_type(ApplicationNodeType::Signer)
198            .with_migrations(migrations)
199            .with_logger(self.root_logger())
200            .build()
201            .with_context(|| "Database connection initialisation error")?;
202
203        Ok(connection)
204    }
205
206    /// Build dependencies for the Production environment.
207    pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
208        if !self.config.data_stores_directory.exists() {
209            fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
210                format!(
211                    "Could not create data stores directory: `{}`",
212                    self.config.data_stores_directory.display()
213                )
214            })?;
215        }
216
217        let network = self.config.get_network()?;
218        let sqlite_connection = Arc::new(
219            self.build_sqlite_connection(SQLITE_FILE, crate::database::migration::get_migrations())
220                .await?,
221        );
222        let transaction_sqlite_connection = self
223            .build_sqlite_connection(
224                SQLITE_FILE_CARDANO_TRANSACTION,
225                mithril_persistence::database::cardano_transaction_migration::get_migrations(),
226            )
227            .await?;
228        let sqlite_connection_cardano_transaction_pool = Arc::new(
229            SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
230        );
231
232        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
233
234        let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
235            sqlite_connection.clone(),
236            self.config.store_retention_limit.map(|limit| limit as u64),
237        ));
238
239        let digester = Arc::new(CardanoImmutableDigester::new(
240            network.to_string(),
241            self.build_digester_cache_provider().await?,
242            self.root_logger(),
243        ));
244        let stake_store = Arc::new(StakePoolStore::new(
245            sqlite_connection.clone(),
246            self.config.store_retention_limit.map(|limit| limit as u64),
247        ));
248        let chain_observer = {
249            let builder = self.chain_observer_builder;
250            builder(self.config)?
251        };
252        let ticker_service = {
253            let builder = self.immutable_file_observer_builder;
254            Arc::new(MithrilTickerService::new(
255                chain_observer.clone(),
256                builder(self.config)?,
257            ))
258        };
259
260        let era_reader = Arc::new(EraReader::new(
261            self.config.build_era_reader_adapter(chain_observer.clone())?,
262        ));
263        let era_epoch_token = era_reader
264            .read_era_epoch_token(ticker_service.get_current_epoch().await?)
265            .await?;
266        let era_checker = Arc::new(EraChecker::new(
267            era_epoch_token.get_current_supported_era()?,
268            era_epoch_token.get_current_epoch(),
269        ));
270
271        let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
272        let aggregator_client = Arc::new(
273            AggregatorHttpClient::builder(self.config.aggregator_endpoint.clone())
274                .with_relay_endpoint(self.config.relay_endpoint.clone())
275                .with_api_version_provider(api_version_provider.clone())
276                .with_timeout(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION))
277                .with_logger(self.root_logger())
278                .build()?,
279        );
280
281        let cardano_immutable_snapshot_builder =
282            Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
283                digester.clone(),
284                &self.config.db_directory,
285                self.root_logger(),
286            ));
287        let mithril_stake_distribution_signable_builder =
288            Arc::new(MithrilStakeDistributionSignableBuilder::default());
289        let transaction_store = Arc::new(CardanoTransactionRepository::new(
290            sqlite_connection_cardano_transaction_pool.clone(),
291        ));
292        let chain_block_reader = PallasChainReader::new(
293            &self.config.cardano_node_socket_path,
294            network,
295            self.root_logger(),
296        );
297        let block_scanner = Arc::new(CardanoBlockScanner::new(
298            Arc::new(Mutex::new(chain_block_reader)),
299            self.config
300                .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
301            self.root_logger(),
302        ));
303        let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
304            block_scanner,
305            transaction_store.clone(),
306            self.root_logger(),
307        ));
308        // Wrap the transaction importer with decorator to prune the transactions after import
309        let transactions_importer = Arc::new(TransactionsImporterWithPruner::new(
310            self.config
311                .enable_transaction_pruning
312                .then_some(self.config.network_security_parameter),
313            transaction_store.clone(),
314            transactions_importer,
315            self.root_logger(),
316        ));
317        // Wrap the transaction importer with decorator to chunk its workload, so it prunes
318        // transactions after each chunk, reducing the storage footprint
319        let state_machine_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
320            transaction_store.clone(),
321            transactions_importer.clone(),
322            self.config.transactions_import_block_chunk_size,
323            self.root_logger(),
324        ));
325        // For the preloader, we want to vacuum the database after each chunk, to reclaim disk space
326        // earlier than with just auto_vacuum (that execute only after the end of all import).
327        let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
328            transaction_store.clone(),
329            Arc::new(TransactionsImporterWithVacuum::new(
330                sqlite_connection_cardano_transaction_pool.clone(),
331                transactions_importer.clone(),
332                self.root_logger(),
333            )),
334            self.config.transactions_import_block_chunk_size,
335            self.root_logger(),
336        ));
337        let block_range_root_retriever = transaction_store.clone();
338        let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
339            MKTreeStoreSqlite,
340        >::new(
341            state_machine_transactions_importer,
342            block_range_root_retriever,
343        ));
344        let cardano_stake_distribution_signable_builder = Arc::new(
345            CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
346        );
347        let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
348            digester.clone(),
349            &self.config.db_directory,
350            self.root_logger(),
351        ));
352        let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
353            stake_store.clone(),
354            protocol_initializer_store.clone(),
355            self.root_logger(),
356        )));
357        let single_signer = Arc::new(MithrilSingleSigner::new(
358            self.compute_protocol_party_id()?,
359            epoch_service.clone(),
360            self.root_logger(),
361        ));
362        let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
363            epoch_service.clone(),
364            protocol_initializer_store.clone(),
365        ));
366        let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
367            mithril_stake_distribution_signable_builder,
368            cardano_immutable_snapshot_builder,
369            cardano_transactions_builder,
370            cardano_stake_distribution_signable_builder,
371            cardano_database_signable_builder,
372        );
373        let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
374            signable_seed_builder_service,
375            signable_builders_dependencies,
376            self.root_logger(),
377        ));
378        let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
379        let network_configuration_service = Arc::new(HttpMithrilNetworkConfigurationProvider::new(
380            aggregator_client.clone(),
381        ));
382        let preloader_activation = CardanoTransactionsPreloaderActivationSigner::new(
383            network_configuration_service.clone(),
384            ticker_service.clone(),
385        );
386        let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
387            signed_entity_type_lock.clone(),
388            preloader_transactions_importer,
389            self.config.preload_security_parameter,
390            chain_observer.clone(),
391            self.root_logger(),
392            Arc::new(preloader_activation),
393        ));
394        let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
395            sqlite_connection.clone(),
396            self.config.store_retention_limit.map(|limit| limit as u64),
397        ));
398        let upkeep_service = Arc::new(SignerUpkeepService::new(
399            sqlite_connection.clone(),
400            sqlite_connection_cardano_transaction_pool,
401            signed_entity_type_lock.clone(),
402            vec![
403                signed_beacon_repository.clone(),
404                stake_store.clone(),
405                protocol_initializer_store.clone(),
406            ],
407            self.root_logger(),
408        ));
409
410        let kes_signer = match (
411            &self.config.kes_secret_key_path,
412            &self.config.operational_certificate_path,
413        ) {
414            (Some(kes_secret_key_path), Some(operational_certificate_path)) => {
415                Some(Arc::new(KesSignerStandard::new(
416                    kes_secret_key_path.clone(),
417                    operational_certificate_path.clone(),
418                )) as Arc<dyn KesSigner>)
419            }
420            (Some(_), None) | (None, Some(_)) => {
421                return Err(anyhow!(
422                    "kes_secret_key and operational_certificate are both mandatory".to_string(),
423                ));
424            }
425            _ => None,
426        };
427
428        #[cfg(feature = "future_dmq")]
429        let signature_publisher = {
430            let first_publisher = SignaturePublisherRetrier::new(
431                match &self.config.dmq_node_socket_path {
432                    Some(dmq_node_socket_path) => {
433                        let dmq_network = &self.config.get_dmq_network()?;
434                        let dmq_message_builder = DmqMessageBuilder::new(
435                            kes_signer
436                                .clone()
437                                .ok_or(anyhow!("A KES signer is mandatory to sign DMQ messages"))?,
438                            chain_observer.clone(),
439                        );
440                        Arc::new(SignaturePublisherDmq::new(Arc::new(
441                            DmqPublisherClientPallas::<RegisterSignatureMessageDmq>::new(
442                                dmq_node_socket_path.to_owned(),
443                                *dmq_network,
444                                dmq_message_builder,
445                                self.root_logger(),
446                            ),
447                        ))) as Arc<dyn SignaturePublisher>
448                    }
449                    _ => Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
450                },
451                SignaturePublishRetryPolicy::never(),
452            );
453
454            let second_publisher = SignaturePublisherRetrier::new(
455                aggregator_client.clone(),
456                SignaturePublishRetryPolicy {
457                    attempts: self.config.signature_publisher_config.retry_attempts,
458                    delay_between_attempts: Duration::from_millis(
459                        self.config.signature_publisher_config.retry_delay_ms,
460                    ),
461                },
462            );
463
464            if self.config.signature_publisher_config.skip_delayer {
465                Arc::new(first_publisher) as Arc<dyn SignaturePublisher>
466            } else {
467                Arc::new(SignaturePublisherDelayer::new(
468                    Arc::new(first_publisher),
469                    Arc::new(second_publisher),
470                    Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
471                    self.root_logger(),
472                )) as Arc<dyn SignaturePublisher>
473            }
474        };
475
476        #[cfg(not(feature = "future_dmq"))]
477        let signature_publisher = {
478            let first_publisher = SignaturePublisherRetrier::new(
479                Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
480                SignaturePublishRetryPolicy::never(),
481            );
482
483            let second_publisher = SignaturePublisherRetrier::new(
484                aggregator_client.clone(),
485                SignaturePublishRetryPolicy {
486                    attempts: self.config.signature_publisher_config.retry_attempts,
487                    delay_between_attempts: Duration::from_millis(
488                        self.config.signature_publisher_config.retry_delay_ms,
489                    ),
490                },
491            );
492
493            if self.config.signature_publisher_config.skip_delayer {
494                Arc::new(second_publisher) as Arc<dyn SignaturePublisher>
495            } else {
496                Arc::new(SignaturePublisherDelayer::new(
497                    Arc::new(first_publisher),
498                    Arc::new(second_publisher),
499                    Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
500                    self.root_logger(),
501                )) as Arc<dyn SignaturePublisher>
502            }
503        };
504
505        let certifier = Arc::new(SignerCertifierService::new(
506            signed_beacon_repository,
507            Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
508            signed_entity_type_lock.clone(),
509            single_signer.clone(),
510            signature_publisher,
511            self.root_logger(),
512        ));
513
514        let services = SignerDependencyContainer {
515            ticker_service,
516            certificate_handler: aggregator_client,
517            chain_observer,
518            digester,
519            single_signer,
520            stake_store,
521            protocol_initializer_store,
522            era_checker,
523            era_reader,
524            api_version_provider,
525            signable_builder_service,
526            metrics_service,
527            signed_entity_type_lock,
528            cardano_transactions_preloader,
529            upkeep_service,
530            epoch_service,
531            certifier,
532            kes_signer,
533            network_configuration_service,
534        };
535
536        Ok(services)
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use std::path::PathBuf;
543
544    use mithril_cardano_node_chain::test::double::FakeChainObserver;
545    use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
546    use mithril_common::test::double::Dummy;
547    use mithril_common::{entities::TimePoint, test::TempDir};
548
549    use crate::test_tools::TestLogger;
550
551    use super::*;
552
553    fn get_test_dir(test_name: &str) -> PathBuf {
554        TempDir::create("signer_service", test_name)
555    }
556
557    #[tokio::test]
558    async fn test_auto_create_stores_directory() {
559        let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
560        let config = Configuration {
561            data_stores_directory: stores_dir.clone(),
562            ..Configuration::new_sample("party-123456")
563        };
564
565        assert!(!stores_dir.exists());
566        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
567            |_config| Ok(Arc::new(FakeChainObserver::new(Some(TimePoint::dummy()))));
568        let immutable_file_observer_builder: fn(
569            &Configuration,
570        )
571            -> StdResult<Arc<dyn ImmutableFileObserver>> =
572            |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
573
574        let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
575        dependencies_builder
576            .override_chain_observer_builder(chain_observer_builder)
577            .override_immutable_file_observer_builder(immutable_file_observer_builder)
578            .build()
579            .await
580            .expect("service builder build should not fail");
581        assert!(stores_dir.exists());
582    }
583}