mithril_aggregator/dependency_injection/builder/protocol/
artifacts.rs

1use anyhow::Context;
2use semver::Version;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use mithril_common::crypto_helper::ManifestSigner;
7
8use crate::artifact_builder::{
9    AncillaryArtifactBuilder, AncillaryFileUploader, CardanoDatabaseArtifactBuilder,
10    CardanoImmutableFilesFullArtifactBuilder, CardanoStakeDistributionArtifactBuilder,
11    CardanoTransactionsArtifactBuilder, DigestArtifactBuilder, DigestFileUploader,
12    DigestSnapshotter, ImmutableArtifactBuilder, ImmutableFilesUploader,
13    MithrilStakeDistributionArtifactBuilder,
14};
15use crate::configuration::AncillaryFilesSignerConfig;
16use crate::dependency_injection::builder::SNAPSHOT_ARTIFACTS_DIR;
17use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
18use crate::file_uploaders::{
19    CloudRemotePath, FileUploadRetryPolicy, GcpBackendUploader, GcpUploader, LocalUploader,
20};
21use crate::get_dependency;
22use crate::http_server::{CARDANO_DATABASE_DOWNLOAD_PATH, SNAPSHOT_DOWNLOAD_PATH};
23use crate::services::ancillary_signer::{
24    AncillarySigner, AncillarySignerWithGcpKms, AncillarySignerWithSecretKey,
25};
26use crate::services::{
27    CompressedArchiveSnapshotter, DumbSnapshotter, MithrilSignedEntityService, SignedEntityService,
28    SignedEntityServiceArtifactsDependencies, Snapshotter,
29};
30use crate::tools::file_archiver::FileArchiver;
31use crate::{DumbUploader, ExecutionEnvironment, FileUploader, SnapshotUploaderType};
32impl DependenciesBuilder {
33    async fn build_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
34        let logger = self.root_logger();
35        let signed_entity_storer = self.get_signed_entity_storer().await?;
36        let epoch_service = self.get_epoch_service().await?;
37        let mithril_stake_distribution_artifact_builder = Arc::new(
38            MithrilStakeDistributionArtifactBuilder::new(epoch_service.clone()),
39        );
40        let snapshotter = self.get_snapshotter().await?;
41        let snapshot_uploader = self.get_snapshot_uploader().await?;
42        let cardano_node_version = Version::parse(&self.configuration.cardano_node_version())
43            .map_err(|e| DependenciesBuilderError::Initialization { message: format!("Could not parse configuration setting 'cardano_node_version' value '{}' as Semver.", self.configuration.cardano_node_version()), error: Some(e.into()) })?;
44        let cardano_immutable_files_full_artifact_builder =
45            Arc::new(CardanoImmutableFilesFullArtifactBuilder::new(
46                self.configuration.get_network()?,
47                &cardano_node_version,
48                snapshotter,
49                snapshot_uploader,
50                logger.clone(),
51            ));
52        let prover_service = self.get_prover_service().await?;
53        let cardano_transactions_artifact_builder = Arc::new(
54            CardanoTransactionsArtifactBuilder::new(prover_service.clone()),
55        );
56        let stake_store = self.get_stake_store().await?;
57        let cardano_stake_distribution_artifact_builder =
58            Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store));
59        let cardano_database_artifact_builder = Arc::new(
60            self.build_cardano_database_artifact_builder(cardano_node_version)
61                .await?,
62        );
63        let dependencies = SignedEntityServiceArtifactsDependencies::new(
64            mithril_stake_distribution_artifact_builder,
65            cardano_immutable_files_full_artifact_builder,
66            cardano_transactions_artifact_builder,
67            cardano_stake_distribution_artifact_builder,
68            cardano_database_artifact_builder,
69        );
70        let signed_entity_service = Arc::new(MithrilSignedEntityService::new(
71            signed_entity_storer,
72            dependencies,
73            self.get_signed_entity_type_lock().await?,
74            self.get_metrics_service().await?,
75            logger,
76        ));
77
78        // Compute the cache pool for prover service
79        // This is done here to avoid circular dependencies between the prover service and the signed entity service
80        // TODO: Make this part of a warmup phase of the aggregator?
81        if let Some(signed_entity) = signed_entity_service
82            .get_last_cardano_transaction_snapshot()
83            .await?
84        {
85            prover_service
86                .compute_cache(signed_entity.artifact.block_number)
87                .await?;
88        }
89
90        Ok(signed_entity_service)
91    }
92
93    /// [SignedEntityService] service
94    pub async fn get_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
95        get_dependency!(self.signed_entity_service)
96    }
97
98    async fn build_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
99        let archive_verification_directory =
100            std::env::temp_dir().join("mithril_archiver_verify_archive");
101        let file_archiver = Arc::new(FileArchiver::new(
102            self.configuration
103                .zstandard_parameters()
104                .unwrap_or_default(),
105            archive_verification_directory,
106            self.root_logger(),
107        ));
108
109        Ok(file_archiver)
110    }
111
112    async fn get_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
113        get_dependency!(self.file_archiver)
114    }
115
116    async fn get_ancillary_signer(&self) -> Result<Arc<dyn AncillarySigner>> {
117        match &self.configuration.ancillary_files_signer_config() {
118            AncillaryFilesSignerConfig::SecretKey { secret_key } => {
119                let manifest_signer = ManifestSigner::from_secret_key(
120                    secret_key
121                        .as_str()
122                        .try_into()
123                        .with_context(|| "Failed to build ancillary signer: Invalid secret key")?,
124                );
125
126                Ok(Arc::new(AncillarySignerWithSecretKey::new(
127                    manifest_signer,
128                    self.root_logger(),
129                )))
130            }
131            AncillaryFilesSignerConfig::GcpKms {
132                resource_name,
133                credentials_json_env_var,
134            } => {
135                let service = AncillarySignerWithGcpKms::new(
136                    resource_name.clone(),
137                    credentials_json_env_var.clone(),
138                    self.root_logger(),
139                )
140                .await?;
141
142                Ok(Arc::new(service))
143            }
144        }
145    }
146
147    async fn build_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
148        let snapshotter: Arc<dyn Snapshotter> = match self.configuration.environment() {
149            ExecutionEnvironment::Production => {
150                let ongoing_snapshot_directory = self
151                    .configuration
152                    .get_snapshot_dir()?
153                    .join("pending_snapshot");
154
155                Arc::new(CompressedArchiveSnapshotter::new(
156                    self.configuration.db_directory().clone(),
157                    ongoing_snapshot_directory,
158                    self.configuration.snapshot_compression_algorithm(),
159                    self.get_file_archiver().await?,
160                    self.get_ancillary_signer().await?,
161                    self.root_logger(),
162                )?)
163            }
164            _ => Arc::new(DumbSnapshotter::new(
165                self.configuration.snapshot_compression_algorithm(),
166            )),
167        };
168
169        Ok(snapshotter)
170    }
171
172    async fn build_digests_snapshotter(
173        &mut self,
174        digests_path: PathBuf,
175    ) -> Result<DigestSnapshotter> {
176        Ok(DigestSnapshotter {
177            file_archiver: self.get_file_archiver().await?,
178            target_location: digests_path,
179            compression_algorithm: self.configuration.snapshot_compression_algorithm(),
180        })
181    }
182
183    /// [Snapshotter] service.
184    pub async fn get_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
185        get_dependency!(self.snapshotter)
186    }
187
188    async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
189        let logger = self.root_logger();
190        if self.configuration.environment() == ExecutionEnvironment::Production {
191            match self.configuration.snapshot_uploader_type() {
192                SnapshotUploaderType::Gcp => {
193                    let allow_overwrite = true;
194                    let remote_folder_path = CloudRemotePath::new("cardano-immutable-files-full");
195
196                    Ok(Arc::new(
197                        self.build_gcp_uploader(remote_folder_path, allow_overwrite)?,
198                    ))
199                }
200                SnapshotUploaderType::Local => {
201                    let server_url_prefix = self.configuration.get_server_url()?;
202                    let snapshot_url_prefix =
203                        server_url_prefix.sanitize_join(SNAPSHOT_DOWNLOAD_PATH)?;
204                    let snapshot_artifacts_dir = self
205                        .configuration
206                        .get_snapshot_dir()?
207                        .join(SNAPSHOT_ARTIFACTS_DIR);
208                    std::fs::create_dir_all(&snapshot_artifacts_dir).map_err(|e| {
209                        DependenciesBuilderError::Initialization {
210                            message: format!(
211                                "Cannot create '{snapshot_artifacts_dir:?}' directory."
212                            ),
213                            error: Some(e.into()),
214                        }
215                    })?;
216
217                    Ok(Arc::new(LocalUploader::new(
218                        snapshot_url_prefix,
219                        &snapshot_artifacts_dir,
220                        FileUploadRetryPolicy::default(),
221                        logger,
222                    )))
223                }
224            }
225        } else {
226            Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
227        }
228    }
229
230    /// Get a [FileUploader]
231    pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
232        get_dependency!(self.snapshot_uploader)
233    }
234
235    fn build_gcp_uploader(
236        &self,
237        remote_folder_path: CloudRemotePath,
238        allow_overwrite: bool,
239    ) -> Result<GcpUploader> {
240        let logger = self.root_logger();
241        let bucket = self
242            .configuration
243            .snapshot_bucket_name()
244            .to_owned()
245            .ok_or_else(|| {
246                DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
247            })?;
248
249        Ok(GcpUploader::new(
250            Arc::new(GcpBackendUploader::try_new(
251                bucket,
252                self.configuration.snapshot_use_cdn_domain(),
253                logger.clone(),
254            )?),
255            remote_folder_path,
256            allow_overwrite,
257            FileUploadRetryPolicy::default(),
258        ))
259    }
260
261    fn build_cardano_database_ancillary_uploaders(
262        &self,
263    ) -> Result<Vec<Arc<dyn AncillaryFileUploader>>> {
264        let logger = self.root_logger();
265        if self.configuration.environment() == ExecutionEnvironment::Production {
266            match self.configuration.snapshot_uploader_type() {
267                SnapshotUploaderType::Gcp => {
268                    let allow_overwrite = true;
269                    let remote_folder_path =
270                        CloudRemotePath::new("cardano-database").join("ancillary");
271
272                    Ok(vec![Arc::new(self.build_gcp_uploader(
273                        remote_folder_path,
274                        allow_overwrite,
275                    )?)])
276                }
277                SnapshotUploaderType::Local => {
278                    let server_url_prefix = self.configuration.get_server_url()?;
279                    let ancillary_url_prefix = server_url_prefix
280                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/ancillary/"))?;
281                    let target_dir = self.get_cardano_db_artifacts_dir()?.join("ancillary");
282
283                    std::fs::create_dir_all(&target_dir).map_err(|e| {
284                        DependenciesBuilderError::Initialization {
285                            message: format!("Cannot create '{target_dir:?}' directory."),
286                            error: Some(e.into()),
287                        }
288                    })?;
289
290                    Ok(vec![Arc::new(LocalUploader::new(
291                        ancillary_url_prefix,
292                        &target_dir,
293                        FileUploadRetryPolicy::default(),
294                        logger,
295                    ))])
296                }
297            }
298        } else {
299            Ok(vec![Arc::new(DumbUploader::new(
300                FileUploadRetryPolicy::never(),
301            ))])
302        }
303    }
304
305    fn build_cardano_database_immutable_uploaders(
306        &self,
307    ) -> Result<Vec<Arc<dyn ImmutableFilesUploader>>> {
308        let logger = self.root_logger();
309        if self.configuration.environment() == ExecutionEnvironment::Production {
310            match self.configuration.snapshot_uploader_type() {
311                SnapshotUploaderType::Gcp => {
312                    let allow_overwrite = false;
313                    let remote_folder_path =
314                        CloudRemotePath::new("cardano-database").join("immutable");
315
316                    Ok(vec![Arc::new(self.build_gcp_uploader(
317                        remote_folder_path,
318                        allow_overwrite,
319                    )?)])
320                }
321                SnapshotUploaderType::Local => {
322                    let server_url_prefix = self.configuration.get_server_url()?;
323                    let immutable_url_prefix = server_url_prefix
324                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/immutable/"))?;
325
326                    Ok(vec![Arc::new(LocalUploader::new_without_copy(
327                        immutable_url_prefix,
328                        FileUploadRetryPolicy::default(),
329                        logger,
330                    ))])
331                }
332            }
333        } else {
334            Ok(vec![Arc::new(DumbUploader::new(
335                FileUploadRetryPolicy::never(),
336            ))])
337        }
338    }
339
340    fn build_cardano_database_digests_uploaders(&self) -> Result<Vec<Arc<dyn DigestFileUploader>>> {
341        let logger = self.root_logger();
342        if self.configuration.environment() == ExecutionEnvironment::Production {
343            match self.configuration.snapshot_uploader_type() {
344                SnapshotUploaderType::Gcp => {
345                    let allow_overwrite = false;
346                    let remote_folder_path =
347                        CloudRemotePath::new("cardano-database").join("digests");
348
349                    Ok(vec![Arc::new(self.build_gcp_uploader(
350                        remote_folder_path,
351                        allow_overwrite,
352                    )?)])
353                }
354                SnapshotUploaderType::Local => {
355                    let server_url_prefix = self.configuration.get_server_url()?;
356                    let digests_url_prefix = server_url_prefix
357                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/digests/"))?;
358                    let target_dir = self.get_cardano_db_artifacts_dir()?.join("digests");
359
360                    std::fs::create_dir_all(&target_dir).map_err(|e| {
361                        DependenciesBuilderError::Initialization {
362                            message: format!("Cannot create '{target_dir:?}' directory."),
363                            error: Some(e.into()),
364                        }
365                    })?;
366
367                    Ok(vec![Arc::new(LocalUploader::new(
368                        digests_url_prefix,
369                        &target_dir,
370                        FileUploadRetryPolicy::default(),
371                        logger,
372                    ))])
373                }
374            }
375        } else {
376            Ok(vec![Arc::new(DumbUploader::new(
377                FileUploadRetryPolicy::never(),
378            ))])
379        }
380    }
381
382    async fn build_cardano_database_artifact_builder(
383        &mut self,
384        cardano_node_version: Version,
385    ) -> Result<CardanoDatabaseArtifactBuilder> {
386        let snapshot_dir = self.configuration.get_snapshot_dir()?;
387        let immutable_dir = self.get_cardano_db_artifacts_dir()?.join("immutable");
388
389        let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new(
390            self.build_cardano_database_ancillary_uploaders()?,
391            self.get_snapshotter().await?,
392            self.configuration.get_network()?,
393            self.root_logger(),
394        )?);
395
396        let immutable_builder = Arc::new(ImmutableArtifactBuilder::new(
397            immutable_dir,
398            self.build_cardano_database_immutable_uploaders()?,
399            self.get_snapshotter().await?,
400            self.root_logger(),
401        )?);
402
403        let digests_path = snapshot_dir.join("pending_cardano_database_digests");
404        let digests_snapshotter = self.build_digests_snapshotter(digests_path.clone()).await?;
405
406        let digest_builder = Arc::new(DigestArtifactBuilder::new(
407            self.configuration.get_server_url()?,
408            self.build_cardano_database_digests_uploaders()?,
409            digests_snapshotter,
410            self.configuration.get_network()?,
411            digests_path,
412            self.get_immutable_file_digest_mapper().await?,
413            self.root_logger(),
414        )?);
415
416        Ok(CardanoDatabaseArtifactBuilder::new(
417            self.configuration.get_network()?,
418            &cardano_node_version,
419            ancillary_builder,
420            immutable_builder,
421            digest_builder,
422        ))
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use mithril_common::temp_dir_create;
429    use mithril_persistence::sqlite::ConnectionBuilder;
430
431    use crate::dependency_injection::builder::CARDANO_DB_ARTIFACTS_DIR;
432    use crate::ServeCommandConfiguration;
433
434    use super::*;
435
436    #[tokio::test]
437    async fn if_not_local_uploader_create_cardano_database_immutable_dirs() {
438        let snapshot_directory = temp_dir_create!();
439        let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
440        let ancillary_dir = cdb_dir.join("ancillary");
441        let immutable_dir = cdb_dir.join("immutable");
442        let digests_dir = cdb_dir.join("digests");
443
444        let mut dep_builder = {
445            let config = ServeCommandConfiguration {
446                // Test environment yield dumb uploaders
447                environment: ExecutionEnvironment::Test,
448                ..ServeCommandConfiguration::new_sample(snapshot_directory)
449            };
450
451            DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
452        };
453
454        assert!(!ancillary_dir.exists());
455        assert!(!immutable_dir.exists());
456        assert!(!digests_dir.exists());
457
458        dep_builder
459            .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
460            .await
461            .unwrap();
462
463        assert!(!ancillary_dir.exists());
464        assert!(immutable_dir.exists());
465        assert!(!digests_dir.exists());
466    }
467
468    #[tokio::test]
469    async fn if_local_uploader_creates_all_cardano_database_subdirs() {
470        let snapshot_directory = temp_dir_create!();
471        let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
472        let ancillary_dir = cdb_dir.join("ancillary");
473        let immutable_dir = cdb_dir.join("immutable");
474        let digests_dir = cdb_dir.join("digests");
475
476        let mut dep_builder = {
477            let config = ServeCommandConfiguration {
478                // Must use production environment to make `snapshot_uploader_type` effective
479                environment: ExecutionEnvironment::Production,
480                snapshot_uploader_type: SnapshotUploaderType::Local,
481                ..ServeCommandConfiguration::new_sample(snapshot_directory)
482            };
483
484            DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
485        };
486        // In production environment the builder can't create in-memory SQLite connections, we
487        // need to provide it manually to avoid creations of unnecessary files.
488        dep_builder.sqlite_connection =
489            Some(Arc::new(ConnectionBuilder::open_memory().build().unwrap()));
490
491        assert!(!ancillary_dir.exists());
492        assert!(!immutable_dir.exists());
493        assert!(!digests_dir.exists());
494
495        dep_builder
496            .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
497            .await
498            .unwrap();
499
500        assert!(ancillary_dir.exists());
501        assert!(immutable_dir.exists());
502        assert!(digests_dir.exists());
503    }
504}