mithril_aggregator/dependency_injection/builder/protocol/
artifacts.rs1use anyhow::Context;
2use semver::Version;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use mithril_common::crypto_helper::ManifestSigner;
7
8use crate::artifact_builder::{
9 AncillaryArtifactBuilder, AncillaryFileUploader, CardanoDatabaseArtifactBuilder,
10 CardanoImmutableFilesFullArtifactBuilder, CardanoStakeDistributionArtifactBuilder,
11 CardanoTransactionsArtifactBuilder, DigestArtifactBuilder, DigestFileUploader,
12 DigestSnapshotter, ImmutableArtifactBuilder, ImmutableFilesUploader,
13 MithrilStakeDistributionArtifactBuilder,
14};
15use crate::configuration::AncillaryFilesSignerConfig;
16use crate::dependency_injection::builder::SNAPSHOT_ARTIFACTS_DIR;
17use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
18use crate::file_uploaders::{
19 CloudRemotePath, FileUploadRetryPolicy, GcpBackendUploader, GcpUploader, LocalUploader,
20};
21use crate::get_dependency;
22use crate::http_server::{CARDANO_DATABASE_DOWNLOAD_PATH, SNAPSHOT_DOWNLOAD_PATH};
23use crate::services::ancillary_signer::{
24 AncillarySigner, AncillarySignerWithGcpKms, AncillarySignerWithSecretKey,
25};
26use crate::services::{
27 CompressedArchiveSnapshotter, DumbSnapshotter, MithrilSignedEntityService, SignedEntityService,
28 SignedEntityServiceArtifactsDependencies, Snapshotter,
29};
30use crate::tools::file_archiver::FileArchiver;
31use crate::{DumbUploader, ExecutionEnvironment, FileUploader, SnapshotUploaderType};
32impl DependenciesBuilder {
33 async fn build_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
34 let logger = self.root_logger();
35 let signed_entity_storer = self.get_signed_entity_storer().await?;
36 let epoch_service = self.get_epoch_service().await?;
37 let mithril_stake_distribution_artifact_builder = Arc::new(
38 MithrilStakeDistributionArtifactBuilder::new(epoch_service.clone()),
39 );
40 let snapshotter = self.get_snapshotter().await?;
41 let snapshot_uploader = self.get_snapshot_uploader().await?;
42 let cardano_node_version = Version::parse(&self.configuration.cardano_node_version())
43 .map_err(|e| DependenciesBuilderError::Initialization { message: format!("Could not parse configuration setting 'cardano_node_version' value '{}' as Semver.", self.configuration.cardano_node_version()), error: Some(e.into()) })?;
44 let cardano_immutable_files_full_artifact_builder =
45 Arc::new(CardanoImmutableFilesFullArtifactBuilder::new(
46 self.configuration.get_network()?,
47 &cardano_node_version,
48 snapshotter,
49 snapshot_uploader,
50 logger.clone(),
51 ));
52 let prover_service = self.get_prover_service().await?;
53 let cardano_transactions_artifact_builder = Arc::new(
54 CardanoTransactionsArtifactBuilder::new(prover_service.clone()),
55 );
56 let stake_store = self.get_stake_store().await?;
57 let cardano_stake_distribution_artifact_builder =
58 Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store));
59 let cardano_database_artifact_builder = Arc::new(
60 self.build_cardano_database_artifact_builder(cardano_node_version)
61 .await?,
62 );
63 let dependencies = SignedEntityServiceArtifactsDependencies::new(
64 mithril_stake_distribution_artifact_builder,
65 cardano_immutable_files_full_artifact_builder,
66 cardano_transactions_artifact_builder,
67 cardano_stake_distribution_artifact_builder,
68 cardano_database_artifact_builder,
69 );
70 let signed_entity_service = Arc::new(MithrilSignedEntityService::new(
71 signed_entity_storer,
72 dependencies,
73 self.get_signed_entity_type_lock().await?,
74 self.get_metrics_service().await?,
75 logger,
76 ));
77
78 if let Some(signed_entity) = signed_entity_service
82 .get_last_cardano_transaction_snapshot()
83 .await?
84 {
85 prover_service
86 .compute_cache(signed_entity.artifact.block_number)
87 .await?;
88 }
89
90 Ok(signed_entity_service)
91 }
92
93 pub async fn get_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
95 get_dependency!(self.signed_entity_service)
96 }
97
98 async fn build_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
99 let archive_verification_directory =
100 std::env::temp_dir().join("mithril_archiver_verify_archive");
101 let file_archiver = Arc::new(FileArchiver::new(
102 self.configuration
103 .zstandard_parameters()
104 .unwrap_or_default(),
105 archive_verification_directory,
106 self.root_logger(),
107 ));
108
109 Ok(file_archiver)
110 }
111
112 async fn get_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
113 get_dependency!(self.file_archiver)
114 }
115
116 async fn get_ancillary_signer(&self) -> Result<Arc<dyn AncillarySigner>> {
117 match &self.configuration.ancillary_files_signer_config() {
118 AncillaryFilesSignerConfig::SecretKey { secret_key } => {
119 let manifest_signer = ManifestSigner::from_secret_key(
120 secret_key
121 .as_str()
122 .try_into()
123 .with_context(|| "Failed to build ancillary signer: Invalid secret key")?,
124 );
125
126 Ok(Arc::new(AncillarySignerWithSecretKey::new(
127 manifest_signer,
128 self.root_logger(),
129 )))
130 }
131 AncillaryFilesSignerConfig::GcpKms {
132 resource_name,
133 credentials_json_env_var,
134 } => {
135 let service = AncillarySignerWithGcpKms::new(
136 resource_name.clone(),
137 credentials_json_env_var.clone(),
138 self.root_logger(),
139 )
140 .await?;
141
142 Ok(Arc::new(service))
143 }
144 }
145 }
146
147 async fn build_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
148 let snapshotter: Arc<dyn Snapshotter> = match self.configuration.environment() {
149 ExecutionEnvironment::Production => {
150 let ongoing_snapshot_directory = self
151 .configuration
152 .get_snapshot_dir()?
153 .join("pending_snapshot");
154
155 Arc::new(CompressedArchiveSnapshotter::new(
156 self.configuration.db_directory().clone(),
157 ongoing_snapshot_directory,
158 self.configuration.snapshot_compression_algorithm(),
159 self.get_file_archiver().await?,
160 self.get_ancillary_signer().await?,
161 self.root_logger(),
162 )?)
163 }
164 _ => Arc::new(DumbSnapshotter::new(
165 self.configuration.snapshot_compression_algorithm(),
166 )),
167 };
168
169 Ok(snapshotter)
170 }
171
172 async fn build_digests_snapshotter(
173 &mut self,
174 digests_path: PathBuf,
175 ) -> Result<DigestSnapshotter> {
176 Ok(DigestSnapshotter {
177 file_archiver: self.get_file_archiver().await?,
178 target_location: digests_path,
179 compression_algorithm: self.configuration.snapshot_compression_algorithm(),
180 })
181 }
182
183 pub async fn get_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
185 get_dependency!(self.snapshotter)
186 }
187
188 async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
189 let logger = self.root_logger();
190 if self.configuration.environment() == ExecutionEnvironment::Production {
191 match self.configuration.snapshot_uploader_type() {
192 SnapshotUploaderType::Gcp => {
193 let allow_overwrite = true;
194 let remote_folder_path = CloudRemotePath::new("cardano-immutable-files-full");
195
196 Ok(Arc::new(
197 self.build_gcp_uploader(remote_folder_path, allow_overwrite)?,
198 ))
199 }
200 SnapshotUploaderType::Local => {
201 let server_url_prefix = self.configuration.get_server_url()?;
202 let snapshot_url_prefix =
203 server_url_prefix.sanitize_join(SNAPSHOT_DOWNLOAD_PATH)?;
204 let snapshot_artifacts_dir = self
205 .configuration
206 .get_snapshot_dir()?
207 .join(SNAPSHOT_ARTIFACTS_DIR);
208 std::fs::create_dir_all(&snapshot_artifacts_dir).map_err(|e| {
209 DependenciesBuilderError::Initialization {
210 message: format!(
211 "Cannot create '{snapshot_artifacts_dir:?}' directory."
212 ),
213 error: Some(e.into()),
214 }
215 })?;
216
217 Ok(Arc::new(LocalUploader::new(
218 snapshot_url_prefix,
219 &snapshot_artifacts_dir,
220 FileUploadRetryPolicy::default(),
221 logger,
222 )))
223 }
224 }
225 } else {
226 Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
227 }
228 }
229
230 pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
232 get_dependency!(self.snapshot_uploader)
233 }
234
235 fn build_gcp_uploader(
236 &self,
237 remote_folder_path: CloudRemotePath,
238 allow_overwrite: bool,
239 ) -> Result<GcpUploader> {
240 let logger = self.root_logger();
241 let bucket = self
242 .configuration
243 .snapshot_bucket_name()
244 .to_owned()
245 .ok_or_else(|| {
246 DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
247 })?;
248
249 Ok(GcpUploader::new(
250 Arc::new(GcpBackendUploader::try_new(
251 bucket,
252 self.configuration.snapshot_use_cdn_domain(),
253 logger.clone(),
254 )?),
255 remote_folder_path,
256 allow_overwrite,
257 FileUploadRetryPolicy::default(),
258 ))
259 }
260
261 fn build_cardano_database_ancillary_uploaders(
262 &self,
263 ) -> Result<Vec<Arc<dyn AncillaryFileUploader>>> {
264 let logger = self.root_logger();
265 if self.configuration.environment() == ExecutionEnvironment::Production {
266 match self.configuration.snapshot_uploader_type() {
267 SnapshotUploaderType::Gcp => {
268 let allow_overwrite = true;
269 let remote_folder_path =
270 CloudRemotePath::new("cardano-database").join("ancillary");
271
272 Ok(vec![Arc::new(self.build_gcp_uploader(
273 remote_folder_path,
274 allow_overwrite,
275 )?)])
276 }
277 SnapshotUploaderType::Local => {
278 let server_url_prefix = self.configuration.get_server_url()?;
279 let ancillary_url_prefix = server_url_prefix
280 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/ancillary/"))?;
281 let target_dir = self.get_cardano_db_artifacts_dir()?.join("ancillary");
282
283 std::fs::create_dir_all(&target_dir).map_err(|e| {
284 DependenciesBuilderError::Initialization {
285 message: format!("Cannot create '{target_dir:?}' directory."),
286 error: Some(e.into()),
287 }
288 })?;
289
290 Ok(vec![Arc::new(LocalUploader::new(
291 ancillary_url_prefix,
292 &target_dir,
293 FileUploadRetryPolicy::default(),
294 logger,
295 ))])
296 }
297 }
298 } else {
299 Ok(vec![Arc::new(DumbUploader::new(
300 FileUploadRetryPolicy::never(),
301 ))])
302 }
303 }
304
305 fn build_cardano_database_immutable_uploaders(
306 &self,
307 ) -> Result<Vec<Arc<dyn ImmutableFilesUploader>>> {
308 let logger = self.root_logger();
309 if self.configuration.environment() == ExecutionEnvironment::Production {
310 match self.configuration.snapshot_uploader_type() {
311 SnapshotUploaderType::Gcp => {
312 let allow_overwrite = false;
313 let remote_folder_path =
314 CloudRemotePath::new("cardano-database").join("immutable");
315
316 Ok(vec![Arc::new(self.build_gcp_uploader(
317 remote_folder_path,
318 allow_overwrite,
319 )?)])
320 }
321 SnapshotUploaderType::Local => {
322 let server_url_prefix = self.configuration.get_server_url()?;
323 let immutable_url_prefix = server_url_prefix
324 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/immutable/"))?;
325
326 Ok(vec![Arc::new(LocalUploader::new_without_copy(
327 immutable_url_prefix,
328 FileUploadRetryPolicy::default(),
329 logger,
330 ))])
331 }
332 }
333 } else {
334 Ok(vec![Arc::new(DumbUploader::new(
335 FileUploadRetryPolicy::never(),
336 ))])
337 }
338 }
339
340 fn build_cardano_database_digests_uploaders(&self) -> Result<Vec<Arc<dyn DigestFileUploader>>> {
341 let logger = self.root_logger();
342 if self.configuration.environment() == ExecutionEnvironment::Production {
343 match self.configuration.snapshot_uploader_type() {
344 SnapshotUploaderType::Gcp => {
345 let allow_overwrite = false;
346 let remote_folder_path =
347 CloudRemotePath::new("cardano-database").join("digests");
348
349 Ok(vec![Arc::new(self.build_gcp_uploader(
350 remote_folder_path,
351 allow_overwrite,
352 )?)])
353 }
354 SnapshotUploaderType::Local => {
355 let server_url_prefix = self.configuration.get_server_url()?;
356 let digests_url_prefix = server_url_prefix
357 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/digests/"))?;
358 let target_dir = self.get_cardano_db_artifacts_dir()?.join("digests");
359
360 std::fs::create_dir_all(&target_dir).map_err(|e| {
361 DependenciesBuilderError::Initialization {
362 message: format!("Cannot create '{target_dir:?}' directory."),
363 error: Some(e.into()),
364 }
365 })?;
366
367 Ok(vec![Arc::new(LocalUploader::new(
368 digests_url_prefix,
369 &target_dir,
370 FileUploadRetryPolicy::default(),
371 logger,
372 ))])
373 }
374 }
375 } else {
376 Ok(vec![Arc::new(DumbUploader::new(
377 FileUploadRetryPolicy::never(),
378 ))])
379 }
380 }
381
382 async fn build_cardano_database_artifact_builder(
383 &mut self,
384 cardano_node_version: Version,
385 ) -> Result<CardanoDatabaseArtifactBuilder> {
386 let snapshot_dir = self.configuration.get_snapshot_dir()?;
387 let immutable_dir = self.get_cardano_db_artifacts_dir()?.join("immutable");
388
389 let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new(
390 self.build_cardano_database_ancillary_uploaders()?,
391 self.get_snapshotter().await?,
392 self.configuration.get_network()?,
393 self.root_logger(),
394 )?);
395
396 let immutable_builder = Arc::new(ImmutableArtifactBuilder::new(
397 immutable_dir,
398 self.build_cardano_database_immutable_uploaders()?,
399 self.get_snapshotter().await?,
400 self.root_logger(),
401 )?);
402
403 let digests_path = snapshot_dir.join("pending_cardano_database_digests");
404 let digests_snapshotter = self.build_digests_snapshotter(digests_path.clone()).await?;
405
406 let digest_builder = Arc::new(DigestArtifactBuilder::new(
407 self.configuration.get_server_url()?,
408 self.build_cardano_database_digests_uploaders()?,
409 digests_snapshotter,
410 self.configuration.get_network()?,
411 digests_path,
412 self.get_immutable_file_digest_mapper().await?,
413 self.root_logger(),
414 )?);
415
416 Ok(CardanoDatabaseArtifactBuilder::new(
417 self.configuration.get_network()?,
418 &cardano_node_version,
419 ancillary_builder,
420 immutable_builder,
421 digest_builder,
422 ))
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use mithril_common::temp_dir_create;
429 use mithril_persistence::sqlite::ConnectionBuilder;
430
431 use crate::dependency_injection::builder::CARDANO_DB_ARTIFACTS_DIR;
432 use crate::ServeCommandConfiguration;
433
434 use super::*;
435
436 #[tokio::test]
437 async fn if_not_local_uploader_create_cardano_database_immutable_dirs() {
438 let snapshot_directory = temp_dir_create!();
439 let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
440 let ancillary_dir = cdb_dir.join("ancillary");
441 let immutable_dir = cdb_dir.join("immutable");
442 let digests_dir = cdb_dir.join("digests");
443
444 let mut dep_builder = {
445 let config = ServeCommandConfiguration {
446 environment: ExecutionEnvironment::Test,
448 ..ServeCommandConfiguration::new_sample(snapshot_directory)
449 };
450
451 DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
452 };
453
454 assert!(!ancillary_dir.exists());
455 assert!(!immutable_dir.exists());
456 assert!(!digests_dir.exists());
457
458 dep_builder
459 .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
460 .await
461 .unwrap();
462
463 assert!(!ancillary_dir.exists());
464 assert!(immutable_dir.exists());
465 assert!(!digests_dir.exists());
466 }
467
468 #[tokio::test]
469 async fn if_local_uploader_creates_all_cardano_database_subdirs() {
470 let snapshot_directory = temp_dir_create!();
471 let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
472 let ancillary_dir = cdb_dir.join("ancillary");
473 let immutable_dir = cdb_dir.join("immutable");
474 let digests_dir = cdb_dir.join("digests");
475
476 let mut dep_builder = {
477 let config = ServeCommandConfiguration {
478 environment: ExecutionEnvironment::Production,
480 snapshot_uploader_type: SnapshotUploaderType::Local,
481 ..ServeCommandConfiguration::new_sample(snapshot_directory)
482 };
483
484 DependenciesBuilder::new_with_stdout_logger(Arc::new(config))
485 };
486 dep_builder.sqlite_connection =
489 Some(Arc::new(ConnectionBuilder::open_memory().build().unwrap()));
490
491 assert!(!ancillary_dir.exists());
492 assert!(!immutable_dir.exists());
493 assert!(!digests_dir.exists());
494
495 dep_builder
496 .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
497 .await
498 .unwrap();
499
500 assert!(ancillary_dir.exists());
501 assert!(immutable_dir.exists());
502 assert!(digests_dir.exists());
503 }
504}