mithril_aggregator/dependency_injection/builder/protocol/
artifacts.rs1use anyhow::Context;
2use semver::Version;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use mithril_common::crypto_helper::ManifestSigner;
7
8use crate::artifact_builder::{
9 AncillaryArtifactBuilder, AncillaryFileUploader, CardanoDatabaseArtifactBuilder,
10 CardanoImmutableFilesFullArtifactBuilder, CardanoStakeDistributionArtifactBuilder,
11 CardanoTransactionsArtifactBuilder, DigestArtifactBuilder, DigestFileUploader,
12 DigestSnapshotter, ImmutableArtifactBuilder, ImmutableFilesUploader,
13 MithrilStakeDistributionArtifactBuilder,
14};
15use crate::configuration::AncillaryFilesSignerConfig;
16use crate::dependency_injection::builder::SNAPSHOT_ARTIFACTS_DIR;
17use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
18use crate::file_uploaders::{
19 CloudRemotePath, FileUploadRetryPolicy, GcpBackendUploader, GcpUploader, LocalUploader,
20};
21use crate::http_server::{CARDANO_DATABASE_DOWNLOAD_PATH, SNAPSHOT_DOWNLOAD_PATH};
22use crate::services::ancillary_signer::{AncillarySigner, AncillarySignerWithSecretKey};
23use crate::services::{
24 CompressedArchiveSnapshotter, DumbSnapshotter, MithrilSignedEntityService, SignedEntityService,
25 SignedEntityServiceArtifactsDependencies, Snapshotter,
26};
27use crate::tools::file_archiver::FileArchiver;
28use crate::{DumbUploader, ExecutionEnvironment, FileUploader, SnapshotUploaderType};
29
30impl DependenciesBuilder {
31 async fn build_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
32 let logger = self.root_logger();
33 let signed_entity_storer = self.get_signed_entity_storer().await?;
34 let epoch_service = self.get_epoch_service().await?;
35 let mithril_stake_distribution_artifact_builder = Arc::new(
36 MithrilStakeDistributionArtifactBuilder::new(epoch_service.clone()),
37 );
38 let snapshotter = self.get_snapshotter().await?;
39 let snapshot_uploader = self.get_snapshot_uploader().await?;
40 let cardano_node_version = Version::parse(&self.configuration.cardano_node_version)
41 .map_err(|e| DependenciesBuilderError::Initialization { message: format!("Could not parse configuration setting 'cardano_node_version' value '{}' as Semver.", self.configuration.cardano_node_version), error: Some(e.into()) })?;
42 let cardano_immutable_files_full_artifact_builder =
43 Arc::new(CardanoImmutableFilesFullArtifactBuilder::new(
44 self.configuration.get_network()?,
45 &cardano_node_version,
46 snapshotter,
47 snapshot_uploader,
48 logger.clone(),
49 ));
50 let prover_service = self.get_prover_service().await?;
51 let cardano_transactions_artifact_builder = Arc::new(
52 CardanoTransactionsArtifactBuilder::new(prover_service.clone()),
53 );
54 let stake_store = self.get_stake_store().await?;
55 let cardano_stake_distribution_artifact_builder =
56 Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store));
57 let cardano_database_artifact_builder = Arc::new(
58 self.build_cardano_database_artifact_builder(cardano_node_version)
59 .await?,
60 );
61 let dependencies = SignedEntityServiceArtifactsDependencies::new(
62 mithril_stake_distribution_artifact_builder,
63 cardano_immutable_files_full_artifact_builder,
64 cardano_transactions_artifact_builder,
65 cardano_stake_distribution_artifact_builder,
66 cardano_database_artifact_builder,
67 );
68 let signed_entity_service = Arc::new(MithrilSignedEntityService::new(
69 signed_entity_storer,
70 dependencies,
71 self.get_signed_entity_lock().await?,
72 self.get_metrics_service().await?,
73 logger,
74 ));
75
76 if let Some(signed_entity) = signed_entity_service
80 .get_last_cardano_transaction_snapshot()
81 .await?
82 {
83 prover_service
84 .compute_cache(signed_entity.artifact.block_number)
85 .await?;
86 }
87
88 Ok(signed_entity_service)
89 }
90
91 pub async fn get_signed_entity_service(&mut self) -> Result<Arc<dyn SignedEntityService>> {
93 if self.signed_entity_service.is_none() {
94 self.signed_entity_service = Some(self.build_signed_entity_service().await?);
95 }
96
97 Ok(self.signed_entity_service.as_ref().cloned().unwrap())
98 }
99
100 async fn build_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
101 let archive_verification_directory =
102 std::env::temp_dir().join("mithril_archiver_verify_archive");
103 let file_archiver = Arc::new(FileArchiver::new(
104 self.configuration.zstandard_parameters.unwrap_or_default(),
105 archive_verification_directory,
106 self.root_logger(),
107 ));
108
109 Ok(file_archiver)
110 }
111
112 async fn get_file_archiver(&mut self) -> Result<Arc<FileArchiver>> {
113 if self.file_archiver.is_none() {
114 self.file_archiver = Some(self.build_file_archiver().await?);
115 }
116
117 Ok(self.file_archiver.as_ref().cloned().unwrap())
118 }
119
120 async fn get_ancillary_signer(&self) -> Result<Arc<dyn AncillarySigner>> {
121 match &self.configuration.ancillary_files_signer_config {
122 AncillaryFilesSignerConfig::SecretKey { secret_key } => {
123 let manifest_signer = ManifestSigner::from_secret_key(
124 secret_key
125 .as_str()
126 .try_into()
127 .with_context(|| "Failed to build ancillary signer: Invalid secret key")?,
128 );
129
130 Ok(Arc::new(AncillarySignerWithSecretKey::new(
131 manifest_signer,
132 self.root_logger(),
133 )))
134 }
135 }
136 }
137
138 async fn build_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
139 let snapshotter: Arc<dyn Snapshotter> = match self.configuration.environment {
140 ExecutionEnvironment::Production => {
141 let ongoing_snapshot_directory = self
142 .configuration
143 .get_snapshot_dir()?
144 .join("pending_snapshot");
145
146 Arc::new(CompressedArchiveSnapshotter::new(
147 self.configuration.db_directory.clone(),
148 ongoing_snapshot_directory,
149 self.configuration.snapshot_compression_algorithm,
150 self.get_file_archiver().await?,
151 self.get_ancillary_signer().await?,
152 self.root_logger(),
153 )?)
154 }
155 _ => Arc::new(DumbSnapshotter::new(
156 self.configuration.snapshot_compression_algorithm,
157 )),
158 };
159
160 Ok(snapshotter)
161 }
162
163 async fn build_digests_snapshotter(
164 &mut self,
165 digests_path: PathBuf,
166 ) -> Result<DigestSnapshotter> {
167 Ok(DigestSnapshotter {
168 file_archiver: self.get_file_archiver().await?,
169 target_location: digests_path,
170 compression_algorithm: self.configuration.snapshot_compression_algorithm,
171 })
172 }
173
174 pub async fn get_snapshotter(&mut self) -> Result<Arc<dyn Snapshotter>> {
176 if self.snapshotter.is_none() {
177 self.snapshotter = Some(self.build_snapshotter().await?);
178 }
179
180 Ok(self.snapshotter.as_ref().cloned().unwrap())
181 }
182
183 async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
184 let logger = self.root_logger();
185 if self.configuration.environment == ExecutionEnvironment::Production {
186 match self.configuration.snapshot_uploader_type {
187 SnapshotUploaderType::Gcp => {
188 let allow_overwrite = true;
189 let remote_folder_path = CloudRemotePath::new("cardano-immutable-files-full");
190
191 Ok(Arc::new(
192 self.build_gcp_uploader(remote_folder_path, allow_overwrite)?,
193 ))
194 }
195 SnapshotUploaderType::Local => {
196 let server_url_prefix = self.configuration.get_server_url()?;
197 let snapshot_url_prefix =
198 server_url_prefix.sanitize_join(SNAPSHOT_DOWNLOAD_PATH)?;
199 let snapshot_artifacts_dir = self
200 .configuration
201 .get_snapshot_dir()?
202 .join(SNAPSHOT_ARTIFACTS_DIR);
203 std::fs::create_dir_all(&snapshot_artifacts_dir).map_err(|e| {
204 DependenciesBuilderError::Initialization {
205 message: format!(
206 "Cannot create '{snapshot_artifacts_dir:?}' directory."
207 ),
208 error: Some(e.into()),
209 }
210 })?;
211
212 Ok(Arc::new(LocalUploader::new(
213 snapshot_url_prefix,
214 &snapshot_artifacts_dir,
215 FileUploadRetryPolicy::default(),
216 logger,
217 )))
218 }
219 }
220 } else {
221 Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
222 }
223 }
224
225 pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
227 if self.snapshot_uploader.is_none() {
228 self.snapshot_uploader = Some(self.build_snapshot_uploader().await?);
229 }
230
231 Ok(self.snapshot_uploader.as_ref().cloned().unwrap())
232 }
233
234 fn build_gcp_uploader(
235 &self,
236 remote_folder_path: CloudRemotePath,
237 allow_overwrite: bool,
238 ) -> Result<GcpUploader> {
239 let logger = self.root_logger();
240 let bucket = self
241 .configuration
242 .snapshot_bucket_name
243 .to_owned()
244 .ok_or_else(|| {
245 DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
246 })?;
247
248 Ok(GcpUploader::new(
249 Arc::new(GcpBackendUploader::try_new(
250 bucket,
251 self.configuration.snapshot_use_cdn_domain,
252 logger.clone(),
253 )?),
254 remote_folder_path,
255 allow_overwrite,
256 FileUploadRetryPolicy::default(),
257 ))
258 }
259
260 fn build_cardano_database_ancillary_uploaders(
261 &self,
262 ) -> Result<Vec<Arc<dyn AncillaryFileUploader>>> {
263 let logger = self.root_logger();
264 if self.configuration.environment == ExecutionEnvironment::Production {
265 match self.configuration.snapshot_uploader_type {
266 SnapshotUploaderType::Gcp => {
267 let allow_overwrite = true;
268 let remote_folder_path =
269 CloudRemotePath::new("cardano-database").join("ancillary");
270
271 Ok(vec![Arc::new(self.build_gcp_uploader(
272 remote_folder_path,
273 allow_overwrite,
274 )?)])
275 }
276 SnapshotUploaderType::Local => {
277 let server_url_prefix = self.configuration.get_server_url()?;
278 let ancillary_url_prefix = server_url_prefix
279 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/ancillary/"))?;
280 let target_dir = self.get_cardano_db_artifacts_dir()?.join("ancillary");
281
282 std::fs::create_dir_all(&target_dir).map_err(|e| {
283 DependenciesBuilderError::Initialization {
284 message: format!("Cannot create '{target_dir:?}' directory."),
285 error: Some(e.into()),
286 }
287 })?;
288
289 Ok(vec![Arc::new(LocalUploader::new(
290 ancillary_url_prefix,
291 &target_dir,
292 FileUploadRetryPolicy::default(),
293 logger,
294 ))])
295 }
296 }
297 } else {
298 Ok(vec![Arc::new(DumbUploader::new(
299 FileUploadRetryPolicy::never(),
300 ))])
301 }
302 }
303
304 fn build_cardano_database_immutable_uploaders(
305 &self,
306 ) -> Result<Vec<Arc<dyn ImmutableFilesUploader>>> {
307 let logger = self.root_logger();
308 if self.configuration.environment == ExecutionEnvironment::Production {
309 match self.configuration.snapshot_uploader_type {
310 SnapshotUploaderType::Gcp => {
311 let allow_overwrite = false;
312 let remote_folder_path =
313 CloudRemotePath::new("cardano-database").join("immutable");
314
315 Ok(vec![Arc::new(self.build_gcp_uploader(
316 remote_folder_path,
317 allow_overwrite,
318 )?)])
319 }
320 SnapshotUploaderType::Local => {
321 let server_url_prefix = self.configuration.get_server_url()?;
322 let immutable_url_prefix = server_url_prefix
323 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/immutable/"))?;
324
325 Ok(vec![Arc::new(LocalUploader::new_without_copy(
326 immutable_url_prefix,
327 FileUploadRetryPolicy::default(),
328 logger,
329 ))])
330 }
331 }
332 } else {
333 Ok(vec![Arc::new(DumbUploader::new(
334 FileUploadRetryPolicy::never(),
335 ))])
336 }
337 }
338
339 fn build_cardano_database_digests_uploaders(&self) -> Result<Vec<Arc<dyn DigestFileUploader>>> {
340 let logger = self.root_logger();
341 if self.configuration.environment == ExecutionEnvironment::Production {
342 match self.configuration.snapshot_uploader_type {
343 SnapshotUploaderType::Gcp => {
344 let allow_overwrite = false;
345 let remote_folder_path =
346 CloudRemotePath::new("cardano-database").join("digests");
347
348 Ok(vec![Arc::new(self.build_gcp_uploader(
349 remote_folder_path,
350 allow_overwrite,
351 )?)])
352 }
353 SnapshotUploaderType::Local => {
354 let server_url_prefix = self.configuration.get_server_url()?;
355 let digests_url_prefix = server_url_prefix
356 .sanitize_join(&format!("{CARDANO_DATABASE_DOWNLOAD_PATH}/digests/"))?;
357 let target_dir = self.get_cardano_db_artifacts_dir()?.join("digests");
358
359 std::fs::create_dir_all(&target_dir).map_err(|e| {
360 DependenciesBuilderError::Initialization {
361 message: format!("Cannot create '{target_dir:?}' directory."),
362 error: Some(e.into()),
363 }
364 })?;
365
366 Ok(vec![Arc::new(LocalUploader::new(
367 digests_url_prefix,
368 &target_dir,
369 FileUploadRetryPolicy::default(),
370 logger,
371 ))])
372 }
373 }
374 } else {
375 Ok(vec![Arc::new(DumbUploader::new(
376 FileUploadRetryPolicy::never(),
377 ))])
378 }
379 }
380
381 async fn build_cardano_database_artifact_builder(
382 &mut self,
383 cardano_node_version: Version,
384 ) -> Result<CardanoDatabaseArtifactBuilder> {
385 let snapshot_dir = self.configuration.get_snapshot_dir()?;
386 let immutable_dir = self.get_cardano_db_artifacts_dir()?.join("immutable");
387
388 let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new(
389 self.build_cardano_database_ancillary_uploaders()?,
390 self.get_snapshotter().await?,
391 self.configuration.get_network()?,
392 self.root_logger(),
393 )?);
394
395 let immutable_builder = Arc::new(ImmutableArtifactBuilder::new(
396 immutable_dir,
397 self.build_cardano_database_immutable_uploaders()?,
398 self.get_snapshotter().await?,
399 self.root_logger(),
400 )?);
401
402 let digests_path = snapshot_dir.join("pending_cardano_database_digests");
403 let digests_snapshotter = self.build_digests_snapshotter(digests_path.clone()).await?;
404
405 let digest_builder = Arc::new(DigestArtifactBuilder::new(
406 self.configuration.get_server_url()?,
407 self.build_cardano_database_digests_uploaders()?,
408 digests_snapshotter,
409 self.configuration.get_network()?,
410 digests_path,
411 self.get_immutable_file_digest_mapper().await?,
412 self.root_logger(),
413 )?);
414
415 Ok(CardanoDatabaseArtifactBuilder::new(
416 self.configuration.get_network()?,
417 &cardano_node_version,
418 ancillary_builder,
419 immutable_builder,
420 digest_builder,
421 ))
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use mithril_common::temp_dir_create;
428 use mithril_persistence::sqlite::ConnectionBuilder;
429
430 use crate::dependency_injection::builder::CARDANO_DB_ARTIFACTS_DIR;
431 use crate::Configuration;
432
433 use super::*;
434
435 #[tokio::test]
436 async fn if_not_local_uploader_create_cardano_database_immutable_dirs() {
437 let snapshot_directory = temp_dir_create!();
438 let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
439 let ancillary_dir = cdb_dir.join("ancillary");
440 let immutable_dir = cdb_dir.join("immutable");
441 let digests_dir = cdb_dir.join("digests");
442
443 let mut dep_builder = {
444 let config = Configuration {
445 environment: ExecutionEnvironment::Test,
447 ..Configuration::new_sample(snapshot_directory)
448 };
449
450 DependenciesBuilder::new_with_stdout_logger(config)
451 };
452
453 assert!(!ancillary_dir.exists());
454 assert!(!immutable_dir.exists());
455 assert!(!digests_dir.exists());
456
457 dep_builder
458 .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
459 .await
460 .unwrap();
461
462 assert!(!ancillary_dir.exists());
463 assert!(immutable_dir.exists());
464 assert!(!digests_dir.exists());
465 }
466
467 #[tokio::test]
468 async fn if_local_uploader_creates_all_cardano_database_subdirs() {
469 let snapshot_directory = temp_dir_create!();
470 let cdb_dir = snapshot_directory.join(CARDANO_DB_ARTIFACTS_DIR);
471 let ancillary_dir = cdb_dir.join("ancillary");
472 let immutable_dir = cdb_dir.join("immutable");
473 let digests_dir = cdb_dir.join("digests");
474
475 let mut dep_builder = {
476 let config = Configuration {
477 environment: ExecutionEnvironment::Production,
479 snapshot_uploader_type: SnapshotUploaderType::Local,
480 ..Configuration::new_sample(snapshot_directory)
481 };
482
483 DependenciesBuilder::new_with_stdout_logger(config)
484 };
485 dep_builder.sqlite_connection =
488 Some(Arc::new(ConnectionBuilder::open_memory().build().unwrap()));
489
490 assert!(!ancillary_dir.exists());
491 assert!(!immutable_dir.exists());
492 assert!(!digests_dir.exists());
493
494 dep_builder
495 .build_cardano_database_artifact_builder(Version::parse("1.0.0").unwrap())
496 .await
497 .unwrap();
498
499 assert!(ancillary_dir.exists());
500 assert!(immutable_dir.exists());
501 assert!(digests_dir.exists());
502 }
503}