mithril_aggregator/tools/file_archiver/
appender.rs

1use anyhow::{Context, anyhow};
2use serde::Serialize;
3use std::fs::File;
4use std::io::Write;
5use std::path::PathBuf;
6
7use mithril_common::StdResult;
8
9use crate::tools::file_size;
10
11const READ_WRITE_PERMISSION: u32 = 0o666;
12
13/// Define multiple ways to append content to a tar archive.
14pub trait TarAppender: Send {
15    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()>;
16
17    fn compute_uncompressed_data_size(&self) -> StdResult<u64>;
18
19    fn chain<A2: TarAppender>(self, appender_right: A2) -> ChainAppender<Self, A2>
20    where
21        Self: Sized,
22    {
23        ChainAppender::new(self, appender_right)
24    }
25}
26
27pub struct AppenderDirAll {
28    target_directory: PathBuf,
29}
30
31impl AppenderDirAll {
32    // Note: Not used anymore outside of tests but useful tool to keep around if we ever need to archive a directory
33    #[cfg(test)]
34    pub fn new(target_directory: PathBuf) -> Self {
35        Self { target_directory }
36    }
37}
38
39impl TarAppender for AppenderDirAll {
40    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()> {
41        tar.append_dir_all(".", &self.target_directory).with_context(|| {
42            format!(
43                "Create archive error:  Can not add directory: '{}' to the archive",
44                self.target_directory.display()
45            )
46        })?;
47        Ok(())
48    }
49
50    fn compute_uncompressed_data_size(&self) -> StdResult<u64> {
51        file_size::compute_size_of_path(&self.target_directory)
52    }
53}
54
55pub struct AppenderFile {
56    /// Location of the file in the archive.
57    location_in_archive: PathBuf,
58    /// Path to the file to add to the archive.
59    target_file: PathBuf,
60}
61
62impl AppenderFile {
63    /// Append the file at the root of the archive, keeping the same file name.
64    pub fn append_at_archive_root(target_file: PathBuf) -> StdResult<Self> {
65        if !target_file.is_file() {
66            return Err(anyhow!(
67                "The target file is not a file, path: {}",
68                target_file.display()
69            ));
70        }
71
72        let location_in_archive = target_file
73            .file_name()
74            .ok_or_else(|| {
75                anyhow!(
76                    "Can not get the file name from the target file path: '{}'",
77                    target_file.display()
78                )
79            })?
80            .to_owned();
81
82        Ok(Self {
83            location_in_archive: PathBuf::from(location_in_archive),
84            target_file,
85        })
86    }
87}
88
89impl TarAppender for AppenderFile {
90    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()> {
91        let mut file = File::open(&self.target_file)
92            .with_context(|| format!("Can not open file: '{}'", self.target_file.display()))?;
93        tar.append_file(&self.location_in_archive, &mut file)
94            .with_context(|| {
95                format!(
96                    "Can not add file: '{}' to the archive",
97                    self.target_file.display()
98                )
99            })?;
100        Ok(())
101    }
102
103    fn compute_uncompressed_data_size(&self) -> StdResult<u64> {
104        file_size::compute_size_of_path(&self.target_file)
105    }
106}
107
108pub struct AppenderEntries {
109    entries: Vec<PathBuf>,
110    base_directory: PathBuf,
111}
112
113impl AppenderEntries {
114    /// Create a new instance of `AppenderEntries`.
115    ///
116    /// Returns an error if the `entries` are empty.
117    pub fn new(entries: Vec<PathBuf>, base_directory: PathBuf) -> StdResult<Self> {
118        if entries.is_empty() {
119            return Err(anyhow!("The entries can not be empty"));
120        }
121
122        Ok(Self {
123            entries,
124            base_directory,
125        })
126    }
127}
128
129impl TarAppender for AppenderEntries {
130    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()> {
131        for entry in &self.entries {
132            let entry_path = self.base_directory.join(entry);
133            if entry_path.is_dir() {
134                tar.append_dir_all(entry, entry_path.clone()).with_context(|| {
135                    format!(
136                        "Can not add directory: '{}' to the archive",
137                        entry_path.display()
138                    )
139                })?;
140            } else if entry_path.is_file() {
141                let mut file = File::open(entry_path.clone())?;
142                tar.append_file(entry, &mut file).with_context(|| {
143                    format!(
144                        "Can not add file: '{}' to the archive",
145                        entry_path.display()
146                    )
147                })?;
148            } else {
149                return Err(anyhow!(
150                    "The entry: '{}' is not valid",
151                    entry_path.display()
152                ));
153            }
154        }
155        Ok(())
156    }
157
158    fn compute_uncompressed_data_size(&self) -> StdResult<u64> {
159        let full_entries_path = self
160            .entries
161            .iter()
162            .map(|entry| self.base_directory.join(entry))
163            .collect();
164        file_size::compute_size(full_entries_path)
165    }
166}
167
168/// Append data to the archive.
169pub struct AppenderData {
170    /// Location of the file in the archive where the data will be appended.
171    location_in_archive: PathBuf,
172    /// Byte array of the data to append.
173    bytes: Vec<u8>,
174}
175
176impl AppenderData {
177    /// Create a new instance of `AppenderData` from an object that will be serialized to JSON.
178    pub fn from_json<T: Serialize + Send>(
179        location_in_archive: PathBuf,
180        object: &T,
181    ) -> StdResult<Self> {
182        let json_bytes = serde_json::to_vec(object).with_context(|| {
183            format!(
184                "Can not serialize JSON to file in archive: {:?}",
185                location_in_archive.display()
186            )
187        })?;
188
189        Ok(Self::from_raw_bytes(location_in_archive, json_bytes))
190    }
191
192    /// Create a new instance of `AppenderData` from a byte array.
193    pub fn from_raw_bytes(location_in_archive: PathBuf, bytes: Vec<u8>) -> Self {
194        Self {
195            location_in_archive,
196            bytes,
197        }
198    }
199}
200
201impl TarAppender for AppenderData {
202    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()> {
203        let mut header = tar::Header::new_gnu();
204        header.set_size(self.bytes.len() as u64);
205        header.set_mode(READ_WRITE_PERMISSION);
206        header.set_mtime(chrono::Utc::now().timestamp() as u64);
207        header.set_cksum();
208
209        tar.append_data(
210            &mut header,
211            &self.location_in_archive,
212            self.bytes.as_slice(),
213        )
214        .with_context(|| {
215            format!(
216                "Can not add file: '{}' to the archive",
217                self.location_in_archive.display()
218            )
219        })?;
220
221        Ok(())
222    }
223
224    fn compute_uncompressed_data_size(&self) -> StdResult<u64> {
225        Ok(self.bytes.len() as u64)
226    }
227}
228
229/// Chain multiple `TarAppender` instances together.
230pub struct ChainAppender<L, R> {
231    appender_left: L,
232    appender_right: R,
233}
234
235impl<L: TarAppender, R: TarAppender> ChainAppender<L, R> {
236    pub fn new(appender_left: L, appender_right: R) -> Self {
237        Self {
238            appender_left,
239            appender_right,
240        }
241    }
242}
243
244impl<L: TarAppender, R: TarAppender> TarAppender for ChainAppender<L, R> {
245    fn append<T: Write>(&self, tar: &mut tar::Builder<T>) -> StdResult<()> {
246        self.appender_left.append(tar)?;
247        self.appender_right.append(tar)
248    }
249
250    fn compute_uncompressed_data_size(&self) -> StdResult<u64> {
251        // Size is aggregated even if the data is overwritten by the right appender because we
252        // can't know if there is an overlap or not
253        Ok(self.appender_left.compute_uncompressed_data_size()?
254            + self.appender_right.compute_uncompressed_data_size()?)
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use mithril_cardano_node_internal_database::test::DummyCardanoDbBuilder;
261    use mithril_cardano_node_internal_database::{IMMUTABLE_DIR, LEDGER_DIR, VOLATILE_DIR};
262    use mithril_common::entities::CompressionAlgorithm;
263    use mithril_common::{assert_dir_eq, temp_dir_create};
264
265    use crate::tools::file_archiver::test_tools::*;
266    use crate::tools::file_archiver::{ArchiveParameters, FileArchiver};
267
268    use super::*;
269
270    mod appender_entries {
271        use super::*;
272
273        #[test]
274        fn create_archive_only_for_specified_directories_and_files() {
275            let test_dir = temp_dir_create!();
276            let source = test_dir.join(create_dir(&test_dir, "source"));
277
278            let directory_to_archive_path = create_dir(&source, "directory_to_archive");
279            let file_to_archive_path = create_file(&source, "file_to_archive.txt");
280            create_dir(&source, "directory_not_to_archive");
281            create_file(&source, "file_not_to_archive.txt");
282
283            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
284
285            let archive = file_archiver
286                .archive(
287                    ArchiveParameters {
288                        archive_name_without_extension: "archive".to_string(),
289                        target_directory: test_dir.clone(),
290                        compression_algorithm: CompressionAlgorithm::Gzip,
291                    },
292                    AppenderEntries::new(
293                        vec![directory_to_archive_path.clone(), file_to_archive_path.clone()],
294                        source,
295                    )
296                    .unwrap(),
297                )
298                .unwrap();
299
300            let unpack_path = archive.unpack_gzip(&test_dir);
301
302            assert_dir_eq!(
303                &unpack_path,
304                "* directory_to_archive/
305                 * file_to_archive.txt"
306            );
307        }
308
309        #[test]
310        fn return_error_when_appending_file_or_directory_that_does_not_exist() {
311            let test_dir = temp_dir_create!();
312            let target_archive = test_dir.join("whatever.tar.gz");
313            let source = test_dir.join(create_dir(&test_dir, "source"));
314
315            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
316
317            file_archiver
318                .archive(
319                    ArchiveParameters {
320                        archive_name_without_extension: "archive".to_string(),
321                        target_directory: test_dir.clone(),
322                        compression_algorithm: CompressionAlgorithm::Gzip,
323                    },
324                    AppenderEntries::new(vec![PathBuf::from("not_exist")], source).unwrap(),
325                )
326                .expect_err("AppenderEntries should return error when file or directory not exist");
327            assert!(!target_archive.exists());
328        }
329
330        #[test]
331        fn return_error_when_appending_empty_entries() {
332            let appender_creation_result = AppenderEntries::new(vec![], PathBuf::new());
333            assert!(appender_creation_result.is_err(),);
334        }
335
336        #[test]
337        fn can_append_duplicate_files_and_directories() {
338            let test_dir = temp_dir_create!();
339            let source = test_dir.join(create_dir(&test_dir, "source"));
340
341            let directory_to_archive_path = create_dir(&source, "directory_to_archive");
342            let file_to_archive_path =
343                create_file(&source, "directory_to_archive/file_to_archive.txt");
344
345            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
346
347            let archive = file_archiver
348                .archive(
349                    ArchiveParameters {
350                        archive_name_without_extension: "archive".to_string(),
351                        target_directory: test_dir.clone(),
352                        compression_algorithm: CompressionAlgorithm::Gzip,
353                    },
354                    AppenderEntries::new(
355                        vec![
356                            directory_to_archive_path.clone(),
357                            directory_to_archive_path.clone(),
358                            file_to_archive_path.clone(),
359                            file_to_archive_path.clone(),
360                        ],
361                        source,
362                    )
363                    .unwrap(),
364                )
365                .unwrap();
366
367            let unpack_path = archive.unpack_gzip(&test_dir);
368
369            assert_dir_eq!(
370                &unpack_path,
371                "* directory_to_archive/
372                 ** file_to_archive.txt"
373            );
374        }
375
376        #[test]
377        fn compute_uncompressed_size_of_its_paths() {
378            let test_dir = "compute_uncompressed_size_of_its_paths";
379
380            let immutable_trio_file_size = 777;
381            let ledger_file_size = 6666;
382            let volatile_file_size = 99;
383
384            let cardano_db = DummyCardanoDbBuilder::new(test_dir)
385                .with_immutables(&[1, 2, 3])
386                .set_immutable_trio_file_size(immutable_trio_file_size)
387                .with_legacy_ledger_snapshots(&[437, 537, 637, 737])
388                .set_ledger_file_size(ledger_file_size)
389                .with_volatile_files(&["blocks-0.dat", "blocks-1.dat", "blocks-2.dat"])
390                .set_volatile_file_size(volatile_file_size)
391                .build();
392
393            let appender_entries = AppenderEntries::new(
394                vec![
395                    PathBuf::from(IMMUTABLE_DIR),
396                    PathBuf::from(LEDGER_DIR).join("437"),
397                    PathBuf::from(LEDGER_DIR).join("537"),
398                    PathBuf::from(VOLATILE_DIR).join("blocks-0.dat"),
399                ],
400                cardano_db.get_dir().clone(),
401            )
402            .unwrap();
403
404            let entries_size = appender_entries.compute_uncompressed_data_size().unwrap();
405            let expected_total_size =
406                (immutable_trio_file_size * 3) + (2 * ledger_file_size) + volatile_file_size;
407            assert_eq!(expected_total_size, entries_size);
408        }
409    }
410
411    mod appender_file {
412        use super::*;
413
414        #[test]
415        fn appending_file_to_tar() {
416            let test_dir = temp_dir_create!();
417            let file_to_archive = create_file(&test_dir, "test_file.txt");
418
419            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
420            let archive = file_archiver
421                .archive(
422                    ArchiveParameters {
423                        archive_name_without_extension: "archive".to_string(),
424                        target_directory: test_dir.clone(),
425                        compression_algorithm: CompressionAlgorithm::Gzip,
426                    },
427                    AppenderFile::append_at_archive_root(test_dir.join(&file_to_archive)).unwrap(),
428                )
429                .unwrap();
430
431            let unpack_path = archive.unpack_gzip(&test_dir);
432
433            assert!(unpack_path.join(file_to_archive).exists());
434        }
435
436        #[test]
437        fn return_error_if_file_does_not_exist() {
438            let target_file_path = PathBuf::from("non_existent_file.txt");
439            assert!(AppenderFile::append_at_archive_root(target_file_path).is_err());
440        }
441
442        #[test]
443        fn return_error_if_input_is_not_a_file() {
444            let test_dir = temp_dir_create!();
445            assert!(AppenderFile::append_at_archive_root(test_dir).is_err());
446        }
447
448        #[test]
449        fn compute_uncompressed_size() {
450            let test_dir = temp_dir_create!();
451
452            let file_path = test_dir.join("file.txt");
453            let file = File::create(&file_path).unwrap();
454            file.set_len(777).unwrap();
455
456            let appender_file = AppenderFile::append_at_archive_root(file_path).unwrap();
457
458            let entries_size = appender_file.compute_uncompressed_data_size().unwrap();
459            assert_eq!(777, entries_size);
460        }
461    }
462
463    mod appender_dir_all {
464        use super::*;
465
466        #[test]
467        fn compute_uncompressed_size() {
468            let test_dir = "appender_dir_all_compute_size";
469
470            let immutable_trio_file_size = 777;
471            let ledger_file_size = 6666;
472            let volatile_file_size = 99;
473
474            let cardano_db = DummyCardanoDbBuilder::new(test_dir)
475                .with_immutables(&[1, 2])
476                .set_immutable_trio_file_size(immutable_trio_file_size)
477                .with_legacy_ledger_snapshots(&[437, 537, 637])
478                .set_ledger_file_size(ledger_file_size)
479                .with_volatile_files(&["blocks-0.dat"])
480                .set_volatile_file_size(volatile_file_size)
481                .build();
482
483            let appender_dir_all = AppenderDirAll::new(cardano_db.get_dir().clone());
484
485            let entries_size = appender_dir_all.compute_uncompressed_data_size().unwrap();
486            let expected_total_size =
487                (immutable_trio_file_size * 2) + (3 * ledger_file_size) + volatile_file_size;
488            assert_eq!(expected_total_size, entries_size);
489        }
490    }
491
492    mod appender_data {
493        use flate2::read::GzDecoder;
494        use serde::Deserialize;
495
496        use super::*;
497
498        #[derive(Debug, PartialEq, Serialize, Deserialize)]
499        struct TestStruct {
500            field1: String,
501            field2: i32,
502        }
503
504        #[test]
505        fn append_serializable_json() {
506            let test_dir = temp_dir_create!();
507            let object = TestStruct {
508                field1: "test".to_string(),
509                field2: 42,
510            };
511            let location_in_archive = PathBuf::from("folder").join("test.json");
512
513            let data_appender =
514                AppenderData::from_json(location_in_archive.clone(), &object).unwrap();
515            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
516            let archive = file_archiver
517                .archive(
518                    ArchiveParameters {
519                        archive_name_without_extension: "archive".to_string(),
520                        target_directory: test_dir.clone(),
521                        compression_algorithm: CompressionAlgorithm::Gzip,
522                    },
523                    data_appender,
524                )
525                .unwrap();
526
527            let unpack_path = archive.unpack_gzip(&test_dir);
528            let unpacked_file_path = unpack_path.join(&location_in_archive);
529
530            assert!(unpacked_file_path.exists());
531
532            let deserialized_object: TestStruct =
533                serde_json::from_reader(File::open(unpacked_file_path).unwrap()).unwrap();
534            assert_eq!(object, deserialized_object);
535        }
536
537        #[test]
538        fn appended_entry_have_read_write_permissions_and_time_metadata() {
539            let test_dir = temp_dir_create!();
540            let object = TestStruct {
541                field1: "test".to_string(),
542                field2: 42,
543            };
544            let location_in_archive = PathBuf::from("folder").join("test.json");
545            let start_time_stamp = chrono::Utc::now().timestamp() as u64;
546
547            let data_appender =
548                AppenderData::from_json(location_in_archive.clone(), &object).unwrap();
549            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
550            let archive = file_archiver
551                .archive(
552                    ArchiveParameters {
553                        archive_name_without_extension: "archive".to_string(),
554                        target_directory: test_dir.clone(),
555                        compression_algorithm: CompressionAlgorithm::Gzip,
556                    },
557                    data_appender,
558                )
559                .unwrap();
560
561            let archive_file = File::open(archive.get_file_path()).unwrap();
562            let mut archive = tar::Archive::new(GzDecoder::new(archive_file));
563            let mut archive_entries = archive.entries().unwrap();
564            let appended_entry = archive_entries.next().unwrap().unwrap();
565
566            assert_eq!(
567                READ_WRITE_PERMISSION,
568                appended_entry.header().mode().unwrap()
569            );
570            let mtime = appended_entry.header().mtime().unwrap();
571            assert!(
572                mtime >= start_time_stamp,
573                "entry mtime should be greater than or equal to the timestamp before the archive \
574                creation:\n {mtime} < {start_time_stamp}"
575            );
576        }
577
578        #[test]
579        fn compute_uncompressed_size() {
580            let object = TestStruct {
581                field1: "test".to_string(),
582                field2: 42,
583            };
584
585            let data_appender =
586                AppenderData::from_json(PathBuf::from("whatever.json"), &object).unwrap();
587
588            let expected_size = serde_json::to_vec(&object).unwrap().len() as u64;
589            let entry_size = data_appender.compute_uncompressed_data_size().unwrap();
590            assert_eq!(expected_size, entry_size);
591        }
592    }
593
594    mod chain_appender {
595        use super::*;
596
597        #[test]
598        fn chain_non_overlapping_appenders() {
599            let test_dir = temp_dir_create!();
600            let file_to_archive = create_file(&test_dir, "test_file.txt");
601            let json_location_in_archive = PathBuf::from("folder").join("test.json");
602
603            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
604            let archive = file_archiver
605                .archive(
606                    ArchiveParameters {
607                        archive_name_without_extension: "archive".to_string(),
608                        target_directory: test_dir.clone(),
609                        compression_algorithm: CompressionAlgorithm::Gzip,
610                    },
611                    ChainAppender::new(
612                        AppenderFile::append_at_archive_root(test_dir.join(&file_to_archive))
613                            .unwrap(),
614                        AppenderData::from_json(json_location_in_archive.clone(), &"test").unwrap(),
615                    ),
616                )
617                .unwrap();
618
619            let unpack_path = archive.unpack_gzip(&test_dir);
620
621            assert!(unpack_path.join(file_to_archive).exists());
622            assert!(unpack_path.join(json_location_in_archive).exists());
623        }
624
625        #[test]
626        fn chain_overlapping_appenders_data_from_right_appender_overwrite_left_appender_data() {
627            let test_dir = temp_dir_create!();
628            let json_location_in_archive = PathBuf::from("test.json");
629
630            let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
631            let archive = file_archiver
632                .archive(
633                    ArchiveParameters {
634                        archive_name_without_extension: "archive".to_string(),
635                        target_directory: test_dir.clone(),
636                        compression_algorithm: CompressionAlgorithm::Gzip,
637                    },
638                    ChainAppender::new(
639                        AppenderData::from_json(
640                            json_location_in_archive.clone(),
641                            &"will be overwritten",
642                        )
643                        .unwrap(),
644                        AppenderData::from_json(json_location_in_archive.clone(), &"test").unwrap(),
645                    ),
646                )
647                .unwrap();
648
649            let unpack_path = archive.unpack_gzip(&test_dir);
650            let unpacked_json_path = unpack_path.join(&json_location_in_archive);
651
652            let deserialized_object: String =
653                serde_json::from_reader(File::open(&unpacked_json_path).unwrap()).unwrap();
654            assert_eq!("test", deserialized_object);
655        }
656
657        #[test]
658        fn compute_non_overlapping_uncompressed_size() {
659            let left_appender =
660                AppenderData::from_json(PathBuf::from("whatever1.json"), &"foo").unwrap();
661            let right_appender =
662                AppenderData::from_json(PathBuf::from("whatever2.json"), &"bar").unwrap();
663
664            let expected_size = left_appender.compute_uncompressed_data_size().unwrap()
665                + right_appender.compute_uncompressed_data_size().unwrap();
666
667            let chain_appender = left_appender.chain(right_appender);
668            let size = chain_appender.compute_uncompressed_data_size().unwrap();
669            assert_eq!(expected_size, size);
670        }
671
672        #[test]
673        fn compute_uncompressed_size_cant_discriminate_overlaps_and_return_aggregated_appenders_sizes()
674         {
675            let overlapping_path = PathBuf::from("whatever.json");
676            let left_appender =
677                AppenderData::from_json(overlapping_path.clone(), &"overwritten data").unwrap();
678            let right_appender =
679                AppenderData::from_json(overlapping_path.clone(), &"final data").unwrap();
680
681            let expected_size = left_appender.compute_uncompressed_data_size().unwrap()
682                + right_appender.compute_uncompressed_data_size().unwrap();
683
684            let chain_appender = left_appender.chain(right_appender);
685            let size = chain_appender.compute_uncompressed_data_size().unwrap();
686            assert_eq!(expected_size, size);
687        }
688    }
689}