mithril_aggregator/dependency_injection/builder/protocol/
artifacts.rs

1use anyhow::Context;
2use semver::Version;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use mithril_common::crypto_helper::ManifestSigner;
7
8use crate::artifact_builder::{
9    AncillaryArtifactBuilder, AncillaryFileUploader, CardanoDatabaseArtifactBuilder,
10    CardanoImmutableFilesFullArtifactBuilder, CardanoStakeDistributionArtifactBuilder,
11    CardanoTransactionsArtifactBuilder, DigestArtifactBuilder, DigestFileUploader,
12    DigestSnapshotter, ImmutableArtifactBuilder, ImmutableFilesUploader,
13    MithrilStakeDistributionArtifactBuilder,
14};
15use crate::configuration::AncillaryFilesSignerConfig;
16use crate::dependency_injection::builder::SNAPSHOT_ARTIFACTS_DIR;
17use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
18use crate::file_uploaders::{
19    CloudRemotePath, FileUploadRetryPolicy, GcpBackendUploader, GcpUploader, LocalUploader,
20};
21use crate::http_server::{CARDANO_DATABASE_DOWNLOAD_PATH, SNAPSHOT_DOWNLOAD_PATH};
22use crate::services::ancillary_signer::{AncillarySigner, AncillarySignerWithSecretKey};
23use crate::services::{
24    CompressedArchiveSnapshotter, DumbSnapshotter, MithrilSignedEntityService, SignedEntityService,
25    SignedEntityServiceArtifactsDependencies, Snapshotter,
26};
27use crate::tools::file_archiver::FileArchiver;
28use crate::{DumbUploader, ExecutionEnvironment, FileUploader, SnapshotUploaderType};
29
30impl DependenciesBuilder {
31    async fn build_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
32        let logger = self.root_logger();
33        let signed_entity_storer = self.get_signed_entity_storer().await?;
34        let epoch_service = self.get_epoch_service().await?;
35        let mithril_stake_distribution_artifact_builder = Arc::new(
36            MithrilStakeDistributionArtifactBuilder::new(epoch_service.clone()),
37        );
38        let snapshotter = self.get_snapshotter().await?;
39        let snapshot_uploader = self.get_snapshot_uploader().await?;
40        let cardano_node_version = Version::parse(&self.configuration.cardano_node_version)
41            .map_err(|e| DependenciesBuilderError::Initialization { message: format!("Could not parse configuration setting 'cardano_node_version' value '{}' as Semver.", self.configuration.cardano_node_version), error: Some(e.into()) })?;
42        let cardano_immutable_files_full_artifact_builder =
43            Arc::new(CardanoImmutableFilesFullArtifactBuilder::new(
44                self.configuration.get_network()?,
45                &cardano_node_version,
46                snapshotter,
47                snapshot_uploader,
48                logger.clone(),
49            ));
50        let prover_service = self.get_prover_service().await?;
51        let cardano_transactions_artifact_builder = Arc::new(
52            CardanoTransactionsArtifactBuilder::new(prover_service.clone()),
53        );
54        let stake_store = self.get_stake_store().await?;
55        let cardano_stake_distribution_artifact_builder =
56            Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store));
57        let cardano_database_artifact_builder = Arc::new(
58            self.build_cardano_database_artifact_builder(cardano_node_version)
59                .await?,
60        );
61        let dependencies = SignedEntityServiceArtifactsDependencies::new(
62            mithril_stake_distribution_artifact_builder,
63            cardano_immutable_files_full_artifact_builder,
64            cardano_transactions_artifact_builder,
65            cardano_stake_distribution_artifact_builder,
66            cardano_database_artifact_builder,
67        );
68        let signed_entity_service = Arc::new(MithrilSignedEntityService::new(
69            signed_entity_storer,
70            dependencies,
71            self.get_signed_entity_lock().await?,
72            self.get_metrics_service().await?,
73            logger,
74        ));
75
76        // Compute the cache pool for prover service
77        // This is done here to avoid circular dependencies between the prover service and the signed entity service
78        // TODO: Make this part of a warmup phase of the aggregator?
79        if let Some(signed_entity) = signed_entity_service
80            .get_last_cardano_transaction_snapshot()
81            .await?
82        {
83            prover_service
84                .compute_cache(signed_entity.artifact.block_number)
85                .await?;
86        }
87
88        Ok(signed_entity_service)
89    }
90
91    /// [SignedEntityService] service
92    pub async fn get_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
93        if self.signed_entity_service.is_none() {
94            self.signed_entity_service = Some(self.build_signed_entity_service().await?);
95        }
96
97        Ok(self.signed_entity_service.as_ref().cloned().unwrap())
98    }
99
100    async fn build_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
101        let archive_verification_directory =
102            std::env::temp_dir().join("mithril_archiver_verify_archive");
103        let file_archiver = Arc::new(FileArchiver::new(
104            self.configuration.zstandard_parameters.unwrap_or_default(),
105            archive_verification_directory,
106            self.root_logger(),
107        ));
108
109        Ok(file_archiver)
110    }
111
112    async fn get_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
113        if self.file_archiver.is_none() {
114            self.file_archiver = Some(self.build_file_archiver().await?);
115        }
116
117        Ok(self.file_archiver.as_ref().cloned().unwrap())
118    }
119
120    async fn get_ancillary_signer(&self) -> Result<Arc<dyn AncillarySigner>> {
121        match &self.configuration.ancillary_files_signer_config {
122            AncillaryFilesSignerConfig::SecretKey { secret_key } => {
123                let manifest_signer = ManifestSigner::from_secret_key(
124                    secret_key
125                        .as_str()
126                        .try_into()
127                        .with_context(|| "Failed to build ancillary signer: Invalid secret key")?,
128                );
129
130                Ok(Arc::new(AncillarySignerWithSecretKey::new(
131                    manifest_signer,
132                    self.root_logger(),
133                )))
134            }
135        }
136    }
137
138    async fn build_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
139        let snapshotter: Arc<dyn Snapshotter> = match self.configuration.environment {
140            ExecutionEnvironment::Production => {
141                let ongoing_snapshot_directory = self
142                    .configuration
143                    .get_snapshot_dir()?
144                    .join("pending_snapshot");
145
146                Arc::new(CompressedArchiveSnapshotter::new(
147                    self.configuration.db_directory.clone(),
148                    ongoing_snapshot_directory,
149                    self.configuration.snapshot_compression_algorithm,
150                    self.get_file_archiver().await?,
151                    self.get_ancillary_signer().await?,
152                    self.root_logger(),
153                )?)
154            }
155            _ => Arc::new(DumbSnapshotter::new(
156                self.configuration.snapshot_compression_algorithm,
157            )),
158        };
159
160        Ok(snapshotter)
161    }
162
163    async fn build_digests_snapshotter(
164        &mut self,
165        digests_path: PathBuf,
166    ) -> Result<DigestSnapshotter> {
167        Ok(DigestSnapshotter {
168            file_archiver: self.get_file_archiver().await?,
169            target_location: digests_path,
170            compression_algorithm: self.configuration.snapshot_compression_algorithm,
171        })
172    }
173
174    /// [Snapshotter] service.
175    pub async fn get_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
176        if self.snapshotter.is_none() {
177            self.snapshotter = Some(self.build_snapshotter().await?);
178        }
179
180        Ok(self.snapshotter.as_ref().cloned().unwrap())
181    }
182
183    async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
184        let logger = self.root_logger();
185        if self.configuration.environment == ExecutionEnvironment::Production {
186            match self.configuration.snapshot_uploader_type {
187                SnapshotUploaderType::Gcp => {
188                    let allow_overwrite = true;
189                    let remote_folder_path = CloudRemotePath::new("cardano-immutable-files-full");
190
191                    Ok(Arc::new(
192                        self.build_gcp_uploader(remote_folder_path, allow_overwrite)?,
193                    ))
194                }
195                SnapshotUploaderType::Local => {
196                    let server_url_prefix = self.configuration.get_server_url()?;
197                    let snapshot_url_prefix =
198                        server_url_prefix.sanitize_join(SNAPSHOT_DOWNLOAD_PATH)?;
199                    let snapshot_artifacts_dir = self
200                        .configuration
201                        .get_snapshot_dir()?
202                        .join(SNAPSHOT_ARTIFACTS_DIR);
203                    std::fs::create_dir_all(&snapshot_artifacts_dir).map_err(|e| {
204                        DependenciesBuilderError::Initialization {
205                            message: format!(
206                                "Cannot create '{snapshot_artifacts_dir:?}' directory."
207                            ),
208                            error: Some(e.into()),
209                        }
210                    })?;
211
212                    Ok(Arc::new(LocalUploader::new(
213                        snapshot_url_prefix,
214                        &snapshot_artifacts_dir,
215                        FileUploadRetryPolicy::default(),
216                        logger,
217                    )))
218                }
219            }
220        } else {
221            Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
222        }
223    }
224
225    /// Get a [FileUploader]
226    pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
227        if self.snapshot_uploader.is_none() {
228            self.snapshot_uploader = Some(self.build_snapshot_uploader().await?);
229        }
230
231        Ok(self.snapshot_uploader.as_ref().cloned().unwrap())
232    }
233
234    fn build_gcp_uploader(
235        &self,
236        remote_folder_path: CloudRemotePath,
237        allow_overwrite: bool,
238    ) -> Result<GcpUploader> {
239        let logger = self.root_logger();
240        let bucket = self
241            .configuration
242            .snapshot_bucket_name
243            .to_owned()
244            .ok_or_else(|| {
245                DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
246            })?;
247
248        Ok(GcpUploader::new(
249            Arc::new(GcpBackendUploader::try_new(
250                bucket,
251                self.configuration.snapshot_use_cdn_domain,
252                logger.clone(),
253            )?),
254            remote_folder_path,
255            allow_overwrite,
256            FileUploadRetryPolicy::default(),
257        ))
258    }
259
260    fn build_cardano_database_ancillary_uploaders(
261        &self,
262    ) -> Result<Vec<Arc<dyn AncillaryFileUploader>>> {
263        let logger = self.root_logger();
264        if self.configuration.environment == ExecutionEnvironment::Production {
265            match self.configuration.snapshot_uploader_type {
266                SnapshotUploaderType::Gcp => {
267                    let allow_overwrite = true;
268                    let remote_folder_path =
269                        CloudRemotePath::new("cardano-database").join("ancillary");
270
271                    Ok(vec![Arc::new(self.build_gcp_uploader(
272                        remote_folder_path,
273                        allow_overwrite,
274                    )?)])
275                }
276                SnapshotUploaderType::Local => {
277                    let server_url_prefix = self.configuration.get_server_url()?;
278                    let ancillary_url_prefix = server_url_prefix
279                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/ancillary/"))?;
280                    let target_dir = self.get_cardano_db_artifacts_dir()?.join("ancillary");
281
282                    std::fs::create_dir_all(&target_dir).map_err(|e| {
283                        DependenciesBuilderError::Initialization {
284                            message: format!("Cannot create '{target_dir:?}' directory."),
285                            error: Some(e.into()),
286                        }
287                    })?;
288
289                    Ok(vec![Arc::new(LocalUploader::new(
290                        ancillary_url_prefix,
291                        &target_dir,
292                        FileUploadRetryPolicy::default(),
293                        logger,
294                    ))])
295                }
296            }
297        } else {
298            Ok(vec![Arc::new(DumbUploader::new(
299                FileUploadRetryPolicy::never(),
300            ))])
301        }
302    }
303
304    fn build_cardano_database_immutable_uploaders(
305        &self,
306    ) -> Result<Vec<Arc<dyn ImmutableFilesUploader>>> {
307        let logger = self.root_logger();
308        if self.configuration.environment == ExecutionEnvironment::Production {
309            match self.configuration.snapshot_uploader_type {
310                SnapshotUploaderType::Gcp => {
311                    let allow_overwrite = false;
312                    let remote_folder_path =
313                        CloudRemotePath::new("cardano-database").join("immutable");
314
315                    Ok(vec![Arc::new(self.build_gcp_uploader(
316                        remote_folder_path,
317                        allow_overwrite,
318                    )?)])
319                }
320                SnapshotUploaderType::Local => {
321                    let server_url_prefix = self.configuration.get_server_url()?;
322                    let immutable_url_prefix = server_url_prefix
323                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/immutable/"))?;
324
325                    Ok(vec![Arc::new(LocalUploader::new_without_copy(
326                        immutable_url_prefix,
327                        FileUploadRetryPolicy::default(),
328                        logger,
329                    ))])
330                }
331            }
332        } else {
333            Ok(vec![Arc::new(DumbUploader::new(
334                FileUploadRetryPolicy::never(),
335            ))])
336        }
337    }
338
339    fn build_cardano_database_digests_uploaders(&self) -> Result<Vec<Arc<dyn DigestFileUploader>>> {
340        let logger = self.root_logger();
341        if self.configuration.environment == ExecutionEnvironment::Production {
342            match self.configuration.snapshot_uploader_type {
343                SnapshotUploaderType::Gcp => {
344                    let allow_overwrite = false;
345                    let remote_folder_path =
346                        CloudRemotePath::new("cardano-database").join("digests");
347
348                    Ok(vec![Arc::new(self.build_gcp_uploader(
349                        remote_folder_path,
350                        allow_overwrite,
351                    )?)])
352                }
353                SnapshotUploaderType::Local => {
354                    let server_url_prefix = self.configuration.get_server_url()?;
355                    let digests_url_prefix = server_url_prefix
356                        .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/digests/"))?;
357                    let target_dir = self.get_cardano_db_artifacts_dir()?.join("digests");
358
359                    std::fs::create_dir_all(&target_dir).map_err(|e| {
360                        DependenciesBuilderError::Initialization {
361                            message: format!("Cannot create '{target_dir:?}' directory."),
362                            error: Some(e.into()),
363                        }
364                    })?;
365
366                    Ok(vec![Arc::new(LocalUploader::new(
367                        digests_url_prefix,
368                        &target_dir,
369                        FileUploadRetryPolicy::default(),
370                        logger,
371                    ))])
372                }
373            }
374        } else {
375            Ok(vec![Arc::new(DumbUploader::new(
376                FileUploadRetryPolicy::never(),
377            ))])
378        }
379    }
380
381    async fn build_cardano_database_artifact_builder(
382        &mut self,
383        cardano_node_version: Version,
384    ) -> Result<CardanoDatabaseArtifactBuilder> {
385        let snapshot_dir = self.configuration.get_snapshot_dir()?;
386        let immutable_dir = self.get_cardano_db_artifacts_dir()?.join("immutable");
387
388        let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new(
389            self.build_cardano_database_ancillary_uploaders()?,
390            self.get_snapshotter().await?,
391            self.configuration.get_network()?,
392            self.root_logger(),
393        )?);
394
395        let immutable_builder = Arc::new(ImmutableArtifactBuilder::new(
396            immutable_dir,
397            self.build_cardano_database_immutable_uploaders()?,
398            self.get_snapshotter().await?,
399            self.root_logger(),
400        )?);
401
402        let digests_path = snapshot_dir.join("pending_cardano_database_digests");
403        let digests_snapshotter = self.build_digests_snapshotter(digests_path.clone()).await?;
404
405        let digest_builder = Arc::new(DigestArtifactBuilder::new(
406            self.configuration.get_server_url()?,
407            self.build_cardano_database_digests_uploaders()?,
408            digests_snapshotter,
409            self.configuration.get_network()?,
410            digests_path,
411            self.get_immutable_file_digest_mapper().await?,
412            self.root_logger(),
413        )?);
414
415        Ok(CardanoDatabaseArtifactBuilder::new(
416            self.configuration.get_network()?,
417            &cardano_node_version,
418            ancillary_builder,
419            immutable_builder,
420            digest_builder,
421        ))
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use mithril_common::temp_dir_create;
428    use mithril_persistence::sqlite::ConnectionBuilder;
429
430    use crate::dependency_injection::builder::CARDANO_DB_ARTIFACTS_DIR;
431    use crate::Configuration;
432
433    use super::*;
434
435    #[tokio::test]
436    async fn if_not_local_uploader_create_cardano_database_immutable_dirs() {
437        let snapshot_directory = temp_dir_create!();
438        let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
439        let ancillary_dir = cdb_dir.join("ancillary");
440        let immutable_dir = cdb_dir.join("immutable");
441        let digests_dir = cdb_dir.join("digests");
442
443        let mut dep_builder = {
444            let config = Configuration {
445                // Test environment yield dumb uploaders
446                environment: ExecutionEnvironment::Test,
447                ..Configuration::new_sample(snapshot_directory)
448            };
449
450            DependenciesBuilder::new_with_stdout_logger(config)
451        };
452
453        assert!(!ancillary_dir.exists());
454        assert!(!immutable_dir.exists());
455        assert!(!digests_dir.exists());
456
457        dep_builder
458            .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
459            .await
460            .unwrap();
461
462        assert!(!ancillary_dir.exists());
463        assert!(immutable_dir.exists());
464        assert!(!digests_dir.exists());
465    }
466
467    #[tokio::test]
468    async fn if_local_uploader_creates_all_cardano_database_subdirs() {
469        let snapshot_directory = temp_dir_create!();
470        let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
471        let ancillary_dir = cdb_dir.join("ancillary");
472        let immutable_dir = cdb_dir.join("immutable");
473        let digests_dir = cdb_dir.join("digests");
474
475        let mut dep_builder = {
476            let config = Configuration {
477                // Must use production environment to make `snapshot_uploader_type` effective
478                environment: ExecutionEnvironment::Production,
479                snapshot_uploader_type: SnapshotUploaderType::Local,
480                ..Configuration::new_sample(snapshot_directory)
481            };
482
483            DependenciesBuilder::new_with_stdout_logger(config)
484        };
485        // In production environment the builder can't create in-memory SQLite connections, we
486        // need to provide it manually to avoid creations of unnecessary files.
487        dep_builder.sqlite_connection =
488            Some(Arc::new(ConnectionBuilder::open_memory().build().unwrap()));
489
490        assert!(!ancillary_dir.exists());
491        assert!(!immutable_dir.exists());
492        assert!(!digests_dir.exists());
493
494        dep_builder
495            .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
496            .await
497            .unwrap();
498
499        assert!(ancillary_dir.exists());
500        assert!(immutable_dir.exists());
501        assert!(digests_dir.exists());
502    }
503}