1use anyhow::{anyhow, Context};
2use async_trait::async_trait;
3use slog::{debug, Logger};
4use std::fs;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use mithril_common::digesters::{
9 immutable_trio_names, ImmutableFile, LedgerFile, IMMUTABLE_DIR, LEDGER_DIR,
10};
11use mithril_common::entities::{AncillaryFilesManifest, CompressionAlgorithm, ImmutableFileNumber};
12use mithril_common::logging::LoggerExtensions;
13use mithril_common::StdResult;
14
15use crate::dependency_injection::DependenciesBuilderError;
16use crate::tools::file_archiver::appender::{AppenderData, AppenderEntries, TarAppender};
17use crate::tools::file_archiver::{ArchiveParameters, FileArchive, FileArchiver};
18
19use super::{ancillary_signer::AncillarySigner, Snapshotter};
20
21pub struct CompressedArchiveSnapshotter {
23 db_directory: PathBuf,
25
26 ongoing_snapshot_directory: PathBuf,
28
29 compression_algorithm: CompressionAlgorithm,
31
32 file_archiver: Arc<FileArchiver>,
33
34 ancillary_signer: Arc<dyn AncillarySigner>,
35
36 logger: Logger,
37}
38
39#[async_trait]
40impl Snapshotter for CompressedArchiveSnapshotter {
41 async fn snapshot_all_completed_immutables(
42 &self,
43 archive_name_without_extension: &str,
44 ) -> StdResult<FileArchive> {
45 debug!(
46 self.logger,
47 "Snapshotting all completed immutables into archive: '{archive_name_without_extension}'"
48 );
49
50 let paths_to_include = ImmutableFile::list_completed_in_dir(&self.db_directory)
51 .with_context(|| {
52 format!(
53 "Can not list completed immutables in database directory: '{}'",
54 self.db_directory.display()
55 )
56 })?
57 .into_iter()
58 .map(|immutable_file: ImmutableFile| {
59 PathBuf::from(IMMUTABLE_DIR).join(immutable_file.filename)
60 })
61 .collect();
62 let appender = AppenderEntries::new(paths_to_include, self.db_directory.clone())?;
63 self.snapshot(archive_name_without_extension, appender)
64 .await
65 }
66
67 async fn snapshot_ancillary(
68 &self,
69 immutable_file_number: ImmutableFileNumber,
70 archive_name_without_extension: &str,
71 ) -> StdResult<FileArchive> {
72 debug!(
73 self.logger,
74 "Snapshotting ancillary archive: '{archive_name_without_extension}'"
75 );
76
77 let paths_to_include =
78 self.get_files_and_directories_for_ancillary_snapshot(immutable_file_number)?;
79 let signed_manifest = self
80 .build_and_sign_ancillary_manifest(paths_to_include.clone())
81 .await?;
82 let appender = AppenderEntries::new(paths_to_include, self.db_directory.clone())?.chain(
83 AppenderData::from_json(PathBuf::from("ancillary_manifest.json"), &signed_manifest)?,
84 );
85 self.snapshot(archive_name_without_extension, appender)
86 .await
87 }
88
89 async fn snapshot_immutable_trio(
90 &self,
91 immutable_file_number: ImmutableFileNumber,
92 archive_name_without_extension: &str,
93 ) -> StdResult<FileArchive> {
94 debug!(
95 self.logger,
96 "Snapshotting immutable trio {immutable_file_number} into archive '{archive_name_without_extension}'"
97 );
98
99 let files_to_archive = immutable_trio_names(immutable_file_number)
100 .iter()
101 .map(|filename| PathBuf::from(IMMUTABLE_DIR).join(filename))
102 .collect();
103 let appender = AppenderEntries::new(files_to_archive, self.db_directory.clone())?;
104
105 self.snapshot(archive_name_without_extension, appender)
106 .await
107 }
108
109 fn compression_algorithm(&self) -> CompressionAlgorithm {
110 self.compression_algorithm
111 }
112}
113
114impl CompressedArchiveSnapshotter {
115 pub fn new(
117 db_directory: PathBuf,
118 ongoing_snapshot_directory: PathBuf,
119 compression_algorithm: CompressionAlgorithm,
120 file_archiver: Arc<FileArchiver>,
121 ancillary_signer: Arc<dyn AncillarySigner>,
122 logger: Logger,
123 ) -> StdResult<CompressedArchiveSnapshotter> {
124 if ongoing_snapshot_directory.exists() {
125 fs::remove_dir_all(&ongoing_snapshot_directory).with_context(|| {
126 format!(
127 "Can not remove snapshotter directory: '{}'.",
128 ongoing_snapshot_directory.display()
129 )
130 })?;
131 }
132
133 fs::create_dir(&ongoing_snapshot_directory).map_err(|e| {
134 DependenciesBuilderError::Initialization {
135 message: format!(
136 "Can not create snapshotter directory: '{}'.",
137 ongoing_snapshot_directory.display()
138 ),
139 error: Some(e.into()),
140 }
141 })?;
142
143 Ok(Self {
144 db_directory,
145 ongoing_snapshot_directory,
146 compression_algorithm,
147 file_archiver,
148 ancillary_signer,
149 logger: logger.new_with_component_name::<Self>(),
150 })
151 }
152
153 async fn snapshot<T: TarAppender + 'static>(
154 &self,
155 name_without_extension: &str,
156 appender: T,
157 ) -> StdResult<FileArchive> {
158 let file_archiver = self.file_archiver.clone();
159 let parameters = ArchiveParameters {
160 archive_name_without_extension: name_without_extension.to_string(),
161 target_directory: self.ongoing_snapshot_directory.clone(),
162 compression_algorithm: self.compression_algorithm,
163 };
164
165 let file_archive = tokio::task::spawn_blocking(move || -> StdResult<FileArchive> {
167 file_archiver.archive(parameters, appender)
168 })
169 .await??;
170
171 Ok(file_archive)
172 }
173
174 fn get_files_and_directories_for_ancillary_snapshot(
179 &self,
180 immutable_file_number: u64,
181 ) -> StdResult<Vec<PathBuf>> {
182 let next_immutable_file_number = immutable_file_number + 1;
183 let mut files_to_snapshot: Vec<PathBuf> = immutable_trio_names(next_immutable_file_number)
184 .into_iter()
185 .map(|filename| PathBuf::from(IMMUTABLE_DIR).join(filename))
186 .collect();
187
188 let db_ledger_dir = self.db_directory.join(LEDGER_DIR);
189 let ledger_files = LedgerFile::list_all_in_dir(&db_ledger_dir)?;
190 let last_ledger = ledger_files.last().ok_or(anyhow!(
191 "No ledger file found in directory: `{}`",
192 db_ledger_dir.display()
193 ))?;
194 files_to_snapshot.push(PathBuf::from(LEDGER_DIR).join(&last_ledger.filename));
195
196 Ok(files_to_snapshot)
197 }
198
199 async fn build_and_sign_ancillary_manifest(
200 &self,
201 paths_to_include: Vec<PathBuf>,
202 ) -> StdResult<AncillaryFilesManifest> {
203 let manifest =
204 AncillaryFilesManifest::from_paths(&self.db_directory, paths_to_include).await?;
205 let signature = self
206 .ancillary_signer
207 .compute_ancillary_manifest_signature(&manifest)
208 .await?;
209 let signed_manifest = AncillaryFilesManifest {
210 signature: Some(signature),
211 ..manifest
212 };
213
214 Ok(signed_manifest)
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use std::fs::File;
221 use std::path::Path;
222 use std::sync::Arc;
223
224 use mithril_common::digesters::DummyCardanoDbBuilder;
225 use mithril_common::temp_dir_create;
226 use mithril_common::test_utils::assert_equivalent;
227
228 use crate::services::ancillary_signer::MockAncillarySigner;
229 use crate::services::snapshotter::test_tools::*;
230 use crate::test_tools::TestLogger;
231
232 use super::*;
233
234 fn list_files(test_dir: &Path) -> Vec<String> {
235 fs::read_dir(test_dir)
236 .unwrap()
237 .map(|f| f.unwrap().file_name().to_str().unwrap().to_owned())
238 .collect()
239 }
240
241 fn snapshotter_for_test(
242 test_directory: &Path,
243 db_directory: &Path,
244 compression_algorithm: CompressionAlgorithm,
245 ) -> CompressedArchiveSnapshotter {
246 CompressedArchiveSnapshotter::new(
247 db_directory.to_path_buf(),
248 test_directory.join("ongoing_snapshot"),
249 compression_algorithm,
250 Arc::new(FileArchiver::new_for_test(
251 test_directory.join("verification"),
252 )),
253 Arc::new(MockAncillarySigner::new()),
254 TestLogger::stdout(),
255 )
256 .unwrap()
257 }
258
259 #[test]
260 fn return_parametrized_compression_algorithm() {
261 let test_dir = get_test_directory("return_parametrized_compression_algorithm");
262 let snapshotter = snapshotter_for_test(
263 &test_dir,
264 Path::new("whatever"),
265 CompressionAlgorithm::Zstandard,
266 );
267
268 assert_eq!(
269 CompressionAlgorithm::Zstandard,
270 snapshotter.compression_algorithm()
271 );
272 }
273
274 #[test]
275 fn should_create_directory_if_does_not_exist() {
276 let test_dir = get_test_directory("should_create_directory_if_does_not_exist");
277 let ongoing_snapshot_directory = test_dir.join("ongoing_snapshot");
278 let db_directory = test_dir.join("whatever");
279
280 CompressedArchiveSnapshotter::new(
281 db_directory,
282 ongoing_snapshot_directory.clone(),
283 CompressionAlgorithm::Gzip,
284 Arc::new(FileArchiver::new_for_test(test_dir.join("verification"))),
285 Arc::new(MockAncillarySigner::new()),
286 TestLogger::stdout(),
287 )
288 .unwrap();
289
290 assert!(ongoing_snapshot_directory.is_dir());
291 }
292
293 #[test]
294 fn should_clean_ongoing_snapshot_directory_if_already_exists() {
295 let test_dir =
296 get_test_directory("should_clean_ongoing_snapshot_directory_if_already_exists");
297 let ongoing_snapshot_directory = test_dir.join("ongoing_snapshot");
298 let db_directory = test_dir.join("whatever");
299
300 fs::create_dir_all(&ongoing_snapshot_directory).unwrap();
301
302 File::create(ongoing_snapshot_directory.join("whatever.txt")).unwrap();
303
304 CompressedArchiveSnapshotter::new(
305 db_directory,
306 ongoing_snapshot_directory.clone(),
307 CompressionAlgorithm::Gzip,
308 Arc::new(FileArchiver::new_for_test(test_dir.join("verification"))),
309 Arc::new(MockAncillarySigner::new()),
310 TestLogger::stdout(),
311 )
312 .unwrap();
313
314 assert_eq!(0, fs::read_dir(ongoing_snapshot_directory).unwrap().count());
315 }
316
317 #[tokio::test]
318 async fn should_create_snapshots_in_its_ongoing_snapshot_directory() {
319 let test_dir = temp_dir_create!();
320 let pending_snapshot_directory = test_dir.join("pending_snapshot");
321 let cardano_db =
322 DummyCardanoDbBuilder::new("should_create_snapshots_in_its_ongoing_snapshot_directory")
323 .with_immutables(&[1])
324 .append_immutable_trio()
325 .build();
326
327 let snapshotter = CompressedArchiveSnapshotter::new(
328 cardano_db.get_dir().clone(),
329 pending_snapshot_directory.clone(),
330 CompressionAlgorithm::Gzip,
331 Arc::new(FileArchiver::new_for_test(test_dir.join("verification"))),
332 Arc::new(MockAncillarySigner::new()),
333 TestLogger::stdout(),
334 )
335 .unwrap();
336 let snapshot = snapshotter
337 .snapshot_all_completed_immutables("whatever")
338 .await
339 .unwrap();
340
341 assert_eq!(
342 pending_snapshot_directory,
343 snapshot.get_file_path().parent().unwrap()
344 );
345 }
346
347 mod snapshot_all_completed_immutables {
348 use super::*;
349
350 #[tokio::test]
351 async fn include_only_completed_immutables() {
352 let test_dir = temp_dir_create!();
353 let cardano_db = DummyCardanoDbBuilder::new(
354 "snapshot_all_immutables_include_only_completed_immutables",
355 )
356 .with_immutables(&[1, 2, 3])
357 .append_immutable_trio()
358 .with_ledger_files(&["437"])
359 .with_volatile_files(&["blocks-0.dat"])
360 .with_non_immutables(&["random_file.txt", "00002.trap"])
361 .build();
362
363 let snapshotter =
364 snapshotter_for_test(&test_dir, cardano_db.get_dir(), CompressionAlgorithm::Gzip);
365
366 let snapshot = snapshotter
367 .snapshot_all_completed_immutables("completed_immutables")
368 .await
369 .unwrap();
370
371 let unpack_dir = snapshot.unpack_gzip(&test_dir);
372 let unpacked_files = list_files(&unpack_dir);
373 let unpacked_immutable_files = list_files(&unpack_dir.join(IMMUTABLE_DIR));
374
375 assert_equivalent(vec![IMMUTABLE_DIR.to_string()], unpacked_files);
376 assert_equivalent(
377 vec![
378 "00001.chunk".to_string(),
379 "00001.primary".to_string(),
380 "00001.secondary".to_string(),
381 "00002.chunk".to_string(),
382 "00002.primary".to_string(),
383 "00002.secondary".to_string(),
384 "00003.chunk".to_string(),
385 "00003.primary".to_string(),
386 "00003.secondary".to_string(),
387 ],
388 unpacked_immutable_files,
389 );
390 }
391 }
392
393 mod snapshot_immutable_trio {
394 use super::*;
395
396 #[tokio::test]
397 async fn include_only_immutable_trio() {
398 let test_dir = get_test_directory("include_only_immutable_trio");
399 let cardano_db = DummyCardanoDbBuilder::new("include_only_immutable_trio")
400 .with_immutables(&[1, 2, 3])
401 .with_ledger_files(&["437"])
402 .with_volatile_files(&["blocks-0.dat"])
403 .with_non_immutables(&["random_file.txt", "00002.trap"])
404 .build();
405
406 let snapshotter =
407 snapshotter_for_test(&test_dir, cardano_db.get_dir(), CompressionAlgorithm::Gzip);
408
409 let snapshot = snapshotter
410 .snapshot_immutable_trio(2, "immutable-2")
411 .await
412 .unwrap();
413
414 let unpack_dir = snapshot.unpack_gzip(&test_dir);
415 let unpacked_files = list_files(&unpack_dir);
416 let unpacked_immutable_files = list_files(&unpack_dir.join(IMMUTABLE_DIR));
417
418 assert_equivalent(vec![IMMUTABLE_DIR.to_string()], unpacked_files);
419 assert_equivalent(
420 vec![
421 "00002.chunk".to_string(),
422 "00002.primary".to_string(),
423 "00002.secondary".to_string(),
424 ],
425 unpacked_immutable_files,
426 );
427 }
428 }
429
430 mod snapshot_ancillary {
431 use mithril_common::digesters::VOLATILE_DIR;
432 use mithril_common::test_utils::fake_keys;
433
434 use super::*;
435
436 #[tokio::test]
437 async fn create_archive_should_embed_only_last_ledger_and_last_immutables() {
438 let test_dir = temp_dir_create!();
439 let cardano_db = DummyCardanoDbBuilder::new(
440 "create_archive_should_embed_only_last_ledger_and_last_immutables",
441 )
442 .with_immutables(&[1, 2, 3])
443 .with_ledger_files(&["437", "537", "637", "737", "9not_included"])
444 .with_volatile_files(&["blocks-0.dat", "blocks-1.dat", "blocks-2.dat"])
445 .build();
446 fs::create_dir(cardano_db.get_dir().join("whatever")).unwrap();
447
448 let db_directory = cardano_db.get_dir();
449
450 let snapshotter = CompressedArchiveSnapshotter {
451 ancillary_signer: Arc::new(MockAncillarySigner::that_succeeds_with_signature(
452 fake_keys::signable_manifest_signature()[0],
453 )),
454 ..snapshotter_for_test(&test_dir, db_directory, CompressionAlgorithm::Gzip)
455 };
456
457 let snapshot = snapshotter
458 .snapshot_ancillary(2, "ancillary")
459 .await
460 .unwrap();
461
462 let unpack_dir = snapshot.unpack_gzip(&test_dir);
463
464 let expected_immutable_path = unpack_dir.join(IMMUTABLE_DIR);
465 assert!(expected_immutable_path.join("00003.chunk").exists());
466 assert!(expected_immutable_path.join("00003.primary").exists());
467 assert!(expected_immutable_path.join("00003.secondary").exists());
468 assert_eq!(3, list_files(&expected_immutable_path).len());
469
470 let expected_ledger_path = unpack_dir.join(LEDGER_DIR);
472 assert!(expected_ledger_path.join("737").exists());
473 assert_eq!(1, list_files(&expected_ledger_path).len());
474
475 let expected_volatile_path = unpack_dir.join(VOLATILE_DIR);
476 assert!(!expected_volatile_path.exists());
477
478 assert!(!unpack_dir.join("whatever").exists());
479 }
480
481 #[tokio::test]
482 async fn create_archive_fail_if_manifest_signing_fail() {
483 let test_dir = temp_dir_create!();
484 let cardano_db =
485 DummyCardanoDbBuilder::new("create_archive_fail_if_manifest_signing_fail")
486 .with_immutables(&[1, 2])
487 .with_ledger_files(&["737"])
488 .build();
489
490 let db_directory = cardano_db.get_dir();
491
492 let snapshotter = CompressedArchiveSnapshotter {
493 ancillary_signer: Arc::new(MockAncillarySigner::that_fails_with_message(
494 "MockAncillarySigner failed",
495 )),
496 ..snapshotter_for_test(&test_dir, db_directory, CompressionAlgorithm::Gzip)
497 };
498
499 let err = snapshotter
500 .snapshot_ancillary(1, "ancillary")
501 .await
502 .expect_err("Must fail if manifest signing fails");
503 assert!(
504 err.to_string().contains("MockAncillarySigner failed"),
505 "Expected error message to be raised by the mock ancillary signer, but got: '{err:?}'",
506 );
507 }
508
509 #[tokio::test]
510 async fn create_archive_generate_sign_and_include_manifest_file() {
511 let test_dir = temp_dir_create!();
512 let cardano_db = DummyCardanoDbBuilder::new(
513 "create_archive_generate_sign_and_include_manifest_file",
514 )
515 .with_immutables(&[1, 2, 3])
516 .with_ledger_files(&["321", "737"])
517 .with_non_immutables(&["not_to_include.txt"])
518 .build();
519 File::create(cardano_db.get_dir().join("not_to_include_as_well.txt")).unwrap();
520
521 let db_directory = cardano_db.get_dir();
522
523 let snapshotter = CompressedArchiveSnapshotter {
524 ancillary_signer: Arc::new(MockAncillarySigner::that_succeeds_with_signature(
525 fake_keys::signable_manifest_signature()[0],
526 )),
527 ..snapshotter_for_test(&test_dir, db_directory, CompressionAlgorithm::Gzip)
528 };
529
530 let archive = snapshotter
531 .snapshot_ancillary(2, "ancillary")
532 .await
533 .unwrap();
534 let unpacked = archive.unpack_gzip(test_dir);
535 let manifest_path = unpacked.join("ancillary_manifest.json");
536
537 assert!(manifest_path.exists());
538
539 let manifest = serde_json::from_reader::<_, AncillaryFilesManifest>(
540 File::open(&manifest_path).unwrap(),
541 )
542 .unwrap();
543
544 assert_eq!(
545 vec![
546 &PathBuf::from(IMMUTABLE_DIR).join("00003.chunk"),
547 &PathBuf::from(IMMUTABLE_DIR).join("00003.primary"),
548 &PathBuf::from(IMMUTABLE_DIR).join("00003.secondary"),
549 &PathBuf::from(LEDGER_DIR).join("737"),
550 ],
551 manifest.data.keys().collect::<Vec<_>>()
552 );
553 assert_eq!(
554 Some(
555 fake_keys::signable_manifest_signature()[0]
556 .try_into()
557 .unwrap()
558 ),
559 manifest.signature
560 )
561 }
562 }
563}