mithril_signer/dependency_injection/
builder.rs

1use std::fs;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{anyhow, Context};
6use slog::Logger;
7use tokio::sync::{Mutex, RwLock};
8
9use mithril_common::api_version::APIVersionProvider;
10use mithril_common::cardano_block_scanner::CardanoBlockScanner;
11use mithril_common::chain_observer::{
12    CardanoCliRunner, ChainObserver, ChainObserverBuilder, ChainObserverType,
13};
14use mithril_common::chain_reader::PallasChainReader;
15use mithril_common::crypto_helper::{OpCert, ProtocolPartyId, SerDeShelleyFileFormat};
16use mithril_common::digesters::cache::{
17    ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder,
18};
19use mithril_common::digesters::{
20    CardanoImmutableDigester, ImmutableFileObserver, ImmutableFileSystemObserver,
21};
22use mithril_common::era::{EraChecker, EraReader};
23use mithril_common::signable_builder::{
24    CardanoDatabaseSignableBuilder, CardanoImmutableFilesFullSignableBuilder,
25    CardanoStakeDistributionSignableBuilder, CardanoTransactionsSignableBuilder,
26    MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
27    SignableBuilderServiceDependencies,
28};
29use mithril_common::{MithrilTickerService, StdResult, TickerService};
30use mithril_signed_entity_lock::SignedEntityTypeLock;
31use mithril_signed_entity_preloader::CardanoTransactionsPreloader;
32
33use mithril_persistence::database::repository::CardanoTransactionRepository;
34use mithril_persistence::database::{ApplicationNodeType, SqlMigration};
35use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool};
36
37use crate::database::repository::{
38    ProtocolInitializerRepository, SignedBeaconRepository, StakePoolStore,
39};
40use crate::dependency_injection::SignerDependencyContainer;
41use crate::services::{
42    AggregatorHTTPClient, CardanoTransactionsImporter,
43    CardanoTransactionsPreloaderActivationSigner, MithrilEpochService, MithrilSingleSigner,
44    SignerCertifierService, SignerSignableSeedBuilder, SignerSignedEntityConfigProvider,
45    SignerUpkeepService, TransactionsImporterByChunk, TransactionsImporterWithPruner,
46    TransactionsImporterWithVacuum,
47};
48use crate::store::MKTreeStoreSqlite;
49use crate::{
50    Configuration, MetricsService, HTTP_REQUEST_TIMEOUT_DURATION, SQLITE_FILE,
51    SQLITE_FILE_CARDANO_TRANSACTION,
52};
53
54/// The `DependenciesBuilder` is intended to manage Services instance creation.
55///
56/// The goal of this is to put all this code out of the way of business code.
57pub struct DependenciesBuilder<'a> {
58    config: &'a Configuration,
59    chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
60    immutable_file_observer_builder:
61        fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
62    root_logger: Logger,
63}
64
65impl<'a> DependenciesBuilder<'a> {
66    /// Create a new `DependenciesBuilder`.
67    pub fn new(config: &'a Configuration, root_logger: Logger) -> Self {
68        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
69            |config: &Configuration| {
70                let chain_observer_type = ChainObserverType::Pallas;
71                let cardano_cli_path = &config.cardano_cli_path;
72                let cardano_node_socket_path = &config.cardano_node_socket_path;
73                let cardano_network = &config.get_network().with_context(|| {
74                    "Dependencies Builder can not get Cardano network while building the chain observer"
75                })?;
76                let cardano_cli_runner = &CardanoCliRunner::new(
77                    cardano_cli_path.to_owned(),
78                    cardano_node_socket_path.to_owned(),
79                    cardano_network.to_owned(),
80                );
81
82                let chain_observer_builder = ChainObserverBuilder::new(
83                    &chain_observer_type,
84                    cardano_node_socket_path,
85                    cardano_network,
86                    Some(cardano_cli_runner),
87                );
88
89                chain_observer_builder
90                    .build()
91                    .with_context(|| "Dependencies Builder can not build chain observer")
92            };
93
94        let immutable_file_observer_builder: fn(
95            &Configuration,
96        )
97            -> StdResult<Arc<dyn ImmutableFileObserver>> = |config: &Configuration| {
98            Ok(Arc::new(ImmutableFileSystemObserver::new(
99                &config.db_directory,
100            )))
101        };
102
103        Self {
104            config,
105            chain_observer_builder,
106            immutable_file_observer_builder,
107            root_logger,
108        }
109    }
110
111    /// Override immutable file observer builder.
112    pub fn override_immutable_file_observer_builder(
113        &mut self,
114        builder: fn(&Configuration) -> StdResult<Arc<dyn ImmutableFileObserver>>,
115    ) -> &mut Self {
116        self.immutable_file_observer_builder = builder;
117
118        self
119    }
120
121    /// Return a copy of the root logger.
122    pub fn root_logger(&self) -> Logger {
123        self.root_logger.clone()
124    }
125
126    /// Override default chain observer builder.
127    pub fn override_chain_observer_builder(
128        &mut self,
129        builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>>,
130    ) -> &mut Self {
131        self.chain_observer_builder = builder;
132
133        self
134    }
135
136    /// Compute protocol party id
137    fn compute_protocol_party_id(&self) -> StdResult<ProtocolPartyId> {
138        match &self.config.operational_certificate_path {
139            Some(operational_certificate_path) => {
140                let opcert: OpCert = OpCert::from_file(operational_certificate_path)
141                    .with_context(|| "Could not decode operational certificate")?;
142                Ok(opcert
143                    .compute_protocol_party_id()
144                    .with_context(|| "Could not compute party_id from operational certificate")?)
145            }
146            _ => Ok(self
147                .config
148                .party_id
149                .to_owned()
150                .ok_or(anyhow!("A party_id should at least be provided"))?),
151        }
152    }
153
154    async fn build_digester_cache_provider(
155        &self,
156    ) -> StdResult<Option<Arc<dyn ImmutableFileDigestCacheProvider>>> {
157        if self.config.disable_digests_cache {
158            return Ok(None);
159        }
160
161        let cache_provider = JsonImmutableFileDigestCacheProviderBuilder::new(
162            &self.config.data_stores_directory,
163            &format!("immutables_digests_{}.json", self.config.network),
164        )
165        .should_reset_digests_cache(self.config.reset_digests_cache)
166        .with_logger(self.root_logger())
167        .build()
168        .await?;
169
170        Ok(Some(Arc::new(cache_provider)))
171    }
172
173    /// Build a SQLite connection.
174    pub async fn build_sqlite_connection(
175        &self,
176        sqlite_file_name: &str,
177        migrations: Vec<SqlMigration>,
178    ) -> StdResult<SqliteConnection> {
179        let sqlite_db_path = self.config.get_sqlite_file(sqlite_file_name)?;
180        let connection = ConnectionBuilder::open_file(&sqlite_db_path)
181            .with_node_type(ApplicationNodeType::Signer)
182            .with_migrations(migrations)
183            .with_logger(self.root_logger())
184            .build()
185            .with_context(|| "Database connection initialisation error")?;
186
187        Ok(connection)
188    }
189
190    /// Build dependencies for the Production environment.
191    pub async fn build(&self) -> StdResult<SignerDependencyContainer> {
192        if !self.config.data_stores_directory.exists() {
193            fs::create_dir_all(self.config.data_stores_directory.clone()).with_context(|| {
194                format!(
195                    "Could not create data stores directory: `{}`",
196                    self.config.data_stores_directory.display()
197                )
198            })?;
199        }
200
201        let network = self.config.get_network()?;
202        let sqlite_connection = Arc::new(
203            self.build_sqlite_connection(SQLITE_FILE, crate::database::migration::get_migrations())
204                .await?,
205        );
206        let transaction_sqlite_connection = self
207            .build_sqlite_connection(
208                SQLITE_FILE_CARDANO_TRANSACTION,
209                mithril_persistence::database::cardano_transaction_migration::get_migrations(),
210            )
211            .await?;
212        let sqlite_connection_cardano_transaction_pool = Arc::new(
213            SqliteConnectionPool::build_from_connection(transaction_sqlite_connection),
214        );
215
216        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
217
218        let protocol_initializer_store = Arc::new(ProtocolInitializerRepository::new(
219            sqlite_connection.clone(),
220            self.config.store_retention_limit.map(|limit| limit as u64),
221        ));
222
223        let digester = Arc::new(CardanoImmutableDigester::new(
224            network.to_string(),
225            self.build_digester_cache_provider().await?,
226            self.root_logger(),
227        ));
228        let stake_store = Arc::new(StakePoolStore::new(
229            sqlite_connection.clone(),
230            self.config.store_retention_limit.map(|limit| limit as u64),
231        ));
232        let chain_observer = {
233            let builder = self.chain_observer_builder;
234            builder(self.config)?
235        };
236        let ticker_service = {
237            let builder = self.immutable_file_observer_builder;
238            Arc::new(MithrilTickerService::new(
239                chain_observer.clone(),
240                builder(self.config)?,
241            ))
242        };
243
244        let era_reader = Arc::new(EraReader::new(
245            self.config
246                .build_era_reader_adapter(chain_observer.clone())?,
247        ));
248        let era_epoch_token = era_reader
249            .read_era_epoch_token(ticker_service.get_current_epoch().await?)
250            .await?;
251        let era_checker = Arc::new(EraChecker::new(
252            era_epoch_token.get_current_supported_era()?,
253            era_epoch_token.get_current_epoch(),
254        ));
255
256        let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone()));
257        let aggregator_client = Arc::new(AggregatorHTTPClient::new(
258            self.config.aggregator_endpoint.clone(),
259            self.config.relay_endpoint.clone(),
260            api_version_provider.clone(),
261            Some(Duration::from_millis(HTTP_REQUEST_TIMEOUT_DURATION)),
262            self.root_logger(),
263        ));
264
265        let cardano_immutable_snapshot_builder =
266            Arc::new(CardanoImmutableFilesFullSignableBuilder::new(
267                digester.clone(),
268                &self.config.db_directory,
269                self.root_logger(),
270            ));
271        let mithril_stake_distribution_signable_builder =
272            Arc::new(MithrilStakeDistributionSignableBuilder::default());
273        let transaction_store = Arc::new(CardanoTransactionRepository::new(
274            sqlite_connection_cardano_transaction_pool.clone(),
275        ));
276        let chain_block_reader = PallasChainReader::new(
277            &self.config.cardano_node_socket_path,
278            network,
279            self.root_logger(),
280        );
281        let block_scanner = Arc::new(CardanoBlockScanner::new(
282            Arc::new(Mutex::new(chain_block_reader)),
283            self.config
284                .cardano_transactions_block_streamer_max_roll_forwards_per_poll,
285            self.root_logger(),
286        ));
287        let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
288            block_scanner,
289            transaction_store.clone(),
290            self.root_logger(),
291        ));
292        // Wrap the transaction importer with decorator to prune the transactions after import
293        let transactions_importer = Arc::new(TransactionsImporterWithPruner::new(
294            self.config
295                .enable_transaction_pruning
296                .then_some(self.config.network_security_parameter),
297            transaction_store.clone(),
298            transactions_importer,
299            self.root_logger(),
300        ));
301        // Wrap the transaction importer with decorator to chunk its workload, so it prunes
302        // transactions after each chunk, reducing the storage footprint
303        let state_machine_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
304            transaction_store.clone(),
305            transactions_importer.clone(),
306            self.config.transactions_import_block_chunk_size,
307            self.root_logger(),
308        ));
309        // For the preloader, we want to vacuum the database after each chunk, to reclaim disk space
310        // earlier than with just auto_vacuum (that execute only after the end of all import).
311        let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new(
312            transaction_store.clone(),
313            Arc::new(TransactionsImporterWithVacuum::new(
314                sqlite_connection_cardano_transaction_pool.clone(),
315                transactions_importer.clone(),
316                self.root_logger(),
317            )),
318            self.config.transactions_import_block_chunk_size,
319            self.root_logger(),
320        ));
321        let block_range_root_retriever = transaction_store.clone();
322        let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::<
323            MKTreeStoreSqlite,
324        >::new(
325            state_machine_transactions_importer,
326            block_range_root_retriever,
327        ));
328        let cardano_stake_distribution_signable_builder = Arc::new(
329            CardanoStakeDistributionSignableBuilder::new(stake_store.clone()),
330        );
331        let cardano_database_signable_builder = Arc::new(CardanoDatabaseSignableBuilder::new(
332            digester.clone(),
333            &self.config.db_directory,
334            self.root_logger(),
335        ));
336        let epoch_service = Arc::new(RwLock::new(MithrilEpochService::new(
337            stake_store.clone(),
338            protocol_initializer_store.clone(),
339            self.root_logger(),
340        )));
341        let single_signer = Arc::new(MithrilSingleSigner::new(
342            self.compute_protocol_party_id()?,
343            epoch_service.clone(),
344            self.root_logger(),
345        ));
346        let signable_seed_builder_service = Arc::new(SignerSignableSeedBuilder::new(
347            epoch_service.clone(),
348            protocol_initializer_store.clone(),
349        ));
350        let signable_builders_dependencies = SignableBuilderServiceDependencies::new(
351            mithril_stake_distribution_signable_builder,
352            cardano_immutable_snapshot_builder,
353            cardano_transactions_builder,
354            cardano_stake_distribution_signable_builder,
355            cardano_database_signable_builder,
356        );
357        let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(
358            signable_seed_builder_service,
359            signable_builders_dependencies,
360            self.root_logger(),
361        ));
362        let metrics_service = Arc::new(MetricsService::new(self.root_logger())?);
363        let preloader_activation =
364            CardanoTransactionsPreloaderActivationSigner::new(aggregator_client.clone());
365        let cardano_transactions_preloader = Arc::new(CardanoTransactionsPreloader::new(
366            signed_entity_type_lock.clone(),
367            preloader_transactions_importer,
368            self.config.preload_security_parameter,
369            chain_observer.clone(),
370            self.root_logger(),
371            Arc::new(preloader_activation),
372        ));
373        let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
374            sqlite_connection.clone(),
375            self.config.store_retention_limit.map(|limit| limit as u64),
376        ));
377        let upkeep_service = Arc::new(SignerUpkeepService::new(
378            sqlite_connection.clone(),
379            sqlite_connection_cardano_transaction_pool,
380            signed_entity_type_lock.clone(),
381            vec![
382                signed_beacon_repository.clone(),
383                stake_store.clone(),
384                protocol_initializer_store.clone(),
385            ],
386            self.root_logger(),
387        ));
388        let certifier = Arc::new(SignerCertifierService::new(
389            signed_beacon_repository,
390            Arc::new(SignerSignedEntityConfigProvider::new(epoch_service.clone())),
391            signed_entity_type_lock.clone(),
392            single_signer.clone(),
393            aggregator_client.clone(),
394            self.root_logger(),
395        ));
396
397        let services = SignerDependencyContainer {
398            ticker_service,
399            certificate_handler: aggregator_client,
400            chain_observer,
401            digester,
402            single_signer,
403            stake_store,
404            protocol_initializer_store,
405            era_checker,
406            era_reader,
407            api_version_provider,
408            signable_builder_service,
409            metrics_service,
410            signed_entity_type_lock,
411            cardano_transactions_preloader,
412            upkeep_service,
413            epoch_service,
414            certifier,
415        };
416
417        Ok(services)
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use std::path::PathBuf;
424
425    use mithril_common::{
426        chain_observer::FakeObserver, digesters::DumbImmutableFileObserver, entities::TimePoint,
427        test_utils::TempDir,
428    };
429
430    use crate::test_tools::TestLogger;
431
432    use super::*;
433
434    fn get_test_dir(test_name: &str) -> PathBuf {
435        TempDir::create("signer_service", test_name)
436    }
437
438    #[tokio::test]
439    async fn test_auto_create_stores_directory() {
440        let stores_dir = get_test_dir("test_auto_create_stores_directory").join("stores");
441        let config = Configuration {
442            data_stores_directory: stores_dir.clone(),
443            ..Configuration::new_sample("party-123456")
444        };
445
446        assert!(!stores_dir.exists());
447        let chain_observer_builder: fn(&Configuration) -> StdResult<Arc<dyn ChainObserver>> =
448            |_config| Ok(Arc::new(FakeObserver::new(Some(TimePoint::dummy()))));
449        let immutable_file_observer_builder: fn(
450            &Configuration,
451        )
452            -> StdResult<Arc<dyn ImmutableFileObserver>> =
453            |_config: &Configuration| Ok(Arc::new(DumbImmutableFileObserver::default()));
454
455        let mut dependencies_builder = DependenciesBuilder::new(&config, TestLogger::stdout());
456        dependencies_builder
457            .override_chain_observer_builder(chain_observer_builder)
458            .override_immutable_file_observer_builder(immutable_file_observer_builder)
459            .build()
460            .await
461            .expect("service builder build should not fail");
462        assert!(stores_dir.exists());
463    }
464}