mithril_signer/dependency_injection/
builder.rs

1use std::collections::HashMap;
2use std::fs;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, anyhow};
7use slog::Logger;
8use tokio::sync::{Mutex, RwLock};
9
10use mithril_aggregator_client::AggregatorHttpClient;
11use mithril_cardano_node_chain::{
12    chain_importer::{
13        CardanoChainDataImporter, ChainDataImporterByChunk, ChainDataImporterWithPruner,
14    },
15    chain_observer::{CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType},
16    chain_reader::PallasChainReader,
17    chain_scanner::CardanoBlockScanner,
18};
19use mithril_cardano_node_internal_database::{
20    ImmutableFileObserver, ImmutableFileSystemObserver,
21    digesters::CardanoImmutableDigester,
22    digesters::cache::{
23        ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
24    },
25    signable_builder::{CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder},
26};
27use mithril_common::api_version::APIVersionProvider;
28use mithril_common::crypto_helper::{
29    KesSigner, KesSignerStandard, OpCert, ProtocolPartyId, SerDeShelleyFileFormat,
30};
31use mithril_common::messages::RegisterSignatureMessageDmq;
32use mithril_common::signable_builder::{
33    CardanoBlocksTransactionsSignableBuilder, CardanoStakeDistributionSignableBuilder,
34    CardanoTransactionsSignableBuilder, MithrilSignableBuilderService,
35    MithrilStakeDistributionSignableBuilder, SignableBuilderServiceDependencies,
36};
37use mithril_common::{MITHRIL_SIGNER_VERSION_HEADER, StdResult};
38
39use mithril_era::{EraChecker, EraReader};
40use mithril_signed_entity_lock::SignedEntityTypeLock;
41use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
42use mithril_ticker::{MithrilTickerService, TickerService};
43
44use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
45use mithril_persistence::sqlite::{
46    ConnectionBuilder, ConnectionOptions, SqliteConnection, SqliteConnectionPool,
47};
48
49use mithril_protocol_config::http::HttpMithrilNetworkConfigurationProvider;
50
51use mithril_dmq::{DmqMessageBuilder, DmqPublisherClientPallas};
52
53use crate::database::repository::{
54    ProtocolInitializerRepository, SignedBeaconRepository, SignerCardanoChainDataRepository,
55    StakePoolStore,
56};
57use crate::dependency_injection::SignerDependencyContainer;
58use crate::services::{
59    CardanoTransactionsPreloaderActivationSigner, ChainDataImporterWithVacuum, MithrilEpochService,
60    MithrilSingleSigner, SignaturePublishRetryPolicy, SignaturePublisher,
61    SignaturePublisherDelayer, SignaturePublisherDmq, SignaturePublisherNoop,
62    SignaturePublisherRetrier, SignerCertifierService, SignerChainDataImporter,
63    SignerSignableSeedBuilder, SignerSignedEntityConfigProvider, SignerUpkeepService,
64};
65use crate::store::MKTreeStoreSqlite;
66use crate::{
67    Configuration, HTTP_REQUEST_TIMEOUT_DURATION, MetricsService, SQLITE_FILE,
68    SQLITE_FILE_CARDANO_TRANSACTION,
69};
70
71/// The `DependenciesBuilder` is intended to manage Services instance creation.
72///
73/// The goal of this is to put all this code out of the way of business code.
74pub struct DependenciesBuilder<'a> {
75    config: &'a Configuration,
76    chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
77    immutable_file_observer_builder:
78        fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
79    root_logger: Logger,
80}
81
82impl<'a> DependenciesBuilder<'a> {
83    /// Create a new `DependenciesBuilder`.
84    pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
85        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
86            |config: &Configuration| {
87                let chain_observer_type = ChainObserverType::Pallas;
88                let cardano_cli_path = &config.cardano_cli_path;
89                let cardano_node_socket_path = &config.cardano_node_socket_path;
90                let cardano_network = &config.get_network().with_context(|| {
91                    "Dependencies Builder can not get Cardano network while building the chain observer"
92                })?;
93                let cardano_cli_runner = &CardanoCliRunner::new(
94                    cardano_cli_path.to_owned(),
95                    cardano_node_socket_path.to_owned(),
96                    cardano_network.to_owned(),
97                );
98
99                let chain_observer_builder = ChainObserverBuilder::new(
100                    &chain_observer_type,
101                    cardano_node_socket_path,
102                    cardano_network,
103                    Some(cardano_cli_runner),
104                );
105
106                chain_observer_builder
107                    .build()
108                    .with_context(|| "Dependencies Builder can not build chain observer")
109            };
110
111        let immutable_file_observer_builder: fn(
112            &Configuration,
113        )
114            -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
115            Ok(Arc::new(ImmutableFileSystemObserver::new(
116                &config.db_directory,
117            )))
118        };
119
120        Self {
121            config,
122            chain_observer_builder,
123            immutable_file_observer_builder,
124            root_logger,
125        }
126    }
127
128    /// Override immutable file observer builder.
129    pub fn override_immutable_file_observer_builder(
130        &mut self,
131        builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
132    ) -> &mut Self {
133        self.immutable_file_observer_builder = builder;
134
135        self
136    }
137
138    /// Return a copy of the root logger.
139    pub fn root_logger(&self) -> Logger {
140        self.root_logger.clone()
141    }
142
143    /// Override default chain observer builder.
144    pub fn override_chain_observer_builder(
145        &mut self,
146        builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
147    ) -> &mut Self {
148        self.chain_observer_builder = builder;
149
150        self
151    }
152
153    /// Compute protocol party id
154    fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
155        match &self.config.operational_certificate_path {
156            Some(operational_certificate_path) => {
157                let opcert: OpCert = OpCert::from_file(operational_certificate_path)
158                    .with_context(|| "Could not decode operational certificate")?;
159                Ok(opcert
160                    .compute_protocol_party_id()
161                    .with_context(|| "Could not compute party_id from operational certificate")?)
162            }
163            _ => Ok(self
164                .config
165                .party_id
166                .to_owned()
167                .with_context(|| "A party_id should at least be provided")?),
168        }
169    }
170
171    async fn build_digester_cache_provider(
172        &self,
173    ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
174        if self.config.disable_digests_cache {
175            return Ok(None);
176        }
177
178        let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
179            &self.config.data_stores_directory,
180            &format!("immutables_digests_{}.json", self.config.network),
181        )
182        .should_reset_digests_cache(self.config.reset_digests_cache)
183        .with_logger(self.root_logger())
184        .build()
185        .await?;
186
187        Ok(Some(Arc::new(cache_provider)))
188    }
189
190    /// Build an SQLite connection for the main database.
191    pub async fn build_main_sqlite_connection(
192        &self,
193        sqlite_file_name: &str,
194    ) -> StdResult<SqliteConnection> {
195        self.build_sqlite_connection(
196            sqlite_file_name,
197            crate::database::migration::get_migrations(),
198            &[],
199        )
200    }
201
202    /// Build an SQLite connection for the Cardano transactions database.
203    pub async fn build_cardano_tx_sqlite_connection(
204        &self,
205        sqlite_file_name: &str,
206    ) -> StdResult<SqliteConnection> {
207        self.build_sqlite_connection(
208            sqlite_file_name,
209            mithril_persistence::database::cardano_transaction_migration::get_migrations(),
210            &[ConnectionOptions::EnableForeignKeys],
211        )
212    }
213
214    fn build_sqlite_connection(
215        &self,
216        sqlite_file_name: &str,
217        migrations: Vec<SqlMigration>,
218        options: &[ConnectionOptions],
219    ) -> StdResult<SqliteConnection> {
220        let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
221        let connection = ConnectionBuilder::open_file(&sqlite_db_path)
222            .with_node_type(ApplicationNodeType::Signer)
223            .with_migrations(migrations)
224            .with_options(options)
225            .with_logger(self.root_logger())
226            .build()
227            .with_context(|| "Database connection initialisation error")?;
228
229        Ok(connection)
230    }
231
232    fn build_aggregator_client(
233        &self,
234        api_version_provider: &Arc<APIVersionProvider>,
235    ) -> StdResult<Arc<AggregatorHttpClient>> {
236        let client = AggregatorHttpClient::builder(self.config.aggregator_endpoint.clone())
237            .with_headers(HashMap::from([(
238                MITHRIL_SIGNER_VERSION_HEADER.to_string(),
239                env!("CARGO_PKG_VERSION").to_string(),
240            )]))
241            .with_relay_endpoint(self.config.relay_endpoint.clone())
242            .with_api_version_provider(api_version_provider.clone())
243            .with_timeout(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION))
244            .with_logger(self.root_logger())
245            .build()?;
246        Ok(Arc::new(client))
247    }
248
249    /// Build dependencies for the Production environment.
250    pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
251        if !self.config.data_stores_directory.exists() {
252            fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
253                format!(
254                    "Could not create data stores directory: `{}`",
255                    self.config.data_stores_directory.display()
256                )
257            })?;
258        }
259
260        let network = self.config.get_network()?;
261        let sqlite_connection = Arc::new(self.build_main_sqlite_connection(SQLITE_FILE).await?);
262        let transaction_sqlite_connection = self
263            .build_cardano_tx_sqlite_connection(SQLITE_FILE_CARDANO_TRANSACTION)
264            .await?;
265        let sqlite_connection_cardano_transaction_pool = Arc::new(
266            SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
267        );
268
269        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
270
271        let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
272            sqlite_connection.clone(),
273            self.config.store_retention_limit.map(|limit| limit as u64),
274        ));
275
276        let digester = Arc::new(CardanoImmutableDigester::new(
277            network.to_string(),
278            self.build_digester_cache_provider().await?,
279            self.root_logger(),
280        ));
281        let stake_store = Arc::new(StakePoolStore::new(
282            sqlite_connection.clone(),
283            self.config.store_retention_limit.map(|limit| limit as u64),
284        ));
285        let chain_observer = {
286            let builder = self.chain_observer_builder;
287            builder(self.config)?
288        };
289        let ticker_service = {
290            let builder = self.immutable_file_observer_builder;
291            Arc::new(MithrilTickerService::new(
292                chain_observer.clone(),
293                builder(self.config)?,
294            ))
295        };
296
297        let era_reader = Arc::new(EraReader::new(
298            self.config.build_era_reader_adapter(chain_observer.clone())?,
299        ));
300        let era_epoch_token = era_reader
301            .read_era_epoch_token(ticker_service.get_current_epoch().await?)
302            .await?;
303        let era_checker = Arc::new(EraChecker::new(
304            era_epoch_token.get_current_supported_era()?,
305            era_epoch_token.get_current_epoch(),
306        ));
307
308        let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
309        let aggregator_client = self.build_aggregator_client(&api_version_provider)?;
310
311        let cardano_immutable_snapshot_builder =
312            Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
313                digester.clone(),
314                &self.config.db_directory,
315                self.root_logger(),
316            ));
317        let mithril_stake_distribution_signable_builder =
318            Arc::new(MithrilStakeDistributionSignableBuilder::default());
319        let chain_data_store = Arc::new(SignerCardanoChainDataRepository::new(
320            sqlite_connection_cardano_transaction_pool.clone(),
321        ));
322        let chain_block_reader = PallasChainReader::new(
323            &self.config.cardano_node_socket_path,
324            network,
325            self.root_logger(),
326        );
327        let block_scanner = Arc::new(
328            CardanoBlockScanner::new(
329                Arc::new(Mutex::new(chain_block_reader)),
330                self.config
331                    .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
332                self.root_logger(),
333            )
334            .set_throttling_interval(
335                self.config
336                    .cardano_transactions_block_streamer_throttling_interval
337                    .map(Duration::from_millis),
338            ),
339        );
340        let chain_data_importer = Arc::new(CardanoChainDataImporter::new(
341            block_scanner,
342            chain_data_store.clone(),
343            self.root_logger(),
344        ));
345        // Wrap the transaction importer with decorator to prune the transactions after import
346        let chain_data_importer = Arc::new(ChainDataImporterWithPruner::new(
347            self.config
348                .enable_transaction_pruning
349                .then_some(self.config.network_security_parameter),
350            chain_data_store.clone(),
351            chain_data_importer,
352            self.root_logger(),
353        ));
354        // Wrap the transaction importer with decorator to chunk its workload, so it prunes
355        // transactions after each chunk, reducing the storage footprint
356        let state_machine_transactions_importer = Arc::new(SignerChainDataImporter::new(Arc::new(
357            ChainDataImporterByChunk::new(
358                chain_data_store.clone(),
359                chain_data_importer.clone(),
360                self.config.transactions_import_block_chunk_size,
361                self.root_logger(),
362            ),
363        )));
364        // For the preloader, we want to vacuum the database after each chunk, to reclaim disk space
365        // earlier than with just auto_vacuum (that execute only after the end of all import).
366        let preloader_chain_data_importer = Arc::new(SignerChainDataImporter::new(Arc::new(
367            ChainDataImporterByChunk::new(
368                chain_data_store.clone(),
369                Arc::new(ChainDataImporterWithVacuum::new(
370                    sqlite_connection_cardano_transaction_pool.clone(),
371                    chain_data_importer.clone(),
372                    self.root_logger(),
373                )),
374                self.config.transactions_import_block_chunk_size,
375                self.root_logger(),
376            ),
377        )));
378        let block_range_root_retriever = chain_data_store.clone();
379        let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
380            MKTreeStoreSqlite,
381        >::new(
382            state_machine_transactions_importer.clone(),
383            block_range_root_retriever.clone(),
384        ));
385        let cardano_blocks_transactions_builder = Arc::new(
386            CardanoBlocksTransactionsSignableBuilder::<MKTreeStoreSqlite>::new(
387                state_machine_transactions_importer,
388                block_range_root_retriever,
389            ),
390        );
391        let cardano_stake_distribution_signable_builder = Arc::new(
392            CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
393        );
394        let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
395            digester.clone(),
396            &self.config.db_directory,
397            self.root_logger(),
398        ));
399        let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
400            stake_store.clone(),
401            protocol_initializer_store.clone(),
402            self.root_logger(),
403        )));
404        let single_signer = Arc::new(MithrilSingleSigner::new(
405            self.compute_protocol_party_id()?,
406            epoch_service.clone(),
407            self.root_logger(),
408        ));
409        let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
410            epoch_service.clone(),
411            protocol_initializer_store.clone(),
412        ));
413        let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
414            mithril_stake_distribution_signable_builder,
415            cardano_immutable_snapshot_builder,
416            cardano_transactions_builder,
417            cardano_blocks_transactions_builder,
418            cardano_stake_distribution_signable_builder,
419            cardano_database_signable_builder,
420        );
421        let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
422            signable_seed_builder_service,
423            signable_builders_dependencies,
424            self.root_logger(),
425        ));
426        let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
427        let network_configuration_service = Arc::new(HttpMithrilNetworkConfigurationProvider::new(
428            aggregator_client.clone(),
429        ));
430        let preloader_activation = CardanoTransactionsPreloaderActivationSigner::new(
431            network_configuration_service.clone(),
432            ticker_service.clone(),
433        );
434        let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
435            signed_entity_type_lock.clone(),
436            preloader_chain_data_importer,
437            self.config.preload_security_parameter,
438            chain_observer.clone(),
439            self.root_logger(),
440            Arc::new(preloader_activation),
441        ));
442        let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
443            sqlite_connection.clone(),
444            self.config.store_retention_limit.map(|limit| limit as u64),
445        ));
446        let upkeep_service = Arc::new(SignerUpkeepService::new(
447            sqlite_connection.clone(),
448            sqlite_connection_cardano_transaction_pool,
449            signed_entity_type_lock.clone(),
450            vec![
451                signed_beacon_repository.clone(),
452                stake_store.clone(),
453                protocol_initializer_store.clone(),
454            ],
455            self.root_logger(),
456        ));
457
458        let kes_signer = match (
459            &self.config.kes_secret_key_path,
460            &self.config.operational_certificate_path,
461        ) {
462            (Some(kes_secret_key_path), Some(operational_certificate_path)) => {
463                Some(Arc::new(KesSignerStandard::new(
464                    kes_secret_key_path.clone(),
465                    operational_certificate_path.clone(),
466                )) as Arc<dyn KesSigner>)
467            }
468            (Some(_), None) | (None, Some(_)) => {
469                return Err(anyhow!(
470                    "kes_secret_key and operational_certificate are both mandatory".to_string(),
471                ));
472            }
473            _ => None,
474        };
475
476        let signature_publisher = {
477            let first_publisher = SignaturePublisherRetrier::new(
478                match &self.config.dmq_node_socket_path {
479                    Some(dmq_node_socket_path) => {
480                        let dmq_network = &self.config.get_dmq_network()?;
481                        let dmq_message_builder = DmqMessageBuilder::new(
482                            kes_signer.clone().with_context(
483                                || "A KES signer is mandatory to sign DMQ messages",
484                            )?,
485                            chain_observer.clone(),
486                        );
487                        Arc::new(SignaturePublisherDmq::new(Arc::new(
488                            DmqPublisherClientPallas::<RegisterSignatureMessageDmq>::new(
489                                dmq_node_socket_path.to_owned(),
490                                *dmq_network,
491                                dmq_message_builder,
492                                self.root_logger(),
493                            ),
494                        ))) as Arc<dyn SignaturePublisher>
495                    }
496                    _ => Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
497                },
498                SignaturePublishRetryPolicy::never(),
499            );
500
501            let second_publisher = SignaturePublisherRetrier::new(
502                aggregator_client.clone(),
503                SignaturePublishRetryPolicy {
504                    attempts: self.config.signature_publisher_config.retry_attempts,
505                    delay_between_attempts: Duration::from_millis(
506                        self.config.signature_publisher_config.retry_delay_ms,
507                    ),
508                },
509            );
510
511            if self.config.signature_publisher_config.skip_delayer {
512                Arc::new(first_publisher) as Arc<dyn SignaturePublisher>
513            } else {
514                Arc::new(SignaturePublisherDelayer::new(
515                    Arc::new(first_publisher),
516                    Arc::new(second_publisher),
517                    Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
518                    self.root_logger(),
519                )) as Arc<dyn SignaturePublisher>
520            }
521        };
522
523        let certifier = Arc::new(SignerCertifierService::new(
524            signed_beacon_repository,
525            Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
526            signed_entity_type_lock.clone(),
527            single_signer.clone(),
528            signature_publisher,
529            self.root_logger(),
530        ));
531
532        let services = SignerDependencyContainer {
533            ticker_service,
534            chain_observer,
535            digester,
536            single_signer,
537            stake_store,
538            protocol_initializer_store,
539            era_checker,
540            era_reader,
541            api_version_provider,
542            signable_builder_service,
543            metrics_service,
544            signed_entity_type_lock,
545            cardano_transactions_preloader,
546            upkeep_service,
547            epoch_service,
548            certifier,
549            signer_registration_publisher: aggregator_client.clone(),
550            signers_registration_retriever: aggregator_client,
551            kes_signer,
552            network_configuration_service,
553        };
554
555        Ok(services)
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use std::path::PathBuf;
562
563    use mithril_aggregator_client::query::GetAggregatorFeaturesQuery;
564    use mithril_cardano_node_chain::test::double::FakeChainObserver;
565    use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
566    use mithril_common::{
567        entities::TimePoint,
568        messages::AggregatorFeaturesMessage,
569        test::{TempDir, api_version_extensions::ApiVersionProviderTestExtension, double::Dummy},
570    };
571
572    use crate::test::TestLogger;
573
574    use super::*;
575
576    fn get_test_dir(test_name: &str) -> PathBuf {
577        TempDir::create("signer_service", test_name)
578    }
579
580    #[tokio::test]
581    async fn test_auto_create_stores_directory() {
582        let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
583        let config = Configuration {
584            data_stores_directory: stores_dir.clone(),
585            ..Configuration::new_sample("party-123456")
586        };
587
588        assert!(!stores_dir.exists());
589        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
590            |_config| Ok(Arc::new(FakeChainObserver::new(Some(TimePoint::dummy()))));
591        let immutable_file_observer_builder: fn(
592            &Configuration,
593        )
594            -> StdResult<Arc<dyn ImmutableFileObserver>> =
595            |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
596
597        let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
598        dependencies_builder
599            .override_chain_observer_builder(chain_observer_builder)
600            .override_immutable_file_observer_builder(immutable_file_observer_builder)
601            .build()
602            .await
603            .expect("service builder build should not fail");
604        assert!(stores_dir.exists());
605    }
606
607    #[tokio::test]
608    async fn set_signer_version_header_to_built_aggregator_client() {
609        let server = httpmock::MockServer::start();
610        server.mock(|when, then| {
611            when.is_true(|req| {
612                req.headers()
613                    .get(MITHRIL_SIGNER_VERSION_HEADER)
614                    .is_some_and(|header| header == env!("CARGO_PKG_VERSION"))
615            });
616            then.status(200)
617                .json_body(serde_json::json!(AggregatorFeaturesMessage::dummy()));
618        });
619
620        let config = Configuration {
621            aggregator_endpoint: server.base_url(),
622            ..Configuration::new_sample("party-123456")
623        };
624        let dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
625        let aggregator_client = dependencies_builder
626            // Note: api version doesn't matter for this test
627            .build_aggregator_client(&Arc::new(APIVersionProvider::new_failing()))
628            .unwrap();
629
630        aggregator_client
631            .send(GetAggregatorFeaturesQuery::current())
632            .await
633            .unwrap();
634    }
635}