1use std::{
2 fs,
3 path::{Path, PathBuf},
4 sync::Arc,
5};
6
7use anyhow::Context;
8use async_trait::async_trait;
9use mithril_common::{
10 entities::{CardanoDbBeacon, CompressionAlgorithm, DigestLocation},
11 logging::LoggerExtensions,
12 messages::CardanoDatabaseDigestListItemMessage,
13 CardanoNetwork, StdResult,
14};
15use slog::{error, Logger};
16
17use crate::{
18 file_uploaders::{GcpUploader, LocalUploader},
19 tools::{
20 file_archiver::{appender::AppenderFile, ArchiveParameters, FileArchive, FileArchiver},
21 url_sanitizer::SanitizedUrlWithTrailingSlash,
22 },
23 DumbUploader, FileUploader, ImmutableFileDigestMapper,
24};
25
26#[cfg_attr(test, mockall::automock)]
28#[async_trait]
29pub trait DigestFileUploader: Send + Sync {
30 async fn upload(
32 &self,
33 filepath: &Path,
34 compression_algorithm: Option<CompressionAlgorithm>,
35 ) -> StdResult<DigestLocation>;
36}
37
38#[async_trait]
39impl DigestFileUploader for DumbUploader {
40 async fn upload(
41 &self,
42 filepath: &Path,
43 compression_algorithm: Option<CompressionAlgorithm>,
44 ) -> StdResult<DigestLocation> {
45 let uri = FileUploader::upload(self, filepath).await?.into();
46
47 Ok(DigestLocation::CloudStorage {
48 uri,
49 compression_algorithm,
50 })
51 }
52}
53
54#[async_trait]
55impl DigestFileUploader for LocalUploader {
56 async fn upload(
57 &self,
58 filepath: &Path,
59 compression_algorithm: Option<CompressionAlgorithm>,
60 ) -> StdResult<DigestLocation> {
61 let uri = FileUploader::upload(self, filepath).await?.into();
62
63 Ok(DigestLocation::CloudStorage {
64 uri,
65 compression_algorithm,
66 })
67 }
68}
69
70#[async_trait]
71impl DigestFileUploader for GcpUploader {
72 async fn upload(
73 &self,
74 filepath: &Path,
75 compression_algorithm: Option<CompressionAlgorithm>,
76 ) -> StdResult<DigestLocation> {
77 let uri = FileUploader::upload(self, filepath).await?.into();
78
79 Ok(DigestLocation::CloudStorage {
80 uri,
81 compression_algorithm,
82 })
83 }
84}
85
86#[derive(Debug)]
87pub struct DigestUpload {
88 pub locations: Vec<DigestLocation>,
89 pub size: u64,
90}
91
92pub struct DigestSnapshotter {
93 pub file_archiver: Arc<FileArchiver>,
94 pub target_location: PathBuf,
95 pub compression_algorithm: CompressionAlgorithm,
96}
97
98impl DigestSnapshotter {
99 fn create_archive_file(
100 &self,
101 filename_without_extensions: &str,
102 digest_file_path: &Path,
103 ) -> StdResult<FileArchive> {
104 let digests_archive = self
105 .file_archiver
106 .archive(
107 ArchiveParameters {
108 archive_name_without_extension: filename_without_extensions.to_string(),
109 target_directory: self.target_location.clone(),
110 compression_algorithm: self.compression_algorithm,
111 },
112 AppenderFile::append_at_archive_root(digest_file_path.to_path_buf())?,
113 )
114 .with_context(|| {
115 format!(
116 "Could not create snapshot of digest file: '{}'",
117 digest_file_path.display()
118 )
119 })?;
120
121 Ok(digests_archive)
122 }
123}
124
125pub struct DigestArtifactBuilder {
126 aggregator_url_prefix: SanitizedUrlWithTrailingSlash,
128
129 uploaders: Vec<Arc<dyn DigestFileUploader>>,
131
132 digest_snapshotter: DigestSnapshotter,
133
134 network: CardanoNetwork,
135
136 digests_dir: PathBuf,
137
138 immutable_file_digest_mapper: Arc<dyn ImmutableFileDigestMapper>,
139
140 logger: Logger,
141}
142
143impl DigestArtifactBuilder {
144 pub fn new(
146 aggregator_url_prefix: SanitizedUrlWithTrailingSlash,
147 uploaders: Vec<Arc<dyn DigestFileUploader>>,
148 digest_snapshotter: DigestSnapshotter,
149 network: CardanoNetwork,
150 digests_dir: PathBuf,
151 immutable_file_digest_mapper: Arc<dyn ImmutableFileDigestMapper>,
152 logger: Logger,
153 ) -> StdResult<Self> {
154 Ok(Self {
155 aggregator_url_prefix,
156 uploaders,
157 digest_snapshotter,
158 network,
159 digests_dir,
160 immutable_file_digest_mapper,
161 logger: logger.new_with_component_name::<Self>(),
162 })
163 }
164
165 pub async fn upload(&self, beacon: &CardanoDbBeacon) -> StdResult<DigestUpload> {
166 let filename_without_extensions =
167 Self::get_digests_file_name_without_extension(&self.network, beacon);
168 let digest_path = self
169 .create_digest_file(&filename_without_extensions)
170 .await?;
171 let digest_archive = self
172 .digest_snapshotter
173 .create_archive_file(&filename_without_extensions, &digest_path)?;
174
175 let locations = self.upload_digest_file(&digest_archive).await;
176
177 self.cleanup_uploaded_artifacts(&digest_path, &digest_archive)?;
178
179 Ok(DigestUpload {
180 locations: locations?,
181 size: digest_archive.get_uncompressed_size(),
182 })
183 }
184
185 fn cleanup_uploaded_artifacts(
186 &self,
187 digest_path: &PathBuf,
188 digest_archive: &FileArchive,
189 ) -> StdResult<()> {
190 fs::remove_file(digest_path).with_context(|| {
191 format!("Could not remove digest file: '{}'", digest_path.display())
192 })?;
193
194 let digest_archive_path = digest_archive.get_file_path();
195 if digest_archive_path.exists() {
196 fs::remove_file(digest_archive_path).with_context(|| {
197 format!(
198 "Could not remove digest archive file: '{}'",
199 digest_archive_path.display()
200 )
201 })?;
202 }
203 Ok(())
204 }
205
206 async fn create_digest_file(&self, filename_without_extensions: &str) -> StdResult<PathBuf> {
207 let immutable_file_digest_map = self
208 .immutable_file_digest_mapper
209 .get_immutable_file_digest_map()
210 .await?
211 .into_iter()
212 .map(
213 |(immutable_file_name, digest)| CardanoDatabaseDigestListItemMessage {
214 immutable_file_name,
215 digest,
216 },
217 )
218 .collect::<Vec<_>>();
219
220 let digests_file_path = self
221 .digests_dir
222 .join(format!("{filename_without_extensions}.json"));
223
224 if let Some(digests_dir) = digests_file_path.parent() {
225 fs::create_dir_all(digests_dir).with_context(|| {
226 format!(
227 "Can not create digests directory: '{}'",
228 digests_dir.display()
229 )
230 })?;
231 }
232
233 let digest_file = fs::File::create(digests_file_path.clone())?;
234 serde_json::to_writer(digest_file, &immutable_file_digest_map)?;
235
236 Ok(digests_file_path)
237 }
238
239 async fn upload_digest_file(
241 &self,
242 digest_archive: &FileArchive,
243 ) -> StdResult<Vec<DigestLocation>> {
244 let mut locations = Vec::<DigestLocation>::new();
245 for uploader in &self.uploaders {
246 let result = uploader
247 .upload(
248 digest_archive.get_file_path(),
249 Some(digest_archive.get_compression_algorithm()),
250 )
251 .await;
252 match result {
253 Ok(location) => {
254 locations.push(location);
255 }
256 Err(e) => {
257 error!(
258 self.logger,
259 "Failed to upload digest file";
260 "error" => e.to_string()
261 );
262 }
263 }
264 }
265
266 locations.push(self.aggregator_digests_route_location()?);
267
268 Ok(locations)
269 }
270
271 fn aggregator_digests_route_location(&self) -> StdResult<DigestLocation> {
272 Ok(DigestLocation::Aggregator {
273 uri: self
274 .aggregator_url_prefix
275 .join("artifact/cardano-database/digests")?
276 .to_string(),
277 })
278 }
279
280 fn get_digests_file_name_without_extension(
281 network: &CardanoNetwork,
282 beacon: &CardanoDbBeacon,
283 ) -> String {
284 format!(
285 "{}-e{}-i{}.digests",
286 network, *beacon.epoch, beacon.immutable_file_number
287 )
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use anyhow::anyhow;
294 use flate2::read::GzDecoder;
295 use std::{
296 collections::BTreeMap,
297 fs::{read_to_string, File},
298 };
299 use tar::Archive;
300
301 use mithril_common::{
302 current_function,
303 entities::{CardanoDbBeacon, CompressionAlgorithm},
304 messages::{CardanoDatabaseDigestListItemMessage, CardanoDatabaseDigestListMessage},
305 test_utils::{assert_equivalent, TempDir},
306 };
307
308 use crate::{
309 file_uploaders::FileUploadRetryPolicy,
310 immutable_file_digest_mapper::MockImmutableFileDigestMapper, test_tools::TestLogger,
311 tools::file_archiver::FileArchiver,
312 };
313
314 use super::*;
315
316 fn fake_uploader_returning_error() -> MockDigestFileUploader {
317 let mut uploader = MockDigestFileUploader::new();
318 uploader
319 .expect_upload()
320 .return_once(|_, _| Err(anyhow!("Failure while uploading...")));
321
322 uploader
323 }
324
325 fn fake_uploader(
326 location_uri: &str,
327 compression_algorithm: Option<CompressionAlgorithm>,
328 ) -> MockDigestFileUploader {
329 let uri = location_uri.to_string();
330 let mut uploader = MockDigestFileUploader::new();
331 uploader.expect_upload().times(1).return_once(move |_, _| {
332 Ok(DigestLocation::CloudStorage {
333 uri,
334 compression_algorithm,
335 })
336 });
337
338 uploader
339 }
340
341 fn path_content(path: &Path) -> Vec<PathBuf> {
342 std::fs::read_dir(path)
343 .unwrap()
344 .map(|res| res.unwrap().path())
345 .collect()
346 }
347
348 fn build_local_uploader(path: &Path) -> LocalUploader {
349 std::fs::create_dir_all(path).unwrap();
350 LocalUploader::new(
351 SanitizedUrlWithTrailingSlash::parse("http://server/").unwrap(),
352 path,
353 FileUploadRetryPolicy::never(),
354 TestLogger::stdout(),
355 )
356 }
357
358 fn build_dummy_immutable_file_digest_mapper() -> MockImmutableFileDigestMapper {
359 let mut immutable_file_digest_mapper = MockImmutableFileDigestMapper::new();
360 immutable_file_digest_mapper
361 .expect_get_immutable_file_digest_map()
362 .returning(|| Ok(BTreeMap::new()));
363 immutable_file_digest_mapper
364 }
365
366 fn unpack_archive(archive_path: &Path, unpack_dir: &Path) -> StdResult<()> {
367 let mut archive = {
368 let file_tar_gz = File::open(archive_path)?;
369 let file_tar_gz_decoder = GzDecoder::new(file_tar_gz);
370 Archive::new(file_tar_gz_decoder)
371 };
372
373 archive.unpack(unpack_dir)?;
374 Ok(())
375 }
376
377 #[tokio::test]
378 async fn digest_artifact_builder_return_digests_route_on_aggregator() {
379 let temp_dir = TempDir::create("digest", current_function!());
380
381 let mut immutable_file_digest_mapper = MockImmutableFileDigestMapper::new();
382 immutable_file_digest_mapper
383 .expect_get_immutable_file_digest_map()
384 .returning(|| Ok(BTreeMap::new()));
385
386 let builder = DigestArtifactBuilder::new(
387 SanitizedUrlWithTrailingSlash::parse("https://aggregator/").unwrap(),
388 vec![],
389 DigestSnapshotter {
390 file_archiver: Arc::new(FileArchiver::new_for_test(temp_dir.join("verification"))),
391 target_location: temp_dir.clone(),
392 compression_algorithm: CompressionAlgorithm::Gzip,
393 },
394 CardanoNetwork::DevNet(123),
395 temp_dir,
396 Arc::new(immutable_file_digest_mapper),
397 TestLogger::stdout(),
398 )
399 .unwrap();
400
401 let upload_info = builder.upload(&CardanoDbBeacon::new(4, 123)).await.unwrap();
402 assert_eq!(
403 vec!(DigestLocation::Aggregator {
404 uri: "https://aggregator/artifact/cardano-database/digests".to_string()
405 }),
406 upload_info.locations
407 );
408 }
409
410 #[tokio::test]
411 async fn digest_artifact_builder_return_size_of_digest_file() {
412 let temp_dir = TempDir::create("digest", current_function!());
413
414 let mut immutable_file_digest_mapper = MockImmutableFileDigestMapper::new();
415 immutable_file_digest_mapper
416 .expect_get_immutable_file_digest_map()
417 .returning(|| Ok(BTreeMap::new()));
418
419 let builder = DigestArtifactBuilder::new(
420 SanitizedUrlWithTrailingSlash::parse("https://aggregator/").unwrap(),
421 vec![],
422 DigestSnapshotter {
423 file_archiver: Arc::new(FileArchiver::new_for_test(temp_dir.join("verification"))),
424 target_location: temp_dir.clone(),
425 compression_algorithm: CompressionAlgorithm::Gzip,
426 },
427 CardanoNetwork::DevNet(123),
428 temp_dir,
429 Arc::new(immutable_file_digest_mapper),
430 TestLogger::stdout(),
431 )
432 .unwrap();
433
434 let beacon = CardanoDbBeacon::new(4, 123);
435 let upload_info = builder.upload(&beacon).await.unwrap();
436
437 let digest_path = builder.create_digest_file("digests").await.unwrap();
438
439 let expected_size = std::fs::metadata(digest_path).unwrap().len();
440 assert!(expected_size > 0);
441 assert_eq!(expected_size, upload_info.size);
442 }
443
444 #[tokio::test]
445 async fn upload_digest_file_should_log_upload_errors() {
446 let temp_dir = TempDir::create("digest", current_function!());
447 let log_path = temp_dir.join("test.log");
448
449 let mut uploader = MockDigestFileUploader::new();
450 uploader
451 .expect_upload()
452 .return_once(|_, _| Err(anyhow!("Failure while uploading...")));
453
454 {
455 let builder = DigestArtifactBuilder::new(
456 SanitizedUrlWithTrailingSlash::parse("https://aggregator/").unwrap(),
457 vec![Arc::new(uploader)],
458 DigestSnapshotter {
459 file_archiver: Arc::new(FileArchiver::new_for_test(
460 temp_dir.join("verification"),
461 )),
462 target_location: temp_dir.clone(),
463 compression_algorithm: CompressionAlgorithm::Gzip,
464 },
465 CardanoNetwork::DevNet(123),
466 PathBuf::from("/tmp/whatever"),
467 Arc::new(MockImmutableFileDigestMapper::new()),
468 TestLogger::file(&log_path),
469 )
470 .unwrap();
471
472 let _ = builder.upload_digest_file(&FileArchive::dummy()).await;
473 }
474
475 let logs = std::fs::read_to_string(&log_path).unwrap();
476 assert!(logs.contains("Failure while uploading..."));
477 }
478
479 #[tokio::test]
480 async fn upload_digest_file_should_not_error_even_if_no_location_returned_from_uploaders() {
481 let temp_dir = TempDir::create("digest", current_function!());
482 let uploader = fake_uploader_returning_error();
483
484 let builder = DigestArtifactBuilder::new(
485 SanitizedUrlWithTrailingSlash::parse("https://aggregator/").unwrap(),
486 vec![Arc::new(uploader)],
487 DigestSnapshotter {
488 file_archiver: Arc::new(FileArchiver::new_for_test(temp_dir.join("verification"))),
489 target_location: temp_dir.clone(),
490 compression_algorithm: CompressionAlgorithm::Gzip,
491 },
492 CardanoNetwork::DevNet(123),
493 PathBuf::from("/tmp/whatever"),
494 Arc::new(MockImmutableFileDigestMapper::new()),
495 TestLogger::stdout(),
496 )
497 .unwrap();
498
499 let locations = builder
500 .upload_digest_file(&FileArchive::dummy())
501 .await
502 .unwrap();
503
504 assert!(!locations.is_empty());
505 }
506
507 #[tokio::test]
508 async fn upload_digest_file_should_return_location_even_with_uploaders_errors() {
509 let temp_dir = TempDir::create("digest", current_function!());
510 let first_uploader = fake_uploader_returning_error();
511 let second_uploader = fake_uploader("an_uri", Some(CompressionAlgorithm::Gzip));
512 let third_uploader = fake_uploader_returning_error();
513
514 let uploaders: Vec<Arc<dyn DigestFileUploader>> = vec![
515 Arc::new(first_uploader),
516 Arc::new(second_uploader),
517 Arc::new(third_uploader),
518 ];
519
520 let builder = DigestArtifactBuilder::new(
521 SanitizedUrlWithTrailingSlash::parse("https://aggregator/").unwrap(),
522 uploaders,
523 DigestSnapshotter {
524 file_archiver: Arc::new(FileArchiver::new_for_test(temp_dir.join("verification"))),
525 target_location: temp_dir.clone(),
526 compression_algorithm: CompressionAlgorithm::Gzip,
527 },
528 CardanoNetwork::DevNet(123),
529 PathBuf::from("/tmp/whatever"),
530 Arc::new(MockImmutableFileDigestMapper::new()),
531 TestLogger::stdout(),
532 )
533 .unwrap();
534
535 let locations = builder
536 .upload_digest_file(&FileArchive::dummy())
537 .await
538 .unwrap();
539
540 assert_equivalent(
541 locations,
542 vec![
543 DigestLocation::CloudStorage {
544 uri: "an_uri".to_string(),
545 compression_algorithm: Some(CompressionAlgorithm::Gzip),
546 },
547 DigestLocation::Aggregator {
548 uri: "https://aggregator/artifact/cardano-database/digests".to_string(),
549 },
550 ],
551 );
552 }
553
554 #[tokio::test]
555 async fn upload_digest_file_should_return_all_uploaders_returned_locations() {
556 let temp_dir = TempDir::create("digest", current_function!());
557 let first_uploader = fake_uploader("an_uri", Some(CompressionAlgorithm::Gzip));
558 let second_uploader = fake_uploader("another_uri", Some(CompressionAlgorithm::Gzip));
559
560 let uploaders: Vec<Arc<dyn DigestFileUploader>> =
561 vec![Arc::new(first_uploader), Arc::new(second_uploader)];
562
563 let builder = DigestArtifactBuilder::new(
564 SanitizedUrlWithTrailingSlash::parse("https://aggregator/").unwrap(),
565 uploaders,
566 DigestSnapshotter {
567 file_archiver: Arc::new(FileArchiver::new_for_test(temp_dir.join("verification"))),
568 target_location: temp_dir.clone(),
569 compression_algorithm: CompressionAlgorithm::Gzip,
570 },
571 CardanoNetwork::DevNet(123),
572 PathBuf::from("/tmp/whatever"),
573 Arc::new(MockImmutableFileDigestMapper::new()),
574 TestLogger::stdout(),
575 )
576 .unwrap();
577
578 let locations = builder
579 .upload_digest_file(&FileArchive::dummy())
580 .await
581 .unwrap();
582
583 assert_equivalent(
584 locations,
585 vec![
586 DigestLocation::CloudStorage {
587 uri: "an_uri".to_string(),
588 compression_algorithm: Some(CompressionAlgorithm::Gzip),
589 },
590 DigestLocation::CloudStorage {
591 uri: "another_uri".to_string(),
592 compression_algorithm: Some(CompressionAlgorithm::Gzip),
593 },
594 DigestLocation::Aggregator {
595 uri: "https://aggregator/artifact/cardano-database/digests".to_string(),
596 },
597 ],
598 );
599 }
600
601 #[tokio::test]
602 async fn create_digest_file_should_create_json_file_with_all_digests() {
603 let temp_dir = TempDir::create("digest", current_function!());
604 let mut immutable_file_digest_mapper = MockImmutableFileDigestMapper::new();
605 immutable_file_digest_mapper
606 .expect_get_immutable_file_digest_map()
607 .returning(|| {
608 Ok(BTreeMap::from([(
609 "06685.chunk".to_string(),
610 "0af556ab2620dd9363bf76963a231abe8948a500ea6be31b131d87907ab09b1e".to_string(),
611 )]))
612 });
613
614 let builder = DigestArtifactBuilder::new(
615 SanitizedUrlWithTrailingSlash::parse("https://aggregator/").unwrap(),
616 vec![],
617 DigestSnapshotter {
618 file_archiver: Arc::new(FileArchiver::new_for_test(temp_dir.join("verification"))),
619 target_location: temp_dir.clone(),
620 compression_algorithm: CompressionAlgorithm::Gzip,
621 },
622 CardanoNetwork::DevNet(123),
623 temp_dir,
624 Arc::new(immutable_file_digest_mapper),
625 TestLogger::stdout(),
626 )
627 .unwrap();
628
629 let digest_file = builder.create_digest_file("digests").await.unwrap();
630
631 let file_content = read_to_string(digest_file).unwrap();
632 let digest_content: CardanoDatabaseDigestListMessage =
633 serde_json::from_str(&file_content).unwrap();
634
635 assert_eq!(
636 digest_content,
637 vec![CardanoDatabaseDigestListItemMessage {
638 immutable_file_name: "06685.chunk".to_string(),
639 digest: "0af556ab2620dd9363bf76963a231abe8948a500ea6be31b131d87907ab09b1e"
640 .to_string(),
641 }]
642 );
643 }
644
645 #[tokio::test]
646 async fn upload_should_upload_a_digest_archive_file_and_delete_created_files() {
647 let tmp_dir = TempDir::create("digest", current_function!());
648 let digests_dir = tmp_dir.join("digest");
649 let digests_archive_dir = tmp_dir.join("archive");
650 let uploader_path = tmp_dir.join("uploaded_digests");
651
652 let compression_algorithm = CompressionAlgorithm::Gzip;
653 let beacon = CardanoDbBeacon::new(3, 456);
654 let network = CardanoNetwork::DevNet(24);
655
656 let builder = DigestArtifactBuilder::new(
657 SanitizedUrlWithTrailingSlash::parse("https://aggregator/").unwrap(),
658 vec![Arc::new(build_local_uploader(&uploader_path))],
659 DigestSnapshotter {
660 file_archiver: Arc::new(FileArchiver::new_for_test(tmp_dir.join("verification"))),
661 target_location: digests_archive_dir.clone(),
662 compression_algorithm,
663 },
664 network,
665 digests_dir.clone(),
666 Arc::new(build_dummy_immutable_file_digest_mapper()),
667 TestLogger::stdout(),
668 )
669 .unwrap();
670
671 let _locations = builder.upload(&beacon).await.unwrap();
672
673 {
675 let file_name_without_extension =
676 DigestArtifactBuilder::get_digests_file_name_without_extension(&network, &beacon);
677 let digest_archive_path = uploader_path.join(format!(
678 "{file_name_without_extension}.{}",
679 compression_algorithm.tar_file_extension(),
680 ));
681 assert!(
682 digest_archive_path.exists(),
683 "Archive should have been uploaded to {}",
684 digest_archive_path.display()
685 );
686
687 let unpack_dir = tmp_dir.join("unpack");
688 unpack_archive(&digest_archive_path, &unpack_dir).unwrap();
689
690 let digest_file_path = unpack_dir.join(format!("{file_name_without_extension}.json"));
691 assert!(digest_file_path.is_file());
692 }
693
694 {
696 let remaining_files = path_content(&digests_dir);
697 assert!(
698 remaining_files.is_empty(),
699 "There should be no remaining files in digests folder, but found: {:?}",
700 remaining_files
701 );
702
703 let remaining_files = path_content(&digests_archive_dir);
704 assert!(
705 remaining_files.is_empty(),
706 "There should be no remaining files in archive folder, but found: {:?}",
707 remaining_files
708 );
709 }
710 }
711
712 #[tokio::test]
713 async fn get_digest_file_name_include_beacon_information() {
714 let beacon = CardanoDbBeacon::new(5, 456);
715 let network = CardanoNetwork::MainNet;
716 let digest_name =
717 DigestArtifactBuilder::get_digests_file_name_without_extension(&network, &beacon);
718
719 assert_eq!(digest_name, "mainnet-e5-i456.digests");
720 }
721}