mithril_aggregator/tools/file_archiver/
api.rs1use anyhow::{Context, anyhow};
2use flate2::{Compression, read::GzDecoder, write::GzEncoder};
3use slog::{Logger, info, warn};
4use std::{
5 fs,
6 fs::File,
7 io::{Read, Seek, SeekFrom},
8 path::{Path, PathBuf},
9};
10use tar::{Archive, Entry, EntryType};
11use zstd::{Decoder, Encoder};
12
13use mithril_common::StdResult;
14use mithril_common::entities::CompressionAlgorithm;
15use mithril_common::logging::LoggerExtensions;
16
17use crate::ZstandardCompressionParameters;
18use crate::tools::file_size;
19
20use super::appender::TarAppender;
21use super::{ArchiveParameters, FileArchive};
22
23pub struct FileArchiver {
25 zstandard_compression_parameter: ZstandardCompressionParameters,
26 verification_temp_dir: PathBuf,
28 logger: Logger,
29}
30
31impl FileArchiver {
32 pub fn new(
34 zstandard_compression_parameter: ZstandardCompressionParameters,
35 verification_temp_dir: PathBuf,
36 logger: Logger,
37 ) -> Self {
38 Self {
39 zstandard_compression_parameter,
40 verification_temp_dir,
41 logger: logger.new_with_component_name::<Self>(),
42 }
43 }
44
45 #[cfg(test)]
46 pub fn new_for_test(verification_temp_dir: PathBuf) -> Self {
47 use crate::test::TestLogger;
48 Self {
49 zstandard_compression_parameter: ZstandardCompressionParameters::default(),
50 verification_temp_dir,
51 logger: TestLogger::stdout(),
52 }
53 }
54
55 pub fn archive<T: TarAppender>(
57 &self,
58 parameters: ArchiveParameters,
59 appender: T,
60 ) -> StdResult<FileArchive> {
61 fs::create_dir_all(¶meters.target_directory).with_context(|| {
62 format!(
63 "FileArchiver can not create archive directory: '{}'",
64 parameters.target_directory.display()
65 )
66 })?;
67
68 let target_path = parameters.target_path();
69 let temporary_archive_path = parameters.temporary_archive_path();
70
71 let temporary_file_archive = self
72 .create_and_verify_archive(
73 &temporary_archive_path,
74 appender,
75 parameters.compression_algorithm,
76 )
77 .inspect_err(|_err| {
78 if temporary_archive_path.exists()
79 && let Err(remove_error) = fs::remove_file(&temporary_archive_path)
80 {
81 warn!(
82 self.logger,
83 " > Post FileArchiver.archive failure, could not remove temporary archive";
84 "archive_path" => temporary_archive_path.display(),
85 "error" => remove_error
86 );
87 }
88 })
89 .with_context(|| {
90 format!(
91 "FileArchiver can not create and verify archive: '{}'",
92 target_path.display()
93 )
94 })?;
95
96 fs::rename(&temporary_archive_path, &target_path).with_context(|| {
97 format!(
98 "FileArchiver can not rename temporary archive: '{}' to final archive: '{}'",
99 temporary_archive_path.display(),
100 target_path.display()
101 )
102 })?;
103
104 Ok(FileArchive {
105 filepath: target_path,
106 ..temporary_file_archive
107 })
108 }
109
110 fn create_and_verify_archive<T: TarAppender>(
111 &self,
112 archive_path: &Path,
113 appender: T,
114 compression_algorithm: CompressionAlgorithm,
115 ) -> StdResult<FileArchive> {
116 let file_archive = self
117 .create_archive(archive_path, appender, compression_algorithm)
118 .with_context(|| {
119 format!(
120 "FileArchiver can not create archive with path: '{}''",
121 archive_path.display()
122 )
123 })?;
124 self.verify_archive(&file_archive).with_context(|| {
125 format!(
126 "FileArchiver can not verify archive with path: '{}''",
127 archive_path.display()
128 )
129 })?;
130
131 Ok(file_archive)
132 }
133
134 fn create_archive<T: TarAppender>(
135 &self,
136 archive_path: &Path,
137 appender: T,
138 compression_algorithm: CompressionAlgorithm,
139 ) -> StdResult<FileArchive> {
140 info!(
141 self.logger,
142 "Archiving content to archive: '{}'",
143 archive_path.display()
144 );
145
146 let tar_file = File::create(archive_path).with_context(|| {
147 format!("Error while creating the archive with path: {archive_path:?}")
148 })?;
149
150 match compression_algorithm {
151 CompressionAlgorithm::Gzip => {
152 let enc = GzEncoder::new(tar_file, Compression::default());
153 let mut tar = tar::Builder::new(enc);
154
155 appender
156 .append(&mut tar)
157 .with_context(|| "GzEncoder Builder failed to append content")?;
158
159 let mut gz = tar
160 .into_inner()
161 .with_context(|| "GzEncoder Builder can not write the archive")?;
162 gz.try_finish()
163 .with_context(|| "GzEncoder can not finish the output stream after writing")?;
164 }
165 CompressionAlgorithm::Zstandard => {
166 let mut enc = Encoder::new(tar_file, self.zstandard_compression_parameter.level)?;
167 enc.multithread(self.zstandard_compression_parameter.number_of_workers)
168 .with_context(|| "ZstandardEncoder can not set the number of workers")?;
169 let mut tar = tar::Builder::new(enc);
170
171 appender
172 .append(&mut tar)
173 .with_context(|| "ZstandardEncoder Builder failed to append content")?;
174
175 let zstd = tar
176 .into_inner()
177 .with_context(|| "ZstandardEncoder Builder can not write the archive")?;
178 zstd.finish().with_context(
179 || "ZstandardEncoder can not finish the output stream after writing",
180 )?;
181 }
182 }
183
184 let uncompressed_size = appender.compute_uncompressed_data_size().with_context(|| {
185 format!(
186 "FileArchiver can not get the size of the uncompressed data to archive: '{}'",
187 archive_path.display()
188 )
189 })?;
190 let archive_filesize =
191 file_size::compute_size_of_path(archive_path).with_context(|| {
192 format!(
193 "FileArchiver can not get file size of archive with path: '{}'",
194 archive_path.display()
195 )
196 })?;
197
198 Ok(FileArchive {
199 filepath: archive_path.to_path_buf(),
200 archive_filesize,
201 uncompressed_size,
202 compression_algorithm,
203 })
204 }
205
206 fn verify_archive(&self, archive: &FileArchive) -> StdResult<()> {
208 info!(
209 self.logger,
210 "Verifying archive: {}",
211 archive.filepath.display()
212 );
213
214 let mut archive_file_tar = File::open(&archive.filepath).with_context(|| {
215 format!(
216 "Verify archive error: can not open archive: '{}'",
217 archive.filepath.display()
218 )
219 })?;
220 archive_file_tar.seek(SeekFrom::Start(0))?;
221
222 let mut tar_archive: Archive<Box<dyn Read>> = match archive.compression_algorithm {
223 CompressionAlgorithm::Gzip => {
224 let archive_decoder = GzDecoder::new(archive_file_tar);
225 Archive::new(Box::new(archive_decoder))
226 }
227 CompressionAlgorithm::Zstandard => {
228 let archive_decoder = Decoder::new(archive_file_tar)?;
229 Archive::new(Box::new(archive_decoder))
230 }
231 };
232
233 let unpack_temp_dir = self
234 .verification_temp_dir
235 .join(archive.filepath.file_name().ok_or(anyhow!(
237 "Verify archive error: Could not append archive name to temp directory: archive `{}`",
238 archive.filepath.display(),
239 ))?);
240
241 fs::create_dir_all(&unpack_temp_dir).with_context(|| {
242 format!(
243 "Verify archive error: Could not create directory `{}`",
244 unpack_temp_dir.display(),
245 )
246 })?;
247
248 let unpack_temp_file = &unpack_temp_dir.join("unpack.tmp");
249
250 let verify_result = {
251 let mut result = Ok(());
252 for e in tar_archive.entries()? {
253 match e {
254 Err(e) => {
255 result = Err(anyhow!(e).context("Verify archive error: invalid entry"));
256 break;
257 }
258 Ok(entry) => Self::unpack_and_delete_file_from_entry(entry, unpack_temp_file)?,
259 };
260 }
261 result
262 };
263
264 fs::remove_dir_all(&unpack_temp_dir).with_context(|| {
266 format!(
267 "Verify archive error: Could not remove directory `{}`",
268 unpack_temp_dir.display()
269 )
270 })?;
271
272 verify_result
273 }
274
275 fn unpack_and_delete_file_from_entry<R: Read>(
277 entry: Entry<R>,
278 unpack_file_path: &Path,
279 ) -> StdResult<()> {
280 if entry.header().entry_type() != EntryType::Directory {
281 let mut file = entry;
282 let _ = file.unpack(unpack_file_path).with_context(|| "can't unpack entry")?;
283
284 fs::remove_file(unpack_file_path).with_context(|| {
285 format!(
286 "can't remove temporary unpacked file, file path: `{}`",
287 unpack_file_path.display()
288 )
289 })?;
290 }
291
292 Ok(())
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use std::fs::File;
299
300 use mithril_common::test::assert_equivalent;
301
302 use crate::tools::file_archiver::appender::{AppenderDirAll, AppenderFile};
303 use crate::tools::file_archiver::test_tools::*;
304
305 use super::*;
306
307 fn list_remaining_files(test_dir: &Path) -> Vec<String> {
308 fs::read_dir(test_dir)
309 .unwrap()
310 .map(|f| f.unwrap().file_name().to_str().unwrap().to_owned())
311 .collect()
312 }
313
314 #[test]
315 fn should_create_a_valid_archive_with_gzip_compression() {
316 let test_dir = get_test_directory("should_create_a_valid_archive_with_gzip_compression");
317 let target_archive = test_dir.join("archive.tar.gz");
318 let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
319 create_file(&archived_directory, "file_to_archive.txt");
320
321 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
322
323 let archive = file_archiver
324 .create_archive(
325 &target_archive,
326 AppenderDirAll::new(archived_directory),
327 CompressionAlgorithm::Gzip,
328 )
329 .expect("create_archive should not fail");
330 file_archiver
331 .verify_archive(&archive)
332 .expect("verify_archive should not fail");
333 }
334
335 #[test]
336 fn should_create_a_valid_archive_with_zstandard_compression() {
337 let test_dir =
338 get_test_directory("should_create_a_valid_archive_with_zstandard_compression");
339 let target_archive = test_dir.join("archive.tar.zst");
340 let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
341 create_file(&archived_directory, "file_to_archive.txt");
342
343 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
344
345 let archive = file_archiver
346 .create_archive(
347 &target_archive,
348 AppenderDirAll::new(archived_directory),
349 CompressionAlgorithm::Zstandard,
350 )
351 .expect("create_archive should not fail");
352 file_archiver
353 .verify_archive(&archive)
354 .expect("verify_archive should not fail");
355 }
356
357 #[test]
358 fn should_delete_tmp_file_in_target_directory_if_archiving_fail() {
359 let test_dir =
360 get_test_directory("should_delete_tmp_file_in_target_directory_if_archiving_fail");
361 let archived_directory = test_dir.join("db");
363
364 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
365
366 File::create(test_dir.join("other-process.file")).unwrap();
368
369 let archive_params = ArchiveParameters {
370 archive_name_without_extension: "archive".to_string(),
371 target_directory: test_dir.clone(),
372 compression_algorithm: CompressionAlgorithm::Gzip,
373 };
374 let _ = file_archiver
375 .archive(archive_params, AppenderDirAll::new(archived_directory))
376 .expect_err("FileArchiver::archive should fail if the target path doesn't exist.");
377
378 let remaining_files: Vec<String> = list_remaining_files(&test_dir);
379 assert_eq!(vec!["other-process.file".to_string()], remaining_files);
380 }
381
382 #[test]
383 fn should_not_delete_an_already_existing_archive_with_same_name_if_archiving_fail() {
384 let test_dir = get_test_directory(
385 "should_not_delete_an_already_existing_archive_with_same_name_if_archiving_fail",
386 );
387 let archived_directory = test_dir.join("db");
389
390 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
391
392 create_file(&test_dir, "other-process.file");
394 create_file(&test_dir, "archive.tar.gz");
395 create_file(&test_dir, "archive.tar.tmp");
397
398 let archive_params = ArchiveParameters {
399 archive_name_without_extension: "archive".to_string(),
400 target_directory: test_dir.clone(),
401 compression_algorithm: CompressionAlgorithm::Gzip,
402 };
403 let _ = file_archiver
404 .archive(archive_params, AppenderDirAll::new(archived_directory))
405 .expect_err("FileArchiver::archive should fail if the db is empty.");
406 let remaining_files: Vec<String> = list_remaining_files(&test_dir);
407
408 assert_equivalent!(
409 vec!["other-process.file".to_string(), "archive.tar.gz".to_string()],
410 remaining_files,
411 );
412 }
413
414 #[test]
415 fn overwrite_already_existing_archive_when_archiving_succeed() {
416 let test_dir =
417 get_test_directory("overwrite_already_existing_archive_when_archiving_succeed");
418 let archived_directory = test_dir.join(create_dir(&test_dir, "archived_directory"));
419
420 create_file(&archived_directory, "file_to_archive.txt");
421
422 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
423
424 let archive_params = ArchiveParameters {
425 archive_name_without_extension: "archive".to_string(),
426 target_directory: test_dir.clone(),
427 compression_algorithm: CompressionAlgorithm::Gzip,
428 };
429 let first_archive = file_archiver
430 .archive(
431 archive_params.clone(),
432 AppenderDirAll::new(archived_directory.clone()),
433 )
434 .unwrap();
435 let first_archive_size = first_archive.get_archive_size();
436
437 create_file(&archived_directory, "another_file_to_archive.txt");
438
439 let second_archive = file_archiver
440 .archive(archive_params, AppenderDirAll::new(archived_directory))
441 .unwrap();
442 let second_archive_size = second_archive.get_archive_size();
443
444 assert_ne!(first_archive_size, second_archive_size);
445
446 let unpack_path = second_archive.unpack_gzip(&test_dir);
447 assert!(unpack_path.join("another_file_to_archive.txt").exists());
448 }
449
450 #[test]
451 fn compute_size_of_uncompressed_data_and_archive() {
452 let test_dir = get_test_directory("compute_size_of_uncompressed_data_and_archive");
453
454 let file_path = test_dir.join("file.txt");
455 let file = File::create(&file_path).unwrap();
456 file.set_len(777).unwrap();
457
458 let file_archiver = FileArchiver::new_for_test(test_dir.join("verification"));
459
460 let archive_params = ArchiveParameters {
461 archive_name_without_extension: "archive".to_string(),
462 target_directory: test_dir.clone(),
463 compression_algorithm: CompressionAlgorithm::Gzip,
464 };
465 let archive = file_archiver
466 .archive(
467 archive_params.clone(),
468 AppenderFile::append_at_archive_root(file_path.clone()).unwrap(),
469 )
470 .unwrap();
471
472 let expected_archive_size = file_size::compute_size_of_path(&archive.filepath).unwrap();
473 assert_eq!(expected_archive_size, archive.get_archive_size(),);
474 assert_eq!(777, archive.get_uncompressed_size());
475 }
476}