mithril_aggregator/tools/file_archiver/
appender.rs

1use anyhow::{anyhow, Context};
2use serde::Serialize;
3use std::fs::File;
4use std::io::Write;
5use std::path::PathBuf;
6
7use mithril_common::StdResult;
8
9use crate::tools::file_size;
10
11const READ_WRITE_PERMISSION: u32 = 0o666;
12
13/// Define multiple ways to append content to a tar archive.
14pub trait TarAppender: Send {
15    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()>;
16
17    fn compute_uncompressed_data_size(&self) -> StdResult<u64>;
18
19    fn chain<A2: TarAppender>(self, appender_right: A2) -> ChainAppender<Self, A2>
20    where
21        Self: Sized,
22    {
23        ChainAppender::new(self, appender_right)
24    }
25}
26
27pub struct AppenderDirAll {
28    target_directory: PathBuf,
29}
30
31impl AppenderDirAll {
32    // Note: Not used anymore outside of tests but useful tool to keep around if we ever need to archive a directory
33    #[cfg(test)]
34    pub fn new(target_directory: PathBuf) -> Self {
35        Self { target_directory }
36    }
37}
38
39impl TarAppender for AppenderDirAll {
40    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()> {
41        tar.append_dir_all(".", &self.target_directory)
42            .with_context(|| {
43                format!(
44                    "Create archive error:  Can not add directory: '{}' to the archive",
45                    self.target_directory.display()
46                )
47            })?;
48        Ok(())
49    }
50
51    fn compute_uncompressed_data_size(&self) -> StdResult<u64> {
52        file_size::compute_size_of_path(&self.target_directory)
53    }
54}
55
56pub struct AppenderFile {
57    /// Location of the file in the archive.
58    location_in_archive: PathBuf,
59    /// Path to the file to add to the archive.
60    target_file: PathBuf,
61}
62
63impl AppenderFile {
64    /// Append the file at the root of the archive, keeping the same file name.
65    pub fn append_at_archive_root(target_file: PathBuf) -> StdResult<Self> {
66        if !target_file.is_file() {
67            return Err(anyhow!(
68                "The target file is not a file, path: {}",
69                target_file.display()
70            ));
71        }
72
73        let location_in_archive = target_file
74            .file_name()
75            .ok_or_else(|| {
76                anyhow!(
77                    "Can not get the file name from the target file path: '{}'",
78                    target_file.display()
79                )
80            })?
81            .to_owned();
82
83        Ok(Self {
84            location_in_archive: PathBuf::from(location_in_archive),
85            target_file,
86        })
87    }
88}
89
90impl TarAppender for AppenderFile {
91    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()> {
92        let mut file = File::open(&self.target_file)
93            .with_context(|| format!("Can not open file: '{}'", self.target_file.display()))?;
94        tar.append_file(&self.location_in_archive, &mut file)
95            .with_context(|| {
96                format!(
97                    "Can not add file: '{}' to the archive",
98                    self.target_file.display()
99                )
100            })?;
101        Ok(())
102    }
103
104    fn compute_uncompressed_data_size(&self) -> StdResult<u64> {
105        file_size::compute_size_of_path(&self.target_file)
106    }
107}
108
109pub struct AppenderEntries {
110    entries: Vec<PathBuf>,
111    base_directory: PathBuf,
112}
113
114impl AppenderEntries {
115    /// Create a new instance of `AppenderEntries`.
116    ///
117    /// Returns an error if the `entries` are empty.
118    pub fn new(entries: Vec<PathBuf>, base_directory: PathBuf) -> StdResult<Self> {
119        if entries.is_empty() {
120            return Err(anyhow!("The entries can not be empty"));
121        }
122
123        Ok(Self {
124            entries,
125            base_directory,
126        })
127    }
128}
129
130impl TarAppender for AppenderEntries {
131    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()> {
132        for entry in &self.entries {
133            let entry_path = self.base_directory.join(entry);
134            if entry_path.is_dir() {
135                tar.append_dir_all(entry, entry_path.clone())
136                    .with_context(|| {
137                        format!(
138                            "Can not add directory: '{}' to the archive",
139                            entry_path.display()
140                        )
141                    })?;
142            } else if entry_path.is_file() {
143                let mut file = File::open(entry_path.clone())?;
144                tar.append_file(entry, &mut file).with_context(|| {
145                    format!(
146                        "Can not add file: '{}' to the archive",
147                        entry_path.display()
148                    )
149                })?;
150            } else {
151                return Err(anyhow!(
152                    "The entry: '{}' is not valid",
153                    entry_path.display()
154                ));
155            }
156        }
157        Ok(())
158    }
159
160    fn compute_uncompressed_data_size(&self) -> StdResult<u64> {
161        let full_entries_path = self
162            .entries
163            .iter()
164            .map(|entry| self.base_directory.join(entry))
165            .collect();
166        file_size::compute_size(full_entries_path)
167    }
168}
169
170/// Append data to the archive.
171pub struct AppenderData {
172    /// Location of the file in the archive where the data will be appended.
173    location_in_archive: PathBuf,
174    /// Byte array of the data to append.
175    bytes: Vec<u8>,
176}
177
178impl AppenderData {
179    /// Create a new instance of `AppenderData` from an object that will be serialized to JSON.
180    pub fn from_json<T: Serialize + Send>(
181        location_in_archive: PathBuf,
182        object: &T,
183    ) -> StdResult<Self> {
184        let json_bytes = serde_json::to_vec(object).with_context(|| {
185            format!(
186                "Can not serialize JSON to file in archive: {:?}",
187                location_in_archive.display()
188            )
189        })?;
190
191        Ok(Self::from_raw_bytes(location_in_archive, json_bytes))
192    }
193
194    /// Create a new instance of `AppenderData` from a byte array.
195    pub fn from_raw_bytes(location_in_archive: PathBuf, bytes: Vec<u8>) -> Self {
196        Self {
197            location_in_archive,
198            bytes,
199        }
200    }
201}
202
203impl TarAppender for AppenderData {
204    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()> {
205        let mut header = tar::Header::new_gnu();
206        header.set_size(self.bytes.len() as u64);
207        header.set_mode(READ_WRITE_PERMISSION);
208        header.set_mtime(chrono::Utc::now().timestamp() as u64);
209        header.set_cksum();
210
211        tar.append_data(
212            &mut header,
213            &self.location_in_archive,
214            self.bytes.as_slice(),
215        )
216        .with_context(|| {
217            format!(
218                "Can not add file: '{}' to the archive",
219                self.location_in_archive.display()
220            )
221        })?;
222
223        Ok(())
224    }
225
226    fn compute_uncompressed_data_size(&self) -> StdResult<u64> {
227        Ok(self.bytes.len() as u64)
228    }
229}
230
231/// Chain multiple `TarAppender` instances together.
232pub struct ChainAppender<L, R> {
233    appender_left: L,
234    appender_right: R,
235}
236
237impl<L: TarAppender, R: TarAppender> ChainAppender<L, R> {
238    pub fn new(appender_left: L, appender_right: R) -> Self {
239        Self {
240            appender_left,
241            appender_right,
242        }
243    }
244}
245
246impl<L: TarAppender, R: TarAppender> TarAppender for ChainAppender<L, R> {
247    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()> {
248        self.appender_left.append(tar)?;
249        self.appender_right.append(tar)
250    }
251
252    fn compute_uncompressed_data_size(&self) -> StdResult<u64> {
253        // Size is aggregated even if the data is overwritten by the right appender because we
254        // can't know if there is an overlap or not
255        Ok(self.appender_left.compute_uncompressed_data_size()?
256            + self.appender_right.compute_uncompressed_data_size()?)
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use mithril_common::digesters::{
263        DummyCardanoDbBuilder, IMMUTABLE_DIR, LEDGER_DIR, VOLATILE_DIR,
264    };
265    use mithril_common::entities::CompressionAlgorithm;
266    use mithril_common::temp_dir_create;
267
268    use crate::tools::file_archiver::test_tools::*;
269    use crate::tools::file_archiver::{ArchiveParameters, FileArchiver};
270
271    use super::*;
272
273    mod appender_entries {
274        use super::*;
275
276        #[test]
277        fn create_archive_only_for_specified_directories_and_files() {
278            let test_dir = temp_dir_create!();
279            let source = test_dir.join(create_dir(&test_dir, "source"));
280
281            let directory_to_archive_path = create_dir(&source, "directory_to_archive");
282            let file_to_archive_path = create_file(&source, "file_to_archive.txt");
283            let directory_not_to_archive_path = create_dir(&source, "directory_not_to_archive");
284            let file_not_to_archive_path = create_file(&source, "file_not_to_archive.txt");
285
286            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
287
288            let archive = file_archiver
289                .archive(
290                    ArchiveParameters {
291                        archive_name_without_extension: "archive".to_string(),
292                        target_directory: test_dir.clone(),
293                        compression_algorithm: CompressionAlgorithm::Gzip,
294                    },
295                    AppenderEntries::new(
296                        vec![
297                            directory_to_archive_path.clone(),
298                            file_to_archive_path.clone(),
299                        ],
300                        source,
301                    )
302                    .unwrap(),
303                )
304                .unwrap();
305
306            let unpack_path = archive.unpack_gzip(&test_dir);
307
308            assert!(unpack_path.join(directory_to_archive_path).is_dir());
309            assert!(unpack_path.join(file_to_archive_path).is_file());
310            assert!(!unpack_path.join(directory_not_to_archive_path).exists());
311            assert!(!unpack_path.join(file_not_to_archive_path).exists());
312        }
313
314        #[test]
315        fn return_error_when_appending_file_or_directory_that_does_not_exist() {
316            let test_dir = temp_dir_create!();
317            let target_archive = test_dir.join("whatever.tar.gz");
318            let source = test_dir.join(create_dir(&test_dir, "source"));
319
320            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
321
322            file_archiver
323                .archive(
324                    ArchiveParameters {
325                        archive_name_without_extension: "archive".to_string(),
326                        target_directory: test_dir.clone(),
327                        compression_algorithm: CompressionAlgorithm::Gzip,
328                    },
329                    AppenderEntries::new(vec![PathBuf::from("not_exist")], source).unwrap(),
330                )
331                .expect_err("AppenderEntries should return error when file or directory not exist");
332            assert!(!target_archive.exists());
333        }
334
335        #[test]
336        fn return_error_when_appending_empty_entries() {
337            let appender_creation_result = AppenderEntries::new(vec![], PathBuf::new());
338            assert!(appender_creation_result.is_err(),);
339        }
340
341        #[test]
342        fn can_append_duplicate_files_and_directories() {
343            let test_dir = temp_dir_create!();
344            let source = test_dir.join(create_dir(&test_dir, "source"));
345
346            let directory_to_archive_path = create_dir(&source, "directory_to_archive");
347            let file_to_archive_path =
348                create_file(&source, "directory_to_archive/file_to_archive.txt");
349
350            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
351
352            let archive = file_archiver
353                .archive(
354                    ArchiveParameters {
355                        archive_name_without_extension: "archive".to_string(),
356                        target_directory: test_dir.clone(),
357                        compression_algorithm: CompressionAlgorithm::Gzip,
358                    },
359                    AppenderEntries::new(
360                        vec![
361                            directory_to_archive_path.clone(),
362                            directory_to_archive_path.clone(),
363                            file_to_archive_path.clone(),
364                            file_to_archive_path.clone(),
365                        ],
366                        source,
367                    )
368                    .unwrap(),
369                )
370                .unwrap();
371
372            let unpack_path = archive.unpack_gzip(&test_dir);
373
374            assert!(unpack_path.join(directory_to_archive_path).is_dir());
375            assert!(unpack_path.join(file_to_archive_path).is_file());
376        }
377
378        #[test]
379        fn compute_uncompressed_size_of_its_paths() {
380            let test_dir = "compute_uncompressed_size_of_its_paths";
381
382            let immutable_trio_file_size = 777;
383            let ledger_file_size = 6666;
384            let volatile_file_size = 99;
385
386            let cardano_db = DummyCardanoDbBuilder::new(test_dir)
387                .with_immutables(&[1, 2, 3])
388                .set_immutable_trio_file_size(immutable_trio_file_size)
389                .with_ledger_files(&["437", "537", "637", "737"])
390                .set_ledger_file_size(ledger_file_size)
391                .with_volatile_files(&["blocks-0.dat", "blocks-1.dat", "blocks-2.dat"])
392                .set_volatile_file_size(volatile_file_size)
393                .build();
394
395            let appender_entries = AppenderEntries::new(
396                vec![
397                    PathBuf::from(IMMUTABLE_DIR),
398                    PathBuf::from(LEDGER_DIR).join("437"),
399                    PathBuf::from(LEDGER_DIR).join("537"),
400                    PathBuf::from(VOLATILE_DIR).join("blocks-0.dat"),
401                ],
402                cardano_db.get_dir().clone(),
403            )
404            .unwrap();
405
406            let entries_size = appender_entries.compute_uncompressed_data_size().unwrap();
407            let expected_total_size =
408                (immutable_trio_file_size * 3) + (2 * ledger_file_size) + volatile_file_size;
409            assert_eq!(expected_total_size, entries_size);
410        }
411    }
412
413    mod appender_file {
414        use super::*;
415
416        #[test]
417        fn appending_file_to_tar() {
418            let test_dir = temp_dir_create!();
419            let file_to_archive = create_file(&test_dir, "test_file.txt");
420
421            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
422            let archive = file_archiver
423                .archive(
424                    ArchiveParameters {
425                        archive_name_without_extension: "archive".to_string(),
426                        target_directory: test_dir.clone(),
427                        compression_algorithm: CompressionAlgorithm::Gzip,
428                    },
429                    AppenderFile::append_at_archive_root(test_dir.join(&file_to_archive)).unwrap(),
430                )
431                .unwrap();
432
433            let unpack_path = archive.unpack_gzip(&test_dir);
434
435            assert!(unpack_path.join(file_to_archive).exists());
436        }
437
438        #[test]
439        fn return_error_if_file_does_not_exist() {
440            let target_file_path = PathBuf::from("non_existent_file.txt");
441            assert!(AppenderFile::append_at_archive_root(target_file_path).is_err());
442        }
443
444        #[test]
445        fn return_error_if_input_is_not_a_file() {
446            let test_dir = temp_dir_create!();
447            assert!(AppenderFile::append_at_archive_root(test_dir).is_err());
448        }
449
450        #[test]
451        fn compute_uncompressed_size() {
452            let test_dir = temp_dir_create!();
453
454            let file_path = test_dir.join("file.txt");
455            let file = File::create(&file_path).unwrap();
456            file.set_len(777).unwrap();
457
458            let appender_file = AppenderFile::append_at_archive_root(file_path).unwrap();
459
460            let entries_size = appender_file.compute_uncompressed_data_size().unwrap();
461            assert_eq!(777, entries_size);
462        }
463    }
464
465    mod appender_dir_all {
466        use super::*;
467
468        #[test]
469        fn compute_uncompressed_size() {
470            let test_dir = "appender_dir_all_compute_size";
471
472            let immutable_trio_file_size = 777;
473            let ledger_file_size = 6666;
474            let volatile_file_size = 99;
475
476            let cardano_db = DummyCardanoDbBuilder::new(test_dir)
477                .with_immutables(&[1, 2])
478                .set_immutable_trio_file_size(immutable_trio_file_size)
479                .with_ledger_files(&["437", "537", "637"])
480                .set_ledger_file_size(ledger_file_size)
481                .with_volatile_files(&["blocks-0.dat"])
482                .set_volatile_file_size(volatile_file_size)
483                .build();
484
485            let appender_dir_all = AppenderDirAll::new(cardano_db.get_dir().clone());
486
487            let entries_size = appender_dir_all.compute_uncompressed_data_size().unwrap();
488            let expected_total_size =
489                (immutable_trio_file_size * 2) + (3 * ledger_file_size) + volatile_file_size;
490            assert_eq!(expected_total_size, entries_size);
491        }
492    }
493
494    mod appender_data {
495        use flate2::read::GzDecoder;
496        use serde::Deserialize;
497
498        use super::*;
499
500        #[derive(Debug, PartialEq, Serialize, Deserialize)]
501        struct TestStruct {
502            field1: String,
503            field2: i32,
504        }
505
506        #[test]
507        fn append_serializable_json() {
508            let test_dir = temp_dir_create!();
509            let object = TestStruct {
510                field1: "test".to_string(),
511                field2: 42,
512            };
513            let location_in_archive = PathBuf::from("folder").join("test.json");
514
515            let data_appender =
516                AppenderData::from_json(location_in_archive.clone(), &object).unwrap();
517            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
518            let archive = file_archiver
519                .archive(
520                    ArchiveParameters {
521                        archive_name_without_extension: "archive".to_string(),
522                        target_directory: test_dir.clone(),
523                        compression_algorithm: CompressionAlgorithm::Gzip,
524                    },
525                    data_appender,
526                )
527                .unwrap();
528
529            let unpack_path = archive.unpack_gzip(&test_dir);
530            let unpacked_file_path = unpack_path.join(&location_in_archive);
531
532            assert!(unpacked_file_path.exists());
533
534            let deserialized_object: TestStruct =
535                serde_json::from_reader(File::open(unpacked_file_path).unwrap()).unwrap();
536            assert_eq!(object, deserialized_object);
537        }
538
539        #[test]
540        fn appended_entry_have_read_write_permissions_and_time_metadata() {
541            let test_dir = temp_dir_create!();
542            let object = TestStruct {
543                field1: "test".to_string(),
544                field2: 42,
545            };
546            let location_in_archive = PathBuf::from("folder").join("test.json");
547            let start_time_stamp = chrono::Utc::now().timestamp() as u64;
548
549            let data_appender =
550                AppenderData::from_json(location_in_archive.clone(), &object).unwrap();
551            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
552            let archive = file_archiver
553                .archive(
554                    ArchiveParameters {
555                        archive_name_without_extension: "archive".to_string(),
556                        target_directory: test_dir.clone(),
557                        compression_algorithm: CompressionAlgorithm::Gzip,
558                    },
559                    data_appender,
560                )
561                .unwrap();
562
563            let archive_file = File::open(archive.get_file_path()).unwrap();
564            let mut archive = tar::Archive::new(GzDecoder::new(archive_file));
565            let mut archive_entries = archive.entries().unwrap();
566            let appended_entry = archive_entries.next().unwrap().unwrap();
567
568            assert_eq!(
569                READ_WRITE_PERMISSION,
570                appended_entry.header().mode().unwrap()
571            );
572            let mtime = appended_entry.header().mtime().unwrap();
573            assert!(
574                mtime >= start_time_stamp,
575                "entry mtime should be greater than or equal to the timestamp before the archive\
576                creation:\n {mtime} < {start_time_stamp}"
577            );
578        }
579
580        #[test]
581        fn compute_uncompressed_size() {
582            let object = TestStruct {
583                field1: "test".to_string(),
584                field2: 42,
585            };
586
587            let data_appender =
588                AppenderData::from_json(PathBuf::from("whatever.json"), &object).unwrap();
589
590            let expected_size = serde_json::to_vec(&object).unwrap().len() as u64;
591            let entry_size = data_appender.compute_uncompressed_data_size().unwrap();
592            assert_eq!(expected_size, entry_size);
593        }
594    }
595
596    mod chain_appender {
597        use super::*;
598
599        #[test]
600        fn chain_non_overlapping_appenders() {
601            let test_dir = temp_dir_create!();
602            let file_to_archive = create_file(&test_dir, "test_file.txt");
603            let json_location_in_archive = PathBuf::from("folder").join("test.json");
604
605            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
606            let archive = file_archiver
607                .archive(
608                    ArchiveParameters {
609                        archive_name_without_extension: "archive".to_string(),
610                        target_directory: test_dir.clone(),
611                        compression_algorithm: CompressionAlgorithm::Gzip,
612                    },
613                    ChainAppender::new(
614                        AppenderFile::append_at_archive_root(test_dir.join(&file_to_archive))
615                            .unwrap(),
616                        AppenderData::from_json(json_location_in_archive.clone(), &"test").unwrap(),
617                    ),
618                )
619                .unwrap();
620
621            let unpack_path = archive.unpack_gzip(&test_dir);
622
623            assert!(unpack_path.join(file_to_archive).exists());
624            assert!(unpack_path.join(json_location_in_archive).exists());
625        }
626
627        #[test]
628        fn chain_overlapping_appenders_data_from_right_appender_overwrite_left_appender_data() {
629            let test_dir = temp_dir_create!();
630            let json_location_in_archive = PathBuf::from("test.json");
631
632            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
633            let archive = file_archiver
634                .archive(
635                    ArchiveParameters {
636                        archive_name_without_extension: "archive".to_string(),
637                        target_directory: test_dir.clone(),
638                        compression_algorithm: CompressionAlgorithm::Gzip,
639                    },
640                    ChainAppender::new(
641                        AppenderData::from_json(
642                            json_location_in_archive.clone(),
643                            &"will be overwritten",
644                        )
645                        .unwrap(),
646                        AppenderData::from_json(json_location_in_archive.clone(), &"test").unwrap(),
647                    ),
648                )
649                .unwrap();
650
651            let unpack_path = archive.unpack_gzip(&test_dir);
652            let unpacked_json_path = unpack_path.join(&json_location_in_archive);
653
654            let deserialized_object: String =
655                serde_json::from_reader(File::open(&unpacked_json_path).unwrap()).unwrap();
656            assert_eq!("test", deserialized_object);
657        }
658
659        #[test]
660        fn compute_non_overlapping_uncompressed_size() {
661            let left_appender =
662                AppenderData::from_json(PathBuf::from("whatever1.json"), &"foo").unwrap();
663            let right_appender =
664                AppenderData::from_json(PathBuf::from("whatever2.json"), &"bar").unwrap();
665
666            let expected_size = left_appender.compute_uncompressed_data_size().unwrap()
667                + right_appender.compute_uncompressed_data_size().unwrap();
668
669            let chain_appender = left_appender.chain(right_appender);
670            let size = chain_appender.compute_uncompressed_data_size().unwrap();
671            assert_eq!(expected_size, size);
672        }
673
674        #[test]
675        fn compute_uncompressed_size_cant_discriminate_overlaps_and_return_aggregated_appenders_sizes(
676        ) {
677            let overlapping_path = PathBuf::from("whatever.json");
678            let left_appender =
679                AppenderData::from_json(overlapping_path.clone(), &"overwritten data").unwrap();
680            let right_appender =
681                AppenderData::from_json(overlapping_path.clone(), &"final data").unwrap();
682
683            let expected_size = left_appender.compute_uncompressed_data_size().unwrap()
684                + right_appender.compute_uncompressed_data_size().unwrap();
685
686            let chain_appender = left_appender.chain(right_appender);
687            let size = chain_appender.compute_uncompressed_data_size().unwrap();
688            assert_eq!(expected_size, size);
689        }
690    }
691}