mithril_aggregator/dependency_injection/builder/protocol/
artifacts.rs1use anyhow::Context;
2use semver::Version;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use mithril_common::crypto_helper::ManifestSigner;
7
8use crate::artifact_builder::{
9 AncillaryArtifactBuilder, AncillaryFileUploader, CardanoDatabaseArtifactBuilder,
10 CardanoImmutableFilesFullArtifactBuilder, CardanoStakeDistributionArtifactBuilder,
11 CardanoTransactionsArtifactBuilder, DigestArtifactBuilder, DigestFileUploader,
12 DigestSnapshotter, ImmutableArtifactBuilder, ImmutableFilesUploader,
13 MithrilStakeDistributionArtifactBuilder,
14};
15use crate::configuration::AncillaryFilesSignerConfig;
16use crate::dependency_injection::builder::SNAPSHOT_ARTIFACTS_DIR;
17use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
18use crate::file_uploaders::{
19 CloudRemotePath, CloudUploader, FileUploadRetryPolicy, GCloudBackendUploader, LocalUploader,
20};
21use crate::get_dependency;
22use crate::http_server::{CARDANO_DATABASE_DOWNLOAD_PATH, SNAPSHOT_DOWNLOAD_PATH};
23use crate::services::ancillary_signer::{
24 AncillarySigner, AncillarySignerWithGcpKms, AncillarySignerWithSecretKey,
25};
26use crate::services::{
27 CompressedArchiveSnapshotter, DumbSnapshotter, MithrilSignedEntityService, SignedEntityService,
28 SignedEntityServiceArtifactsDependencies, Snapshotter,
29};
30use crate::tools::DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR;
31use crate::tools::file_archiver::FileArchiver;
32use crate::{DumbUploader, ExecutionEnvironment, FileUploader, SnapshotUploaderType};
33
34impl DependenciesBuilder {
35 async fn build_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
36 let logger = self.root_logger();
37 let signed_entity_storer = self.get_signed_entity_storer().await?;
38 let epoch_service = self.get_epoch_service().await?;
39 let mithril_stake_distribution_artifact_builder = Arc::new(
40 MithrilStakeDistributionArtifactBuilder::new(epoch_service.clone()),
41 );
42 let snapshotter = self.get_snapshotter().await?;
43 let snapshot_uploader = self.get_snapshot_uploader().await?;
44 let cardano_node_version = Version::parse(&self.configuration.cardano_node_version())
45 .map_err(|e| DependenciesBuilderError::Initialization { message: format!("Could not parse configuration setting 'cardano_node_version' value '{}' as Semver.", self.configuration.cardano_node_version()), error: Some(e.into()) })?;
46 let cardano_immutable_files_full_artifact_builder =
47 Arc::new(CardanoImmutableFilesFullArtifactBuilder::new(
48 self.configuration.get_network()?,
49 &cardano_node_version,
50 snapshotter,
51 snapshot_uploader,
52 logger.clone(),
53 ));
54 let prover_service = self.get_prover_service().await?;
55 let cardano_transactions_artifact_builder = Arc::new(
56 CardanoTransactionsArtifactBuilder::new(prover_service.clone()),
57 );
58 let stake_store = self.get_stake_store().await?;
59 let cardano_stake_distribution_artifact_builder =
60 Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store));
61 let cardano_database_artifact_builder = Arc::new(
62 self.build_cardano_database_artifact_builder(cardano_node_version)
63 .await?,
64 );
65 let dependencies = SignedEntityServiceArtifactsDependencies::new(
66 mithril_stake_distribution_artifact_builder,
67 cardano_immutable_files_full_artifact_builder,
68 cardano_transactions_artifact_builder,
69 cardano_stake_distribution_artifact_builder,
70 cardano_database_artifact_builder,
71 );
72 let signed_entity_service = Arc::new(MithrilSignedEntityService::new(
73 signed_entity_storer,
74 dependencies,
75 self.get_signed_entity_type_lock().await?,
76 self.get_metrics_service().await?,
77 logger,
78 ));
79
80 if let Some(signed_entity) =
84 signed_entity_service.get_last_cardano_transaction_snapshot().await?
85 {
86 prover_service
87 .compute_cache(signed_entity.artifact.block_number)
88 .await?;
89 }
90
91 Ok(signed_entity_service)
92 }
93
94 pub async fn get_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
96 get_dependency!(self.signed_entity_service)
97 }
98
99 async fn build_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
100 let archive_verification_directory =
101 std::env::temp_dir().join("mithril_archiver_verify_archive");
102 let file_archiver = Arc::new(FileArchiver::new(
103 self.configuration.zstandard_parameters().unwrap_or_default(),
104 archive_verification_directory,
105 self.root_logger(),
106 ));
107
108 Ok(file_archiver)
109 }
110
111 async fn get_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
112 get_dependency!(self.file_archiver)
113 }
114
115 async fn get_ancillary_signer(&self) -> Result<Arc<dyn AncillarySigner>> {
116 match &self.configuration.ancillary_files_signer_config() {
117 AncillaryFilesSignerConfig::SecretKey { secret_key } => {
118 let manifest_signer = ManifestSigner::from_secret_key(
119 secret_key
120 .as_str()
121 .try_into()
122 .with_context(|| "Failed to build ancillary signer: Invalid secret key")?,
123 );
124
125 Ok(Arc::new(AncillarySignerWithSecretKey::new(
126 manifest_signer,
127 self.root_logger(),
128 )))
129 }
130 AncillaryFilesSignerConfig::GcpKms {
131 resource_name,
132 credentials_json_env_var,
133 } => {
134 let service = AncillarySignerWithGcpKms::new(
135 resource_name.clone(),
136 credentials_json_env_var.clone(),
137 self.root_logger(),
138 )
139 .await?;
140
141 Ok(Arc::new(service))
142 }
143 }
144 }
145
146 async fn build_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
147 let snapshotter: Arc<dyn Snapshotter> = match self.configuration.environment() {
148 ExecutionEnvironment::Production => {
149 let ongoing_snapshot_directory =
150 self.configuration.get_snapshot_dir()?.join("pending_snapshot");
151
152 Arc::new(CompressedArchiveSnapshotter::new(
153 self.configuration.db_directory().clone(),
154 ongoing_snapshot_directory,
155 self.configuration.snapshot_compression_algorithm(),
156 self.get_file_archiver().await?,
157 self.get_ancillary_signer().await?,
158 self.root_logger(),
159 )?)
160 }
161 _ => Arc::new(DumbSnapshotter::new(
162 self.configuration.snapshot_compression_algorithm(),
163 )),
164 };
165
166 Ok(snapshotter)
167 }
168
169 async fn build_digests_snapshotter(
170 &mut self,
171 digests_path: PathBuf,
172 ) -> Result<DigestSnapshotter> {
173 Ok(DigestSnapshotter {
174 file_archiver: self.get_file_archiver().await?,
175 target_location: digests_path,
176 compression_algorithm: self.configuration.snapshot_compression_algorithm(),
177 })
178 }
179
180 pub async fn get_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
182 get_dependency!(self.snapshotter)
183 }
184
185 async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
186 let logger = self.root_logger();
187 if self.configuration.environment() == ExecutionEnvironment::Production {
188 match self.configuration.snapshot_uploader_type() {
189 SnapshotUploaderType::Gcp => {
190 let allow_overwrite = true;
191 let remote_folder_path = CloudRemotePath::new("cardano-immutable-files-full");
192
193 Ok(Arc::new(
194 self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
195 ))
196 }
197 SnapshotUploaderType::Local => {
198 let server_url_prefix = self.configuration.get_server_url()?;
199 let snapshot_url_prefix =
200 server_url_prefix.sanitize_join(SNAPSHOT_DOWNLOAD_PATH)?;
201 let snapshot_artifacts_dir =
202 self.configuration.get_snapshot_dir()?.join(SNAPSHOT_ARTIFACTS_DIR);
203 std::fs::create_dir_all(&snapshot_artifacts_dir).map_err(|e| {
204 DependenciesBuilderError::Initialization {
205 message: format!(
206 "Cannot create '{snapshot_artifacts_dir:?}' directory."
207 ),
208 error: Some(e.into()),
209 }
210 })?;
211
212 Ok(Arc::new(LocalUploader::new(
213 snapshot_url_prefix,
214 &snapshot_artifacts_dir,
215 FileUploadRetryPolicy::default(),
216 logger,
217 )))
218 }
219 }
220 } else {
221 Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
222 }
223 }
224
225 pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
227 get_dependency!(self.snapshot_uploader)
228 }
229
230 async fn build_gcp_uploader(
231 &self,
232 remote_folder_path: CloudRemotePath,
233 allow_overwrite: bool,
234 ) -> Result<CloudUploader> {
235 let logger = self.root_logger();
236 let bucket = self.configuration.snapshot_bucket_name().to_owned().ok_or_else(|| {
237 DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
238 })?;
239
240 Ok(CloudUploader::new(
241 Arc::new(
242 GCloudBackendUploader::try_new(
243 bucket,
244 self.configuration.snapshot_use_cdn_domain(),
245 DEFAULT_GCP_CREDENTIALS_JSON_ENV_VAR.to_string(),
246 logger.clone(),
247 )
248 .await?,
249 ),
250 remote_folder_path,
251 allow_overwrite,
252 FileUploadRetryPolicy::default(),
253 ))
254 }
255
256 async fn build_cardano_database_ancillary_uploaders(
257 &self,
258 ) -> Result<Vec<Arc<dyn AncillaryFileUploader>>> {
259 let logger = self.root_logger();
260 if self.configuration.environment() == ExecutionEnvironment::Production {
261 match self.configuration.snapshot_uploader_type() {
262 SnapshotUploaderType::Gcp => {
263 let allow_overwrite = true;
264 let remote_folder_path =
265 CloudRemotePath::new("cardano-database").join("ancillary");
266
267 Ok(vec![Arc::new(
268 self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
269 )])
270 }
271 SnapshotUploaderType::Local => {
272 let server_url_prefix = self.configuration.get_server_url()?;
273 let ancillary_url_prefix = server_url_prefix
274 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/ancillary/"))?;
275 let target_dir = self.get_cardano_db_artifacts_dir()?.join("ancillary");
276
277 std::fs::create_dir_all(&target_dir).map_err(|e| {
278 DependenciesBuilderError::Initialization {
279 message: format!("Cannot create '{target_dir:?}' directory."),
280 error: Some(e.into()),
281 }
282 })?;
283
284 Ok(vec![Arc::new(LocalUploader::new(
285 ancillary_url_prefix,
286 &target_dir,
287 FileUploadRetryPolicy::default(),
288 logger,
289 ))])
290 }
291 }
292 } else {
293 Ok(vec![Arc::new(DumbUploader::new(
294 FileUploadRetryPolicy::never(),
295 ))])
296 }
297 }
298
299 async fn build_cardano_database_immutable_uploaders(
300 &self,
301 ) -> Result<Vec<Arc<dyn ImmutableFilesUploader>>> {
302 let logger = self.root_logger();
303 if self.configuration.environment() == ExecutionEnvironment::Production {
304 match self.configuration.snapshot_uploader_type() {
305 SnapshotUploaderType::Gcp => {
306 let allow_overwrite = false;
307 let remote_folder_path =
308 CloudRemotePath::new("cardano-database").join("immutable");
309
310 Ok(vec![Arc::new(
311 self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
312 )])
313 }
314 SnapshotUploaderType::Local => {
315 let server_url_prefix = self.configuration.get_server_url()?;
316 let immutable_url_prefix = server_url_prefix
317 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/immutable/"))?;
318
319 Ok(vec![Arc::new(LocalUploader::new_without_copy(
320 immutable_url_prefix,
321 FileUploadRetryPolicy::default(),
322 logger,
323 ))])
324 }
325 }
326 } else {
327 Ok(vec![Arc::new(DumbUploader::new(
328 FileUploadRetryPolicy::never(),
329 ))])
330 }
331 }
332
333 async fn build_cardano_database_digests_uploaders(
334 &self,
335 ) -> Result<Vec<Arc<dyn DigestFileUploader>>> {
336 let logger = self.root_logger();
337 if self.configuration.environment() == ExecutionEnvironment::Production {
338 match self.configuration.snapshot_uploader_type() {
339 SnapshotUploaderType::Gcp => {
340 let allow_overwrite = false;
341 let remote_folder_path =
342 CloudRemotePath::new("cardano-database").join("digests");
343
344 Ok(vec![Arc::new(
345 self.build_gcp_uploader(remote_folder_path, allow_overwrite).await?,
346 )])
347 }
348 SnapshotUploaderType::Local => {
349 let server_url_prefix = self.configuration.get_server_url()?;
350 let digests_url_prefix = server_url_prefix
351 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/digests/"))?;
352 let target_dir = self.get_cardano_db_artifacts_dir()?.join("digests");
353
354 std::fs::create_dir_all(&target_dir).map_err(|e| {
355 DependenciesBuilderError::Initialization {
356 message: format!("Cannot create '{target_dir:?}' directory."),
357 error: Some(e.into()),
358 }
359 })?;
360
361 Ok(vec![Arc::new(LocalUploader::new(
362 digests_url_prefix,
363 &target_dir,
364 FileUploadRetryPolicy::default(),
365 logger,
366 ))])
367 }
368 }
369 } else {
370 Ok(vec![Arc::new(DumbUploader::new(
371 FileUploadRetryPolicy::never(),
372 ))])
373 }
374 }
375
376 async fn build_cardano_database_artifact_builder(
377 &mut self,
378 cardano_node_version: Version,
379 ) -> Result<CardanoDatabaseArtifactBuilder> {
380 let snapshot_dir = self.configuration.get_snapshot_dir()?;
381 let immutable_dir = self.get_cardano_db_artifacts_dir()?.join("immutable");
382
383 let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new(
384 self.build_cardano_database_ancillary_uploaders().await?,
385 self.get_snapshotter().await?,
386 self.configuration.get_network()?,
387 self.root_logger(),
388 )?);
389
390 let immutable_builder = Arc::new(ImmutableArtifactBuilder::new(
391 immutable_dir,
392 self.build_cardano_database_immutable_uploaders().await?,
393 self.get_snapshotter().await?,
394 self.root_logger(),
395 )?);
396
397 let digests_path = snapshot_dir.join("pending_cardano_database_digests");
398 let digests_snapshotter = self.build_digests_snapshotter(digests_path.clone()).await?;
399
400 let digest_builder = Arc::new(DigestArtifactBuilder::new(
401 self.configuration.get_server_url()?,
402 self.build_cardano_database_digests_uploaders().await?,
403 digests_snapshotter,
404 self.configuration.get_network()?,
405 digests_path,
406 self.get_immutable_file_digest_mapper().await?,
407 self.root_logger(),
408 )?);
409
410 Ok(CardanoDatabaseArtifactBuilder::new(
411 self.configuration.get_network()?,
412 &cardano_node_version,
413 ancillary_builder,
414 immutable_builder,
415 digest_builder,
416 ))
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use mithril_common::temp_dir_create;
423 use mithril_persistence::sqlite::ConnectionBuilder;
424
425 use crate::ServeCommandConfiguration;
426 use crate::dependency_injection::builder::CARDANO_DB_ARTIFACTS_DIR;
427
428 use super::*;
429
430 #[tokio::test]
431 async fn if_not_local_uploader_create_cardano_database_immutable_dirs() {
432 let snapshot_directory = temp_dir_create!();
433 let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
434 let ancillary_dir = cdb_dir.join("ancillary");
435 let immutable_dir = cdb_dir.join("immutable");
436 let digests_dir = cdb_dir.join("digests");
437
438 let mut dep_builder = {
439 let config = ServeCommandConfiguration {
440 environment: ExecutionEnvironment::Test,
442 ..ServeCommandConfiguration::new_sample(snapshot_directory)
443 };
444
445 DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
446 };
447
448 assert!(!ancillary_dir.exists());
449 assert!(!immutable_dir.exists());
450 assert!(!digests_dir.exists());
451
452 dep_builder
453 .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
454 .await
455 .unwrap();
456
457 assert!(!ancillary_dir.exists());
458 assert!(immutable_dir.exists());
459 assert!(!digests_dir.exists());
460 }
461
462 #[tokio::test]
463 async fn if_local_uploader_creates_all_cardano_database_subdirs() {
464 let snapshot_directory = temp_dir_create!();
465 let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
466 let ancillary_dir = cdb_dir.join("ancillary");
467 let immutable_dir = cdb_dir.join("immutable");
468 let digests_dir = cdb_dir.join("digests");
469
470 let mut dep_builder = {
471 let config = ServeCommandConfiguration {
472 environment: ExecutionEnvironment::Production,
474 snapshot_uploader_type: SnapshotUploaderType::Local,
475 ..ServeCommandConfiguration::new_sample(snapshot_directory)
476 };
477
478 DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
479 };
480 dep_builder.sqlite_connection =
483 Some(Arc::new(ConnectionBuilder::open_memory().build().unwrap()));
484
485 assert!(!ancillary_dir.exists());
486 assert!(!immutable_dir.exists());
487 assert!(!digests_dir.exists());
488
489 dep_builder
490 .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
491 .await
492 .unwrap();
493
494 assert!(ancillary_dir.exists());
495 assert!(immutable_dir.exists());
496 assert!(digests_dir.exists());
497 }
498}