mithril_aggregator/dependency_injection/builder/protocol/
artifacts.rs

1use anyhow::Context;
2use semver::Version;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use mithril_common::crypto_helper::ManifestSigner;
7
8use crate::artifact_builder::{
9    AncillaryArtifactBuilder, AncillaryFileUploader, CardanoBlocksTransactionsArtifactBuilder,
10    CardanoDatabaseArtifactBuilder, CardanoImmutableFilesFullArtifactBuilder,
11    CardanoStakeDistributionArtifactBuilder, CardanoTransactionsArtifactBuilder,
12    DigestArtifactBuilder, DigestFileUploader, DigestSnapshotter, ImmutableArtifactBuilder,
13    ImmutableFilesUploader, MithrilStakeDistributionArtifactBuilder,
14};
15use crate::configuration::AncillaryFilesSignerConfig;
16use crate::dependency_injection::builder::SNAPSHOT_ARTIFACTS_DIR;
17use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
18use crate::file_uploaders::{
19    CloudRemotePath, CloudUploader, FileUploadRetryPolicy, GCloudBackendUploader, LocalUploader,
20};
21use crate::get_dependency;
22use crate::http_server::{CARDANO_DATABASE_DOWNLOAD_PATH, SNAPSHOT_DOWNLOAD_PATH};
23use crate::services::ancillary_signer::{
24    AncillarySigner, AncillarySignerWithGcpKms, AncillarySignerWithSecretKey,
25};
26use crate::services::{
27    CompressedArchiveSnapshotter, DumbSnapshotter, MithrilSignedEntityService, SignedEntityService,
28    SignedEntityServiceArtifactsDependencies, Snapshotter,
29};
30use crate::tools::DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR;
31use crate::tools::file_archiver::FileArchiver;
32use crate::{DumbUploader, ExecutionEnvironment, FileUploader, SnapshotUploaderType};
33
34impl DependenciesBuilder {
35    async fn build_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
36        let logger = self.root_logger();
37        let signed_entity_storer = self.get_signed_entity_storer().await?;
38        let epoch_service = self.get_epoch_service().await?;
39        let mithril_stake_distribution_artifact_builder = Arc::new(
40            MithrilStakeDistributionArtifactBuilder::new(epoch_service.clone()),
41        );
42        let snapshotter = self.get_snapshotter().await?;
43        let snapshot_uploader = self.get_snapshot_uploader().await?;
44        let cardano_node_version = Version::parse(&self.configuration.cardano_node_version())
45            .map_err(|e| DependenciesBuilderError::Initialization { message: format!("Could not parse configuration setting 'cardano_node_version' value '{}' as Semver.", self.configuration.cardano_node_version()), error: Some(e.into()) })?;
46        let cardano_immutable_files_full_artifact_builder =
47            Arc::new(CardanoImmutableFilesFullArtifactBuilder::new(
48                self.configuration.get_network()?,
49                &cardano_node_version,
50                snapshotter,
51                snapshot_uploader,
52                logger.clone(),
53            ));
54        let legacy_prover_service = self.get_legacy_prover_service().await?;
55        let cardano_transactions_artifact_builder = Arc::new(
56            CardanoTransactionsArtifactBuilder::new(legacy_prover_service.clone()),
57        );
58        let prover_service = self.get_prover_service().await?;
59        let mithril_network_configuration_provider =
60            self.get_mithril_network_configuration_provider().await?;
61        let cardano_blocks_transactions_artifact_builder =
62            Arc::new(CardanoBlocksTransactionsArtifactBuilder::new(
63                prover_service.clone(),
64                mithril_network_configuration_provider.clone(),
65            ));
66        let stake_store = self.get_stake_store().await?;
67        let cardano_stake_distribution_artifact_builder =
68            Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store));
69        let cardano_database_artifact_builder = Arc::new(
70            self.build_cardano_database_artifact_builder(cardano_node_version)
71                .await?,
72        );
73        let dependencies = SignedEntityServiceArtifactsDependencies::new(
74            mithril_stake_distribution_artifact_builder,
75            cardano_immutable_files_full_artifact_builder,
76            cardano_transactions_artifact_builder,
77            cardano_blocks_transactions_artifact_builder,
78            cardano_stake_distribution_artifact_builder,
79            cardano_database_artifact_builder,
80        );
81        let signed_entity_service = Arc::new(MithrilSignedEntityService::new(
82            signed_entity_storer,
83            dependencies,
84            self.get_signed_entity_type_lock().await?,
85            self.get_metrics_service().await?,
86            logger,
87        ));
88
89        // Compute the cache pool for prover service
90        // This is done here to avoid circular dependencies between the prover service and the signed entity service
91        // TODO: Make this part of a warmup phase of the aggregator?
92        if let Some(signed_entity) =
93            signed_entity_service.get_last_cardano_transaction_snapshot().await?
94        {
95            legacy_prover_service
96                .compute_cache(signed_entity.artifact.block_number)
97                .await?;
98        }
99
100        Ok(signed_entity_service)
101    }
102
103    /// [SignedEntityService] service
104    pub async fn get_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
105        get_dependency!(self.signed_entity_service)
106    }
107
108    async fn build_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
109        let archive_verification_directory =
110            std::env::temp_dir().join("mithril_archiver_verify_archive");
111        let file_archiver = Arc::new(FileArchiver::new(
112            self.configuration.zstandard_parameters().unwrap_or_default(),
113            archive_verification_directory,
114            self.root_logger(),
115        ));
116
117        Ok(file_archiver)
118    }
119
120    async fn get_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
121        get_dependency!(self.file_archiver)
122    }
123
124    async fn get_ancillary_signer(&self) -> Result<Arc<dyn AncillarySigner>> {
125        match &self.configuration.ancillary_files_signer_config() {
126            AncillaryFilesSignerConfig::SecretKey { secret_key } => {
127                let manifest_signer = ManifestSigner::from_secret_key(
128                    secret_key
129                        .as_str()
130                        .try_into()
131                        .with_context(|| "Failed to build ancillary signer: Invalid secret key")?,
132                );
133
134                Ok(Arc::new(AncillarySignerWithSecretKey::new(
135                    manifest_signer,
136                    self.root_logger(),
137                )))
138            }
139            AncillaryFilesSignerConfig::GcpKms {
140                resource_name,
141                credentials_json_env_var,
142            } => {
143                let service = AncillarySignerWithGcpKms::new(
144                    resource_name.clone(),
145                    credentials_json_env_var.clone(),
146                    self.root_logger(),
147                )
148                .await?;
149
150                Ok(Arc::new(service))
151            }
152        }
153    }
154
155    async fn build_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
156        let snapshotter: Arc<dyn Snapshotter> = match self.configuration.environment() {
157            ExecutionEnvironment::Production => {
158                let ongoing_snapshot_directory =
159                    self.configuration.get_snapshot_dir()?.join("pending_snapshot");
160
161                Arc::new(CompressedArchiveSnapshotter::new(
162                    self.configuration.db_directory().clone(),
163                    ongoing_snapshot_directory,
164                    self.configuration.snapshot_compression_algorithm(),
165                    self.get_file_archiver().await?,
166                    self.get_ancillary_signer().await?,
167                    self.root_logger(),
168                )?)
169            }
170            _ => Arc::new(DumbSnapshotter::new(
171                self.configuration.snapshot_compression_algorithm(),
172            )),
173        };
174
175        Ok(snapshotter)
176    }
177
178    async fn build_digests_snapshotter(
179        &mut self,
180        digests_path: PathBuf,
181    ) -> Result<DigestSnapshotter> {
182        Ok(DigestSnapshotter {
183            file_archiver: self.get_file_archiver().await?,
184            target_location: digests_path,
185            compression_algorithm: self.configuration.snapshot_compression_algorithm(),
186        })
187    }
188
189    /// [Snapshotter] service.
190    pub async fn get_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
191        get_dependency!(self.snapshotter)
192    }
193
194    async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
195        let logger = self.root_logger();
196        if self.configuration.environment() == ExecutionEnvironment::Production {
197            match self.configuration.snapshot_uploader_type() {
198                SnapshotUploaderType::Gcp => {
199                    let allow_overwrite = true;
200                    let remote_folder_path = CloudRemotePath::new("cardano-immutable-files-full");
201
202                    Ok(Arc::new(
203                        self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
204                    ))
205                }
206                SnapshotUploaderType::Local => {
207                    let server_url_prefix = self.configuration.get_server_url()?;
208                    let snapshot_url_prefix =
209                        server_url_prefix.sanitize_join(SNAPSHOT_DOWNLOAD_PATH)?;
210                    let snapshot_artifacts_dir =
211                        self.configuration.get_snapshot_dir()?.join(SNAPSHOT_ARTIFACTS_DIR);
212                    std::fs::create_dir_all(&snapshot_artifacts_dir).map_err(|e| {
213                        DependenciesBuilderError::Initialization {
214                            message: format!(
215                                "Cannot create '{snapshot_artifacts_dir:?}' directory."
216                            ),
217                            error: Some(e.into()),
218                        }
219                    })?;
220
221                    Ok(Arc::new(LocalUploader::new(
222                        snapshot_url_prefix,
223                        &snapshot_artifacts_dir,
224                        FileUploadRetryPolicy::default(),
225                        logger,
226                    )))
227                }
228            }
229        } else {
230            Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
231        }
232    }
233
234    /// Get a [FileUploader]
235    pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
236        get_dependency!(self.snapshot_uploader)
237    }
238
239    async fn build_gcp_uploader(
240        &self,
241        remote_folder_path: CloudRemotePath,
242        allow_overwrite: bool,
243    ) -> Result<CloudUploader> {
244        let logger = self.root_logger();
245        let bucket = self.configuration.snapshot_bucket_name().to_owned().ok_or_else(|| {
246            DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
247        })?;
248
249        Ok(CloudUploader::new(
250            Arc::new(
251                GCloudBackendUploader::try_new(
252                    bucket,
253                    self.configuration.snapshot_use_cdn_domain(),
254                    DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR.to_string(),
255                    logger.clone(),
256                )
257                .await?,
258            ),
259            remote_folder_path,
260            allow_overwrite,
261            FileUploadRetryPolicy::default(),
262        ))
263    }
264
265    async fn build_cardano_database_ancillary_uploaders(
266        &self,
267    ) -> Result<Vec<Arc<dyn AncillaryFileUploader>>> {
268        let logger = self.root_logger();
269        if self.configuration.environment() == ExecutionEnvironment::Production {
270            match self.configuration.snapshot_uploader_type() {
271                SnapshotUploaderType::Gcp => {
272                    let allow_overwrite = true;
273                    let remote_folder_path =
274                        CloudRemotePath::new("cardano-database").join("ancillary");
275
276                    Ok(vec![Arc::new(
277                        self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
278                    )])
279                }
280                SnapshotUploaderType::Local => {
281                    let server_url_prefix = self.configuration.get_server_url()?;
282                    let ancillary_url_prefix = server_url_prefix
283                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/ancillary/"))?;
284                    let target_dir = self.get_cardano_db_artifacts_dir()?.join("ancillary");
285
286                    std::fs::create_dir_all(&target_dir).map_err(|e| {
287                        DependenciesBuilderError::Initialization {
288                            message: format!("Cannot create '{target_dir:?}' directory."),
289                            error: Some(e.into()),
290                        }
291                    })?;
292
293                    Ok(vec![Arc::new(LocalUploader::new(
294                        ancillary_url_prefix,
295                        &target_dir,
296                        FileUploadRetryPolicy::default(),
297                        logger,
298                    ))])
299                }
300            }
301        } else {
302            Ok(vec![Arc::new(DumbUploader::new(
303                FileUploadRetryPolicy::never(),
304            ))])
305        }
306    }
307
308    async fn build_cardano_database_immutable_uploaders(
309        &self,
310    ) -> Result<Vec<Arc<dyn ImmutableFilesUploader>>> {
311        let logger = self.root_logger();
312        if self.configuration.environment() == ExecutionEnvironment::Production {
313            match self.configuration.snapshot_uploader_type() {
314                SnapshotUploaderType::Gcp => {
315                    let allow_overwrite = false;
316                    let remote_folder_path =
317                        CloudRemotePath::new("cardano-database").join("immutable");
318
319                    Ok(vec![Arc::new(
320                        self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
321                    )])
322                }
323                SnapshotUploaderType::Local => {
324                    let server_url_prefix = self.configuration.get_server_url()?;
325                    let immutable_url_prefix = server_url_prefix
326                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/immutable/"))?;
327
328                    Ok(vec![Arc::new(LocalUploader::new_without_copy(
329                        immutable_url_prefix,
330                        FileUploadRetryPolicy::default(),
331                        logger,
332                    ))])
333                }
334            }
335        } else {
336            Ok(vec![Arc::new(DumbUploader::new(
337                FileUploadRetryPolicy::never(),
338            ))])
339        }
340    }
341
342    async fn build_cardano_database_digests_uploaders(
343        &self,
344    ) -> Result<Vec<Arc<dyn DigestFileUploader>>> {
345        let logger = self.root_logger();
346        if self.configuration.environment() == ExecutionEnvironment::Production {
347            match self.configuration.snapshot_uploader_type() {
348                SnapshotUploaderType::Gcp => {
349                    let allow_overwrite = false;
350                    let remote_folder_path =
351                        CloudRemotePath::new("cardano-database").join("digests");
352
353                    Ok(vec![Arc::new(
354                        self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
355                    )])
356                }
357                SnapshotUploaderType::Local => {
358                    let server_url_prefix = self.configuration.get_server_url()?;
359                    let digests_url_prefix = server_url_prefix
360                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/digests/"))?;
361                    let target_dir = self.get_cardano_db_artifacts_dir()?.join("digests");
362
363                    std::fs::create_dir_all(&target_dir).map_err(|e| {
364                        DependenciesBuilderError::Initialization {
365                            message: format!("Cannot create '{target_dir:?}' directory."),
366                            error: Some(e.into()),
367                        }
368                    })?;
369
370                    Ok(vec![Arc::new(LocalUploader::new(
371                        digests_url_prefix,
372                        &target_dir,
373                        FileUploadRetryPolicy::default(),
374                        logger,
375                    ))])
376                }
377            }
378        } else {
379            Ok(vec![Arc::new(DumbUploader::new(
380                FileUploadRetryPolicy::never(),
381            ))])
382        }
383    }
384
385    async fn build_cardano_database_artifact_builder(
386        &mut self,
387        cardano_node_version: Version,
388    ) -> Result<CardanoDatabaseArtifactBuilder> {
389        let snapshot_dir = self.configuration.get_snapshot_dir()?;
390        let immutable_dir = self.get_cardano_db_artifacts_dir()?.join("immutable");
391
392        let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new(
393            self.build_cardano_database_ancillary_uploaders().await?,
394            self.get_snapshotter().await?,
395            self.configuration.get_network()?,
396            self.root_logger(),
397        )?);
398
399        let immutable_builder = Arc::new(ImmutableArtifactBuilder::new(
400            immutable_dir,
401            self.build_cardano_database_immutable_uploaders().await?,
402            self.get_snapshotter().await?,
403            self.root_logger(),
404        )?);
405
406        let digests_path = snapshot_dir.join("pending_cardano_database_digests");
407        let digests_snapshotter = self.build_digests_snapshotter(digests_path.clone()).await?;
408
409        let digest_builder = Arc::new(DigestArtifactBuilder::new(
410            self.configuration.get_server_url()?,
411            self.build_cardano_database_digests_uploaders().await?,
412            digests_snapshotter,
413            self.configuration.get_network()?,
414            digests_path,
415            self.get_immutable_file_digest_mapper().await?,
416            self.root_logger(),
417        )?);
418
419        Ok(CardanoDatabaseArtifactBuilder::new(
420            self.configuration.get_network()?,
421            &cardano_node_version,
422            ancillary_builder,
423            immutable_builder,
424            digest_builder,
425        ))
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use mithril_common::temp_dir_create;
432    use mithril_persistence::sqlite::ConnectionBuilder;
433
434    use crate::ServeCommandConfiguration;
435    use crate::dependency_injection::builder::CARDANO_DB_ARTIFACTS_DIR;
436
437    use super::*;
438
439    #[tokio::test]
440    async fn if_not_local_uploader_create_cardano_database_immutable_dirs() {
441        let snapshot_directory = temp_dir_create!();
442        let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
443        let ancillary_dir = cdb_dir.join("ancillary");
444        let immutable_dir = cdb_dir.join("immutable");
445        let digests_dir = cdb_dir.join("digests");
446
447        let mut dep_builder = {
448            let config = ServeCommandConfiguration {
449                // Test environment yield dumb uploaders
450                environment: ExecutionEnvironment::Test,
451                ..ServeCommandConfiguration::new_sample(snapshot_directory)
452            };
453
454            DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
455        };
456
457        assert!(!ancillary_dir.exists());
458        assert!(!immutable_dir.exists());
459        assert!(!digests_dir.exists());
460
461        dep_builder
462            .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
463            .await
464            .unwrap();
465
466        assert!(!ancillary_dir.exists());
467        assert!(immutable_dir.exists());
468        assert!(!digests_dir.exists());
469    }
470
471    #[tokio::test]
472    async fn if_local_uploader_creates_all_cardano_database_subdirs() {
473        let snapshot_directory = temp_dir_create!();
474        let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
475        let ancillary_dir = cdb_dir.join("ancillary");
476        let immutable_dir = cdb_dir.join("immutable");
477        let digests_dir = cdb_dir.join("digests");
478
479        let mut dep_builder = {
480            let config = ServeCommandConfiguration {
481                // Must use production environment to make `snapshot_uploader_type` effective
482                environment: ExecutionEnvironment::Production,
483                snapshot_uploader_type: SnapshotUploaderType::Local,
484                ..ServeCommandConfiguration::new_sample(snapshot_directory)
485            };
486
487            DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
488        };
489        // In production environment the builder can't create in-memory SQLite connections, we
490        // need to provide it manually to avoid creations of unnecessary files.
491        dep_builder.sqlite_connection =
492            Some(Arc::new(ConnectionBuilder::open_memory().build().unwrap()));
493
494        assert!(!ancillary_dir.exists());
495        assert!(!immutable_dir.exists());
496        assert!(!digests_dir.exists());
497
498        dep_builder
499            .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
500            .await
501            .unwrap();
502
503        assert!(ancillary_dir.exists());
504        assert!(immutable_dir.exists());
505        assert!(digests_dir.exists());
506    }
507}