mithril_aggregator/artifact_builder/cardano_database_artifacts/
ancillary.rs

1use std::{path::Path, sync::Arc};
2
3use anyhow::{anyhow, Context};
4use async_trait::async_trait;
5use slog::{debug, error, warn, Logger};
6
7use mithril_common::{
8    entities::{AncillaryLocation, CardanoDbBeacon, CompressionAlgorithm},
9    logging::LoggerExtensions,
10    CardanoNetwork, StdResult,
11};
12
13use crate::{
14    file_uploaders::{GcpUploader, LocalUploader},
15    services::Snapshotter,
16    tools::file_archiver::FileArchive,
17    DumbUploader, FileUploader,
18};
19
20/// The [AncillaryFileUploader] trait allows identifying uploaders that return locations for ancillary archive files.
21#[cfg_attr(test, mockall::automock)]
22#[async_trait]
23pub trait AncillaryFileUploader: Send + Sync {
24    /// Uploads the archive at the given filepath and returns the location of the uploaded file.
25    async fn upload(
26        &self,
27        filepath: &Path,
28        compression_algorithm: Option<CompressionAlgorithm>,
29    ) -> StdResult<AncillaryLocation>;
30}
31
32#[async_trait]
33impl AncillaryFileUploader for DumbUploader {
34    async fn upload(
35        &self,
36        filepath: &Path,
37        compression_algorithm: Option<CompressionAlgorithm>,
38    ) -> StdResult<AncillaryLocation> {
39        let uri = FileUploader::upload(self, filepath).await?.into();
40
41        Ok(AncillaryLocation::CloudStorage {
42            uri,
43            compression_algorithm,
44        })
45    }
46}
47
48#[async_trait]
49impl AncillaryFileUploader for LocalUploader {
50    async fn upload(
51        &self,
52        filepath: &Path,
53        compression_algorithm: Option<CompressionAlgorithm>,
54    ) -> StdResult<AncillaryLocation> {
55        let uri = FileUploader::upload(self, filepath).await?.into();
56
57        Ok(AncillaryLocation::CloudStorage {
58            uri,
59            compression_algorithm,
60        })
61    }
62}
63
64#[async_trait]
65impl AncillaryFileUploader for GcpUploader {
66    async fn upload(
67        &self,
68        filepath: &Path,
69        compression_algorithm: Option<CompressionAlgorithm>,
70    ) -> StdResult<AncillaryLocation> {
71        let uri = FileUploader::upload(self, filepath).await?.into();
72
73        Ok(AncillaryLocation::CloudStorage {
74            uri,
75            compression_algorithm,
76        })
77    }
78}
79
80#[derive(Debug)]
81pub struct AncillaryUpload {
82    pub locations: Vec<AncillaryLocation>,
83    pub size: u64,
84}
85
86/// The [AncillaryArtifactBuilder] creates an ancillary archive from the cardano database directory (including ledger and volatile directories).
87/// The archive is uploaded with the provided uploaders.
88pub struct AncillaryArtifactBuilder {
89    uploaders: Vec<Arc<dyn AncillaryFileUploader>>,
90    snapshotter: Arc<dyn Snapshotter>,
91    cardano_network: CardanoNetwork,
92    logger: Logger,
93}
94
95impl AncillaryArtifactBuilder {
96    /// Creates a new [AncillaryArtifactBuilder].
97    pub fn new(
98        uploaders: Vec<Arc<dyn AncillaryFileUploader>>,
99        snapshotter: Arc<dyn Snapshotter>,
100        cardano_network: CardanoNetwork,
101        logger: Logger,
102    ) -> StdResult<Self> {
103        if uploaders.is_empty() {
104            return Err(anyhow!(
105                "At least one uploader is required to create an 'AncillaryArtifactBuilder'"
106            ));
107        }
108
109        Ok(Self {
110            uploaders,
111            logger: logger.new_with_component_name::<Self>(),
112            cardano_network,
113            snapshotter,
114        })
115    }
116
117    pub async fn upload(&self, beacon: &CardanoDbBeacon) -> StdResult<AncillaryUpload> {
118        let snapshot = self.create_ancillary_archive(beacon).await?;
119        let locations = self.upload_ancillary_archive(&snapshot).await?;
120
121        Ok(AncillaryUpload {
122            locations,
123            size: snapshot.get_uncompressed_size(),
124        })
125    }
126
127    /// Creates an archive for the Cardano database ancillary files for the given immutable file number.
128    async fn create_ancillary_archive(&self, beacon: &CardanoDbBeacon) -> StdResult<FileArchive> {
129        debug!(
130            self.logger,
131            "Creating ancillary archive for immutable file number: {}",
132            beacon.immutable_file_number
133        );
134
135        let archive_name = format!(
136            "{}-e{}-i{}.ancillary",
137            self.cardano_network, *beacon.epoch, beacon.immutable_file_number,
138        );
139
140        let snapshot = self
141            .snapshotter
142            .snapshot_ancillary(beacon.immutable_file_number, &archive_name)
143            .await
144            .with_context(|| {
145                format!(
146                    "Failed to create ancillary archive for immutable file number: {}",
147                    beacon.immutable_file_number
148                )
149            })?;
150
151        debug!(
152            self.logger,
153            "Ancillary archive created at path: {:?}",
154            snapshot.get_file_path()
155        );
156
157        Ok(snapshot)
158    }
159
160    /// Uploads the ancillary archive and returns the locations of the uploaded files.
161    async fn upload_ancillary_archive(
162        &self,
163        file_archive: &FileArchive,
164    ) -> StdResult<Vec<AncillaryLocation>> {
165        let archive_filepath = file_archive.get_file_path();
166        let mut locations = Vec::new();
167        for uploader in &self.uploaders {
168            let result = uploader
169                .upload(
170                    archive_filepath,
171                    Some(file_archive.get_compression_algorithm()),
172                )
173                .await;
174            match result {
175                Ok(location) => {
176                    locations.push(location);
177                }
178                Err(e) => {
179                    error!(
180                        self.logger,
181                        "Failed to upload ancillary archive";
182                        "error" => e.to_string()
183                    );
184                }
185            }
186        }
187
188        if let Err(error) = tokio::fs::remove_file(archive_filepath).await {
189            warn!(
190                self.logger, " > Post upload ancillary archive file removal failure";
191                "error" => error
192            );
193        }
194
195        if locations.is_empty() {
196            return Err(anyhow!(
197                "Failed to upload ancillary archive with all uploaders"
198            ));
199        }
200
201        Ok(locations)
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use std::path::PathBuf;
208
209    use mithril_common::test_utils::{assert_equivalent, TempDir};
210
211    use crate::services::{DumbSnapshotter, MockSnapshotter};
212    use crate::test_tools::TestLogger;
213
214    use super::*;
215
216    fn fake_uploader_returning_error() -> MockAncillaryFileUploader {
217        let mut uploader = MockAncillaryFileUploader::new();
218        uploader
219            .expect_upload()
220            .return_once(|_, _| Err(anyhow!("Failure while uploading...")));
221
222        uploader
223    }
224
225    fn fake_uploader(
226        archive_path: &str,
227        location_uri: &str,
228        compression_algorithm: Option<CompressionAlgorithm>,
229    ) -> MockAncillaryFileUploader {
230        let uri = location_uri.to_string();
231        let filepath = archive_path.to_string();
232        let mut uploader = MockAncillaryFileUploader::new();
233        uploader
234            .expect_upload()
235            .withf(move |path, algorithm| {
236                path == Path::new(&filepath) && algorithm == &compression_algorithm
237            })
238            .times(1)
239            .return_once(move |_, _| {
240                Ok(AncillaryLocation::CloudStorage {
241                    uri,
242                    compression_algorithm,
243                })
244            });
245
246        uploader
247    }
248
249    fn create_fake_archive(dir: &Path, name: &str) -> PathBuf {
250        use std::fs::File;
251        use std::io::Write;
252
253        let file_path = dir.join(name);
254        let mut file = File::create(&file_path).unwrap();
255        writeln!(
256            file,
257            "I swear, this is an archive, not a temporary test file."
258        )
259        .unwrap();
260
261        file_path
262    }
263
264    #[test]
265    fn create_ancillary_builder_should_error_when_no_uploader() {
266        let result = AncillaryArtifactBuilder::new(
267            vec![],
268            Arc::new(DumbSnapshotter::default()),
269            CardanoNetwork::DevNet(123),
270            TestLogger::stdout(),
271        );
272
273        assert!(result.is_err(), "Should return an error when no uploaders")
274    }
275
276    #[tokio::test]
277    async fn upload_ancillary_archive_should_log_upload_errors() {
278        let log_path = TempDir::create(
279            "ancillary",
280            "upload_ancillary_archive_should_log_upload_errors",
281        )
282        .join("test.log");
283
284        let mut uploader = MockAncillaryFileUploader::new();
285        uploader
286            .expect_upload()
287            .return_once(|_, _| Err(anyhow!("Failure while uploading...")));
288
289        {
290            let builder = AncillaryArtifactBuilder::new(
291                vec![Arc::new(uploader)],
292                Arc::new(DumbSnapshotter::default()),
293                CardanoNetwork::DevNet(123),
294                TestLogger::file(&log_path),
295            )
296            .unwrap();
297
298            let _ = builder
299                .upload_ancillary_archive(&FileArchive::dummy())
300                .await;
301        }
302
303        let logs = std::fs::read_to_string(&log_path).unwrap();
304        assert!(logs.contains("Failure while uploading..."));
305    }
306
307    #[tokio::test]
308    async fn upload_ancillary_archive_should_error_when_no_location_is_returned() {
309        let uploader = fake_uploader_returning_error();
310
311        let builder = AncillaryArtifactBuilder::new(
312            vec![Arc::new(uploader)],
313            Arc::new(DumbSnapshotter::default()),
314            CardanoNetwork::DevNet(123),
315            TestLogger::stdout(),
316        )
317        .unwrap();
318
319        let result = builder
320            .upload_ancillary_archive(&FileArchive::dummy())
321            .await;
322
323        assert!(
324            result.is_err(),
325            "Should return an error when no location is returned"
326        );
327    }
328
329    #[tokio::test]
330    async fn upload_ancillary_archive_should_return_location_even_with_uploaders_errors() {
331        let first_uploader = fake_uploader_returning_error();
332        let second_uploader =
333            fake_uploader("archive_path", "an_uri", Some(CompressionAlgorithm::Gzip));
334        let third_uploader = fake_uploader_returning_error();
335
336        let uploaders: Vec<Arc<dyn AncillaryFileUploader>> = vec![
337            Arc::new(first_uploader),
338            Arc::new(second_uploader),
339            Arc::new(third_uploader),
340        ];
341
342        let builder = AncillaryArtifactBuilder::new(
343            uploaders,
344            Arc::new(DumbSnapshotter::default()),
345            CardanoNetwork::DevNet(123),
346            TestLogger::stdout(),
347        )
348        .unwrap();
349
350        let locations = builder
351            .upload_ancillary_archive(&FileArchive::new(
352                PathBuf::from("archive_path"),
353                0,
354                0,
355                CompressionAlgorithm::Gzip,
356            ))
357            .await
358            .unwrap();
359
360        assert_equivalent(
361            locations,
362            vec![AncillaryLocation::CloudStorage {
363                uri: "an_uri".to_string(),
364                compression_algorithm: Some(CompressionAlgorithm::Gzip),
365            }],
366        );
367    }
368
369    #[tokio::test]
370    async fn upload_ancillary_archive_should_return_all_uploaders_returned_locations() {
371        let first_uploader =
372            fake_uploader("archive_path", "an_uri", Some(CompressionAlgorithm::Gzip));
373        let second_uploader = fake_uploader(
374            "archive_path",
375            "another_uri",
376            Some(CompressionAlgorithm::Gzip),
377        );
378
379        let uploaders: Vec<Arc<dyn AncillaryFileUploader>> =
380            vec![Arc::new(first_uploader), Arc::new(second_uploader)];
381
382        let builder = AncillaryArtifactBuilder::new(
383            uploaders,
384            Arc::new(DumbSnapshotter::new(CompressionAlgorithm::Gzip)),
385            CardanoNetwork::DevNet(123),
386            TestLogger::stdout(),
387        )
388        .unwrap();
389
390        let locations = builder
391            .upload_ancillary_archive(&FileArchive::new(
392                PathBuf::from("archive_path"),
393                0,
394                0,
395                CompressionAlgorithm::Gzip,
396            ))
397            .await
398            .unwrap();
399
400        assert_equivalent(
401            locations,
402            vec![
403                AncillaryLocation::CloudStorage {
404                    uri: "an_uri".to_string(),
405                    compression_algorithm: Some(CompressionAlgorithm::Gzip),
406                },
407                AncillaryLocation::CloudStorage {
408                    uri: "another_uri".to_string(),
409                    compression_algorithm: Some(CompressionAlgorithm::Gzip),
410                },
411            ],
412        );
413    }
414
415    #[tokio::test]
416    async fn upload_ancillary_archive_should_remove_archive_after_upload() {
417        let source_dir = TempDir::create(
418            "ancillary",
419            "upload_ancillary_archive_should_remove_archive_after_upload",
420        );
421        let archive_path = create_fake_archive(&source_dir, "ancillary.tar.gz");
422        let archive = FileArchive::new(archive_path.clone(), 0, 0, CompressionAlgorithm::Gzip);
423        let uploader = fake_uploader(
424            archive_path.as_os_str().to_str().unwrap(),
425            "an_uri",
426            Some(CompressionAlgorithm::Gzip),
427        );
428
429        let builder = AncillaryArtifactBuilder::new(
430            vec![Arc::new(uploader)],
431            Arc::new(DumbSnapshotter::new(CompressionAlgorithm::Gzip)),
432            CardanoNetwork::DevNet(123),
433            TestLogger::stdout(),
434        )
435        .unwrap();
436
437        assert!(archive_path.exists());
438
439        builder.upload_ancillary_archive(&archive).await.unwrap();
440
441        assert!(!archive_path.exists());
442    }
443
444    #[tokio::test]
445    async fn upload_ancillary_archive_should_remove_archive_when_no_uploader_succeed() {
446        let source_dir = TempDir::create(
447            "ancillary",
448            "upload_ancillary_archive_should_remove_archive_when_no_uploader_succeed",
449        );
450        let archive_path = create_fake_archive(&source_dir, "ancillary.tar.gz");
451        let archive = FileArchive::new(archive_path.clone(), 0, 0, CompressionAlgorithm::Gzip);
452        let uploader = fake_uploader_returning_error();
453
454        let builder = AncillaryArtifactBuilder::new(
455            vec![Arc::new(uploader)],
456            Arc::new(DumbSnapshotter::default()),
457            CardanoNetwork::DevNet(123),
458            TestLogger::stdout(),
459        )
460        .unwrap();
461
462        assert!(archive_path.exists());
463
464        builder
465            .upload_ancillary_archive(&archive)
466            .await
467            .unwrap_err();
468
469        assert!(!archive_path.exists());
470    }
471
472    #[tokio::test]
473    async fn upload_should_return_error_and_not_upload_when_archive_creation_fails() {
474        let mut snapshotter = MockSnapshotter::new();
475        snapshotter
476            .expect_snapshot_ancillary()
477            .returning(|_, _| Err(anyhow!("Failed to create archive")));
478
479        let mut uploader = MockAncillaryFileUploader::new();
480        uploader.expect_upload().never();
481
482        let builder = AncillaryArtifactBuilder::new(
483            vec![Arc::new(uploader)],
484            Arc::new(snapshotter),
485            CardanoNetwork::DevNet(123),
486            TestLogger::stdout(),
487        )
488        .unwrap();
489
490        builder
491            .upload(&CardanoDbBeacon::new(99, 1))
492            .await
493            .expect_err("Should return an error when archive creation fails");
494    }
495
496    #[tokio::test]
497    async fn should_compute_the_size_of_the_ancillary() {
498        let mut snapshotter = MockSnapshotter::new();
499        snapshotter.expect_snapshot_ancillary().returning(|_, _| {
500            let expected_uncompressed_size = 123456;
501            Ok(FileArchive::new(
502                PathBuf::from("whatever.tar.gz"),
503                0,
504                expected_uncompressed_size,
505                CompressionAlgorithm::Gzip,
506            ))
507        });
508        let mut uploader = MockAncillaryFileUploader::new();
509        uploader.expect_upload().returning(|_, _| {
510            Ok(AncillaryLocation::CloudStorage {
511                uri: "an_uri".to_string(),
512                compression_algorithm: Some(CompressionAlgorithm::Gzip),
513            })
514        });
515
516        let builder = AncillaryArtifactBuilder::new(
517            vec![Arc::new(uploader)],
518            Arc::new(snapshotter),
519            CardanoNetwork::DevNet(123),
520            TestLogger::stdout(),
521        )
522        .unwrap();
523
524        let upload = builder.upload(&CardanoDbBeacon::new(99, 1)).await.unwrap();
525
526        assert_eq!(123456, upload.size);
527    }
528}