mithril_aggregator/tools/file_archiver/
api.rs

1use anyhow::{anyhow, Context};
2use flate2::{read::GzDecoder, write::GzEncoder, Compression};
3use slog::{info, warn, Logger};
4use std::{
5    fs,
6    fs::File,
7    io::{Read, Seek, SeekFrom},
8    path::{Path, PathBuf},
9};
10use tar::{Archive, Entry, EntryType};
11use zstd::{Decoder, Encoder};
12
13use mithril_common::entities::CompressionAlgorithm;
14use mithril_common::logging::LoggerExtensions;
15use mithril_common::StdResult;
16
17use crate::tools::file_size;
18use crate::ZstandardCompressionParameters;
19
20use super::appender::TarAppender;
21use super::{ArchiveParameters, FileArchive};
22
23/// Tool to archive files and directories.
24pub struct FileArchiver {
25    zstandard_compression_parameter: ZstandardCompressionParameters,
26    // Temporary directory to  the unpacked archive for verification
27    verification_temp_dir: PathBuf,
28    logger: Logger,
29}
30
31impl FileArchiver {
32    /// Constructs a new `FileArchiver`.
33    pub fn new(
34        zstandard_compression_parameter: ZstandardCompressionParameters,
35        verification_temp_dir: PathBuf,
36        logger: Logger,
37    ) -> Self {
38        Self {
39            zstandard_compression_parameter,
40            verification_temp_dir,
41            logger: logger.new_with_component_name::<Self>(),
42        }
43    }
44
45    #[cfg(test)]
46    pub fn new_for_test(verification_temp_dir: PathBuf) -> Self {
47        use crate::test_tools::TestLogger;
48        Self {
49            zstandard_compression_parameter: ZstandardCompressionParameters::default(),
50            verification_temp_dir,
51            logger: TestLogger::stdout(),
52        }
53    }
54
55    /// Archive the content of a directory.
56    pub fn archive<T: TarAppender>(
57        &self,
58        parameters: ArchiveParameters,
59        appender: T,
60    ) -> StdResult<FileArchive> {
61        fs::create_dir_all(&parameters.target_directory).with_context(|| {
62            format!(
63                "FileArchiver can not create archive directory: '{}'",
64                parameters.target_directory.display()
65            )
66        })?;
67
68        let target_path = parameters.target_path();
69        let temporary_archive_path = parameters.temporary_archive_path();
70
71        let temporary_file_archive = self
72            .create_and_verify_archive(&temporary_archive_path, appender, parameters.compression_algorithm)
73            .inspect_err(|_err| {
74                if temporary_archive_path.exists() {
75                    if let Err(remove_error) = fs::remove_file(&temporary_archive_path) {
76                        warn!(
77                            self.logger, " > Post FileArchiver.archive failure, could not remove temporary archive";
78                            "archive_path" => temporary_archive_path.display(),
79                            "error" => remove_error
80                        );
81                    }
82                }
83            })
84            .with_context(|| {
85                format!(
86                    "FileArchiver can not create and verify archive: '{}'",
87                    target_path.display()
88                )
89            })?;
90
91        fs::rename(&temporary_archive_path, &target_path).with_context(|| {
92            format!(
93                "FileArchiver can not rename temporary archive: '{}' to final archive: '{}'",
94                temporary_archive_path.display(),
95                target_path.display()
96            )
97        })?;
98
99        Ok(FileArchive {
100            filepath: target_path,
101            ..temporary_file_archive
102        })
103    }
104
105    fn create_and_verify_archive<T: TarAppender>(
106        &self,
107        archive_path: &Path,
108        appender: T,
109        compression_algorithm: CompressionAlgorithm,
110    ) -> StdResult<FileArchive> {
111        let file_archive = self
112            .create_archive(archive_path, appender, compression_algorithm)
113            .with_context(|| {
114                format!(
115                    "FileArchiver can not create archive with path: '{}''",
116                    archive_path.display()
117                )
118            })?;
119        self.verify_archive(&file_archive).with_context(|| {
120            format!(
121                "FileArchiver can not verify archive with path: '{}''",
122                archive_path.display()
123            )
124        })?;
125
126        Ok(file_archive)
127    }
128
129    fn create_archive<T: TarAppender>(
130        &self,
131        archive_path: &Path,
132        appender: T,
133        compression_algorithm: CompressionAlgorithm,
134    ) -> StdResult<FileArchive> {
135        info!(
136            self.logger,
137            "Archiving content to archive: '{}'",
138            archive_path.display()
139        );
140
141        let tar_file = File::create(archive_path).with_context(|| {
142            format!("Error while creating the archive with path: {archive_path:?}")
143        })?;
144
145        match compression_algorithm {
146            CompressionAlgorithm::Gzip => {
147                let enc = GzEncoder::new(tar_file, Compression::default());
148                let mut tar = tar::Builder::new(enc);
149
150                appender
151                    .append(&mut tar)
152                    .with_context(|| "GzEncoder Builder failed to append content")?;
153
154                let mut gz = tar
155                    .into_inner()
156                    .with_context(|| "GzEncoder Builder can not write the archive")?;
157                gz.try_finish()
158                    .with_context(|| "GzEncoder can not finish the output stream after writing")?;
159            }
160            CompressionAlgorithm::Zstandard => {
161                let mut enc = Encoder::new(tar_file, self.zstandard_compression_parameter.level)?;
162                enc.multithread(self.zstandard_compression_parameter.number_of_workers)
163                    .with_context(|| "ZstandardEncoder can not set the number of workers")?;
164                let mut tar = tar::Builder::new(enc);
165
166                appender
167                    .append(&mut tar)
168                    .with_context(|| "ZstandardEncoder Builder failed to append content")?;
169
170                let zstd = tar
171                    .into_inner()
172                    .with_context(|| "ZstandardEncoder Builder can not write the archive")?;
173                zstd.finish().with_context(|| {
174                    "ZstandardEncoder can not finish the output stream after writing"
175                })?;
176            }
177        }
178
179        let uncompressed_size = appender.compute_uncompressed_data_size().with_context(|| {
180            format!(
181                "FileArchiver can not get the size of the uncompressed data to archive: '{}'",
182                archive_path.display()
183            )
184        })?;
185        let archive_filesize =
186            file_size::compute_size_of_path(archive_path).with_context(|| {
187                format!(
188                    "FileArchiver can not get file size of archive with path: '{}'",
189                    archive_path.display()
190                )
191            })?;
192
193        Ok(FileArchive {
194            filepath: archive_path.to_path_buf(),
195            archive_filesize,
196            uncompressed_size,
197            compression_algorithm,
198        })
199    }
200
201    /// Verify the integrity of the archive.
202    fn verify_archive(&self, archive: &FileArchive) -> StdResult<()> {
203        info!(
204            self.logger,
205            "Verifying archive: {}",
206            archive.filepath.display()
207        );
208
209        let mut archive_file_tar = File::open(&archive.filepath).with_context(|| {
210            format!(
211                "Verify archive error: can not open archive: '{}'",
212                archive.filepath.display()
213            )
214        })?;
215        archive_file_tar.seek(SeekFrom::Start(0))?;
216
217        let mut tar_archive: Archive<Box<dyn Read>> = match archive.compression_algorithm {
218            CompressionAlgorithm::Gzip => {
219                let archive_decoder = GzDecoder::new(archive_file_tar);
220                Archive::new(Box::new(archive_decoder))
221            }
222            CompressionAlgorithm::Zstandard => {
223                let archive_decoder = Decoder::new(archive_file_tar)?;
224                Archive::new(Box::new(archive_decoder))
225            }
226        };
227
228        let unpack_temp_dir = self
229            .verification_temp_dir
230            // Add the archive name to the directory to allow two verifications at the same time
231            .join(archive.filepath.file_name().ok_or(anyhow!(
232                "Verify archive error: Could not append archive name to temp directory: archive `{}`",
233                archive.filepath.display(),
234            ))?);
235
236        fs::create_dir_all(&unpack_temp_dir).with_context(|| {
237            format!(
238                "Verify archive error: Could not create directory `{}`",
239                unpack_temp_dir.display(),
240            )
241        })?;
242
243        let unpack_temp_file = &unpack_temp_dir.join("unpack.tmp");
244
245        let verify_result = {
246            let mut result = Ok(());
247            for e in tar_archive.entries()? {
248                match e {
249                    Err(e) => {
250                        result = Err(anyhow!(e).context("Verify archive error: invalid entry"));
251                        break;
252                    }
253                    Ok(entry) => Self::unpack_and_delete_file_from_entry(entry, unpack_temp_file)?,
254                };
255            }
256            result
257        };
258
259        // Always remove the temp directory
260        fs::remove_dir_all(&unpack_temp_dir).with_context(|| {
261            format!(
262                "Verify archive error: Could not remove directory `{}`",
263                unpack_temp_dir.display()
264            )
265        })?;
266
267        verify_result
268    }
269
270    // Helper to unpack and delete a file from en entry, for archive verification purpose
271    fn unpack_and_delete_file_from_entry<R: Read>(
272        entry: Entry<R>,
273        unpack_file_path: &Path,
274    ) -> StdResult<()> {
275        if entry.header().entry_type() != EntryType::Directory {
276            let mut file = entry;
277            let _ = file
278                .unpack(unpack_file_path)
279                .with_context(|| "can't unpack entry")?;
280
281            fs::remove_file(unpack_file_path).with_context(|| {
282                format!(
283                    "can't remove temporary unpacked file, file path: `{}`",
284                    unpack_file_path.display()
285                )
286            })?;
287        }
288
289        Ok(())
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use std::fs::File;
296
297    use mithril_common::test_utils::assert_equivalent;
298
299    use crate::tools::file_archiver::appender::{AppenderDirAll, AppenderFile};
300    use crate::tools::file_archiver::test_tools::*;
301
302    use super::*;
303
304    fn list_remaining_files(test_dir: &Path) -> Vec<String> {
305        fs::read_dir(test_dir)
306            .unwrap()
307            .map(|f| f.unwrap().file_name().to_str().unwrap().to_owned())
308            .collect()
309    }
310
311    #[test]
312    fn should_create_a_valid_archive_with_gzip_compression() {
313        let test_dir = get_test_directory("should_create_a_valid_archive_with_gzip_compression");
314        let target_archive = test_dir.join("archive.tar.gz");
315        let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
316        create_file(&archived_directory, "file_to_archive.txt");
317
318        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
319
320        let archive = file_archiver
321            .create_archive(
322                &target_archive,
323                AppenderDirAll::new(archived_directory),
324                CompressionAlgorithm::Gzip,
325            )
326            .expect("create_archive should not fail");
327        file_archiver
328            .verify_archive(&archive)
329            .expect("verify_archive should not fail");
330    }
331
332    #[test]
333    fn should_create_a_valid_archive_with_zstandard_compression() {
334        let test_dir =
335            get_test_directory("should_create_a_valid_archive_with_zstandard_compression");
336        let target_archive = test_dir.join("archive.tar.zst");
337        let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
338        create_file(&archived_directory, "file_to_archive.txt");
339
340        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
341
342        let archive = file_archiver
343            .create_archive(
344                &target_archive,
345                AppenderDirAll::new(archived_directory),
346                CompressionAlgorithm::Zstandard,
347            )
348            .expect("create_archive should not fail");
349        file_archiver
350            .verify_archive(&archive)
351            .expect("verify_archive should not fail");
352    }
353
354    #[test]
355    fn should_delete_tmp_file_in_target_directory_if_archiving_fail() {
356        let test_dir =
357            get_test_directory("should_delete_tmp_file_in_target_directory_if_archiving_fail");
358        // Note: the archived directory does not exist in order to make the archive process fail
359        let archived_directory = test_dir.join("db");
360
361        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
362
363        // this file should not be deleted by the archive creation
364        File::create(test_dir.join("other-process.file")).unwrap();
365
366        let archive_params = ArchiveParameters {
367            archive_name_without_extension: "archive".to_string(),
368            target_directory: test_dir.clone(),
369            compression_algorithm: CompressionAlgorithm::Gzip,
370        };
371        let _ = file_archiver
372            .archive(archive_params, AppenderDirAll::new(archived_directory))
373            .expect_err("FileArchiver::archive should fail if the target path doesn't exist.");
374
375        let remaining_files: Vec<String> = list_remaining_files(&test_dir);
376        assert_eq!(vec!["other-process.file".to_string()], remaining_files);
377    }
378
379    #[test]
380    fn should_not_delete_an_already_existing_archive_with_same_name_if_archiving_fail() {
381        let test_dir = get_test_directory(
382            "should_not_delete_an_already_existing_archive_with_same_name_if_archiving_fail",
383        );
384        // Note: the archived directory does not exist in order to make the archive process fail
385        let archived_directory = test_dir.join("db");
386
387        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
388
389        // this file should not be deleted by the archive creation
390        create_file(&test_dir, "other-process.file");
391        create_file(&test_dir, "archive.tar.gz");
392        // an already existing temporary archive file should be deleted
393        create_file(&test_dir, "archive.tar.tmp");
394
395        let archive_params = ArchiveParameters {
396            archive_name_without_extension: "archive".to_string(),
397            target_directory: test_dir.clone(),
398            compression_algorithm: CompressionAlgorithm::Gzip,
399        };
400        let _ = file_archiver
401            .archive(archive_params, AppenderDirAll::new(archived_directory))
402            .expect_err("FileArchiver::archive should fail if the db is empty.");
403        let remaining_files: Vec<String> = list_remaining_files(&test_dir);
404
405        assert_equivalent(
406            vec![
407                "other-process.file".to_string(),
408                "archive.tar.gz".to_string(),
409            ],
410            remaining_files,
411        );
412    }
413
414    #[test]
415    fn overwrite_already_existing_archive_when_archiving_succeed() {
416        let test_dir =
417            get_test_directory("overwrite_already_existing_archive_when_archiving_succeed");
418        let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
419
420        create_file(&archived_directory, "file_to_archive.txt");
421
422        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
423
424        let archive_params = ArchiveParameters {
425            archive_name_without_extension: "archive".to_string(),
426            target_directory: test_dir.clone(),
427            compression_algorithm: CompressionAlgorithm::Gzip,
428        };
429        let first_archive = file_archiver
430            .archive(
431                archive_params.clone(),
432                AppenderDirAll::new(archived_directory.clone()),
433            )
434            .unwrap();
435        let first_archive_size = first_archive.get_archive_size();
436
437        create_file(&archived_directory, "another_file_to_archive.txt");
438
439        let second_archive = file_archiver
440            .archive(archive_params, AppenderDirAll::new(archived_directory))
441            .unwrap();
442        let second_archive_size = second_archive.get_archive_size();
443
444        assert_ne!(first_archive_size, second_archive_size);
445
446        let unpack_path = second_archive.unpack_gzip(&test_dir);
447        assert!(unpack_path.join("another_file_to_archive.txt").exists());
448    }
449
450    #[test]
451    fn compute_size_of_uncompressed_data_and_archive() {
452        let test_dir = get_test_directory("compute_size_of_uncompressed_data_and_archive");
453
454        let file_path = test_dir.join("file.txt");
455        let file = File::create(&file_path).unwrap();
456        file.set_len(777).unwrap();
457
458        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
459
460        let archive_params = ArchiveParameters {
461            archive_name_without_extension: "archive".to_string(),
462            target_directory: test_dir.clone(),
463            compression_algorithm: CompressionAlgorithm::Gzip,
464        };
465        let archive = file_archiver
466            .archive(
467                archive_params.clone(),
468                AppenderFile::append_at_archive_root(file_path.clone()).unwrap(),
469            )
470            .unwrap();
471
472        let expected_archive_size = file_size::compute_size_of_path(&archive.filepath).unwrap();
473        assert_eq!(expected_archive_size, archive.get_archive_size(),);
474        assert_eq!(777, archive.get_uncompressed_size());
475    }
476}