mithril_aggregator/dependency_injection/builder/protocol/
artifacts.rs

1use anyhow::Context;
2use semver::Version;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use mithril_common::crypto_helper::ManifestSigner;
7
8use crate::artifact_builder::{
9    AncillaryArtifactBuilder, AncillaryFileUploader, CardanoDatabaseArtifactBuilder,
10    CardanoImmutableFilesFullArtifactBuilder, CardanoStakeDistributionArtifactBuilder,
11    CardanoTransactionsArtifactBuilder, DigestArtifactBuilder, DigestFileUploader,
12    DigestSnapshotter, ImmutableArtifactBuilder, ImmutableFilesUploader,
13    MithrilStakeDistributionArtifactBuilder,
14};
15use crate::configuration::AncillaryFilesSignerConfig;
16use crate::dependency_injection::builder::SNAPSHOT_ARTIFACTS_DIR;
17use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
18use crate::file_uploaders::{
19    CloudRemotePath, CloudUploader, FileUploadRetryPolicy, GCloudBackendUploader, LocalUploader,
20};
21use crate::get_dependency;
22use crate::http_server::{CARDANO_DATABASE_DOWNLOAD_PATH, SNAPSHOT_DOWNLOAD_PATH};
23use crate::services::ancillary_signer::{
24    AncillarySigner, AncillarySignerWithGcpKms, AncillarySignerWithSecretKey,
25};
26use crate::services::{
27    CompressedArchiveSnapshotter, DumbSnapshotter, MithrilSignedEntityService, SignedEntityService,
28    SignedEntityServiceArtifactsDependencies, Snapshotter,
29};
30use crate::tools::DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR;
31use crate::tools::file_archiver::FileArchiver;
32use crate::{DumbUploader, ExecutionEnvironment, FileUploader, SnapshotUploaderType};
33
34impl DependenciesBuilder {
35    async fn build_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
36        let logger = self.root_logger();
37        let signed_entity_storer = self.get_signed_entity_storer().await?;
38        let epoch_service = self.get_epoch_service().await?;
39        let mithril_stake_distribution_artifact_builder = Arc::new(
40            MithrilStakeDistributionArtifactBuilder::new(epoch_service.clone()),
41        );
42        let snapshotter = self.get_snapshotter().await?;
43        let snapshot_uploader = self.get_snapshot_uploader().await?;
44        let cardano_node_version = Version::parse(&self.configuration.cardano_node_version())
45            .map_err(|e| DependenciesBuilderError::Initialization { message: format!("Could not parse configuration setting 'cardano_node_version' value '{}' as Semver.", self.configuration.cardano_node_version()), error: Some(e.into()) })?;
46        let cardano_immutable_files_full_artifact_builder =
47            Arc::new(CardanoImmutableFilesFullArtifactBuilder::new(
48                self.configuration.get_network()?,
49                &cardano_node_version,
50                snapshotter,
51                snapshot_uploader,
52                logger.clone(),
53            ));
54        let prover_service = self.get_prover_service().await?;
55        let cardano_transactions_artifact_builder = Arc::new(
56            CardanoTransactionsArtifactBuilder::new(prover_service.clone()),
57        );
58        let stake_store = self.get_stake_store().await?;
59        let cardano_stake_distribution_artifact_builder =
60            Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store));
61        let cardano_database_artifact_builder = Arc::new(
62            self.build_cardano_database_artifact_builder(cardano_node_version)
63                .await?,
64        );
65        let dependencies = SignedEntityServiceArtifactsDependencies::new(
66            mithril_stake_distribution_artifact_builder,
67            cardano_immutable_files_full_artifact_builder,
68            cardano_transactions_artifact_builder,
69            cardano_stake_distribution_artifact_builder,
70            cardano_database_artifact_builder,
71        );
72        let signed_entity_service = Arc::new(MithrilSignedEntityService::new(
73            signed_entity_storer,
74            dependencies,
75            self.get_signed_entity_type_lock().await?,
76            self.get_metrics_service().await?,
77            logger,
78        ));
79
80        // Compute the cache pool for prover service
81        // This is done here to avoid circular dependencies between the prover service and the signed entity service
82        // TODO: Make this part of a warmup phase of the aggregator?
83        if let Some(signed_entity) =
84            signed_entity_service.get_last_cardano_transaction_snapshot().await?
85        {
86            prover_service
87                .compute_cache(signed_entity.artifact.block_number)
88                .await?;
89        }
90
91        Ok(signed_entity_service)
92    }
93
94    /// [SignedEntityService] service
95    pub async fn get_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
96        get_dependency!(self.signed_entity_service)
97    }
98
99    async fn build_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
100        let archive_verification_directory =
101            std::env::temp_dir().join("mithril_archiver_verify_archive");
102        let file_archiver = Arc::new(FileArchiver::new(
103            self.configuration.zstandard_parameters().unwrap_or_default(),
104            archive_verification_directory,
105            self.root_logger(),
106        ));
107
108        Ok(file_archiver)
109    }
110
111    async fn get_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
112        get_dependency!(self.file_archiver)
113    }
114
115    async fn get_ancillary_signer(&self) -> Result<Arc<dyn AncillarySigner>> {
116        match &self.configuration.ancillary_files_signer_config() {
117            AncillaryFilesSignerConfig::SecretKey { secret_key } => {
118                let manifest_signer = ManifestSigner::from_secret_key(
119                    secret_key
120                        .as_str()
121                        .try_into()
122                        .with_context(|| "Failed to build ancillary signer: Invalid secret key")?,
123                );
124
125                Ok(Arc::new(AncillarySignerWithSecretKey::new(
126                    manifest_signer,
127                    self.root_logger(),
128                )))
129            }
130            AncillaryFilesSignerConfig::GcpKms {
131                resource_name,
132                credentials_json_env_var,
133            } => {
134                let service = AncillarySignerWithGcpKms::new(
135                    resource_name.clone(),
136                    credentials_json_env_var.clone(),
137                    self.root_logger(),
138                )
139                .await?;
140
141                Ok(Arc::new(service))
142            }
143        }
144    }
145
146    async fn build_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
147        let snapshotter: Arc<dyn Snapshotter> = match self.configuration.environment() {
148            ExecutionEnvironment::Production => {
149                let ongoing_snapshot_directory =
150                    self.configuration.get_snapshot_dir()?.join("pending_snapshot");
151
152                Arc::new(CompressedArchiveSnapshotter::new(
153                    self.configuration.db_directory().clone(),
154                    ongoing_snapshot_directory,
155                    self.configuration.snapshot_compression_algorithm(),
156                    self.get_file_archiver().await?,
157                    self.get_ancillary_signer().await?,
158                    self.root_logger(),
159                )?)
160            }
161            _ => Arc::new(DumbSnapshotter::new(
162                self.configuration.snapshot_compression_algorithm(),
163            )),
164        };
165
166        Ok(snapshotter)
167    }
168
169    async fn build_digests_snapshotter(
170        &mut self,
171        digests_path: PathBuf,
172    ) -> Result<DigestSnapshotter> {
173        Ok(DigestSnapshotter {
174            file_archiver: self.get_file_archiver().await?,
175            target_location: digests_path,
176            compression_algorithm: self.configuration.snapshot_compression_algorithm(),
177        })
178    }
179
180    /// [Snapshotter] service.
181    pub async fn get_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
182        get_dependency!(self.snapshotter)
183    }
184
185    async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
186        let logger = self.root_logger();
187        if self.configuration.environment() == ExecutionEnvironment::Production {
188            match self.configuration.snapshot_uploader_type() {
189                SnapshotUploaderType::Gcp => {
190                    let allow_overwrite = true;
191                    let remote_folder_path = CloudRemotePath::new("cardano-immutable-files-full");
192
193                    Ok(Arc::new(
194                        self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
195                    ))
196                }
197                SnapshotUploaderType::Local => {
198                    let server_url_prefix = self.configuration.get_server_url()?;
199                    let snapshot_url_prefix =
200                        server_url_prefix.sanitize_join(SNAPSHOT_DOWNLOAD_PATH)?;
201                    let snapshot_artifacts_dir =
202                        self.configuration.get_snapshot_dir()?.join(SNAPSHOT_ARTIFACTS_DIR);
203                    std::fs::create_dir_all(&snapshot_artifacts_dir).map_err(|e| {
204                        DependenciesBuilderError::Initialization {
205                            message: format!(
206                                "Cannot create '{snapshot_artifacts_dir:?}' directory."
207                            ),
208                            error: Some(e.into()),
209                        }
210                    })?;
211
212                    Ok(Arc::new(LocalUploader::new(
213                        snapshot_url_prefix,
214                        &snapshot_artifacts_dir,
215                        FileUploadRetryPolicy::default(),
216                        logger,
217                    )))
218                }
219            }
220        } else {
221            Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
222        }
223    }
224
225    /// Get a [FileUploader]
226    pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
227        get_dependency!(self.snapshot_uploader)
228    }
229
230    async fn build_gcp_uploader(
231        &self,
232        remote_folder_path: CloudRemotePath,
233        allow_overwrite: bool,
234    ) -> Result<CloudUploader> {
235        let logger = self.root_logger();
236        let bucket = self.configuration.snapshot_bucket_name().to_owned().ok_or_else(|| {
237            DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
238        })?;
239
240        Ok(CloudUploader::new(
241            Arc::new(
242                GCloudBackendUploader::try_new(
243                    bucket,
244                    self.configuration.snapshot_use_cdn_domain(),
245                    DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR.to_string(),
246                    logger.clone(),
247                )
248                .await?,
249            ),
250            remote_folder_path,
251            allow_overwrite,
252            FileUploadRetryPolicy::default(),
253        ))
254    }
255
256    async fn build_cardano_database_ancillary_uploaders(
257        &self,
258    ) -> Result<Vec<Arc<dyn AncillaryFileUploader>>> {
259        let logger = self.root_logger();
260        if self.configuration.environment() == ExecutionEnvironment::Production {
261            match self.configuration.snapshot_uploader_type() {
262                SnapshotUploaderType::Gcp => {
263                    let allow_overwrite = true;
264                    let remote_folder_path =
265                        CloudRemotePath::new("cardano-database").join("ancillary");
266
267                    Ok(vec![Arc::new(
268                        self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
269                    )])
270                }
271                SnapshotUploaderType::Local => {
272                    let server_url_prefix = self.configuration.get_server_url()?;
273                    let ancillary_url_prefix = server_url_prefix
274                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/ancillary/"))?;
275                    let target_dir = self.get_cardano_db_artifacts_dir()?.join("ancillary");
276
277                    std::fs::create_dir_all(&target_dir).map_err(|e| {
278                        DependenciesBuilderError::Initialization {
279                            message: format!("Cannot create '{target_dir:?}' directory."),
280                            error: Some(e.into()),
281                        }
282                    })?;
283
284                    Ok(vec![Arc::new(LocalUploader::new(
285                        ancillary_url_prefix,
286                        &target_dir,
287                        FileUploadRetryPolicy::default(),
288                        logger,
289                    ))])
290                }
291            }
292        } else {
293            Ok(vec![Arc::new(DumbUploader::new(
294                FileUploadRetryPolicy::never(),
295            ))])
296        }
297    }
298
299    async fn build_cardano_database_immutable_uploaders(
300        &self,
301    ) -> Result<Vec<Arc<dyn ImmutableFilesUploader>>> {
302        let logger = self.root_logger();
303        if self.configuration.environment() == ExecutionEnvironment::Production {
304            match self.configuration.snapshot_uploader_type() {
305                SnapshotUploaderType::Gcp => {
306                    let allow_overwrite = false;
307                    let remote_folder_path =
308                        CloudRemotePath::new("cardano-database").join("immutable");
309
310                    Ok(vec![Arc::new(
311                        self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
312                    )])
313                }
314                SnapshotUploaderType::Local => {
315                    let server_url_prefix = self.configuration.get_server_url()?;
316                    let immutable_url_prefix = server_url_prefix
317                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/immutable/"))?;
318
319                    Ok(vec![Arc::new(LocalUploader::new_without_copy(
320                        immutable_url_prefix,
321                        FileUploadRetryPolicy::default(),
322                        logger,
323                    ))])
324                }
325            }
326        } else {
327            Ok(vec![Arc::new(DumbUploader::new(
328                FileUploadRetryPolicy::never(),
329            ))])
330        }
331    }
332
333    async fn build_cardano_database_digests_uploaders(
334        &self,
335    ) -> Result<Vec<Arc<dyn DigestFileUploader>>> {
336        let logger = self.root_logger();
337        if self.configuration.environment() == ExecutionEnvironment::Production {
338            match self.configuration.snapshot_uploader_type() {
339                SnapshotUploaderType::Gcp => {
340                    let allow_overwrite = false;
341                    let remote_folder_path =
342                        CloudRemotePath::new("cardano-database").join("digests");
343
344                    Ok(vec![Arc::new(
345                        self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
346                    )])
347                }
348                SnapshotUploaderType::Local => {
349                    let server_url_prefix = self.configuration.get_server_url()?;
350                    let digests_url_prefix = server_url_prefix
351                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/digests/"))?;
352                    let target_dir = self.get_cardano_db_artifacts_dir()?.join("digests");
353
354                    std::fs::create_dir_all(&target_dir).map_err(|e| {
355                        DependenciesBuilderError::Initialization {
356                            message: format!("Cannot create '{target_dir:?}' directory."),
357                            error: Some(e.into()),
358                        }
359                    })?;
360
361                    Ok(vec![Arc::new(LocalUploader::new(
362                        digests_url_prefix,
363                        &target_dir,
364                        FileUploadRetryPolicy::default(),
365                        logger,
366                    ))])
367                }
368            }
369        } else {
370            Ok(vec![Arc::new(DumbUploader::new(
371                FileUploadRetryPolicy::never(),
372            ))])
373        }
374    }
375
376    async fn build_cardano_database_artifact_builder(
377        &mut self,
378        cardano_node_version: Version,
379    ) -> Result<CardanoDatabaseArtifactBuilder> {
380        let snapshot_dir = self.configuration.get_snapshot_dir()?;
381        let immutable_dir = self.get_cardano_db_artifacts_dir()?.join("immutable");
382
383        let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new(
384            self.build_cardano_database_ancillary_uploaders().await?,
385            self.get_snapshotter().await?,
386            self.configuration.get_network()?,
387            self.root_logger(),
388        )?);
389
390        let immutable_builder = Arc::new(ImmutableArtifactBuilder::new(
391            immutable_dir,
392            self.build_cardano_database_immutable_uploaders().await?,
393            self.get_snapshotter().await?,
394            self.root_logger(),
395        )?);
396
397        let digests_path = snapshot_dir.join("pending_cardano_database_digests");
398        let digests_snapshotter = self.build_digests_snapshotter(digests_path.clone()).await?;
399
400        let digest_builder = Arc::new(DigestArtifactBuilder::new(
401            self.configuration.get_server_url()?,
402            self.build_cardano_database_digests_uploaders().await?,
403            digests_snapshotter,
404            self.configuration.get_network()?,
405            digests_path,
406            self.get_immutable_file_digest_mapper().await?,
407            self.root_logger(),
408        )?);
409
410        Ok(CardanoDatabaseArtifactBuilder::new(
411            self.configuration.get_network()?,
412            &cardano_node_version,
413            ancillary_builder,
414            immutable_builder,
415            digest_builder,
416        ))
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use mithril_common::temp_dir_create;
423    use mithril_persistence::sqlite::ConnectionBuilder;
424
425    use crate::ServeCommandConfiguration;
426    use crate::dependency_injection::builder::CARDANO_DB_ARTIFACTS_DIR;
427
428    use super::*;
429
430    #[tokio::test]
431    async fn if_not_local_uploader_create_cardano_database_immutable_dirs() {
432        let snapshot_directory = temp_dir_create!();
433        let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
434        let ancillary_dir = cdb_dir.join("ancillary");
435        let immutable_dir = cdb_dir.join("immutable");
436        let digests_dir = cdb_dir.join("digests");
437
438        let mut dep_builder = {
439            let config = ServeCommandConfiguration {
440                // Test environment yield dumb uploaders
441                environment: ExecutionEnvironment::Test,
442                ..ServeCommandConfiguration::new_sample(snapshot_directory)
443            };
444
445            DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
446        };
447
448        assert!(!ancillary_dir.exists());
449        assert!(!immutable_dir.exists());
450        assert!(!digests_dir.exists());
451
452        dep_builder
453            .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
454            .await
455            .unwrap();
456
457        assert!(!ancillary_dir.exists());
458        assert!(immutable_dir.exists());
459        assert!(!digests_dir.exists());
460    }
461
462    #[tokio::test]
463    async fn if_local_uploader_creates_all_cardano_database_subdirs() {
464        let snapshot_directory = temp_dir_create!();
465        let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
466        let ancillary_dir = cdb_dir.join("ancillary");
467        let immutable_dir = cdb_dir.join("immutable");
468        let digests_dir = cdb_dir.join("digests");
469
470        let mut dep_builder = {
471            let config = ServeCommandConfiguration {
472                // Must use production environment to make `snapshot_uploader_type` effective
473                environment: ExecutionEnvironment::Production,
474                snapshot_uploader_type: SnapshotUploaderType::Local,
475                ..ServeCommandConfiguration::new_sample(snapshot_directory)
476            };
477
478            DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
479        };
480        // In production environment the builder can't create in-memory SQLite connections, we
481        // need to provide it manually to avoid creations of unnecessary files.
482        dep_builder.sqlite_connection =
483            Some(Arc::new(ConnectionBuilder::open_memory().build().unwrap()));
484
485        assert!(!ancillary_dir.exists());
486        assert!(!immutable_dir.exists());
487        assert!(!digests_dir.exists());
488
489        dep_builder
490            .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
491            .await
492            .unwrap();
493
494        assert!(ancillary_dir.exists());
495        assert!(immutable_dir.exists());
496        assert!(digests_dir.exists());
497    }
498}