mithril_aggregator/tools/file_archiver/
api.rs1use anyhow::{anyhow, Context};
2use flate2::{read::GzDecoder, write::GzEncoder, Compression};
3use slog::{info, warn, Logger};
4use std::{
5 fs,
6 fs::File,
7 io::{Read, Seek, SeekFrom},
8 path::{Path, PathBuf},
9};
10use tar::{Archive, Entry, EntryType};
11use zstd::{Decoder, Encoder};
12
13use mithril_common::entities::CompressionAlgorithm;
14use mithril_common::logging::LoggerExtensions;
15use mithril_common::StdResult;
16
17use crate::tools::file_size;
18use crate::ZstandardCompressionParameters;
19
20use super::appender::TarAppender;
21use super::{ArchiveParameters, FileArchive};
22
23pub struct FileArchiver {
25 zstandard_compression_parameter: ZstandardCompressionParameters,
26 verification_temp_dir: PathBuf,
28 logger: Logger,
29}
30
31impl FileArchiver {
32 pub fn new(
34 zstandard_compression_parameter: ZstandardCompressionParameters,
35 verification_temp_dir: PathBuf,
36 logger: Logger,
37 ) -> Self {
38 Self {
39 zstandard_compression_parameter,
40 verification_temp_dir,
41 logger: logger.new_with_component_name::<Self>(),
42 }
43 }
44
45 #[cfg(test)]
46 pub fn new_for_test(verification_temp_dir: PathBuf) -> Self {
47 use crate::test_tools::TestLogger;
48 Self {
49 zstandard_compression_parameter: ZstandardCompressionParameters::default(),
50 verification_temp_dir,
51 logger: TestLogger::stdout(),
52 }
53 }
54
55 pub fn archive<T: TarAppender>(
57 &self,
58 parameters: ArchiveParameters,
59 appender: T,
60 ) -> StdResult<FileArchive> {
61 fs::create_dir_all(¶meters.target_directory).with_context(|| {
62 format!(
63 "FileArchiver can not create archive directory: '{}'",
64 parameters.target_directory.display()
65 )
66 })?;
67
68 let target_path = parameters.target_path();
69 let temporary_archive_path = parameters.temporary_archive_path();
70
71 let temporary_file_archive = self
72 .create_and_verify_archive(&temporary_archive_path, appender, parameters.compression_algorithm)
73 .inspect_err(|_err| {
74 if temporary_archive_path.exists() {
75 if let Err(remove_error) = fs::remove_file(&temporary_archive_path) {
76 warn!(
77 self.logger, " > Post FileArchiver.archive failure, could not remove temporary archive";
78 "archive_path" => temporary_archive_path.display(),
79 "error" => remove_error
80 );
81 }
82 }
83 })
84 .with_context(|| {
85 format!(
86 "FileArchiver can not create and verify archive: '{}'",
87 target_path.display()
88 )
89 })?;
90
91 fs::rename(&temporary_archive_path, &target_path).with_context(|| {
92 format!(
93 "FileArchiver can not rename temporary archive: '{}' to final archive: '{}'",
94 temporary_archive_path.display(),
95 target_path.display()
96 )
97 })?;
98
99 Ok(FileArchive {
100 filepath: target_path,
101 ..temporary_file_archive
102 })
103 }
104
105 fn create_and_verify_archive<T: TarAppender>(
106 &self,
107 archive_path: &Path,
108 appender: T,
109 compression_algorithm: CompressionAlgorithm,
110 ) -> StdResult<FileArchive> {
111 let file_archive = self
112 .create_archive(archive_path, appender, compression_algorithm)
113 .with_context(|| {
114 format!(
115 "FileArchiver can not create archive with path: '{}''",
116 archive_path.display()
117 )
118 })?;
119 self.verify_archive(&file_archive).with_context(|| {
120 format!(
121 "FileArchiver can not verify archive with path: '{}''",
122 archive_path.display()
123 )
124 })?;
125
126 Ok(file_archive)
127 }
128
129 fn create_archive<T: TarAppender>(
130 &self,
131 archive_path: &Path,
132 appender: T,
133 compression_algorithm: CompressionAlgorithm,
134 ) -> StdResult<FileArchive> {
135 info!(
136 self.logger,
137 "Archiving content to archive: '{}'",
138 archive_path.display()
139 );
140
141 let tar_file = File::create(archive_path).with_context(|| {
142 format!("Error while creating the archive with path: {archive_path:?}")
143 })?;
144
145 match compression_algorithm {
146 CompressionAlgorithm::Gzip => {
147 let enc = GzEncoder::new(tar_file, Compression::default());
148 let mut tar = tar::Builder::new(enc);
149
150 appender
151 .append(&mut tar)
152 .with_context(|| "GzEncoder Builder failed to append content")?;
153
154 let mut gz = tar
155 .into_inner()
156 .with_context(|| "GzEncoder Builder can not write the archive")?;
157 gz.try_finish()
158 .with_context(|| "GzEncoder can not finish the output stream after writing")?;
159 }
160 CompressionAlgorithm::Zstandard => {
161 let mut enc = Encoder::new(tar_file, self.zstandard_compression_parameter.level)?;
162 enc.multithread(self.zstandard_compression_parameter.number_of_workers)
163 .with_context(|| "ZstandardEncoder can not set the number of workers")?;
164 let mut tar = tar::Builder::new(enc);
165
166 appender
167 .append(&mut tar)
168 .with_context(|| "ZstandardEncoder Builder failed to append content")?;
169
170 let zstd = tar
171 .into_inner()
172 .with_context(|| "ZstandardEncoder Builder can not write the archive")?;
173 zstd.finish().with_context(|| {
174 "ZstandardEncoder can not finish the output stream after writing"
175 })?;
176 }
177 }
178
179 let uncompressed_size = appender.compute_uncompressed_data_size().with_context(|| {
180 format!(
181 "FileArchiver can not get the size of the uncompressed data to archive: '{}'",
182 archive_path.display()
183 )
184 })?;
185 let archive_filesize =
186 file_size::compute_size_of_path(archive_path).with_context(|| {
187 format!(
188 "FileArchiver can not get file size of archive with path: '{}'",
189 archive_path.display()
190 )
191 })?;
192
193 Ok(FileArchive {
194 filepath: archive_path.to_path_buf(),
195 archive_filesize,
196 uncompressed_size,
197 compression_algorithm,
198 })
199 }
200
201 fn verify_archive(&self, archive: &FileArchive) -> StdResult<()> {
203 info!(
204 self.logger,
205 "Verifying archive: {}",
206 archive.filepath.display()
207 );
208
209 let mut archive_file_tar = File::open(&archive.filepath).with_context(|| {
210 format!(
211 "Verify archive error: can not open archive: '{}'",
212 archive.filepath.display()
213 )
214 })?;
215 archive_file_tar.seek(SeekFrom::Start(0))?;
216
217 let mut tar_archive: Archive<Box<dyn Read>> = match archive.compression_algorithm {
218 CompressionAlgorithm::Gzip => {
219 let archive_decoder = GzDecoder::new(archive_file_tar);
220 Archive::new(Box::new(archive_decoder))
221 }
222 CompressionAlgorithm::Zstandard => {
223 let archive_decoder = Decoder::new(archive_file_tar)?;
224 Archive::new(Box::new(archive_decoder))
225 }
226 };
227
228 let unpack_temp_dir = self
229 .verification_temp_dir
230 .join(archive.filepath.file_name().ok_or(anyhow!(
232 "Verify archive error: Could not append archive name to temp directory: archive `{}`",
233 archive.filepath.display(),
234 ))?);
235
236 fs::create_dir_all(&unpack_temp_dir).with_context(|| {
237 format!(
238 "Verify archive error: Could not create directory `{}`",
239 unpack_temp_dir.display(),
240 )
241 })?;
242
243 let unpack_temp_file = &unpack_temp_dir.join("unpack.tmp");
244
245 let verify_result = {
246 let mut result = Ok(());
247 for e in tar_archive.entries()? {
248 match e {
249 Err(e) => {
250 result = Err(anyhow!(e).context("Verify archive error: invalid entry"));
251 break;
252 }
253 Ok(entry) => Self::unpack_and_delete_file_from_entry(entry, unpack_temp_file)?,
254 };
255 }
256 result
257 };
258
259 fs::remove_dir_all(&unpack_temp_dir).with_context(|| {
261 format!(
262 "Verify archive error: Could not remove directory `{}`",
263 unpack_temp_dir.display()
264 )
265 })?;
266
267 verify_result
268 }
269
270 fn unpack_and_delete_file_from_entry<R: Read>(
272 entry: Entry<R>,
273 unpack_file_path: &Path,
274 ) -> StdResult<()> {
275 if entry.header().entry_type() != EntryType::Directory {
276 let mut file = entry;
277 let _ = file
278 .unpack(unpack_file_path)
279 .with_context(|| "can't unpack entry")?;
280
281 fs::remove_file(unpack_file_path).with_context(|| {
282 format!(
283 "can't remove temporary unpacked file, file path: `{}`",
284 unpack_file_path.display()
285 )
286 })?;
287 }
288
289 Ok(())
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use std::fs::File;
296
297 use mithril_common::test_utils::assert_equivalent;
298
299 use crate::tools::file_archiver::appender::{AppenderDirAll, AppenderFile};
300 use crate::tools::file_archiver::test_tools::*;
301
302 use super::*;
303
304 fn list_remaining_files(test_dir: &Path) -> Vec<String> {
305 fs::read_dir(test_dir)
306 .unwrap()
307 .map(|f| f.unwrap().file_name().to_str().unwrap().to_owned())
308 .collect()
309 }
310
311 #[test]
312 fn should_create_a_valid_archive_with_gzip_compression() {
313 let test_dir = get_test_directory("should_create_a_valid_archive_with_gzip_compression");
314 let target_archive = test_dir.join("archive.tar.gz");
315 let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
316 create_file(&archived_directory, "file_to_archive.txt");
317
318 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
319
320 let archive = file_archiver
321 .create_archive(
322 &target_archive,
323 AppenderDirAll::new(archived_directory),
324 CompressionAlgorithm::Gzip,
325 )
326 .expect("create_archive should not fail");
327 file_archiver
328 .verify_archive(&archive)
329 .expect("verify_archive should not fail");
330 }
331
332 #[test]
333 fn should_create_a_valid_archive_with_zstandard_compression() {
334 let test_dir =
335 get_test_directory("should_create_a_valid_archive_with_zstandard_compression");
336 let target_archive = test_dir.join("archive.tar.zst");
337 let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
338 create_file(&archived_directory, "file_to_archive.txt");
339
340 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
341
342 let archive = file_archiver
343 .create_archive(
344 &target_archive,
345 AppenderDirAll::new(archived_directory),
346 CompressionAlgorithm::Zstandard,
347 )
348 .expect("create_archive should not fail");
349 file_archiver
350 .verify_archive(&archive)
351 .expect("verify_archive should not fail");
352 }
353
354 #[test]
355 fn should_delete_tmp_file_in_target_directory_if_archiving_fail() {
356 let test_dir =
357 get_test_directory("should_delete_tmp_file_in_target_directory_if_archiving_fail");
358 let archived_directory = test_dir.join("db");
360
361 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
362
363 File::create(test_dir.join("other-process.file")).unwrap();
365
366 let archive_params = ArchiveParameters {
367 archive_name_without_extension: "archive".to_string(),
368 target_directory: test_dir.clone(),
369 compression_algorithm: CompressionAlgorithm::Gzip,
370 };
371 let _ = file_archiver
372 .archive(archive_params, AppenderDirAll::new(archived_directory))
373 .expect_err("FileArchiver::archive should fail if the target path doesn't exist.");
374
375 let remaining_files: Vec<String> = list_remaining_files(&test_dir);
376 assert_eq!(vec!["other-process.file".to_string()], remaining_files);
377 }
378
379 #[test]
380 fn should_not_delete_an_already_existing_archive_with_same_name_if_archiving_fail() {
381 let test_dir = get_test_directory(
382 "should_not_delete_an_already_existing_archive_with_same_name_if_archiving_fail",
383 );
384 let archived_directory = test_dir.join("db");
386
387 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
388
389 create_file(&test_dir, "other-process.file");
391 create_file(&test_dir, "archive.tar.gz");
392 create_file(&test_dir, "archive.tar.tmp");
394
395 let archive_params = ArchiveParameters {
396 archive_name_without_extension: "archive".to_string(),
397 target_directory: test_dir.clone(),
398 compression_algorithm: CompressionAlgorithm::Gzip,
399 };
400 let _ = file_archiver
401 .archive(archive_params, AppenderDirAll::new(archived_directory))
402 .expect_err("FileArchiver::archive should fail if the db is empty.");
403 let remaining_files: Vec<String> = list_remaining_files(&test_dir);
404
405 assert_equivalent(
406 vec![
407 "other-process.file".to_string(),
408 "archive.tar.gz".to_string(),
409 ],
410 remaining_files,
411 );
412 }
413
414 #[test]
415 fn overwrite_already_existing_archive_when_archiving_succeed() {
416 let test_dir =
417 get_test_directory("overwrite_already_existing_archive_when_archiving_succeed");
418 let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
419
420 create_file(&archived_directory, "file_to_archive.txt");
421
422 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
423
424 let archive_params = ArchiveParameters {
425 archive_name_without_extension: "archive".to_string(),
426 target_directory: test_dir.clone(),
427 compression_algorithm: CompressionAlgorithm::Gzip,
428 };
429 let first_archive = file_archiver
430 .archive(
431 archive_params.clone(),
432 AppenderDirAll::new(archived_directory.clone()),
433 )
434 .unwrap();
435 let first_archive_size = first_archive.get_archive_size();
436
437 create_file(&archived_directory, "another_file_to_archive.txt");
438
439 let second_archive = file_archiver
440 .archive(archive_params, AppenderDirAll::new(archived_directory))
441 .unwrap();
442 let second_archive_size = second_archive.get_archive_size();
443
444 assert_ne!(first_archive_size, second_archive_size);
445
446 let unpack_path = second_archive.unpack_gzip(&test_dir);
447 assert!(unpack_path.join("another_file_to_archive.txt").exists());
448 }
449
450 #[test]
451 fn compute_size_of_uncompressed_data_and_archive() {
452 let test_dir = get_test_directory("compute_size_of_uncompressed_data_and_archive");
453
454 let file_path = test_dir.join("file.txt");
455 let file = File::create(&file_path).unwrap();
456 file.set_len(777).unwrap();
457
458 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
459
460 let archive_params = ArchiveParameters {
461 archive_name_without_extension: "archive".to_string(),
462 target_directory: test_dir.clone(),
463 compression_algorithm: CompressionAlgorithm::Gzip,
464 };
465 let archive = file_archiver
466 .archive(
467 archive_params.clone(),
468 AppenderFile::append_at_archive_root(file_path.clone()).unwrap(),
469 )
470 .unwrap();
471
472 let expected_archive_size = file_size::compute_size_of_path(&archive.filepath).unwrap();
473 assert_eq!(expected_archive_size, archive.get_archive_size(),);
474 assert_eq!(777, archive.get_uncompressed_size());
475 }
476}