mithril_aggregator/artifact_builder/cardano_database_artifacts/
ancillary.rs

1use std::{path::Path, sync::Arc};
2
3use anyhow::{Context, anyhow};
4use async_trait::async_trait;
5use slog::{Logger, debug, error, warn};
6
7use mithril_common::{
8    CardanoNetwork, StdResult,
9    entities::{AncillaryLocation, CardanoDbBeacon, CompressionAlgorithm},
10    logging::LoggerExtensions,
11};
12
13use crate::{
14    DumbUploader, FileUploader,
15    file_uploaders::{CloudUploader, LocalUploader},
16    services::Snapshotter,
17    tools::file_archiver::FileArchive,
18};
19
20/// The [AncillaryFileUploader] trait allows identifying uploaders that return locations for ancillary archive files.
21#[cfg_attr(test, mockall::automock)]
22#[async_trait]
23pub trait AncillaryFileUploader: Send + Sync {
24    /// Uploads the archive at the given filepath and returns the location of the uploaded file.
25    async fn upload(
26        &self,
27        filepath: &Path,
28        compression_algorithm: Option<CompressionAlgorithm>,
29    ) -> StdResult<AncillaryLocation>;
30}
31
32#[async_trait]
33impl AncillaryFileUploader for DumbUploader {
34    async fn upload(
35        &self,
36        filepath: &Path,
37        compression_algorithm: Option<CompressionAlgorithm>,
38    ) -> StdResult<AncillaryLocation> {
39        let uri = FileUploader::upload(self, filepath).await?.into();
40
41        Ok(AncillaryLocation::CloudStorage {
42            uri,
43            compression_algorithm,
44        })
45    }
46}
47
48#[async_trait]
49impl AncillaryFileUploader for LocalUploader {
50    async fn upload(
51        &self,
52        filepath: &Path,
53        compression_algorithm: Option<CompressionAlgorithm>,
54    ) -> StdResult<AncillaryLocation> {
55        let uri = FileUploader::upload(self, filepath).await?.into();
56
57        Ok(AncillaryLocation::CloudStorage {
58            uri,
59            compression_algorithm,
60        })
61    }
62}
63
64#[async_trait]
65impl AncillaryFileUploader for CloudUploader {
66    async fn upload(
67        &self,
68        filepath: &Path,
69        compression_algorithm: Option<CompressionAlgorithm>,
70    ) -> StdResult<AncillaryLocation> {
71        let uri = FileUploader::upload(self, filepath).await?.into();
72
73        Ok(AncillaryLocation::CloudStorage {
74            uri,
75            compression_algorithm,
76        })
77    }
78}
79
80#[derive(Debug)]
81pub struct AncillaryUpload {
82    pub locations: Vec<AncillaryLocation>,
83    pub size: u64,
84}
85
86/// The [AncillaryArtifactBuilder] creates an ancillary archive from the cardano database directory (including ledger and volatile directories).
87/// The archive is uploaded with the provided uploaders.
88pub struct AncillaryArtifactBuilder {
89    uploaders: Vec<Arc<dyn AncillaryFileUploader>>,
90    snapshotter: Arc<dyn Snapshotter>,
91    cardano_network: CardanoNetwork,
92    logger: Logger,
93}
94
95impl AncillaryArtifactBuilder {
96    /// Creates a new [AncillaryArtifactBuilder].
97    pub fn new(
98        uploaders: Vec<Arc<dyn AncillaryFileUploader>>,
99        snapshotter: Arc<dyn Snapshotter>,
100        cardano_network: CardanoNetwork,
101        logger: Logger,
102    ) -> StdResult<Self> {
103        if uploaders.is_empty() {
104            return Err(anyhow!(
105                "At least one uploader is required to create an 'AncillaryArtifactBuilder'"
106            ));
107        }
108
109        Ok(Self {
110            uploaders,
111            logger: logger.new_with_component_name::<Self>(),
112            cardano_network,
113            snapshotter,
114        })
115    }
116
117    pub async fn upload(&self, beacon: &CardanoDbBeacon) -> StdResult<AncillaryUpload> {
118        let snapshot = self.create_ancillary_archive(beacon).await?;
119        let locations = self.upload_ancillary_archive(&snapshot).await?;
120
121        Ok(AncillaryUpload {
122            locations,
123            size: snapshot.get_uncompressed_size(),
124        })
125    }
126
127    /// Creates an archive for the Cardano database ancillary files for the given immutable file number.
128    async fn create_ancillary_archive(&self, beacon: &CardanoDbBeacon) -> StdResult<FileArchive> {
129        debug!(
130            self.logger,
131            "Creating ancillary archive for immutable file number: {}",
132            beacon.immutable_file_number
133        );
134
135        let archive_name = format!(
136            "{}-e{}-i{}.ancillary",
137            self.cardano_network, *beacon.epoch, beacon.immutable_file_number,
138        );
139
140        let snapshot = self
141            .snapshotter
142            .snapshot_ancillary(beacon.immutable_file_number, &archive_name)
143            .await
144            .with_context(|| {
145                format!(
146                    "Failed to create ancillary archive for immutable file number: {}",
147                    beacon.immutable_file_number
148                )
149            })?;
150
151        debug!(
152            self.logger,
153            "Ancillary archive created at path: {:?}",
154            snapshot.get_file_path()
155        );
156
157        Ok(snapshot)
158    }
159
160    /// Uploads the ancillary archive and returns the locations of the uploaded files.
161    async fn upload_ancillary_archive(
162        &self,
163        file_archive: &FileArchive,
164    ) -> StdResult<Vec<AncillaryLocation>> {
165        let archive_filepath = file_archive.get_file_path();
166        let mut locations = Vec::new();
167        for uploader in &self.uploaders {
168            let result = uploader
169                .upload(
170                    archive_filepath,
171                    Some(file_archive.get_compression_algorithm()),
172                )
173                .await;
174            match result {
175                Ok(location) => {
176                    locations.push(location);
177                }
178                Err(e) => {
179                    error!(
180                        self.logger,
181                        "Failed to upload ancillary archive";
182                        "error" => e.to_string()
183                    );
184                }
185            }
186        }
187
188        if let Err(error) = tokio::fs::remove_file(archive_filepath).await {
189            warn!(
190                self.logger, " > Post upload ancillary archive file removal failure";
191                "error" => error
192            );
193        }
194
195        if locations.is_empty() {
196            return Err(anyhow!(
197                "Failed to upload ancillary archive with all uploaders"
198            ));
199        }
200
201        Ok(locations)
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use std::path::PathBuf;
208
209    use mithril_common::test_utils::{TempDir, assert_equivalent};
210
211    use crate::services::{DumbSnapshotter, MockSnapshotter};
212    use crate::test_tools::TestLogger;
213
214    use super::*;
215
216    fn fake_uploader_returning_error() -> MockAncillaryFileUploader {
217        let mut uploader = MockAncillaryFileUploader::new();
218        uploader
219            .expect_upload()
220            .return_once(|_, _| Err(anyhow!("Failure while uploading...")));
221
222        uploader
223    }
224
225    fn fake_uploader(
226        archive_path: &str,
227        location_uri: &str,
228        compression_algorithm: Option<CompressionAlgorithm>,
229    ) -> MockAncillaryFileUploader {
230        let uri = location_uri.to_string();
231        let filepath = archive_path.to_string();
232        let mut uploader = MockAncillaryFileUploader::new();
233        uploader
234            .expect_upload()
235            .withf(move |path, algorithm| {
236                path == Path::new(&filepath) && algorithm == &compression_algorithm
237            })
238            .times(1)
239            .return_once(move |_, _| {
240                Ok(AncillaryLocation::CloudStorage {
241                    uri,
242                    compression_algorithm,
243                })
244            });
245
246        uploader
247    }
248
249    fn create_fake_archive(dir: &Path, name: &str) -> PathBuf {
250        use std::fs::File;
251        use std::io::Write;
252
253        let file_path = dir.join(name);
254        let mut file = File::create(&file_path).unwrap();
255        writeln!(
256            file,
257            "I swear, this is an archive, not a temporary test file."
258        )
259        .unwrap();
260
261        file_path
262    }
263
264    #[test]
265    fn create_ancillary_builder_should_error_when_no_uploader() {
266        let result = AncillaryArtifactBuilder::new(
267            vec![],
268            Arc::new(DumbSnapshotter::default()),
269            CardanoNetwork::DevNet(123),
270            TestLogger::stdout(),
271        );
272
273        assert!(result.is_err(), "Should return an error when no uploaders")
274    }
275
276    #[tokio::test]
277    async fn upload_ancillary_archive_should_log_upload_errors() {
278        let (logger, log_inspector) = TestLogger::memory();
279        let mut uploader = MockAncillaryFileUploader::new();
280        uploader
281            .expect_upload()
282            .return_once(|_, _| Err(anyhow!("Failure while uploading...")));
283
284        let builder = AncillaryArtifactBuilder::new(
285            vec![Arc::new(uploader)],
286            Arc::new(DumbSnapshotter::default()),
287            CardanoNetwork::DevNet(123),
288            logger,
289        )
290        .unwrap();
291
292        let _ = builder.upload_ancillary_archive(&FileArchive::dummy()).await;
293
294        assert!(log_inspector.contains_log("Failure while uploading..."));
295    }
296
297    #[tokio::test]
298    async fn upload_ancillary_archive_should_error_when_no_location_is_returned() {
299        let uploader = fake_uploader_returning_error();
300
301        let builder = AncillaryArtifactBuilder::new(
302            vec![Arc::new(uploader)],
303            Arc::new(DumbSnapshotter::default()),
304            CardanoNetwork::DevNet(123),
305            TestLogger::stdout(),
306        )
307        .unwrap();
308
309        let result = builder.upload_ancillary_archive(&FileArchive::dummy()).await;
310
311        assert!(
312            result.is_err(),
313            "Should return an error when no location is returned"
314        );
315    }
316
317    #[tokio::test]
318    async fn upload_ancillary_archive_should_return_location_even_with_uploaders_errors() {
319        let first_uploader = fake_uploader_returning_error();
320        let second_uploader =
321            fake_uploader("archive_path", "an_uri", Some(CompressionAlgorithm::Gzip));
322        let third_uploader = fake_uploader_returning_error();
323
324        let uploaders: Vec<Arc<dyn AncillaryFileUploader>> = vec![
325            Arc::new(first_uploader),
326            Arc::new(second_uploader),
327            Arc::new(third_uploader),
328        ];
329
330        let builder = AncillaryArtifactBuilder::new(
331            uploaders,
332            Arc::new(DumbSnapshotter::default()),
333            CardanoNetwork::DevNet(123),
334            TestLogger::stdout(),
335        )
336        .unwrap();
337
338        let locations = builder
339            .upload_ancillary_archive(&FileArchive::new(
340                PathBuf::from("archive_path"),
341                0,
342                0,
343                CompressionAlgorithm::Gzip,
344            ))
345            .await
346            .unwrap();
347
348        assert_equivalent(
349            locations,
350            vec![AncillaryLocation::CloudStorage {
351                uri: "an_uri".to_string(),
352                compression_algorithm: Some(CompressionAlgorithm::Gzip),
353            }],
354        );
355    }
356
357    #[tokio::test]
358    async fn upload_ancillary_archive_should_return_all_uploaders_returned_locations() {
359        let first_uploader =
360            fake_uploader("archive_path", "an_uri", Some(CompressionAlgorithm::Gzip));
361        let second_uploader = fake_uploader(
362            "archive_path",
363            "another_uri",
364            Some(CompressionAlgorithm::Gzip),
365        );
366
367        let uploaders: Vec<Arc<dyn AncillaryFileUploader>> =
368            vec![Arc::new(first_uploader), Arc::new(second_uploader)];
369
370        let builder = AncillaryArtifactBuilder::new(
371            uploaders,
372            Arc::new(DumbSnapshotter::new(CompressionAlgorithm::Gzip)),
373            CardanoNetwork::DevNet(123),
374            TestLogger::stdout(),
375        )
376        .unwrap();
377
378        let locations = builder
379            .upload_ancillary_archive(&FileArchive::new(
380                PathBuf::from("archive_path"),
381                0,
382                0,
383                CompressionAlgorithm::Gzip,
384            ))
385            .await
386            .unwrap();
387
388        assert_equivalent(
389            locations,
390            vec![
391                AncillaryLocation::CloudStorage {
392                    uri: "an_uri".to_string(),
393                    compression_algorithm: Some(CompressionAlgorithm::Gzip),
394                },
395                AncillaryLocation::CloudStorage {
396                    uri: "another_uri".to_string(),
397                    compression_algorithm: Some(CompressionAlgorithm::Gzip),
398                },
399            ],
400        );
401    }
402
403    #[tokio::test]
404    async fn upload_ancillary_archive_should_remove_archive_after_upload() {
405        let source_dir = TempDir::create(
406            "ancillary",
407            "upload_ancillary_archive_should_remove_archive_after_upload",
408        );
409        let archive_path = create_fake_archive(&source_dir, "ancillary.tar.gz");
410        let archive = FileArchive::new(archive_path.clone(), 0, 0, CompressionAlgorithm::Gzip);
411        let uploader = fake_uploader(
412            archive_path.as_os_str().to_str().unwrap(),
413            "an_uri",
414            Some(CompressionAlgorithm::Gzip),
415        );
416
417        let builder = AncillaryArtifactBuilder::new(
418            vec![Arc::new(uploader)],
419            Arc::new(DumbSnapshotter::new(CompressionAlgorithm::Gzip)),
420            CardanoNetwork::DevNet(123),
421            TestLogger::stdout(),
422        )
423        .unwrap();
424
425        assert!(archive_path.exists());
426
427        builder.upload_ancillary_archive(&archive).await.unwrap();
428
429        assert!(!archive_path.exists());
430    }
431
432    #[tokio::test]
433    async fn upload_ancillary_archive_should_remove_archive_when_no_uploader_succeed() {
434        let source_dir = TempDir::create(
435            "ancillary",
436            "upload_ancillary_archive_should_remove_archive_when_no_uploader_succeed",
437        );
438        let archive_path = create_fake_archive(&source_dir, "ancillary.tar.gz");
439        let archive = FileArchive::new(archive_path.clone(), 0, 0, CompressionAlgorithm::Gzip);
440        let uploader = fake_uploader_returning_error();
441
442        let builder = AncillaryArtifactBuilder::new(
443            vec![Arc::new(uploader)],
444            Arc::new(DumbSnapshotter::default()),
445            CardanoNetwork::DevNet(123),
446            TestLogger::stdout(),
447        )
448        .unwrap();
449
450        assert!(archive_path.exists());
451
452        builder.upload_ancillary_archive(&archive).await.unwrap_err();
453
454        assert!(!archive_path.exists());
455    }
456
457    #[tokio::test]
458    async fn upload_should_return_error_and_not_upload_when_archive_creation_fails() {
459        let mut snapshotter = MockSnapshotter::new();
460        snapshotter
461            .expect_snapshot_ancillary()
462            .returning(|_, _| Err(anyhow!("Failed to create archive")));
463
464        let mut uploader = MockAncillaryFileUploader::new();
465        uploader.expect_upload().never();
466
467        let builder = AncillaryArtifactBuilder::new(
468            vec![Arc::new(uploader)],
469            Arc::new(snapshotter),
470            CardanoNetwork::DevNet(123),
471            TestLogger::stdout(),
472        )
473        .unwrap();
474
475        builder
476            .upload(&CardanoDbBeacon::new(99, 1))
477            .await
478            .expect_err("Should return an error when archive creation fails");
479    }
480
481    #[tokio::test]
482    async fn should_compute_the_size_of_the_ancillary() {
483        let mut snapshotter = MockSnapshotter::new();
484        snapshotter.expect_snapshot_ancillary().returning(|_, _| {
485            let expected_uncompressed_size = 123456;
486            Ok(FileArchive::new(
487                PathBuf::from("whatever.tar.gz"),
488                0,
489                expected_uncompressed_size,
490                CompressionAlgorithm::Gzip,
491            ))
492        });
493        let mut uploader = MockAncillaryFileUploader::new();
494        uploader.expect_upload().returning(|_, _| {
495            Ok(AncillaryLocation::CloudStorage {
496                uri: "an_uri".to_string(),
497                compression_algorithm: Some(CompressionAlgorithm::Gzip),
498            })
499        });
500
501        let builder = AncillaryArtifactBuilder::new(
502            vec![Arc::new(uploader)],
503            Arc::new(snapshotter),
504            CardanoNetwork::DevNet(123),
505            TestLogger::stdout(),
506        )
507        .unwrap();
508
509        let upload = builder.upload(&CardanoDbBeacon::new(99, 1)).await.unwrap();
510
511        assert_eq!(123456, upload.size);
512    }
513}