mithril_aggregator/services/snapshotter/
compressed_archive_snapshotter.rs

1use anyhow::{anyhow, Context};
2use async_trait::async_trait;
3use slog::{debug, Logger};
4use std::fs;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use mithril_common::digesters::{
9    immutable_trio_names, ImmutableFile, LedgerFile, IMMUTABLE_DIR, LEDGER_DIR,
10};
11use mithril_common::entities::{AncillaryFilesManifest, CompressionAlgorithm, ImmutableFileNumber};
12use mithril_common::logging::LoggerExtensions;
13use mithril_common::StdResult;
14
15use crate::dependency_injection::DependenciesBuilderError;
16use crate::tools::file_archiver::appender::{AppenderData, AppenderEntries, TarAppender};
17use crate::tools::file_archiver::{ArchiveParameters, FileArchive, FileArchiver};
18
19use super::{ancillary_signer::AncillarySigner, Snapshotter};
20
21/// Compressed Archive Snapshotter create a compressed file.
22pub struct CompressedArchiveSnapshotter {
23    /// DB directory to snapshot
24    db_directory: PathBuf,
25
26    /// Directory to store ongoing snapshot
27    ongoing_snapshot_directory: PathBuf,
28
29    /// Compression algorithm to use
30    compression_algorithm: CompressionAlgorithm,
31
32    file_archiver: Arc<FileArchiver>,
33
34    ancillary_signer: Arc<dyn AncillarySigner>,
35
36    logger: Logger,
37}
38
39#[async_trait]
40impl Snapshotter for CompressedArchiveSnapshotter {
41    async fn snapshot_all_completed_immutables(
42        &self,
43        archive_name_without_extension: &str,
44    ) -> StdResult<FileArchive> {
45        debug!(
46            self.logger,
47            "Snapshotting all completed immutables into archive: '{archive_name_without_extension}'"
48        );
49
50        let paths_to_include = ImmutableFile::list_completed_in_dir(&self.db_directory)
51            .with_context(|| {
52                format!(
53                    "Can not list completed immutables in database directory: '{}'",
54                    self.db_directory.display()
55                )
56            })?
57            .into_iter()
58            .map(|immutable_file: ImmutableFile| {
59                PathBuf::from(IMMUTABLE_DIR).join(immutable_file.filename)
60            })
61            .collect();
62        let appender = AppenderEntries::new(paths_to_include, self.db_directory.clone())?;
63        self.snapshot(archive_name_without_extension, appender)
64            .await
65    }
66
67    async fn snapshot_ancillary(
68        &self,
69        immutable_file_number: ImmutableFileNumber,
70        archive_name_without_extension: &str,
71    ) -> StdResult<FileArchive> {
72        debug!(
73            self.logger,
74            "Snapshotting ancillary archive: '{archive_name_without_extension}'"
75        );
76
77        let paths_to_include =
78            self.get_files_and_directories_for_ancillary_snapshot(immutable_file_number)?;
79        let signed_manifest = self
80            .build_and_sign_ancillary_manifest(paths_to_include.clone())
81            .await?;
82        let appender = AppenderEntries::new(paths_to_include, self.db_directory.clone())?.chain(
83            AppenderData::from_json(PathBuf::from("ancillary_manifest.json"), &signed_manifest)?,
84        );
85        self.snapshot(archive_name_without_extension, appender)
86            .await
87    }
88
89    async fn snapshot_immutable_trio(
90        &self,
91        immutable_file_number: ImmutableFileNumber,
92        archive_name_without_extension: &str,
93    ) -> StdResult<FileArchive> {
94        debug!(
95            self.logger,
96            "Snapshotting immutable trio {immutable_file_number} into archive '{archive_name_without_extension}'"
97        );
98
99        let files_to_archive = immutable_trio_names(immutable_file_number)
100            .iter()
101            .map(|filename| PathBuf::from(IMMUTABLE_DIR).join(filename))
102            .collect();
103        let appender = AppenderEntries::new(files_to_archive, self.db_directory.clone())?;
104
105        self.snapshot(archive_name_without_extension, appender)
106            .await
107    }
108
109    fn compression_algorithm(&self) -> CompressionAlgorithm {
110        self.compression_algorithm
111    }
112}
113
114impl CompressedArchiveSnapshotter {
115    /// Snapshotter factory
116    pub fn new(
117        db_directory: PathBuf,
118        ongoing_snapshot_directory: PathBuf,
119        compression_algorithm: CompressionAlgorithm,
120        file_archiver: Arc<FileArchiver>,
121        ancillary_signer: Arc<dyn AncillarySigner>,
122        logger: Logger,
123    ) -> StdResult<CompressedArchiveSnapshotter> {
124        if ongoing_snapshot_directory.exists() {
125            fs::remove_dir_all(&ongoing_snapshot_directory).with_context(|| {
126                format!(
127                    "Can not remove snapshotter directory: '{}'.",
128                    ongoing_snapshot_directory.display()
129                )
130            })?;
131        }
132
133        fs::create_dir(&ongoing_snapshot_directory).map_err(|e| {
134            DependenciesBuilderError::Initialization {
135                message: format!(
136                    "Can not create snapshotter directory: '{}'.",
137                    ongoing_snapshot_directory.display()
138                ),
139                error: Some(e.into()),
140            }
141        })?;
142
143        Ok(Self {
144            db_directory,
145            ongoing_snapshot_directory,
146            compression_algorithm,
147            file_archiver,
148            ancillary_signer,
149            logger: logger.new_with_component_name::<Self>(),
150        })
151    }
152
153    async fn snapshot<T: TarAppender + 'static>(
154        &self,
155        name_without_extension: &str,
156        appender: T,
157    ) -> StdResult<FileArchive> {
158        let file_archiver = self.file_archiver.clone();
159        let parameters = ArchiveParameters {
160            archive_name_without_extension: name_without_extension.to_string(),
161            target_directory: self.ongoing_snapshot_directory.clone(),
162            compression_algorithm: self.compression_algorithm,
163        };
164
165        // spawn a separate thread to prevent blocking
166        let file_archive = tokio::task::spawn_blocking(move || -> StdResult<FileArchive> {
167            file_archiver.archive(parameters, appender)
168        })
169        .await??;
170
171        Ok(file_archive)
172    }
173
174    /// Returns the list of files and directories to include in ancillary snapshot.
175    ///
176    /// The immutable file included in the ancillary archive corresponds to the last one (and not finalized yet)
177    /// when the immutable file number given to the function corresponds to the penultimate.
178    fn get_files_and_directories_for_ancillary_snapshot(
179        &self,
180        immutable_file_number: u64,
181    ) -> StdResult<Vec<PathBuf>> {
182        let next_immutable_file_number = immutable_file_number + 1;
183        let mut files_to_snapshot: Vec<PathBuf> = immutable_trio_names(next_immutable_file_number)
184            .into_iter()
185            .map(|filename| PathBuf::from(IMMUTABLE_DIR).join(filename))
186            .collect();
187
188        let db_ledger_dir = self.db_directory.join(LEDGER_DIR);
189        let ledger_files = LedgerFile::list_all_in_dir(&db_ledger_dir)?;
190        let last_ledger = ledger_files.last().ok_or(anyhow!(
191            "No ledger file found in directory: `{}`",
192            db_ledger_dir.display()
193        ))?;
194        files_to_snapshot.push(PathBuf::from(LEDGER_DIR).join(&last_ledger.filename));
195
196        Ok(files_to_snapshot)
197    }
198
199    async fn build_and_sign_ancillary_manifest(
200        &self,
201        paths_to_include: Vec<PathBuf>,
202    ) -> StdResult<AncillaryFilesManifest> {
203        let manifest =
204            AncillaryFilesManifest::from_paths(&self.db_directory, paths_to_include).await?;
205        let signature = self
206            .ancillary_signer
207            .compute_ancillary_manifest_signature(&manifest)
208            .await?;
209        let signed_manifest = AncillaryFilesManifest {
210            signature: Some(signature),
211            ..manifest
212        };
213
214        Ok(signed_manifest)
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use std::fs::File;
221    use std::path::Path;
222    use std::sync::Arc;
223
224    use mithril_common::digesters::DummyCardanoDbBuilder;
225    use mithril_common::temp_dir_create;
226    use mithril_common::test_utils::assert_equivalent;
227
228    use crate::services::ancillary_signer::MockAncillarySigner;
229    use crate::services::snapshotter::test_tools::*;
230    use crate::test_tools::TestLogger;
231
232    use super::*;
233
234    fn list_files(test_dir: &Path) -> Vec<String> {
235        fs::read_dir(test_dir)
236            .unwrap()
237            .map(|f| f.unwrap().file_name().to_str().unwrap().to_owned())
238            .collect()
239    }
240
241    fn snapshotter_for_test(
242        test_directory: &Path,
243        db_directory: &Path,
244        compression_algorithm: CompressionAlgorithm,
245    ) -> CompressedArchiveSnapshotter {
246        CompressedArchiveSnapshotter::new(
247            db_directory.to_path_buf(),
248            test_directory.join("ongoing_snapshot"),
249            compression_algorithm,
250            Arc::new(FileArchiver::new_for_test(
251                test_directory.join("verification"),
252            )),
253            Arc::new(MockAncillarySigner::new()),
254            TestLogger::stdout(),
255        )
256        .unwrap()
257    }
258
259    #[test]
260    fn return_parametrized_compression_algorithm() {
261        let test_dir = get_test_directory("return_parametrized_compression_algorithm");
262        let snapshotter = snapshotter_for_test(
263            &test_dir,
264            Path::new("whatever"),
265            CompressionAlgorithm::Zstandard,
266        );
267
268        assert_eq!(
269            CompressionAlgorithm::Zstandard,
270            snapshotter.compression_algorithm()
271        );
272    }
273
274    #[test]
275    fn should_create_directory_if_does_not_exist() {
276        let test_dir = get_test_directory("should_create_directory_if_does_not_exist");
277        let ongoing_snapshot_directory = test_dir.join("ongoing_snapshot");
278        let db_directory = test_dir.join("whatever");
279
280        CompressedArchiveSnapshotter::new(
281            db_directory,
282            ongoing_snapshot_directory.clone(),
283            CompressionAlgorithm::Gzip,
284            Arc::new(FileArchiver::new_for_test(test_dir.join("verification"))),
285            Arc::new(MockAncillarySigner::new()),
286            TestLogger::stdout(),
287        )
288        .unwrap();
289
290        assert!(ongoing_snapshot_directory.is_dir());
291    }
292
293    #[test]
294    fn should_clean_ongoing_snapshot_directory_if_already_exists() {
295        let test_dir =
296            get_test_directory("should_clean_ongoing_snapshot_directory_if_already_exists");
297        let ongoing_snapshot_directory = test_dir.join("ongoing_snapshot");
298        let db_directory = test_dir.join("whatever");
299
300        fs::create_dir_all(&ongoing_snapshot_directory).unwrap();
301
302        File::create(ongoing_snapshot_directory.join("whatever.txt")).unwrap();
303
304        CompressedArchiveSnapshotter::new(
305            db_directory,
306            ongoing_snapshot_directory.clone(),
307            CompressionAlgorithm::Gzip,
308            Arc::new(FileArchiver::new_for_test(test_dir.join("verification"))),
309            Arc::new(MockAncillarySigner::new()),
310            TestLogger::stdout(),
311        )
312        .unwrap();
313
314        assert_eq!(0, fs::read_dir(ongoing_snapshot_directory).unwrap().count());
315    }
316
317    #[tokio::test]
318    async fn should_create_snapshots_in_its_ongoing_snapshot_directory() {
319        let test_dir = temp_dir_create!();
320        let pending_snapshot_directory = test_dir.join("pending_snapshot");
321        let cardano_db =
322            DummyCardanoDbBuilder::new("should_create_snapshots_in_its_ongoing_snapshot_directory")
323                .with_immutables(&[1])
324                .append_immutable_trio()
325                .build();
326
327        let snapshotter = CompressedArchiveSnapshotter::new(
328            cardano_db.get_dir().clone(),
329            pending_snapshot_directory.clone(),
330            CompressionAlgorithm::Gzip,
331            Arc::new(FileArchiver::new_for_test(test_dir.join("verification"))),
332            Arc::new(MockAncillarySigner::new()),
333            TestLogger::stdout(),
334        )
335        .unwrap();
336        let snapshot = snapshotter
337            .snapshot_all_completed_immutables("whatever")
338            .await
339            .unwrap();
340
341        assert_eq!(
342            pending_snapshot_directory,
343            snapshot.get_file_path().parent().unwrap()
344        );
345    }
346
347    mod snapshot_all_completed_immutables {
348        use super::*;
349
350        #[tokio::test]
351        async fn include_only_completed_immutables() {
352            let test_dir = temp_dir_create!();
353            let cardano_db = DummyCardanoDbBuilder::new(
354                "snapshot_all_immutables_include_only_completed_immutables",
355            )
356            .with_immutables(&[1, 2, 3])
357            .append_immutable_trio()
358            .with_ledger_files(&["437"])
359            .with_volatile_files(&["blocks-0.dat"])
360            .with_non_immutables(&["random_file.txt", "00002.trap"])
361            .build();
362
363            let snapshotter =
364                snapshotter_for_test(&test_dir, cardano_db.get_dir(), CompressionAlgorithm::Gzip);
365
366            let snapshot = snapshotter
367                .snapshot_all_completed_immutables("completed_immutables")
368                .await
369                .unwrap();
370
371            let unpack_dir = snapshot.unpack_gzip(&test_dir);
372            let unpacked_files = list_files(&unpack_dir);
373            let unpacked_immutable_files = list_files(&unpack_dir.join(IMMUTABLE_DIR));
374
375            assert_equivalent(vec![IMMUTABLE_DIR.to_string()], unpacked_files);
376            assert_equivalent(
377                vec![
378                    "00001.chunk".to_string(),
379                    "00001.primary".to_string(),
380                    "00001.secondary".to_string(),
381                    "00002.chunk".to_string(),
382                    "00002.primary".to_string(),
383                    "00002.secondary".to_string(),
384                    "00003.chunk".to_string(),
385                    "00003.primary".to_string(),
386                    "00003.secondary".to_string(),
387                ],
388                unpacked_immutable_files,
389            );
390        }
391    }
392
393    mod snapshot_immutable_trio {
394        use super::*;
395
396        #[tokio::test]
397        async fn include_only_immutable_trio() {
398            let test_dir = get_test_directory("include_only_immutable_trio");
399            let cardano_db = DummyCardanoDbBuilder::new("include_only_immutable_trio")
400                .with_immutables(&[1, 2, 3])
401                .with_ledger_files(&["437"])
402                .with_volatile_files(&["blocks-0.dat"])
403                .with_non_immutables(&["random_file.txt", "00002.trap"])
404                .build();
405
406            let snapshotter =
407                snapshotter_for_test(&test_dir, cardano_db.get_dir(), CompressionAlgorithm::Gzip);
408
409            let snapshot = snapshotter
410                .snapshot_immutable_trio(2, "immutable-2")
411                .await
412                .unwrap();
413
414            let unpack_dir = snapshot.unpack_gzip(&test_dir);
415            let unpacked_files = list_files(&unpack_dir);
416            let unpacked_immutable_files = list_files(&unpack_dir.join(IMMUTABLE_DIR));
417
418            assert_equivalent(vec![IMMUTABLE_DIR.to_string()], unpacked_files);
419            assert_equivalent(
420                vec![
421                    "00002.chunk".to_string(),
422                    "00002.primary".to_string(),
423                    "00002.secondary".to_string(),
424                ],
425                unpacked_immutable_files,
426            );
427        }
428    }
429
430    mod snapshot_ancillary {
431        use mithril_common::digesters::VOLATILE_DIR;
432        use mithril_common::test_utils::fake_keys;
433
434        use super::*;
435
436        #[tokio::test]
437        async fn create_archive_should_embed_only_last_ledger_and_last_immutables() {
438            let test_dir = temp_dir_create!();
439            let cardano_db = DummyCardanoDbBuilder::new(
440                "create_archive_should_embed_only_last_ledger_and_last_immutables",
441            )
442            .with_immutables(&[1, 2, 3])
443            .with_ledger_files(&["437", "537", "637", "737", "9not_included"])
444            .with_volatile_files(&["blocks-0.dat", "blocks-1.dat", "blocks-2.dat"])
445            .build();
446            fs::create_dir(cardano_db.get_dir().join("whatever")).unwrap();
447
448            let db_directory = cardano_db.get_dir();
449
450            let snapshotter = CompressedArchiveSnapshotter {
451                ancillary_signer: Arc::new(MockAncillarySigner::that_succeeds_with_signature(
452                    fake_keys::signable_manifest_signature()[0],
453                )),
454                ..snapshotter_for_test(&test_dir, db_directory, CompressionAlgorithm::Gzip)
455            };
456
457            let snapshot = snapshotter
458                .snapshot_ancillary(2, "ancillary")
459                .await
460                .unwrap();
461
462            let unpack_dir = snapshot.unpack_gzip(&test_dir);
463
464            let expected_immutable_path = unpack_dir.join(IMMUTABLE_DIR);
465            assert!(expected_immutable_path.join("00003.chunk").exists());
466            assert!(expected_immutable_path.join("00003.primary").exists());
467            assert!(expected_immutable_path.join("00003.secondary").exists());
468            assert_eq!(3, list_files(&expected_immutable_path).len());
469
470            // Only the last ledger file should be included
471            let expected_ledger_path = unpack_dir.join(LEDGER_DIR);
472            assert!(expected_ledger_path.join("737").exists());
473            assert_eq!(1, list_files(&expected_ledger_path).len());
474
475            let expected_volatile_path = unpack_dir.join(VOLATILE_DIR);
476            assert!(!expected_volatile_path.exists());
477
478            assert!(!unpack_dir.join("whatever").exists());
479        }
480
481        #[tokio::test]
482        async fn create_archive_fail_if_manifest_signing_fail() {
483            let test_dir = temp_dir_create!();
484            let cardano_db =
485                DummyCardanoDbBuilder::new("create_archive_fail_if_manifest_signing_fail")
486                    .with_immutables(&[1, 2])
487                    .with_ledger_files(&["737"])
488                    .build();
489
490            let db_directory = cardano_db.get_dir();
491
492            let snapshotter = CompressedArchiveSnapshotter {
493                ancillary_signer: Arc::new(MockAncillarySigner::that_fails_with_message(
494                    "MockAncillarySigner failed",
495                )),
496                ..snapshotter_for_test(&test_dir, db_directory, CompressionAlgorithm::Gzip)
497            };
498
499            let err = snapshotter
500                .snapshot_ancillary(1, "ancillary")
501                .await
502                .expect_err("Must fail if manifest signing fails");
503            assert!(
504                err.to_string().contains("MockAncillarySigner failed"),
505                "Expected error message to be raised by the mock ancillary signer, but got: '{err:?}'",
506            );
507        }
508
509        #[tokio::test]
510        async fn create_archive_generate_sign_and_include_manifest_file() {
511            let test_dir = temp_dir_create!();
512            let cardano_db = DummyCardanoDbBuilder::new(
513                "create_archive_generate_sign_and_include_manifest_file",
514            )
515            .with_immutables(&[1, 2, 3])
516            .with_ledger_files(&["321", "737"])
517            .with_non_immutables(&["not_to_include.txt"])
518            .build();
519            File::create(cardano_db.get_dir().join("not_to_include_as_well.txt")).unwrap();
520
521            let db_directory = cardano_db.get_dir();
522
523            let snapshotter = CompressedArchiveSnapshotter {
524                ancillary_signer: Arc::new(MockAncillarySigner::that_succeeds_with_signature(
525                    fake_keys::signable_manifest_signature()[0],
526                )),
527                ..snapshotter_for_test(&test_dir, db_directory, CompressionAlgorithm::Gzip)
528            };
529
530            let archive = snapshotter
531                .snapshot_ancillary(2, "ancillary")
532                .await
533                .unwrap();
534            let unpacked = archive.unpack_gzip(test_dir);
535            let manifest_path = unpacked.join("ancillary_manifest.json");
536
537            assert!(manifest_path.exists());
538
539            let manifest = serde_json::from_reader::<_, AncillaryFilesManifest>(
540                File::open(&manifest_path).unwrap(),
541            )
542            .unwrap();
543
544            assert_eq!(
545                vec![
546                    &PathBuf::from(IMMUTABLE_DIR).join("00003.chunk"),
547                    &PathBuf::from(IMMUTABLE_DIR).join("00003.primary"),
548                    &PathBuf::from(IMMUTABLE_DIR).join("00003.secondary"),
549                    &PathBuf::from(LEDGER_DIR).join("737"),
550                ],
551                manifest.data.keys().collect::<Vec<_>>()
552            );
553            assert_eq!(
554                Some(
555                    fake_keys::signable_manifest_signature()[0]
556                        .try_into()
557                        .unwrap()
558                ),
559                manifest.signature
560            )
561        }
562    }
563}