mithril_aggregator/tools/file_archiver/
api.rs

1use anyhow::{Context, anyhow};
2use flate2::{Compression, read::GzDecoder, write::GzEncoder};
3use slog::{Logger, info, warn};
4use std::{
5    fs,
6    fs::File,
7    io::{Read, Seek, SeekFrom},
8    path::{Path, PathBuf},
9};
10use tar::{Archive, Entry, EntryType};
11use zstd::{Decoder, Encoder};
12
13use mithril_common::StdResult;
14use mithril_common::entities::CompressionAlgorithm;
15use mithril_common::logging::LoggerExtensions;
16
17use crate::ZstandardCompressionParameters;
18use crate::tools::file_size;
19
20use super::appender::TarAppender;
21use super::{ArchiveParameters, FileArchive};
22
23/// Tool to archive files and directories.
24pub struct FileArchiver {
25    zstandard_compression_parameter: ZstandardCompressionParameters,
26    // Temporary directory to  the unpacked archive for verification
27    verification_temp_dir: PathBuf,
28    logger: Logger,
29}
30
31impl FileArchiver {
32    /// Constructs a new `FileArchiver`.
33    pub fn new(
34        zstandard_compression_parameter: ZstandardCompressionParameters,
35        verification_temp_dir: PathBuf,
36        logger: Logger,
37    ) -> Self {
38        Self {
39            zstandard_compression_parameter,
40            verification_temp_dir,
41            logger: logger.new_with_component_name::<Self>(),
42        }
43    }
44
45    #[cfg(test)]
46    pub fn new_for_test(verification_temp_dir: PathBuf) -> Self {
47        use crate::test::TestLogger;
48        Self {
49            zstandard_compression_parameter: ZstandardCompressionParameters::default(),
50            verification_temp_dir,
51            logger: TestLogger::stdout(),
52        }
53    }
54
55    /// Archive the content of a directory.
56    pub fn archive<T: TarAppender>(
57        &self,
58        parameters: ArchiveParameters,
59        appender: T,
60    ) -> StdResult<FileArchive> {
61        fs::create_dir_all(&parameters.target_directory).with_context(|| {
62            format!(
63                "FileArchiver can not create archive directory: '{}'",
64                parameters.target_directory.display()
65            )
66        })?;
67
68        let target_path = parameters.target_path();
69        let temporary_archive_path = parameters.temporary_archive_path();
70
71        let temporary_file_archive = self
72            .create_and_verify_archive(
73                &temporary_archive_path,
74                appender,
75                parameters.compression_algorithm,
76            )
77            .inspect_err(|_err| {
78                if temporary_archive_path.exists()
79                    && let Err(remove_error) = fs::remove_file(&temporary_archive_path)
80                {
81                    warn!(
82                        self.logger,
83                        " > Post FileArchiver.archive failure, could not remove temporary archive";
84                        "archive_path" => temporary_archive_path.display(),
85                        "error" => remove_error
86                    );
87                }
88            })
89            .with_context(|| {
90                format!(
91                    "FileArchiver can not create and verify archive: '{}'",
92                    target_path.display()
93                )
94            })?;
95
96        fs::rename(&temporary_archive_path, &target_path).with_context(|| {
97            format!(
98                "FileArchiver can not rename temporary archive: '{}' to final archive: '{}'",
99                temporary_archive_path.display(),
100                target_path.display()
101            )
102        })?;
103
104        Ok(FileArchive {
105            filepath: target_path,
106            ..temporary_file_archive
107        })
108    }
109
110    fn create_and_verify_archive<T: TarAppender>(
111        &self,
112        archive_path: &Path,
113        appender: T,
114        compression_algorithm: CompressionAlgorithm,
115    ) -> StdResult<FileArchive> {
116        let file_archive = self
117            .create_archive(archive_path, appender, compression_algorithm)
118            .with_context(|| {
119                format!(
120                    "FileArchiver can not create archive with path: '{}''",
121                    archive_path.display()
122                )
123            })?;
124        self.verify_archive(&file_archive).with_context(|| {
125            format!(
126                "FileArchiver can not verify archive with path: '{}''",
127                archive_path.display()
128            )
129        })?;
130
131        Ok(file_archive)
132    }
133
134    fn create_archive<T: TarAppender>(
135        &self,
136        archive_path: &Path,
137        appender: T,
138        compression_algorithm: CompressionAlgorithm,
139    ) -> StdResult<FileArchive> {
140        info!(
141            self.logger,
142            "Archiving content to archive: '{}'",
143            archive_path.display()
144        );
145
146        let tar_file = File::create(archive_path).with_context(|| {
147            format!("Error while creating the archive with path: {archive_path:?}")
148        })?;
149
150        match compression_algorithm {
151            CompressionAlgorithm::Gzip => {
152                let enc = GzEncoder::new(tar_file, Compression::default());
153                let mut tar = tar::Builder::new(enc);
154
155                appender
156                    .append(&mut tar)
157                    .with_context(|| "GzEncoder Builder failed to append content")?;
158
159                let mut gz = tar
160                    .into_inner()
161                    .with_context(|| "GzEncoder Builder can not write the archive")?;
162                gz.try_finish()
163                    .with_context(|| "GzEncoder can not finish the output stream after writing")?;
164            }
165            CompressionAlgorithm::Zstandard => {
166                let mut enc = Encoder::new(tar_file, self.zstandard_compression_parameter.level)?;
167                enc.multithread(self.zstandard_compression_parameter.number_of_workers)
168                    .with_context(|| "ZstandardEncoder can not set the number of workers")?;
169                let mut tar = tar::Builder::new(enc);
170
171                appender
172                    .append(&mut tar)
173                    .with_context(|| "ZstandardEncoder Builder failed to append content")?;
174
175                let zstd = tar
176                    .into_inner()
177                    .with_context(|| "ZstandardEncoder Builder can not write the archive")?;
178                zstd.finish().with_context(
179                    || "ZstandardEncoder can not finish the output stream after writing",
180                )?;
181            }
182        }
183
184        let uncompressed_size = appender.compute_uncompressed_data_size().with_context(|| {
185            format!(
186                "FileArchiver can not get the size of the uncompressed data to archive: '{}'",
187                archive_path.display()
188            )
189        })?;
190        let archive_filesize =
191            file_size::compute_size_of_path(archive_path).with_context(|| {
192                format!(
193                    "FileArchiver can not get file size of archive with path: '{}'",
194                    archive_path.display()
195                )
196            })?;
197
198        Ok(FileArchive {
199            filepath: archive_path.to_path_buf(),
200            archive_filesize,
201            uncompressed_size,
202            compression_algorithm,
203        })
204    }
205
206    /// Verify the integrity of the archive.
207    fn verify_archive(&self, archive: &FileArchive) -> StdResult<()> {
208        info!(
209            self.logger,
210            "Verifying archive: {}",
211            archive.filepath.display()
212        );
213
214        let mut archive_file_tar = File::open(&archive.filepath).with_context(|| {
215            format!(
216                "Verify archive error: can not open archive: '{}'",
217                archive.filepath.display()
218            )
219        })?;
220        archive_file_tar.seek(SeekFrom::Start(0))?;
221
222        let mut tar_archive: Archive<Box<dyn Read>> = match archive.compression_algorithm {
223            CompressionAlgorithm::Gzip => {
224                let archive_decoder = GzDecoder::new(archive_file_tar);
225                Archive::new(Box::new(archive_decoder))
226            }
227            CompressionAlgorithm::Zstandard => {
228                let archive_decoder = Decoder::new(archive_file_tar)?;
229                Archive::new(Box::new(archive_decoder))
230            }
231        };
232
233        let unpack_temp_dir = self
234            .verification_temp_dir
235            // Add the archive name to the directory to allow two verifications at the same time
236            .join(archive.filepath.file_name().ok_or(anyhow!(
237                "Verify archive error: Could not append archive name to temp directory: archive `{}`",
238                archive.filepath.display(),
239            ))?);
240
241        fs::create_dir_all(&unpack_temp_dir).with_context(|| {
242            format!(
243                "Verify archive error: Could not create directory `{}`",
244                unpack_temp_dir.display(),
245            )
246        })?;
247
248        let unpack_temp_file = &unpack_temp_dir.join("unpack.tmp");
249
250        let verify_result = {
251            let mut result = Ok(());
252            for e in tar_archive.entries()? {
253                match e {
254                    Err(e) => {
255                        result = Err(anyhow!(e).context("Verify archive error: invalid entry"));
256                        break;
257                    }
258                    Ok(entry) => Self::unpack_and_delete_file_from_entry(entry, unpack_temp_file)?,
259                };
260            }
261            result
262        };
263
264        // Always remove the temp directory
265        fs::remove_dir_all(&unpack_temp_dir).with_context(|| {
266            format!(
267                "Verify archive error: Could not remove directory `{}`",
268                unpack_temp_dir.display()
269            )
270        })?;
271
272        verify_result
273    }
274
275    // Helper to unpack and delete a file from en entry, for archive verification purpose
276    fn unpack_and_delete_file_from_entry<R: Read>(
277        entry: Entry<R>,
278        unpack_file_path: &Path,
279    ) -> StdResult<()> {
280        if entry.header().entry_type() != EntryType::Directory {
281            let mut file = entry;
282            let _ = file.unpack(unpack_file_path).with_context(|| "can't unpack entry")?;
283
284            fs::remove_file(unpack_file_path).with_context(|| {
285                format!(
286                    "can't remove temporary unpacked file, file path: `{}`",
287                    unpack_file_path.display()
288                )
289            })?;
290        }
291
292        Ok(())
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use std::fs::File;
299
300    use mithril_common::test::assert_equivalent;
301
302    use crate::tools::file_archiver::appender::{AppenderDirAll, AppenderFile};
303    use crate::tools::file_archiver::test_tools::*;
304
305    use super::*;
306
307    fn list_remaining_files(test_dir: &Path) -> Vec<String> {
308        fs::read_dir(test_dir)
309            .unwrap()
310            .map(|f| f.unwrap().file_name().to_str().unwrap().to_owned())
311            .collect()
312    }
313
314    #[test]
315    fn should_create_a_valid_archive_with_gzip_compression() {
316        let test_dir = get_test_directory("should_create_a_valid_archive_with_gzip_compression");
317        let target_archive = test_dir.join("archive.tar.gz");
318        let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
319        create_file(&archived_directory, "file_to_archive.txt");
320
321        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
322
323        let archive = file_archiver
324            .create_archive(
325                &target_archive,
326                AppenderDirAll::new(archived_directory),
327                CompressionAlgorithm::Gzip,
328            )
329            .expect("create_archive should not fail");
330        file_archiver
331            .verify_archive(&archive)
332            .expect("verify_archive should not fail");
333    }
334
335    #[test]
336    fn should_create_a_valid_archive_with_zstandard_compression() {
337        let test_dir =
338            get_test_directory("should_create_a_valid_archive_with_zstandard_compression");
339        let target_archive = test_dir.join("archive.tar.zst");
340        let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
341        create_file(&archived_directory, "file_to_archive.txt");
342
343        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
344
345        let archive = file_archiver
346            .create_archive(
347                &target_archive,
348                AppenderDirAll::new(archived_directory),
349                CompressionAlgorithm::Zstandard,
350            )
351            .expect("create_archive should not fail");
352        file_archiver
353            .verify_archive(&archive)
354            .expect("verify_archive should not fail");
355    }
356
357    #[test]
358    fn should_delete_tmp_file_in_target_directory_if_archiving_fail() {
359        let test_dir =
360            get_test_directory("should_delete_tmp_file_in_target_directory_if_archiving_fail");
361        // Note: the archived directory does not exist in order to make the archive process fail
362        let archived_directory = test_dir.join("db");
363
364        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
365
366        // this file should not be deleted by the archive creation
367        File::create(test_dir.join("other-process.file")).unwrap();
368
369        let archive_params = ArchiveParameters {
370            archive_name_without_extension: "archive".to_string(),
371            target_directory: test_dir.clone(),
372            compression_algorithm: CompressionAlgorithm::Gzip,
373        };
374        let _ = file_archiver
375            .archive(archive_params, AppenderDirAll::new(archived_directory))
376            .expect_err("FileArchiver::archive should fail if the target path doesn't exist.");
377
378        let remaining_files: Vec<String> = list_remaining_files(&test_dir);
379        assert_eq!(vec!["other-process.file".to_string()], remaining_files);
380    }
381
382    #[test]
383    fn should_not_delete_an_already_existing_archive_with_same_name_if_archiving_fail() {
384        let test_dir = get_test_directory(
385            "should_not_delete_an_already_existing_archive_with_same_name_if_archiving_fail",
386        );
387        // Note: the archived directory does not exist in order to make the archive process fail
388        let archived_directory = test_dir.join("db");
389
390        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
391
392        // this file should not be deleted by the archive creation
393        create_file(&test_dir, "other-process.file");
394        create_file(&test_dir, "archive.tar.gz");
395        // an already existing temporary archive file should be deleted
396        create_file(&test_dir, "archive.tar.tmp");
397
398        let archive_params = ArchiveParameters {
399            archive_name_without_extension: "archive".to_string(),
400            target_directory: test_dir.clone(),
401            compression_algorithm: CompressionAlgorithm::Gzip,
402        };
403        let _ = file_archiver
404            .archive(archive_params, AppenderDirAll::new(archived_directory))
405            .expect_err("FileArchiver::archive should fail if the db is empty.");
406        let remaining_files: Vec<String> = list_remaining_files(&test_dir);
407
408        assert_equivalent!(
409            vec!["other-process.file".to_string(), "archive.tar.gz".to_string()],
410            remaining_files,
411        );
412    }
413
414    #[test]
415    fn overwrite_already_existing_archive_when_archiving_succeed() {
416        let test_dir =
417            get_test_directory("overwrite_already_existing_archive_when_archiving_succeed");
418        let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
419
420        create_file(&archived_directory, "file_to_archive.txt");
421
422        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
423
424        let archive_params = ArchiveParameters {
425            archive_name_without_extension: "archive".to_string(),
426            target_directory: test_dir.clone(),
427            compression_algorithm: CompressionAlgorithm::Gzip,
428        };
429        let first_archive = file_archiver
430            .archive(
431                archive_params.clone(),
432                AppenderDirAll::new(archived_directory.clone()),
433            )
434            .unwrap();
435        let first_archive_size = first_archive.get_archive_size();
436
437        create_file(&archived_directory, "another_file_to_archive.txt");
438
439        let second_archive = file_archiver
440            .archive(archive_params, AppenderDirAll::new(archived_directory))
441            .unwrap();
442        let second_archive_size = second_archive.get_archive_size();
443
444        assert_ne!(first_archive_size, second_archive_size);
445
446        let unpack_path = second_archive.unpack_gzip(&test_dir);
447        assert!(unpack_path.join("another_file_to_archive.txt").exists());
448    }
449
450    #[test]
451    fn compute_size_of_uncompressed_data_and_archive() {
452        let test_dir = get_test_directory("compute_size_of_uncompressed_data_and_archive");
453
454        let file_path = test_dir.join("file.txt");
455        let file = File::create(&file_path).unwrap();
456        file.set_len(777).unwrap();
457
458        let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
459
460        let archive_params = ArchiveParameters {
461            archive_name_without_extension: "archive".to_string(),
462            target_directory: test_dir.clone(),
463            compression_algorithm: CompressionAlgorithm::Gzip,
464        };
465        let archive = file_archiver
466            .archive(
467                archive_params.clone(),
468                AppenderFile::append_at_archive_root(file_path.clone()).unwrap(),
469            )
470            .unwrap();
471
472        let expected_archive_size = file_size::compute_size_of_path(&archive.filepath).unwrap();
473        assert_eq!(expected_archive_size, archive.get_archive_size(),);
474        assert_eq!(777, archive.get_uncompressed_size());
475    }
476}