mithril_aggregator/dependency_injection/builder/protocol/
artifacts.rs1use anyhow::Context;
2use semver::Version;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use mithril_common::crypto_helper::ManifestSigner;
7
8use crate::artifact_builder::{
9 AncillaryArtifactBuilder, AncillaryFileUploader, CardanoBlocksTransactionsArtifactBuilder,
10 CardanoDatabaseArtifactBuilder, CardanoImmutableFilesFullArtifactBuilder,
11 CardanoStakeDistributionArtifactBuilder, CardanoTransactionsArtifactBuilder,
12 DigestArtifactBuilder, DigestFileUploader, DigestSnapshotter, ImmutableArtifactBuilder,
13 ImmutableFilesUploader, MithrilStakeDistributionArtifactBuilder,
14};
15use crate::configuration::AncillaryFilesSignerConfig;
16use crate::dependency_injection::builder::SNAPSHOT_ARTIFACTS_DIR;
17use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
18use crate::file_uploaders::{
19 CloudRemotePath, CloudUploader, FileUploadRetryPolicy, GCloudBackendUploader, LocalUploader,
20};
21use crate::get_dependency;
22use crate::http_server::{CARDANO_DATABASE_DOWNLOAD_PATH, SNAPSHOT_DOWNLOAD_PATH};
23use crate::services::ancillary_signer::{
24 AncillarySigner, AncillarySignerWithGcpKms, AncillarySignerWithSecretKey,
25};
26use crate::services::{
27 CompressedArchiveSnapshotter, DumbSnapshotter, MithrilSignedEntityService, SignedEntityService,
28 SignedEntityServiceArtifactsDependencies, Snapshotter,
29};
30use crate::tools::DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR;
31use crate::tools::file_archiver::FileArchiver;
32use crate::{DumbUploader, ExecutionEnvironment, FileUploader, SnapshotUploaderType};
33
34impl DependenciesBuilder {
35 async fn build_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
36 let logger = self.root_logger();
37 let signed_entity_storer = self.get_signed_entity_storer().await?;
38 let epoch_service = self.get_epoch_service().await?;
39 let mithril_stake_distribution_artifact_builder = Arc::new(
40 MithrilStakeDistributionArtifactBuilder::new(epoch_service.clone()),
41 );
42 let snapshotter = self.get_snapshotter().await?;
43 let snapshot_uploader = self.get_snapshot_uploader().await?;
44 let cardano_node_version = Version::parse(&self.configuration.cardano_node_version())
45 .map_err(|e| DependenciesBuilderError::Initialization { message: format!("Could not parse configuration setting 'cardano_node_version' value '{}' as Semver.", self.configuration.cardano_node_version()), error: Some(e.into()) })?;
46 let cardano_immutable_files_full_artifact_builder =
47 Arc::new(CardanoImmutableFilesFullArtifactBuilder::new(
48 self.configuration.get_network()?,
49 &cardano_node_version,
50 snapshotter,
51 snapshot_uploader,
52 logger.clone(),
53 ));
54 let legacy_prover_service = self.get_legacy_prover_service().await?;
55 let cardano_transactions_artifact_builder = Arc::new(
56 CardanoTransactionsArtifactBuilder::new(legacy_prover_service.clone()),
57 );
58 let prover_service = self.get_prover_service().await?;
59 let mithril_network_configuration_provider =
60 self.get_mithril_network_configuration_provider().await?;
61 let cardano_blocks_transactions_artifact_builder =
62 Arc::new(CardanoBlocksTransactionsArtifactBuilder::new(
63 prover_service.clone(),
64 mithril_network_configuration_provider.clone(),
65 ));
66 let stake_store = self.get_stake_store().await?;
67 let cardano_stake_distribution_artifact_builder =
68 Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store));
69 let cardano_database_artifact_builder = Arc::new(
70 self.build_cardano_database_artifact_builder(cardano_node_version)
71 .await?,
72 );
73 let dependencies = SignedEntityServiceArtifactsDependencies::new(
74 mithril_stake_distribution_artifact_builder,
75 cardano_immutable_files_full_artifact_builder,
76 cardano_transactions_artifact_builder,
77 cardano_blocks_transactions_artifact_builder,
78 cardano_stake_distribution_artifact_builder,
79 cardano_database_artifact_builder,
80 );
81 let signed_entity_service = Arc::new(MithrilSignedEntityService::new(
82 signed_entity_storer,
83 dependencies,
84 self.get_signed_entity_type_lock().await?,
85 self.get_metrics_service().await?,
86 logger,
87 ));
88
89 if let Some(signed_entity) =
93 signed_entity_service.get_last_cardano_transaction_snapshot().await?
94 {
95 legacy_prover_service
96 .compute_cache(signed_entity.artifact.block_number)
97 .await?;
98 }
99
100 Ok(signed_entity_service)
101 }
102
103 pub async fn get_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
105 get_dependency!(self.signed_entity_service)
106 }
107
108 async fn build_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
109 let archive_verification_directory =
110 std::env::temp_dir().join("mithril_archiver_verify_archive");
111 let file_archiver = Arc::new(FileArchiver::new(
112 self.configuration.zstandard_parameters().unwrap_or_default(),
113 archive_verification_directory,
114 self.root_logger(),
115 ));
116
117 Ok(file_archiver)
118 }
119
120 async fn get_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
121 get_dependency!(self.file_archiver)
122 }
123
124 async fn get_ancillary_signer(&self) -> Result<Arc<dyn AncillarySigner>> {
125 match &self.configuration.ancillary_files_signer_config() {
126 AncillaryFilesSignerConfig::SecretKey { secret_key } => {
127 let manifest_signer = ManifestSigner::from_secret_key(
128 secret_key
129 .as_str()
130 .try_into()
131 .with_context(|| "Failed to build ancillary signer: Invalid secret key")?,
132 );
133
134 Ok(Arc::new(AncillarySignerWithSecretKey::new(
135 manifest_signer,
136 self.root_logger(),
137 )))
138 }
139 AncillaryFilesSignerConfig::GcpKms {
140 resource_name,
141 credentials_json_env_var,
142 } => {
143 let service = AncillarySignerWithGcpKms::new(
144 resource_name.clone(),
145 credentials_json_env_var.clone(),
146 self.root_logger(),
147 )
148 .await?;
149
150 Ok(Arc::new(service))
151 }
152 }
153 }
154
155 async fn build_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
156 let snapshotter: Arc<dyn Snapshotter> = match self.configuration.environment() {
157 ExecutionEnvironment::Production => {
158 let ongoing_snapshot_directory =
159 self.configuration.get_snapshot_dir()?.join("pending_snapshot");
160
161 Arc::new(CompressedArchiveSnapshotter::new(
162 self.configuration.db_directory().clone(),
163 ongoing_snapshot_directory,
164 self.configuration.snapshot_compression_algorithm(),
165 self.get_file_archiver().await?,
166 self.get_ancillary_signer().await?,
167 self.root_logger(),
168 )?)
169 }
170 _ => Arc::new(DumbSnapshotter::new(
171 self.configuration.snapshot_compression_algorithm(),
172 )),
173 };
174
175 Ok(snapshotter)
176 }
177
178 async fn build_digests_snapshotter(
179 &mut self,
180 digests_path: PathBuf,
181 ) -> Result<DigestSnapshotter> {
182 Ok(DigestSnapshotter {
183 file_archiver: self.get_file_archiver().await?,
184 target_location: digests_path,
185 compression_algorithm: self.configuration.snapshot_compression_algorithm(),
186 })
187 }
188
189 pub async fn get_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
191 get_dependency!(self.snapshotter)
192 }
193
194 async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
195 let logger = self.root_logger();
196 if self.configuration.environment() == ExecutionEnvironment::Production {
197 match self.configuration.snapshot_uploader_type() {
198 SnapshotUploaderType::Gcp => {
199 let allow_overwrite = true;
200 let remote_folder_path = CloudRemotePath::new("cardano-immutable-files-full");
201
202 Ok(Arc::new(
203 self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
204 ))
205 }
206 SnapshotUploaderType::Local => {
207 let server_url_prefix = self.configuration.get_server_url()?;
208 let snapshot_url_prefix =
209 server_url_prefix.sanitize_join(SNAPSHOT_DOWNLOAD_PATH)?;
210 let snapshot_artifacts_dir =
211 self.configuration.get_snapshot_dir()?.join(SNAPSHOT_ARTIFACTS_DIR);
212 std::fs::create_dir_all(&snapshot_artifacts_dir).map_err(|e| {
213 DependenciesBuilderError::Initialization {
214 message: format!(
215 "Cannot create '{snapshot_artifacts_dir:?}' directory."
216 ),
217 error: Some(e.into()),
218 }
219 })?;
220
221 Ok(Arc::new(LocalUploader::new(
222 snapshot_url_prefix,
223 &snapshot_artifacts_dir,
224 FileUploadRetryPolicy::default(),
225 logger,
226 )))
227 }
228 }
229 } else {
230 Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
231 }
232 }
233
234 pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
236 get_dependency!(self.snapshot_uploader)
237 }
238
239 async fn build_gcp_uploader(
240 &self,
241 remote_folder_path: CloudRemotePath,
242 allow_overwrite: bool,
243 ) -> Result<CloudUploader> {
244 let logger = self.root_logger();
245 let bucket = self.configuration.snapshot_bucket_name().to_owned().ok_or_else(|| {
246 DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
247 })?;
248
249 Ok(CloudUploader::new(
250 Arc::new(
251 GCloudBackendUploader::try_new(
252 bucket,
253 self.configuration.snapshot_use_cdn_domain(),
254 DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR.to_string(),
255 logger.clone(),
256 )
257 .await?,
258 ),
259 remote_folder_path,
260 allow_overwrite,
261 FileUploadRetryPolicy::default(),
262 ))
263 }
264
265 async fn build_cardano_database_ancillary_uploaders(
266 &self,
267 ) -> Result<Vec<Arc<dyn AncillaryFileUploader>>> {
268 let logger = self.root_logger();
269 if self.configuration.environment() == ExecutionEnvironment::Production {
270 match self.configuration.snapshot_uploader_type() {
271 SnapshotUploaderType::Gcp => {
272 let allow_overwrite = true;
273 let remote_folder_path =
274 CloudRemotePath::new("cardano-database").join("ancillary");
275
276 Ok(vec![Arc::new(
277 self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
278 )])
279 }
280 SnapshotUploaderType::Local => {
281 let server_url_prefix = self.configuration.get_server_url()?;
282 let ancillary_url_prefix = server_url_prefix
283 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/ancillary/"))?;
284 let target_dir = self.get_cardano_db_artifacts_dir()?.join("ancillary");
285
286 std::fs::create_dir_all(&target_dir).map_err(|e| {
287 DependenciesBuilderError::Initialization {
288 message: format!("Cannot create '{target_dir:?}' directory."),
289 error: Some(e.into()),
290 }
291 })?;
292
293 Ok(vec![Arc::new(LocalUploader::new(
294 ancillary_url_prefix,
295 &target_dir,
296 FileUploadRetryPolicy::default(),
297 logger,
298 ))])
299 }
300 }
301 } else {
302 Ok(vec![Arc::new(DumbUploader::new(
303 FileUploadRetryPolicy::never(),
304 ))])
305 }
306 }
307
308 async fn build_cardano_database_immutable_uploaders(
309 &self,
310 ) -> Result<Vec<Arc<dyn ImmutableFilesUploader>>> {
311 let logger = self.root_logger();
312 if self.configuration.environment() == ExecutionEnvironment::Production {
313 match self.configuration.snapshot_uploader_type() {
314 SnapshotUploaderType::Gcp => {
315 let allow_overwrite = false;
316 let remote_folder_path =
317 CloudRemotePath::new("cardano-database").join("immutable");
318
319 Ok(vec![Arc::new(
320 self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
321 )])
322 }
323 SnapshotUploaderType::Local => {
324 let server_url_prefix = self.configuration.get_server_url()?;
325 let immutable_url_prefix = server_url_prefix
326 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/immutable/"))?;
327
328 Ok(vec![Arc::new(LocalUploader::new_without_copy(
329 immutable_url_prefix,
330 FileUploadRetryPolicy::default(),
331 logger,
332 ))])
333 }
334 }
335 } else {
336 Ok(vec![Arc::new(DumbUploader::new(
337 FileUploadRetryPolicy::never(),
338 ))])
339 }
340 }
341
342 async fn build_cardano_database_digests_uploaders(
343 &self,
344 ) -> Result<Vec<Arc<dyn DigestFileUploader>>> {
345 let logger = self.root_logger();
346 if self.configuration.environment() == ExecutionEnvironment::Production {
347 match self.configuration.snapshot_uploader_type() {
348 SnapshotUploaderType::Gcp => {
349 let allow_overwrite = false;
350 let remote_folder_path =
351 CloudRemotePath::new("cardano-database").join("digests");
352
353 Ok(vec![Arc::new(
354 self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
355 )])
356 }
357 SnapshotUploaderType::Local => {
358 let server_url_prefix = self.configuration.get_server_url()?;
359 let digests_url_prefix = server_url_prefix
360 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/digests/"))?;
361 let target_dir = self.get_cardano_db_artifacts_dir()?.join("digests");
362
363 std::fs::create_dir_all(&target_dir).map_err(|e| {
364 DependenciesBuilderError::Initialization {
365 message: format!("Cannot create '{target_dir:?}' directory."),
366 error: Some(e.into()),
367 }
368 })?;
369
370 Ok(vec![Arc::new(LocalUploader::new(
371 digests_url_prefix,
372 &target_dir,
373 FileUploadRetryPolicy::default(),
374 logger,
375 ))])
376 }
377 }
378 } else {
379 Ok(vec![Arc::new(DumbUploader::new(
380 FileUploadRetryPolicy::never(),
381 ))])
382 }
383 }
384
385 async fn build_cardano_database_artifact_builder(
386 &mut self,
387 cardano_node_version: Version,
388 ) -> Result<CardanoDatabaseArtifactBuilder> {
389 let snapshot_dir = self.configuration.get_snapshot_dir()?;
390 let immutable_dir = self.get_cardano_db_artifacts_dir()?.join("immutable");
391
392 let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new(
393 self.build_cardano_database_ancillary_uploaders().await?,
394 self.get_snapshotter().await?,
395 self.configuration.get_network()?,
396 self.root_logger(),
397 )?);
398
399 let immutable_builder = Arc::new(ImmutableArtifactBuilder::new(
400 immutable_dir,
401 self.build_cardano_database_immutable_uploaders().await?,
402 self.get_snapshotter().await?,
403 self.root_logger(),
404 )?);
405
406 let digests_path = snapshot_dir.join("pending_cardano_database_digests");
407 let digests_snapshotter = self.build_digests_snapshotter(digests_path.clone()).await?;
408
409 let digest_builder = Arc::new(DigestArtifactBuilder::new(
410 self.configuration.get_server_url()?,
411 self.build_cardano_database_digests_uploaders().await?,
412 digests_snapshotter,
413 self.configuration.get_network()?,
414 digests_path,
415 self.get_immutable_file_digest_mapper().await?,
416 self.root_logger(),
417 )?);
418
419 Ok(CardanoDatabaseArtifactBuilder::new(
420 self.configuration.get_network()?,
421 &cardano_node_version,
422 ancillary_builder,
423 immutable_builder,
424 digest_builder,
425 ))
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use mithril_common::temp_dir_create;
432 use mithril_persistence::sqlite::ConnectionBuilder;
433
434 use crate::ServeCommandConfiguration;
435 use crate::dependency_injection::builder::CARDANO_DB_ARTIFACTS_DIR;
436
437 use super::*;
438
439 #[tokio::test]
440 async fn if_not_local_uploader_create_cardano_database_immutable_dirs() {
441 let snapshot_directory = temp_dir_create!();
442 let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
443 let ancillary_dir = cdb_dir.join("ancillary");
444 let immutable_dir = cdb_dir.join("immutable");
445 let digests_dir = cdb_dir.join("digests");
446
447 let mut dep_builder = {
448 let config = ServeCommandConfiguration {
449 environment: ExecutionEnvironment::Test,
451 ..ServeCommandConfiguration::new_sample(snapshot_directory)
452 };
453
454 DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
455 };
456
457 assert!(!ancillary_dir.exists());
458 assert!(!immutable_dir.exists());
459 assert!(!digests_dir.exists());
460
461 dep_builder
462 .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
463 .await
464 .unwrap();
465
466 assert!(!ancillary_dir.exists());
467 assert!(immutable_dir.exists());
468 assert!(!digests_dir.exists());
469 }
470
471 #[tokio::test]
472 async fn if_local_uploader_creates_all_cardano_database_subdirs() {
473 let snapshot_directory = temp_dir_create!();
474 let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
475 let ancillary_dir = cdb_dir.join("ancillary");
476 let immutable_dir = cdb_dir.join("immutable");
477 let digests_dir = cdb_dir.join("digests");
478
479 let mut dep_builder = {
480 let config = ServeCommandConfiguration {
481 environment: ExecutionEnvironment::Production,
483 snapshot_uploader_type: SnapshotUploaderType::Local,
484 ..ServeCommandConfiguration::new_sample(snapshot_directory)
485 };
486
487 DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
488 };
489 dep_builder.sqlite_connection =
492 Some(Arc::new(ConnectionBuilder::open_memory().build().unwrap()));
493
494 assert!(!ancillary_dir.exists());
495 assert!(!immutable_dir.exists());
496 assert!(!digests_dir.exists());
497
498 dep_builder
499 .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
500 .await
501 .unwrap();
502
503 assert!(ancillary_dir.exists());
504 assert!(immutable_dir.exists());
505 assert!(digests_dir.exists());
506 }
507}