mithril_signer/dependency_injection/
builder.rs

1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{anyhow, Context};
6use slog::Logger;
7use tokio::sync::{Mutex, RwLock};
8
9use mithril_common::api_version::APIVersionProvider;
10use mithril_common::cardano_block_scanner::CardanoBlockScanner;
11use mithril_common::chain_observer::{
12    CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType,
13};
14use mithril_common::chain_reader::PallasChainReader;
15use mithril_common::crypto_helper::{OpCert, ProtocolPartyId, SerDeShelleyFileFormat};
16use mithril_common::digesters::cache::{
17    ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
18};
19use mithril_common::digesters::{
20    CardanoImmutableDigester, ImmutableFileObserver, ImmutableFileSystemObserver,
21};
22use mithril_common::era::{EraChecker, EraReader};
23use mithril_common::signable_builder::{
24    CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder,
25    CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder,
26    MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
27    SignableBuilderServiceDependencies,
28};
29use mithril_common::{MithrilTickerService, StdResult, TickerService};
30use mithril_signed_entity_lock::SignedEntityTypeLock;
31use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
32
33use mithril_persistence::database::repository::CardanoTransactionRepository;
34use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
35use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool};
36
37use crate::database::repository::{
38    ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore,
39};
40use crate::dependency_injection::SignerDependencyContainer;
41use crate::services::{
42    AggregatorHTTPClient, CardanoTransactionsImporter,
43    CardanoTransactionsPreloaderActivationSigner, MithrilEpochService, MithrilSingleSigner,
44    SignaturePublishRetryPolicy, SignaturePublisherDelayer, SignaturePublisherNoop,
45    SignaturePublisherRetrier, SignerCertifierService, SignerSignableSeedBuilder,
46    SignerSignedEntityConfigProvider, SignerUpkeepService, TransactionsImporterByChunk,
47    TransactionsImporterWithPruner, TransactionsImporterWithVacuum,
48};
49use crate::store::MKTreeStoreSqlite;
50use crate::{
51    Configuration, MetricsService, HTTP_REQUEST_TIMEOUT_DURATION, SQLITE_FILE,
52    SQLITE_FILE_CARDANO_TRANSACTION,
53};
54
55/// The `DependenciesBuilder` is intended to manage Services instance creation.
56///
57/// The goal of this is to put all this code out of the way of business code.
58pub struct DependenciesBuilder<'a> {
59    config: &'a Configuration,
60    chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
61    immutable_file_observer_builder:
62        fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
63    root_logger: Logger,
64}
65
66impl<'a> DependenciesBuilder<'a> {
67    /// Create a new `DependenciesBuilder`.
68    pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
69        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
70            |config: &Configuration| {
71                let chain_observer_type = ChainObserverType::Pallas;
72                let cardano_cli_path = &config.cardano_cli_path;
73                let cardano_node_socket_path = &config.cardano_node_socket_path;
74                let cardano_network = &config.get_network().with_context(|| {
75                    "Dependencies Builder can not get Cardano network while building the chain observer"
76                })?;
77                let cardano_cli_runner = &CardanoCliRunner::new(
78                    cardano_cli_path.to_owned(),
79                    cardano_node_socket_path.to_owned(),
80                    cardano_network.to_owned(),
81                );
82
83                let chain_observer_builder = ChainObserverBuilder::new(
84                    &chain_observer_type,
85                    cardano_node_socket_path,
86                    cardano_network,
87                    Some(cardano_cli_runner),
88                );
89
90                chain_observer_builder
91                    .build()
92                    .with_context(|| "Dependencies Builder can not build chain observer")
93            };
94
95        let immutable_file_observer_builder: fn(
96            &Configuration,
97        )
98            -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
99            Ok(Arc::new(ImmutableFileSystemObserver::new(
100                &config.db_directory,
101            )))
102        };
103
104        Self {
105            config,
106            chain_observer_builder,
107            immutable_file_observer_builder,
108            root_logger,
109        }
110    }
111
112    /// Override immutable file observer builder.
113    pub fn override_immutable_file_observer_builder(
114        &mut self,
115        builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
116    ) -> &mut Self {
117        self.immutable_file_observer_builder = builder;
118
119        self
120    }
121
122    /// Return a copy of the root logger.
123    pub fn root_logger(&self) -> Logger {
124        self.root_logger.clone()
125    }
126
127    /// Override default chain observer builder.
128    pub fn override_chain_observer_builder(
129        &mut self,
130        builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
131    ) -> &mut Self {
132        self.chain_observer_builder = builder;
133
134        self
135    }
136
137    /// Compute protocol party id
138    fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
139        match &self.config.operational_certificate_path {
140            Some(operational_certificate_path) => {
141                let opcert: OpCert = OpCert::from_file(operational_certificate_path)
142                    .with_context(|| "Could not decode operational certificate")?;
143                Ok(opcert
144                    .compute_protocol_party_id()
145                    .with_context(|| "Could not compute party_id from operational certificate")?)
146            }
147            _ => Ok(self
148                .config
149                .party_id
150                .to_owned()
151                .ok_or(anyhow!("A party_id should at least be provided"))?),
152        }
153    }
154
155    async fn build_digester_cache_provider(
156        &self,
157    ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
158        if self.config.disable_digests_cache {
159            return Ok(None);
160        }
161
162        let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
163            &self.config.data_stores_directory,
164            &format!("immutables_digests_{}.json", self.config.network),
165        )
166        .should_reset_digests_cache(self.config.reset_digests_cache)
167        .with_logger(self.root_logger())
168        .build()
169        .await?;
170
171        Ok(Some(Arc::new(cache_provider)))
172    }
173
174    /// Build a SQLite connection.
175    pub async fn build_sqlite_connection(
176        &self,
177        sqlite_file_name: &str,
178        migrations: Vec<SqlMigration>,
179    ) -> StdResult<SqliteConnection> {
180        let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
181        let connection = ConnectionBuilder::open_file(&sqlite_db_path)
182            .with_node_type(ApplicationNodeType::Signer)
183            .with_migrations(migrations)
184            .with_logger(self.root_logger())
185            .build()
186            .with_context(|| "Database connection initialisation error")?;
187
188        Ok(connection)
189    }
190
191    /// Build dependencies for the Production environment.
192    pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
193        if !self.config.data_stores_directory.exists() {
194            fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
195                format!(
196                    "Could not create data stores directory: `{}`",
197                    self.config.data_stores_directory.display()
198                )
199            })?;
200        }
201
202        let network = self.config.get_network()?;
203        let sqlite_connection = Arc::new(
204            self.build_sqlite_connection(SQLITE_FILE, crate::database::migration::get_migrations())
205                .await?,
206        );
207        let transaction_sqlite_connection = self
208            .build_sqlite_connection(
209                SQLITE_FILE_CARDANO_TRANSACTION,
210                mithril_persistence::database::cardano_transaction_migration::get_migrations(),
211            )
212            .await?;
213        let sqlite_connection_cardano_transaction_pool = Arc::new(
214            SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
215        );
216
217        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
218
219        let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
220            sqlite_connection.clone(),
221            self.config.store_retention_limit.map(|limit| limit as u64),
222        ));
223
224        let digester = Arc::new(CardanoImmutableDigester::new(
225            network.to_string(),
226            self.build_digester_cache_provider().await?,
227            self.root_logger(),
228        ));
229        let stake_store = Arc::new(StakePoolStore::new(
230            sqlite_connection.clone(),
231            self.config.store_retention_limit.map(|limit| limit as u64),
232        ));
233        let chain_observer = {
234            let builder = self.chain_observer_builder;
235            builder(self.config)?
236        };
237        let ticker_service = {
238            let builder = self.immutable_file_observer_builder;
239            Arc::new(MithrilTickerService::new(
240                chain_observer.clone(),
241                builder(self.config)?,
242            ))
243        };
244
245        let era_reader = Arc::new(EraReader::new(
246            self.config
247                .build_era_reader_adapter(chain_observer.clone())?,
248        ));
249        let era_epoch_token = era_reader
250            .read_era_epoch_token(ticker_service.get_current_epoch().await?)
251            .await?;
252        let era_checker = Arc::new(EraChecker::new(
253            era_epoch_token.get_current_supported_era()?,
254            era_epoch_token.get_current_epoch(),
255        ));
256
257        let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
258        let aggregator_client = Arc::new(AggregatorHTTPClient::new(
259            self.config.aggregator_endpoint.clone(),
260            self.config.relay_endpoint.clone(),
261            api_version_provider.clone(),
262            Some(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION)),
263            self.root_logger(),
264        ));
265
266        let cardano_immutable_snapshot_builder =
267            Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
268                digester.clone(),
269                &self.config.db_directory,
270                self.root_logger(),
271            ));
272        let mithril_stake_distribution_signable_builder =
273            Arc::new(MithrilStakeDistributionSignableBuilder::default());
274        let transaction_store = Arc::new(CardanoTransactionRepository::new(
275            sqlite_connection_cardano_transaction_pool.clone(),
276        ));
277        let chain_block_reader = PallasChainReader::new(
278            &self.config.cardano_node_socket_path,
279            network,
280            self.root_logger(),
281        );
282        let block_scanner = Arc::new(CardanoBlockScanner::new(
283            Arc::new(Mutex::new(chain_block_reader)),
284            self.config
285                .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
286            self.root_logger(),
287        ));
288        let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
289            block_scanner,
290            transaction_store.clone(),
291            self.root_logger(),
292        ));
293        // Wrap the transaction importer with decorator to prune the transactions after import
294        let transactions_importer = Arc::new(TransactionsImporterWithPruner::new(
295            self.config
296                .enable_transaction_pruning
297                .then_some(self.config.network_security_parameter),
298            transaction_store.clone(),
299            transactions_importer,
300            self.root_logger(),
301        ));
302        // Wrap the transaction importer with decorator to chunk its workload, so it prunes
303        // transactions after each chunk, reducing the storage footprint
304        let state_machine_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
305            transaction_store.clone(),
306            transactions_importer.clone(),
307            self.config.transactions_import_block_chunk_size,
308            self.root_logger(),
309        ));
310        // For the preloader, we want to vacuum the database after each chunk, to reclaim disk space
311        // earlier than with just auto_vacuum (that execute only after the end of all import).
312        let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
313            transaction_store.clone(),
314            Arc::new(TransactionsImporterWithVacuum::new(
315                sqlite_connection_cardano_transaction_pool.clone(),
316                transactions_importer.clone(),
317                self.root_logger(),
318            )),
319            self.config.transactions_import_block_chunk_size,
320            self.root_logger(),
321        ));
322        let block_range_root_retriever = transaction_store.clone();
323        let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
324            MKTreeStoreSqlite,
325        >::new(
326            state_machine_transactions_importer,
327            block_range_root_retriever,
328        ));
329        let cardano_stake_distribution_signable_builder = Arc::new(
330            CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
331        );
332        let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
333            digester.clone(),
334            &self.config.db_directory,
335            self.root_logger(),
336        ));
337        let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
338            stake_store.clone(),
339            protocol_initializer_store.clone(),
340            self.root_logger(),
341        )));
342        let single_signer = Arc::new(MithrilSingleSigner::new(
343            self.compute_protocol_party_id()?,
344            epoch_service.clone(),
345            self.root_logger(),
346        ));
347        let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
348            epoch_service.clone(),
349            protocol_initializer_store.clone(),
350        ));
351        let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
352            mithril_stake_distribution_signable_builder,
353            cardano_immutable_snapshot_builder,
354            cardano_transactions_builder,
355            cardano_stake_distribution_signable_builder,
356            cardano_database_signable_builder,
357        );
358        let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
359            signable_seed_builder_service,
360            signable_builders_dependencies,
361            self.root_logger(),
362        ));
363        let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
364        let preloader_activation =
365            CardanoTransactionsPreloaderActivationSigner::new(aggregator_client.clone());
366        let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
367            signed_entity_type_lock.clone(),
368            preloader_transactions_importer,
369            self.config.preload_security_parameter,
370            chain_observer.clone(),
371            self.root_logger(),
372            Arc::new(preloader_activation),
373        ));
374        let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
375            sqlite_connection.clone(),
376            self.config.store_retention_limit.map(|limit| limit as u64),
377        ));
378        let upkeep_service = Arc::new(SignerUpkeepService::new(
379            sqlite_connection.clone(),
380            sqlite_connection_cardano_transaction_pool,
381            signed_entity_type_lock.clone(),
382            vec![
383                signed_beacon_repository.clone(),
384                stake_store.clone(),
385                protocol_initializer_store.clone(),
386            ],
387            self.root_logger(),
388        ));
389
390        let signature_publisher = {
391            // Temporary no-op publisher before a DMQ-based implementation is available.
392            let first_publisher = SignaturePublisherRetrier::new(
393                Arc::new(SignaturePublisherNoop {}),
394                SignaturePublishRetryPolicy::never(),
395            );
396
397            let second_publisher = SignaturePublisherRetrier::new(
398                aggregator_client.clone(),
399                SignaturePublishRetryPolicy {
400                    attempts: self.config.signature_publisher_config.retry_attempts,
401                    delay_between_attempts: Duration::from_millis(
402                        self.config.signature_publisher_config.retry_delay_ms,
403                    ),
404                },
405            );
406
407            Arc::new(SignaturePublisherDelayer::new(
408                Arc::new(first_publisher),
409                Arc::new(second_publisher),
410                Duration::from_millis(self.config.signature_publisher_config.delayer_delay_ms),
411                self.root_logger(),
412            ))
413        };
414
415        let certifier = Arc::new(SignerCertifierService::new(
416            signed_beacon_repository,
417            Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
418            signed_entity_type_lock.clone(),
419            single_signer.clone(),
420            signature_publisher,
421            self.root_logger(),
422        ));
423
424        let services = SignerDependencyContainer {
425            ticker_service,
426            certificate_handler: aggregator_client,
427            chain_observer,
428            digester,
429            single_signer,
430            stake_store,
431            protocol_initializer_store,
432            era_checker,
433            era_reader,
434            api_version_provider,
435            signable_builder_service,
436            metrics_service,
437            signed_entity_type_lock,
438            cardano_transactions_preloader,
439            upkeep_service,
440            epoch_service,
441            certifier,
442        };
443
444        Ok(services)
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use std::path::PathBuf;
451
452    use mithril_common::{
453        chain_observer::FakeObserver, digesters::DumbImmutableFileObserver, entities::TimePoint,
454        test_utils::TempDir,
455    };
456
457    use crate::test_tools::TestLogger;
458
459    use super::*;
460
461    fn get_test_dir(test_name: &str) -> PathBuf {
462        TempDir::create("signer_service", test_name)
463    }
464
465    #[tokio::test]
466    async fn test_auto_create_stores_directory() {
467        let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
468        let config = Configuration {
469            data_stores_directory: stores_dir.clone(),
470            ..Configuration::new_sample("party-123456")
471        };
472
473        assert!(!stores_dir.exists());
474        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
475            |_config| Ok(Arc::new(FakeObserver::new(Some(TimePoint::dummy()))));
476        let immutable_file_observer_builder: fn(
477            &Configuration,
478        )
479            -> StdResult<Arc<dyn ImmutableFileObserver>> =
480            |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
481
482        let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
483        dependencies_builder
484            .override_chain_observer_builder(chain_observer_builder)
485            .override_immutable_file_observer_builder(immutable_file_observer_builder)
486            .build()
487            .await
488            .expect("service builder build should not fail");
489        assert!(stores_dir.exists());
490    }
491}