mithril_signer/dependency_injection/
builder.rs

1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{anyhow, Context};
6use slog::Logger;
7use tokio::sync::{Mutex, RwLock};
8
9use mithril_cardano_node_chain::{
10    chain_observer::{CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType},
11    chain_reader::PallasChainReader,
12    chain_scanner::CardanoBlockScanner,
13};
14use mithril_cardano_node_internal_database::{
15    digesters::cache::{
16        ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
17    },
18    digesters::CardanoImmutableDigester,
19    signable_builder::{CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder},
20    ImmutableFileObserver, ImmutableFileSystemObserver,
21};
22use mithril_common::api_version::APIVersionProvider;
23use mithril_common::crypto_helper::{
24    KesSigner, KesSignerStandard, OpCert, ProtocolPartyId, SerDeShelleyFileFormat,
25};
26#[cfg(feature = "future_dmq")]
27use mithril_common::messages::RegisterSignatureMessageDmq;
28use mithril_common::signable_builder::{
29    CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder,
30    MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
31    SignableBuilderServiceDependencies,
32};
33use mithril_common::StdResult;
34
35use mithril_era::{EraChecker, EraReader};
36use mithril_signed_entity_lock::SignedEntityTypeLock;
37use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
38use mithril_ticker::{MithrilTickerService, TickerService};
39
40use mithril_persistence::database::repository::CardanoTransactionRepository;
41use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
42use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool};
43
44#[cfg(feature = "future_dmq")]
45use mithril_dmq::{DmqMessageBuilder, DmqPublisherPallas};
46
47use crate::dependency_injection::SignerDependencyContainer;
48#[cfg(feature = "future_dmq")]
49use crate::services::SignaturePublisherDmq;
50use crate::services::{
51    AggregatorHTTPClient, CardanoTransactionsImporter,
52    CardanoTransactionsPreloaderActivationSigner, MithrilEpochService, MithrilSingleSigner,
53    SignaturePublishRetryPolicy, SignaturePublisherDelayer, SignaturePublisherNoop,
54    SignaturePublisherRetrier, SignerCertifierService, SignerSignableSeedBuilder,
55    SignerSignedEntityConfigProvider, SignerUpkeepService, TransactionsImporterByChunk,
56    TransactionsImporterWithPruner, TransactionsImporterWithVacuum,
57};
58use crate::store::MKTreeStoreSqlite;
59use crate::{
60    database::repository::{ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore},
61    services::SignaturePublisher,
62};
63use crate::{
64    Configuration, MetricsService, HTTP_REQUEST_TIMEOUT_DURATION, SQLITE_FILE,
65    SQLITE_FILE_CARDANO_TRANSACTION,
66};
67
68/// The `DependenciesBuilder` is intended to manage Services instance creation.
69///
70/// The goal of this is to put all this code out of the way of business code.
71pub struct DependenciesBuilder<'a> {
72    config: &'a Configuration,
73    chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
74    immutable_file_observer_builder:
75        fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
76    root_logger: Logger,
77}
78
79impl<'a> DependenciesBuilder<'a> {
80    /// Create a new `DependenciesBuilder`.
81    pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
82        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
83            |config: &Configuration| {
84                let chain_observer_type = ChainObserverType::Pallas;
85                let cardano_cli_path = &config.cardano_cli_path;
86                let cardano_node_socket_path = &config.cardano_node_socket_path;
87                let cardano_network = &config.get_network().with_context(|| {
88                    "Dependencies Builder can not get Cardano network while building the chain observer"
89                })?;
90                let cardano_cli_runner = &CardanoCliRunner::new(
91                    cardano_cli_path.to_owned(),
92                    cardano_node_socket_path.to_owned(),
93                    cardano_network.to_owned(),
94                );
95
96                let chain_observer_builder = ChainObserverBuilder::new(
97                    &chain_observer_type,
98                    cardano_node_socket_path,
99                    cardano_network,
100                    Some(cardano_cli_runner),
101                );
102
103                chain_observer_builder
104                    .build()
105                    .with_context(|| "Dependencies Builder can not build chain observer")
106            };
107
108        let immutable_file_observer_builder: fn(
109            &Configuration,
110        )
111            -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
112            Ok(Arc::new(ImmutableFileSystemObserver::new(
113                &config.db_directory,
114            )))
115        };
116
117        Self {
118            config,
119            chain_observer_builder,
120            immutable_file_observer_builder,
121            root_logger,
122        }
123    }
124
125    /// Override immutable file observer builder.
126    pub fn override_immutable_file_observer_builder(
127        &mut self,
128        builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
129    ) -> &mut Self {
130        self.immutable_file_observer_builder = builder;
131
132        self
133    }
134
135    /// Return a copy of the root logger.
136    pub fn root_logger(&self) -> Logger {
137        self.root_logger.clone()
138    }
139
140    /// Override default chain observer builder.
141    pub fn override_chain_observer_builder(
142        &mut self,
143        builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
144    ) -> &mut Self {
145        self.chain_observer_builder = builder;
146
147        self
148    }
149
150    /// Compute protocol party id
151    fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
152        match &self.config.operational_certificate_path {
153            Some(operational_certificate_path) => {
154                let opcert: OpCert = OpCert::from_file(operational_certificate_path)
155                    .with_context(|| "Could not decode operational certificate")?;
156                Ok(opcert
157                    .compute_protocol_party_id()
158                    .with_context(|| "Could not compute party_id from operational certificate")?)
159            }
160            _ => Ok(self
161                .config
162                .party_id
163                .to_owned()
164                .ok_or(anyhow!("A party_id should at least be provided"))?),
165        }
166    }
167
168    async fn build_digester_cache_provider(
169        &self,
170    ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
171        if self.config.disable_digests_cache {
172            return Ok(None);
173        }
174
175        let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
176            &self.config.data_stores_directory,
177            &format!("immutables_digests_{}.json", self.config.network),
178        )
179        .should_reset_digests_cache(self.config.reset_digests_cache)
180        .with_logger(self.root_logger())
181        .build()
182        .await?;
183
184        Ok(Some(Arc::new(cache_provider)))
185    }
186
187    /// Build a SQLite connection.
188    pub async fn build_sqlite_connection(
189        &self,
190        sqlite_file_name: &str,
191        migrations: Vec<SqlMigration>,
192    ) -> StdResult<SqliteConnection> {
193        let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
194        let connection = ConnectionBuilder::open_file(&sqlite_db_path)
195            .with_node_type(ApplicationNodeType::Signer)
196            .with_migrations(migrations)
197            .with_logger(self.root_logger())
198            .build()
199            .with_context(|| "Database connection initialisation error")?;
200
201        Ok(connection)
202    }
203
204    /// Build dependencies for the Production environment.
205    pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
206        if !self.config.data_stores_directory.exists() {
207            fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
208                format!(
209                    "Could not create data stores directory: `{}`",
210                    self.config.data_stores_directory.display()
211                )
212            })?;
213        }
214
215        let network = self.config.get_network()?;
216        let sqlite_connection = Arc::new(
217            self.build_sqlite_connection(SQLITE_FILE, crate::database::migration::get_migrations())
218                .await?,
219        );
220        let transaction_sqlite_connection = self
221            .build_sqlite_connection(
222                SQLITE_FILE_CARDANO_TRANSACTION,
223                mithril_persistence::database::cardano_transaction_migration::get_migrations(),
224            )
225            .await?;
226        let sqlite_connection_cardano_transaction_pool = Arc::new(
227            SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
228        );
229
230        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
231
232        let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
233            sqlite_connection.clone(),
234            self.config.store_retention_limit.map(|limit| limit as u64),
235        ));
236
237        let digester = Arc::new(CardanoImmutableDigester::new(
238            network.to_string(),
239            self.build_digester_cache_provider().await?,
240            self.root_logger(),
241        ));
242        let stake_store = Arc::new(StakePoolStore::new(
243            sqlite_connection.clone(),
244            self.config.store_retention_limit.map(|limit| limit as u64),
245        ));
246        let chain_observer = {
247            let builder = self.chain_observer_builder;
248            builder(self.config)?
249        };
250        let ticker_service = {
251            let builder = self.immutable_file_observer_builder;
252            Arc::new(MithrilTickerService::new(
253                chain_observer.clone(),
254                builder(self.config)?,
255            ))
256        };
257
258        let era_reader = Arc::new(EraReader::new(
259            self.config
260                .build_era_reader_adapter(chain_observer.clone())?,
261        ));
262        let era_epoch_token = era_reader
263            .read_era_epoch_token(ticker_service.get_current_epoch().await?)
264            .await?;
265        let era_checker = Arc::new(EraChecker::new(
266            era_epoch_token.get_current_supported_era()?,
267            era_epoch_token.get_current_epoch(),
268        ));
269
270        let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
271        let aggregator_client = Arc::new(AggregatorHTTPClient::new(
272            self.config.aggregator_endpoint.clone(),
273            self.config.relay_endpoint.clone(),
274            api_version_provider.clone(),
275            Some(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION)),
276            self.root_logger(),
277        ));
278
279        let cardano_immutable_snapshot_builder =
280            Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
281                digester.clone(),
282                &self.config.db_directory,
283                self.root_logger(),
284            ));
285        let mithril_stake_distribution_signable_builder =
286            Arc::new(MithrilStakeDistributionSignableBuilder::default());
287        let transaction_store = Arc::new(CardanoTransactionRepository::new(
288            sqlite_connection_cardano_transaction_pool.clone(),
289        ));
290        let chain_block_reader = PallasChainReader::new(
291            &self.config.cardano_node_socket_path,
292            network,
293            self.root_logger(),
294        );
295        let block_scanner = Arc::new(CardanoBlockScanner::new(
296            Arc::new(Mutex::new(chain_block_reader)),
297            self.config
298                .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
299            self.root_logger(),
300        ));
301        let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
302            block_scanner,
303            transaction_store.clone(),
304            self.root_logger(),
305        ));
306        // Wrap the transaction importer with decorator to prune the transactions after import
307        let transactions_importer = Arc::new(TransactionsImporterWithPruner::new(
308            self.config
309                .enable_transaction_pruning
310                .then_some(self.config.network_security_parameter),
311            transaction_store.clone(),
312            transactions_importer,
313            self.root_logger(),
314        ));
315        // Wrap the transaction importer with decorator to chunk its workload, so it prunes
316        // transactions after each chunk, reducing the storage footprint
317        let state_machine_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
318            transaction_store.clone(),
319            transactions_importer.clone(),
320            self.config.transactions_import_block_chunk_size,
321            self.root_logger(),
322        ));
323        // For the preloader, we want to vacuum the database after each chunk, to reclaim disk space
324        // earlier than with just auto_vacuum (that execute only after the end of all import).
325        let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
326            transaction_store.clone(),
327            Arc::new(TransactionsImporterWithVacuum::new(
328                sqlite_connection_cardano_transaction_pool.clone(),
329                transactions_importer.clone(),
330                self.root_logger(),
331            )),
332            self.config.transactions_import_block_chunk_size,
333            self.root_logger(),
334        ));
335        let block_range_root_retriever = transaction_store.clone();
336        let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
337            MKTreeStoreSqlite,
338        >::new(
339            state_machine_transactions_importer,
340            block_range_root_retriever,
341        ));
342        let cardano_stake_distribution_signable_builder = Arc::new(
343            CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
344        );
345        let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
346            digester.clone(),
347            &self.config.db_directory,
348            self.root_logger(),
349        ));
350        let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
351            stake_store.clone(),
352            protocol_initializer_store.clone(),
353            self.root_logger(),
354        )));
355        let single_signer = Arc::new(MithrilSingleSigner::new(
356            self.compute_protocol_party_id()?,
357            epoch_service.clone(),
358            self.root_logger(),
359        ));
360        let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
361            epoch_service.clone(),
362            protocol_initializer_store.clone(),
363        ));
364        let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
365            mithril_stake_distribution_signable_builder,
366            cardano_immutable_snapshot_builder,
367            cardano_transactions_builder,
368            cardano_stake_distribution_signable_builder,
369            cardano_database_signable_builder,
370        );
371        let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
372            signable_seed_builder_service,
373            signable_builders_dependencies,
374            self.root_logger(),
375        ));
376        let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
377        let preloader_activation =
378            CardanoTransactionsPreloaderActivationSigner::new(aggregator_client.clone());
379        let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
380            signed_entity_type_lock.clone(),
381            preloader_transactions_importer,
382            self.config.preload_security_parameter,
383            chain_observer.clone(),
384            self.root_logger(),
385            Arc::new(preloader_activation),
386        ));
387        let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
388            sqlite_connection.clone(),
389            self.config.store_retention_limit.map(|limit| limit as u64),
390        ));
391        let upkeep_service = Arc::new(SignerUpkeepService::new(
392            sqlite_connection.clone(),
393            sqlite_connection_cardano_transaction_pool,
394            signed_entity_type_lock.clone(),
395            vec![
396                signed_beacon_repository.clone(),
397                stake_store.clone(),
398                protocol_initializer_store.clone(),
399            ],
400            self.root_logger(),
401        ));
402
403        let kes_signer = match (
404            &self.config.kes_secret_key_path,
405            &self.config.operational_certificate_path,
406        ) {
407            (Some(kes_secret_key_path), Some(operational_certificate_path)) => {
408                Some(Arc::new(KesSignerStandard::new(
409                    kes_secret_key_path.clone(),
410                    operational_certificate_path.clone(),
411                )) as Arc<dyn KesSigner>)
412            }
413            (Some(_), None) | (None, Some(_)) => {
414                return Err(anyhow!(
415                    "kes_secret_key and operational_certificate are both mandatory".to_string(),
416                ))
417            }
418            _ => None,
419        };
420
421        let signature_publisher = {
422            let first_publisher = SignaturePublisherRetrier::new(
423                {
424                    #[cfg(feature = "future_dmq")]
425                    let publisher = match &self.config.dmq_node_socket_path {
426                        Some(dmq_node_socket_path) => {
427                            let cardano_network = &self.config.get_network()?;
428                            let dmq_message_builder = DmqMessageBuilder::new(
429                                kes_signer.clone().ok_or(anyhow!(
430                                    "A KES signer is mandatory to sign DMQ messages"
431                                ))?,
432                                chain_observer.clone(),
433                            );
434                            Arc::new(SignaturePublisherDmq::new(Arc::new(DmqPublisherPallas::<
435                                RegisterSignatureMessageDmq,
436                            >::new(
437                                dmq_node_socket_path.to_owned(),
438                                *cardano_network,
439                                dmq_message_builder,
440                                self.root_logger(),
441                            )))) as Arc<dyn SignaturePublisher>
442                        }
443                        _ => Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>,
444                    };
445                    #[cfg(not(feature = "future_dmq"))]
446                    let publisher = Arc::new(SignaturePublisherNoop) as Arc<dyn SignaturePublisher>;
447
448                    publisher
449                },
450                SignaturePublishRetryPolicy::never(),
451            );
452
453            let second_publisher = SignaturePublisherRetrier::new(
454                aggregator_client.clone(),
455                SignaturePublishRetryPolicy {
456                    attempts: self.config.signature_publisher_config.retry_attempts,
457                    delay_between_attempts: Duration::from_millis(
458                        self.config.signature_publisher_config.retry_delay_ms,
459                    ),
460                },
461            );
462
463            Arc::new(SignaturePublisherDelayer::new(
464                Arc::new(first_publisher),
465                Arc::new(second_publisher),
466                Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
467                self.root_logger(),
468            ))
469        };
470
471        let certifier = Arc::new(SignerCertifierService::new(
472            signed_beacon_repository,
473            Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
474            signed_entity_type_lock.clone(),
475            single_signer.clone(),
476            signature_publisher,
477            self.root_logger(),
478        ));
479
480        let services = SignerDependencyContainer {
481            ticker_service,
482            certificate_handler: aggregator_client,
483            chain_observer,
484            digester,
485            single_signer,
486            stake_store,
487            protocol_initializer_store,
488            era_checker,
489            era_reader,
490            api_version_provider,
491            signable_builder_service,
492            metrics_service,
493            signed_entity_type_lock,
494            cardano_transactions_preloader,
495            upkeep_service,
496            epoch_service,
497            certifier,
498            kes_signer,
499        };
500
501        Ok(services)
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use std::path::PathBuf;
508
509    use mithril_cardano_node_chain::test::double::FakeChainObserver;
510    use mithril_cardano_node_internal_database::test::double::DumbImmutableFileObserver;
511    use mithril_common::{entities::TimePoint, test_utils::TempDir};
512
513    use crate::test_tools::TestLogger;
514
515    use super::*;
516
517    fn get_test_dir(test_name: &str) -> PathBuf {
518        TempDir::create("signer_service", test_name)
519    }
520
521    #[tokio::test]
522    async fn test_auto_create_stores_directory() {
523        let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
524        let config = Configuration {
525            data_stores_directory: stores_dir.clone(),
526            ..Configuration::new_sample("party-123456")
527        };
528
529        assert!(!stores_dir.exists());
530        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
531            |_config| Ok(Arc::new(FakeChainObserver::new(Some(TimePoint::dummy()))));
532        let immutable_file_observer_builder: fn(
533            &Configuration,
534        )
535            -> StdResult<Arc<dyn ImmutableFileObserver>> =
536            |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
537
538        let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
539        dependencies_builder
540            .override_chain_observer_builder(chain_observer_builder)
541            .override_immutable_file_observer_builder(immutable_file_observer_builder)
542            .build()
543            .await
544            .expect("service builder build should not fail");
545        assert!(stores_dir.exists());
546    }
547}