mithril_aggregator/dependency_injection/builder/protocol/
artifacts.rs1use anyhow::Context;
2use semver::Version;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use mithril_common::crypto_helper::ManifestSigner;
7
8use crate::artifact_builder::{
9 AncillaryArtifactBuilder, AncillaryFileUploader, CardanoDatabaseArtifactBuilder,
10 CardanoImmutableFilesFullArtifactBuilder, CardanoStakeDistributionArtifactBuilder,
11 CardanoTransactionsArtifactBuilder, DigestArtifactBuilder, DigestFileUploader,
12 DigestSnapshotter, ImmutableArtifactBuilder, ImmutableFilesUploader,
13 MithrilStakeDistributionArtifactBuilder,
14};
15use crate::configuration::AncillaryFilesSignerConfig;
16use crate::dependency_injection::builder::SNAPSHOT_ARTIFACTS_DIR;
17use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
18use crate::file_uploaders::{
19 CloudRemotePath, CloudUploader, FileUploadRetryPolicy, GCloudBackendUploader, LocalUploader,
20};
21use crate::get_dependency;
22use crate::http_server::{CARDANO_DATABASE_DOWNLOAD_PATH, SNAPSHOT_DOWNLOAD_PATH};
23use crate::services::ancillary_signer::{
24 AncillarySigner, AncillarySignerWithGcpKms, AncillarySignerWithSecretKey,
25};
26use crate::services::{
27 CompressedArchiveSnapshotter, DumbSnapshotter, MithrilSignedEntityService, SignedEntityService,
28 SignedEntityServiceArtifactsDependencies, Snapshotter,
29};
30use crate::tools::file_archiver::FileArchiver;
31use crate::tools::DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR;
32use crate::{DumbUploader, ExecutionEnvironment, FileUploader, SnapshotUploaderType};
33
34impl DependenciesBuilder {
35 async fn build_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
36 let logger = self.root_logger();
37 let signed_entity_storer = self.get_signed_entity_storer().await?;
38 let epoch_service = self.get_epoch_service().await?;
39 let mithril_stake_distribution_artifact_builder = Arc::new(
40 MithrilStakeDistributionArtifactBuilder::new(epoch_service.clone()),
41 );
42 let snapshotter = self.get_snapshotter().await?;
43 let snapshot_uploader = self.get_snapshot_uploader().await?;
44 let cardano_node_version = Version::parse(&self.configuration.cardano_node_version())
45 .map_err(|e| DependenciesBuilderError::Initialization { message: format!("Could not parse configuration setting 'cardano_node_version' value '{}' as Semver.", self.configuration.cardano_node_version()), error: Some(e.into()) })?;
46 let cardano_immutable_files_full_artifact_builder =
47 Arc::new(CardanoImmutableFilesFullArtifactBuilder::new(
48 self.configuration.get_network()?,
49 &cardano_node_version,
50 snapshotter,
51 snapshot_uploader,
52 logger.clone(),
53 ));
54 let prover_service = self.get_prover_service().await?;
55 let cardano_transactions_artifact_builder = Arc::new(
56 CardanoTransactionsArtifactBuilder::new(prover_service.clone()),
57 );
58 let stake_store = self.get_stake_store().await?;
59 let cardano_stake_distribution_artifact_builder =
60 Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store));
61 let cardano_database_artifact_builder = Arc::new(
62 self.build_cardano_database_artifact_builder(cardano_node_version)
63 .await?,
64 );
65 let dependencies = SignedEntityServiceArtifactsDependencies::new(
66 mithril_stake_distribution_artifact_builder,
67 cardano_immutable_files_full_artifact_builder,
68 cardano_transactions_artifact_builder,
69 cardano_stake_distribution_artifact_builder,
70 cardano_database_artifact_builder,
71 );
72 let signed_entity_service = Arc::new(MithrilSignedEntityService::new(
73 signed_entity_storer,
74 dependencies,
75 self.get_signed_entity_type_lock().await?,
76 self.get_metrics_service().await?,
77 logger,
78 ));
79
80 if let Some(signed_entity) = signed_entity_service
84 .get_last_cardano_transaction_snapshot()
85 .await?
86 {
87 prover_service
88 .compute_cache(signed_entity.artifact.block_number)
89 .await?;
90 }
91
92 Ok(signed_entity_service)
93 }
94
95 pub async fn get_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
97 get_dependency!(self.signed_entity_service)
98 }
99
100 async fn build_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
101 let archive_verification_directory =
102 std::env::temp_dir().join("mithril_archiver_verify_archive");
103 let file_archiver = Arc::new(FileArchiver::new(
104 self.configuration
105 .zstandard_parameters()
106 .unwrap_or_default(),
107 archive_verification_directory,
108 self.root_logger(),
109 ));
110
111 Ok(file_archiver)
112 }
113
114 async fn get_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
115 get_dependency!(self.file_archiver)
116 }
117
118 async fn get_ancillary_signer(&self) -> Result<Arc<dyn AncillarySigner>> {
119 match &self.configuration.ancillary_files_signer_config() {
120 AncillaryFilesSignerConfig::SecretKey { secret_key } => {
121 let manifest_signer = ManifestSigner::from_secret_key(
122 secret_key
123 .as_str()
124 .try_into()
125 .with_context(|| "Failed to build ancillary signer: Invalid secret key")?,
126 );
127
128 Ok(Arc::new(AncillarySignerWithSecretKey::new(
129 manifest_signer,
130 self.root_logger(),
131 )))
132 }
133 AncillaryFilesSignerConfig::GcpKms {
134 resource_name,
135 credentials_json_env_var,
136 } => {
137 let service = AncillarySignerWithGcpKms::new(
138 resource_name.clone(),
139 credentials_json_env_var.clone(),
140 self.root_logger(),
141 )
142 .await?;
143
144 Ok(Arc::new(service))
145 }
146 }
147 }
148
149 async fn build_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
150 let snapshotter: Arc<dyn Snapshotter> = match self.configuration.environment() {
151 ExecutionEnvironment::Production => {
152 let ongoing_snapshot_directory = self
153 .configuration
154 .get_snapshot_dir()?
155 .join("pending_snapshot");
156
157 Arc::new(CompressedArchiveSnapshotter::new(
158 self.configuration.db_directory().clone(),
159 ongoing_snapshot_directory,
160 self.configuration.snapshot_compression_algorithm(),
161 self.get_file_archiver().await?,
162 self.get_ancillary_signer().await?,
163 self.root_logger(),
164 )?)
165 }
166 _ => Arc::new(DumbSnapshotter::new(
167 self.configuration.snapshot_compression_algorithm(),
168 )),
169 };
170
171 Ok(snapshotter)
172 }
173
174 async fn build_digests_snapshotter(
175 &mut self,
176 digests_path: PathBuf,
177 ) -> Result<DigestSnapshotter> {
178 Ok(DigestSnapshotter {
179 file_archiver: self.get_file_archiver().await?,
180 target_location: digests_path,
181 compression_algorithm: self.configuration.snapshot_compression_algorithm(),
182 })
183 }
184
185 pub async fn get_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
187 get_dependency!(self.snapshotter)
188 }
189
190 async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
191 let logger = self.root_logger();
192 if self.configuration.environment() == ExecutionEnvironment::Production {
193 match self.configuration.snapshot_uploader_type() {
194 SnapshotUploaderType::Gcp => {
195 let allow_overwrite = true;
196 let remote_folder_path = CloudRemotePath::new("cardano-immutable-files-full");
197
198 Ok(Arc::new(
199 self.build_gcp_uploader(remote_folder_path, allow_overwrite)
200 .await?,
201 ))
202 }
203 SnapshotUploaderType::Local => {
204 let server_url_prefix = self.configuration.get_server_url()?;
205 let snapshot_url_prefix =
206 server_url_prefix.sanitize_join(SNAPSHOT_DOWNLOAD_PATH)?;
207 let snapshot_artifacts_dir = self
208 .configuration
209 .get_snapshot_dir()?
210 .join(SNAPSHOT_ARTIFACTS_DIR);
211 std::fs::create_dir_all(&snapshot_artifacts_dir).map_err(|e| {
212 DependenciesBuilderError::Initialization {
213 message: format!(
214 "Cannot create '{snapshot_artifacts_dir:?}' directory."
215 ),
216 error: Some(e.into()),
217 }
218 })?;
219
220 Ok(Arc::new(LocalUploader::new(
221 snapshot_url_prefix,
222 &snapshot_artifacts_dir,
223 FileUploadRetryPolicy::default(),
224 logger,
225 )))
226 }
227 }
228 } else {
229 Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
230 }
231 }
232
233 pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
235 get_dependency!(self.snapshot_uploader)
236 }
237
238 async fn build_gcp_uploader(
239 &self,
240 remote_folder_path: CloudRemotePath,
241 allow_overwrite: bool,
242 ) -> Result<CloudUploader> {
243 let logger = self.root_logger();
244 let bucket = self
245 .configuration
246 .snapshot_bucket_name()
247 .to_owned()
248 .ok_or_else(|| {
249 DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
250 })?;
251
252 Ok(CloudUploader::new(
253 Arc::new(
254 GCloudBackendUploader::try_new(
255 bucket,
256 self.configuration.snapshot_use_cdn_domain(),
257 DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR.to_string(),
258 logger.clone(),
259 )
260 .await?,
261 ),
262 remote_folder_path,
263 allow_overwrite,
264 FileUploadRetryPolicy::default(),
265 ))
266 }
267
268 async fn build_cardano_database_ancillary_uploaders(
269 &self,
270 ) -> Result<Vec<Arc<dyn AncillaryFileUploader>>> {
271 let logger = self.root_logger();
272 if self.configuration.environment() == ExecutionEnvironment::Production {
273 match self.configuration.snapshot_uploader_type() {
274 SnapshotUploaderType::Gcp => {
275 let allow_overwrite = true;
276 let remote_folder_path =
277 CloudRemotePath::new("cardano-database").join("ancillary");
278
279 Ok(vec![Arc::new(
280 self.build_gcp_uploader(remote_folder_path, allow_overwrite)
281 .await?,
282 )])
283 }
284 SnapshotUploaderType::Local => {
285 let server_url_prefix = self.configuration.get_server_url()?;
286 let ancillary_url_prefix = server_url_prefix
287 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/ancillary/"))?;
288 let target_dir = self.get_cardano_db_artifacts_dir()?.join("ancillary");
289
290 std::fs::create_dir_all(&target_dir).map_err(|e| {
291 DependenciesBuilderError::Initialization {
292 message: format!("Cannot create '{target_dir:?}' directory."),
293 error: Some(e.into()),
294 }
295 })?;
296
297 Ok(vec![Arc::new(LocalUploader::new(
298 ancillary_url_prefix,
299 &target_dir,
300 FileUploadRetryPolicy::default(),
301 logger,
302 ))])
303 }
304 }
305 } else {
306 Ok(vec![Arc::new(DumbUploader::new(
307 FileUploadRetryPolicy::never(),
308 ))])
309 }
310 }
311
312 async fn build_cardano_database_immutable_uploaders(
313 &self,
314 ) -> Result<Vec<Arc<dyn ImmutableFilesUploader>>> {
315 let logger = self.root_logger();
316 if self.configuration.environment() == ExecutionEnvironment::Production {
317 match self.configuration.snapshot_uploader_type() {
318 SnapshotUploaderType::Gcp => {
319 let allow_overwrite = false;
320 let remote_folder_path =
321 CloudRemotePath::new("cardano-database").join("immutable");
322
323 Ok(vec![Arc::new(
324 self.build_gcp_uploader(remote_folder_path, allow_overwrite)
325 .await?,
326 )])
327 }
328 SnapshotUploaderType::Local => {
329 let server_url_prefix = self.configuration.get_server_url()?;
330 let immutable_url_prefix = server_url_prefix
331 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/immutable/"))?;
332
333 Ok(vec![Arc::new(LocalUploader::new_without_copy(
334 immutable_url_prefix,
335 FileUploadRetryPolicy::default(),
336 logger,
337 ))])
338 }
339 }
340 } else {
341 Ok(vec![Arc::new(DumbUploader::new(
342 FileUploadRetryPolicy::never(),
343 ))])
344 }
345 }
346
347 async fn build_cardano_database_digests_uploaders(
348 &self,
349 ) -> Result<Vec<Arc<dyn DigestFileUploader>>> {
350 let logger = self.root_logger();
351 if self.configuration.environment() == ExecutionEnvironment::Production {
352 match self.configuration.snapshot_uploader_type() {
353 SnapshotUploaderType::Gcp => {
354 let allow_overwrite = false;
355 let remote_folder_path =
356 CloudRemotePath::new("cardano-database").join("digests");
357
358 Ok(vec![Arc::new(
359 self.build_gcp_uploader(remote_folder_path, allow_overwrite)
360 .await?,
361 )])
362 }
363 SnapshotUploaderType::Local => {
364 let server_url_prefix = self.configuration.get_server_url()?;
365 let digests_url_prefix = server_url_prefix
366 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/digests/"))?;
367 let target_dir = self.get_cardano_db_artifacts_dir()?.join("digests");
368
369 std::fs::create_dir_all(&target_dir).map_err(|e| {
370 DependenciesBuilderError::Initialization {
371 message: format!("Cannot create '{target_dir:?}' directory."),
372 error: Some(e.into()),
373 }
374 })?;
375
376 Ok(vec![Arc::new(LocalUploader::new(
377 digests_url_prefix,
378 &target_dir,
379 FileUploadRetryPolicy::default(),
380 logger,
381 ))])
382 }
383 }
384 } else {
385 Ok(vec![Arc::new(DumbUploader::new(
386 FileUploadRetryPolicy::never(),
387 ))])
388 }
389 }
390
391 async fn build_cardano_database_artifact_builder(
392 &mut self,
393 cardano_node_version: Version,
394 ) -> Result<CardanoDatabaseArtifactBuilder> {
395 let snapshot_dir = self.configuration.get_snapshot_dir()?;
396 let immutable_dir = self.get_cardano_db_artifacts_dir()?.join("immutable");
397
398 let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new(
399 self.build_cardano_database_ancillary_uploaders().await?,
400 self.get_snapshotter().await?,
401 self.configuration.get_network()?,
402 self.root_logger(),
403 )?);
404
405 let immutable_builder = Arc::new(ImmutableArtifactBuilder::new(
406 immutable_dir,
407 self.build_cardano_database_immutable_uploaders().await?,
408 self.get_snapshotter().await?,
409 self.root_logger(),
410 )?);
411
412 let digests_path = snapshot_dir.join("pending_cardano_database_digests");
413 let digests_snapshotter = self.build_digests_snapshotter(digests_path.clone()).await?;
414
415 let digest_builder = Arc::new(DigestArtifactBuilder::new(
416 self.configuration.get_server_url()?,
417 self.build_cardano_database_digests_uploaders().await?,
418 digests_snapshotter,
419 self.configuration.get_network()?,
420 digests_path,
421 self.get_immutable_file_digest_mapper().await?,
422 self.root_logger(),
423 )?);
424
425 Ok(CardanoDatabaseArtifactBuilder::new(
426 self.configuration.get_network()?,
427 &cardano_node_version,
428 ancillary_builder,
429 immutable_builder,
430 digest_builder,
431 ))
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use mithril_common::temp_dir_create;
438 use mithril_persistence::sqlite::ConnectionBuilder;
439
440 use crate::dependency_injection::builder::CARDANO_DB_ARTIFACTS_DIR;
441 use crate::ServeCommandConfiguration;
442
443 use super::*;
444
445 #[tokio::test]
446 async fn if_not_local_uploader_create_cardano_database_immutable_dirs() {
447 let snapshot_directory = temp_dir_create!();
448 let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
449 let ancillary_dir = cdb_dir.join("ancillary");
450 let immutable_dir = cdb_dir.join("immutable");
451 let digests_dir = cdb_dir.join("digests");
452
453 let mut dep_builder = {
454 let config = ServeCommandConfiguration {
455 environment: ExecutionEnvironment::Test,
457 ..ServeCommandConfiguration::new_sample(snapshot_directory)
458 };
459
460 DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
461 };
462
463 assert!(!ancillary_dir.exists());
464 assert!(!immutable_dir.exists());
465 assert!(!digests_dir.exists());
466
467 dep_builder
468 .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
469 .await
470 .unwrap();
471
472 assert!(!ancillary_dir.exists());
473 assert!(immutable_dir.exists());
474 assert!(!digests_dir.exists());
475 }
476
477 #[tokio::test]
478 async fn if_local_uploader_creates_all_cardano_database_subdirs() {
479 let snapshot_directory = temp_dir_create!();
480 let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
481 let ancillary_dir = cdb_dir.join("ancillary");
482 let immutable_dir = cdb_dir.join("immutable");
483 let digests_dir = cdb_dir.join("digests");
484
485 let mut dep_builder = {
486 let config = ServeCommandConfiguration {
487 environment: ExecutionEnvironment::Production,
489 snapshot_uploader_type: SnapshotUploaderType::Local,
490 ..ServeCommandConfiguration::new_sample(snapshot_directory)
491 };
492
493 DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
494 };
495 dep_builder.sqlite_connection =
498 Some(Arc::new(ConnectionBuilder::open_memory().build().unwrap()));
499
500 assert!(!ancillary_dir.exists());
501 assert!(!immutable_dir.exists());
502 assert!(!digests_dir.exists());
503
504 dep_builder
505 .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
506 .await
507 .unwrap();
508
509 assert!(ancillary_dir.exists());
510 assert!(immutable_dir.exists());
511 assert!(digests_dir.exists());
512 }
513}