mithril_aggregator/dependency_injection/builder/protocol/
artifacts.rs

1use anyhow::Context;
2use semver::Version;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use mithril_common::crypto_helper::ManifestSigner;
7
8use crate::artifact_builder::{
9    AncillaryArtifactBuilder, AncillaryFileUploader, CardanoDatabaseArtifactBuilder,
10    CardanoImmutableFilesFullArtifactBuilder, CardanoStakeDistributionArtifactBuilder,
11    CardanoTransactionsArtifactBuilder, DigestArtifactBuilder, DigestFileUploader,
12    DigestSnapshotter, ImmutableArtifactBuilder, ImmutableFilesUploader,
13    MithrilStakeDistributionArtifactBuilder,
14};
15use crate::configuration::AncillaryFilesSignerConfig;
16use crate::dependency_injection::builder::SNAPSHOT_ARTIFACTS_DIR;
17use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
18use crate::file_uploaders::{
19    CloudRemotePath, CloudUploader, FileUploadRetryPolicy, GCloudBackendUploader, LocalUploader,
20};
21use crate::get_dependency;
22use crate::http_server::{CARDANO_DATABASE_DOWNLOAD_PATH, SNAPSHOT_DOWNLOAD_PATH};
23use crate::services::ancillary_signer::{
24    AncillarySigner, AncillarySignerWithGcpKms, AncillarySignerWithSecretKey,
25};
26use crate::services::{
27    CompressedArchiveSnapshotter, DumbSnapshotter, MithrilSignedEntityService, SignedEntityService,
28    SignedEntityServiceArtifactsDependencies, Snapshotter,
29};
30use crate::tools::file_archiver::FileArchiver;
31use crate::tools::DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR;
32use crate::{DumbUploader, ExecutionEnvironment, FileUploader, SnapshotUploaderType};
33
34impl DependenciesBuilder {
35    async fn build_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
36        let logger = self.root_logger();
37        let signed_entity_storer = self.get_signed_entity_storer().await?;
38        let epoch_service = self.get_epoch_service().await?;
39        let mithril_stake_distribution_artifact_builder = Arc::new(
40            MithrilStakeDistributionArtifactBuilder::new(epoch_service.clone()),
41        );
42        let snapshotter = self.get_snapshotter().await?;
43        let snapshot_uploader = self.get_snapshot_uploader().await?;
44        let cardano_node_version = Version::parse(&self.configuration.cardano_node_version())
45            .map_err(|e| DependenciesBuilderError::Initialization { message: format!("Could not parse configuration setting 'cardano_node_version' value '{}' as Semver.", self.configuration.cardano_node_version()), error: Some(e.into()) })?;
46        let cardano_immutable_files_full_artifact_builder =
47            Arc::new(CardanoImmutableFilesFullArtifactBuilder::new(
48                self.configuration.get_network()?,
49                &cardano_node_version,
50                snapshotter,
51                snapshot_uploader,
52                logger.clone(),
53            ));
54        let prover_service = self.get_prover_service().await?;
55        let cardano_transactions_artifact_builder = Arc::new(
56            CardanoTransactionsArtifactBuilder::new(prover_service.clone()),
57        );
58        let stake_store = self.get_stake_store().await?;
59        let cardano_stake_distribution_artifact_builder =
60            Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store));
61        let cardano_database_artifact_builder = Arc::new(
62            self.build_cardano_database_artifact_builder(cardano_node_version)
63                .await?,
64        );
65        let dependencies = SignedEntityServiceArtifactsDependencies::new(
66            mithril_stake_distribution_artifact_builder,
67            cardano_immutable_files_full_artifact_builder,
68            cardano_transactions_artifact_builder,
69            cardano_stake_distribution_artifact_builder,
70            cardano_database_artifact_builder,
71        );
72        let signed_entity_service = Arc::new(MithrilSignedEntityService::new(
73            signed_entity_storer,
74            dependencies,
75            self.get_signed_entity_type_lock().await?,
76            self.get_metrics_service().await?,
77            logger,
78        ));
79
80        // Compute the cache pool for prover service
81        // This is done here to avoid circular dependencies between the prover service and the signed entity service
82        // TODO: Make this part of a warmup phase of the aggregator?
83        if let Some(signed_entity) = signed_entity_service
84            .get_last_cardano_transaction_snapshot()
85            .await?
86        {
87            prover_service
88                .compute_cache(signed_entity.artifact.block_number)
89                .await?;
90        }
91
92        Ok(signed_entity_service)
93    }
94
95    /// [SignedEntityService] service
96    pub async fn get_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
97        get_dependency!(self.signed_entity_service)
98    }
99
100    async fn build_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
101        let archive_verification_directory =
102            std::env::temp_dir().join("mithril_archiver_verify_archive");
103        let file_archiver = Arc::new(FileArchiver::new(
104            self.configuration
105                .zstandard_parameters()
106                .unwrap_or_default(),
107            archive_verification_directory,
108            self.root_logger(),
109        ));
110
111        Ok(file_archiver)
112    }
113
114    async fn get_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
115        get_dependency!(self.file_archiver)
116    }
117
118    async fn get_ancillary_signer(&self) -> Result<Arc<dyn AncillarySigner>> {
119        match &self.configuration.ancillary_files_signer_config() {
120            AncillaryFilesSignerConfig::SecretKey { secret_key } => {
121                let manifest_signer = ManifestSigner::from_secret_key(
122                    secret_key
123                        .as_str()
124                        .try_into()
125                        .with_context(|| "Failed to build ancillary signer: Invalid secret key")?,
126                );
127
128                Ok(Arc::new(AncillarySignerWithSecretKey::new(
129                    manifest_signer,
130                    self.root_logger(),
131                )))
132            }
133            AncillaryFilesSignerConfig::GcpKms {
134                resource_name,
135                credentials_json_env_var,
136            } => {
137                let service = AncillarySignerWithGcpKms::new(
138                    resource_name.clone(),
139                    credentials_json_env_var.clone(),
140                    self.root_logger(),
141                )
142                .await?;
143
144                Ok(Arc::new(service))
145            }
146        }
147    }
148
149    async fn build_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
150        let snapshotter: Arc<dyn Snapshotter> = match self.configuration.environment() {
151            ExecutionEnvironment::Production => {
152                let ongoing_snapshot_directory = self
153                    .configuration
154                    .get_snapshot_dir()?
155                    .join("pending_snapshot");
156
157                Arc::new(CompressedArchiveSnapshotter::new(
158                    self.configuration.db_directory().clone(),
159                    ongoing_snapshot_directory,
160                    self.configuration.snapshot_compression_algorithm(),
161                    self.get_file_archiver().await?,
162                    self.get_ancillary_signer().await?,
163                    self.root_logger(),
164                )?)
165            }
166            _ => Arc::new(DumbSnapshotter::new(
167                self.configuration.snapshot_compression_algorithm(),
168            )),
169        };
170
171        Ok(snapshotter)
172    }
173
174    async fn build_digests_snapshotter(
175        &mut self,
176        digests_path: PathBuf,
177    ) -> Result<DigestSnapshotter> {
178        Ok(DigestSnapshotter {
179            file_archiver: self.get_file_archiver().await?,
180            target_location: digests_path,
181            compression_algorithm: self.configuration.snapshot_compression_algorithm(),
182        })
183    }
184
185    /// [Snapshotter] service.
186    pub async fn get_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
187        get_dependency!(self.snapshotter)
188    }
189
190    async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
191        let logger = self.root_logger();
192        if self.configuration.environment() == ExecutionEnvironment::Production {
193            match self.configuration.snapshot_uploader_type() {
194                SnapshotUploaderType::Gcp => {
195                    let allow_overwrite = true;
196                    let remote_folder_path = CloudRemotePath::new("cardano-immutable-files-full");
197
198                    Ok(Arc::new(
199                        self.build_gcp_uploader(remote_folder_path, allow_overwrite)
200                            .await?,
201                    ))
202                }
203                SnapshotUploaderType::Local => {
204                    let server_url_prefix = self.configuration.get_server_url()?;
205                    let snapshot_url_prefix =
206                        server_url_prefix.sanitize_join(SNAPSHOT_DOWNLOAD_PATH)?;
207                    let snapshot_artifacts_dir = self
208                        .configuration
209                        .get_snapshot_dir()?
210                        .join(SNAPSHOT_ARTIFACTS_DIR);
211                    std::fs::create_dir_all(&snapshot_artifacts_dir).map_err(|e| {
212                        DependenciesBuilderError::Initialization {
213                            message: format!(
214                                "Cannot create '{snapshot_artifacts_dir:?}' directory."
215                            ),
216                            error: Some(e.into()),
217                        }
218                    })?;
219
220                    Ok(Arc::new(LocalUploader::new(
221                        snapshot_url_prefix,
222                        &snapshot_artifacts_dir,
223                        FileUploadRetryPolicy::default(),
224                        logger,
225                    )))
226                }
227            }
228        } else {
229            Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
230        }
231    }
232
233    /// Get a [FileUploader]
234    pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
235        get_dependency!(self.snapshot_uploader)
236    }
237
238    async fn build_gcp_uploader(
239        &self,
240        remote_folder_path: CloudRemotePath,
241        allow_overwrite: bool,
242    ) -> Result<CloudUploader> {
243        let logger = self.root_logger();
244        let bucket = self
245            .configuration
246            .snapshot_bucket_name()
247            .to_owned()
248            .ok_or_else(|| {
249                DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
250            })?;
251
252        Ok(CloudUploader::new(
253            Arc::new(
254                GCloudBackendUploader::try_new(
255                    bucket,
256                    self.configuration.snapshot_use_cdn_domain(),
257                    DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR.to_string(),
258                    logger.clone(),
259                )
260                .await?,
261            ),
262            remote_folder_path,
263            allow_overwrite,
264            FileUploadRetryPolicy::default(),
265        ))
266    }
267
268    async fn build_cardano_database_ancillary_uploaders(
269        &self,
270    ) -> Result<Vec<Arc<dyn AncillaryFileUploader>>> {
271        let logger = self.root_logger();
272        if self.configuration.environment() == ExecutionEnvironment::Production {
273            match self.configuration.snapshot_uploader_type() {
274                SnapshotUploaderType::Gcp => {
275                    let allow_overwrite = true;
276                    let remote_folder_path =
277                        CloudRemotePath::new("cardano-database").join("ancillary");
278
279                    Ok(vec![Arc::new(
280                        self.build_gcp_uploader(remote_folder_path, allow_overwrite)
281                            .await?,
282                    )])
283                }
284                SnapshotUploaderType::Local => {
285                    let server_url_prefix = self.configuration.get_server_url()?;
286                    let ancillary_url_prefix = server_url_prefix
287                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/ancillary/"))?;
288                    let target_dir = self.get_cardano_db_artifacts_dir()?.join("ancillary");
289
290                    std::fs::create_dir_all(&target_dir).map_err(|e| {
291                        DependenciesBuilderError::Initialization {
292                            message: format!("Cannot create '{target_dir:?}' directory."),
293                            error: Some(e.into()),
294                        }
295                    })?;
296
297                    Ok(vec![Arc::new(LocalUploader::new(
298                        ancillary_url_prefix,
299                        &target_dir,
300                        FileUploadRetryPolicy::default(),
301                        logger,
302                    ))])
303                }
304            }
305        } else {
306            Ok(vec![Arc::new(DumbUploader::new(
307                FileUploadRetryPolicy::never(),
308            ))])
309        }
310    }
311
312    async fn build_cardano_database_immutable_uploaders(
313        &self,
314    ) -> Result<Vec<Arc<dyn ImmutableFilesUploader>>> {
315        let logger = self.root_logger();
316        if self.configuration.environment() == ExecutionEnvironment::Production {
317            match self.configuration.snapshot_uploader_type() {
318                SnapshotUploaderType::Gcp => {
319                    let allow_overwrite = false;
320                    let remote_folder_path =
321                        CloudRemotePath::new("cardano-database").join("immutable");
322
323                    Ok(vec![Arc::new(
324                        self.build_gcp_uploader(remote_folder_path, allow_overwrite)
325                            .await?,
326                    )])
327                }
328                SnapshotUploaderType::Local => {
329                    let server_url_prefix = self.configuration.get_server_url()?;
330                    let immutable_url_prefix = server_url_prefix
331                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/immutable/"))?;
332
333                    Ok(vec![Arc::new(LocalUploader::new_without_copy(
334                        immutable_url_prefix,
335                        FileUploadRetryPolicy::default(),
336                        logger,
337                    ))])
338                }
339            }
340        } else {
341            Ok(vec![Arc::new(DumbUploader::new(
342                FileUploadRetryPolicy::never(),
343            ))])
344        }
345    }
346
347    async fn build_cardano_database_digests_uploaders(
348        &self,
349    ) -> Result<Vec<Arc<dyn DigestFileUploader>>> {
350        let logger = self.root_logger();
351        if self.configuration.environment() == ExecutionEnvironment::Production {
352            match self.configuration.snapshot_uploader_type() {
353                SnapshotUploaderType::Gcp => {
354                    let allow_overwrite = false;
355                    let remote_folder_path =
356                        CloudRemotePath::new("cardano-database").join("digests");
357
358                    Ok(vec![Arc::new(
359                        self.build_gcp_uploader(remote_folder_path, allow_overwrite)
360                            .await?,
361                    )])
362                }
363                SnapshotUploaderType::Local => {
364                    let server_url_prefix = self.configuration.get_server_url()?;
365                    let digests_url_prefix = server_url_prefix
366                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/digests/"))?;
367                    let target_dir = self.get_cardano_db_artifacts_dir()?.join("digests");
368
369                    std::fs::create_dir_all(&target_dir).map_err(|e| {
370                        DependenciesBuilderError::Initialization {
371                            message: format!("Cannot create '{target_dir:?}' directory."),
372                            error: Some(e.into()),
373                        }
374                    })?;
375
376                    Ok(vec![Arc::new(LocalUploader::new(
377                        digests_url_prefix,
378                        &target_dir,
379                        FileUploadRetryPolicy::default(),
380                        logger,
381                    ))])
382                }
383            }
384        } else {
385            Ok(vec![Arc::new(DumbUploader::new(
386                FileUploadRetryPolicy::never(),
387            ))])
388        }
389    }
390
391    async fn build_cardano_database_artifact_builder(
392        &mut self,
393        cardano_node_version: Version,
394    ) -> Result<CardanoDatabaseArtifactBuilder> {
395        let snapshot_dir = self.configuration.get_snapshot_dir()?;
396        let immutable_dir = self.get_cardano_db_artifacts_dir()?.join("immutable");
397
398        let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new(
399            self.build_cardano_database_ancillary_uploaders().await?,
400            self.get_snapshotter().await?,
401            self.configuration.get_network()?,
402            self.root_logger(),
403        )?);
404
405        let immutable_builder = Arc::new(ImmutableArtifactBuilder::new(
406            immutable_dir,
407            self.build_cardano_database_immutable_uploaders().await?,
408            self.get_snapshotter().await?,
409            self.root_logger(),
410        )?);
411
412        let digests_path = snapshot_dir.join("pending_cardano_database_digests");
413        let digests_snapshotter = self.build_digests_snapshotter(digests_path.clone()).await?;
414
415        let digest_builder = Arc::new(DigestArtifactBuilder::new(
416            self.configuration.get_server_url()?,
417            self.build_cardano_database_digests_uploaders().await?,
418            digests_snapshotter,
419            self.configuration.get_network()?,
420            digests_path,
421            self.get_immutable_file_digest_mapper().await?,
422            self.root_logger(),
423        )?);
424
425        Ok(CardanoDatabaseArtifactBuilder::new(
426            self.configuration.get_network()?,
427            &cardano_node_version,
428            ancillary_builder,
429            immutable_builder,
430            digest_builder,
431        ))
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use mithril_common::temp_dir_create;
438    use mithril_persistence::sqlite::ConnectionBuilder;
439
440    use crate::dependency_injection::builder::CARDANO_DB_ARTIFACTS_DIR;
441    use crate::ServeCommandConfiguration;
442
443    use super::*;
444
445    #[tokio::test]
446    async fn if_not_local_uploader_create_cardano_database_immutable_dirs() {
447        let snapshot_directory = temp_dir_create!();
448        let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
449        let ancillary_dir = cdb_dir.join("ancillary");
450        let immutable_dir = cdb_dir.join("immutable");
451        let digests_dir = cdb_dir.join("digests");
452
453        let mut dep_builder = {
454            let config = ServeCommandConfiguration {
455                // Test environment yield dumb uploaders
456                environment: ExecutionEnvironment::Test,
457                ..ServeCommandConfiguration::new_sample(snapshot_directory)
458            };
459
460            DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
461        };
462
463        assert!(!ancillary_dir.exists());
464        assert!(!immutable_dir.exists());
465        assert!(!digests_dir.exists());
466
467        dep_builder
468            .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
469            .await
470            .unwrap();
471
472        assert!(!ancillary_dir.exists());
473        assert!(immutable_dir.exists());
474        assert!(!digests_dir.exists());
475    }
476
477    #[tokio::test]
478    async fn if_local_uploader_creates_all_cardano_database_subdirs() {
479        let snapshot_directory = temp_dir_create!();
480        let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
481        let ancillary_dir = cdb_dir.join("ancillary");
482        let immutable_dir = cdb_dir.join("immutable");
483        let digests_dir = cdb_dir.join("digests");
484
485        let mut dep_builder = {
486            let config = ServeCommandConfiguration {
487                // Must use production environment to make `snapshot_uploader_type` effective
488                environment: ExecutionEnvironment::Production,
489                snapshot_uploader_type: SnapshotUploaderType::Local,
490                ..ServeCommandConfiguration::new_sample(snapshot_directory)
491            };
492
493            DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
494        };
495        // In production environment the builder can't create in-memory SQLite connections, we
496        // need to provide it manually to avoid creations of unnecessary files.
497        dep_builder.sqlite_connection =
498            Some(Arc::new(ConnectionBuilder::open_memory().build().unwrap()));
499
500        assert!(!ancillary_dir.exists());
501        assert!(!immutable_dir.exists());
502        assert!(!digests_dir.exists());
503
504        dep_builder
505            .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
506            .await
507            .unwrap();
508
509        assert!(ancillary_dir.exists());
510        assert!(immutable_dir.exists());
511        assert!(digests_dir.exists());
512    }
513}