mithril_aggregator/artifact_builder/
cardano_immutable_files_full.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use mithril_common::entities::{FileUri, ImmutableFileNumber};
4use semver::Version;
5use slog::{Logger, debug, warn};
6use std::sync::Arc;
7use thiserror::Error;
8
9use mithril_common::logging::LoggerExtensions;
10use mithril_common::{
11    CardanoNetwork, StdResult,
12    entities::{CardanoDbBeacon, Certificate, ProtocolMessagePartKey, Snapshot},
13};
14
15use super::ArtifactBuilder;
16use crate::{FileUploader, services::Snapshotter, tools::file_archiver::FileArchive};
17
18/// [CardanoImmutableFilesFullArtifact] error
19#[derive(Debug, Error)]
20pub enum CardanoImmutableFilesFullArtifactError {
21    /// Protocol message part is missing
22    #[error("Missing protocol message for beacon: '{0}'.")]
23    MissingProtocolMessage(CardanoDbBeacon),
24}
25
26/// A [CardanoImmutableFilesFullArtifact] builder
27pub struct CardanoImmutableFilesFullArtifactBuilder {
28    cardano_network: CardanoNetwork,
29    cardano_node_version: Version,
30    snapshotter: Arc<dyn Snapshotter>,
31    snapshot_uploader: Arc<dyn FileUploader>,
32    logger: Logger,
33}
34
35impl CardanoImmutableFilesFullArtifactBuilder {
36    /// CardanoImmutableFilesFull artifact builder factory
37    pub fn new(
38        cardano_network: CardanoNetwork,
39        cardano_node_version: &Version,
40        snapshotter: Arc<dyn Snapshotter>,
41        snapshot_uploader: Arc<dyn FileUploader>,
42        logger: Logger,
43    ) -> Self {
44        Self {
45            cardano_network,
46            cardano_node_version: cardano_node_version.clone(),
47            snapshotter,
48            snapshot_uploader,
49            logger: logger.new_with_component_name::<Self>(),
50        }
51    }
52
53    async fn create_immutables_snapshot_archive(
54        &self,
55        base_file_name_without_extension: &str,
56    ) -> StdResult<FileArchive> {
57        debug!(self.logger, ">> create_immutables_snapshot_archive");
58
59        let snapshotter = self.snapshotter.clone();
60        let snapshot_name = base_file_name_without_extension.to_owned();
61        let ongoing_snapshot =
62            snapshotter.snapshot_all_completed_immutables(&snapshot_name).await?;
63
64        debug!(
65            self.logger,
66            " > Immutables snapshot created: '{ongoing_snapshot:?}'"
67        );
68
69        Ok(ongoing_snapshot)
70    }
71
72    async fn create_ancillary_snapshot_archive(
73        &self,
74        immutable_file_number: ImmutableFileNumber,
75        base_file_name_without_extension: &str,
76    ) -> StdResult<FileArchive> {
77        debug!(self.logger, ">> create_ancillary_snapshot_archive");
78
79        let snapshotter = self.snapshotter.clone();
80        let snapshot_name = format!("{base_file_name_without_extension}.ancillary");
81        let ongoing_snapshot = snapshotter
82            .snapshot_ancillary(immutable_file_number, &snapshot_name)
83            .await?;
84
85        debug!(
86            self.logger,
87            " > Ancillary snapshot created: '{ongoing_snapshot:?}'"
88        );
89
90        Ok(ongoing_snapshot)
91    }
92
93    async fn upload_snapshot_archive(
94        &self,
95        ongoing_snapshot: &FileArchive,
96    ) -> StdResult<Vec<FileUri>> {
97        debug!(self.logger, ">> upload_snapshot_archive");
98        let location = self.snapshot_uploader.upload(ongoing_snapshot.get_file_path()).await;
99
100        if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await {
101            warn!(
102                self.logger, " > Post upload ongoing snapshot file removal failure";
103                "error" => error
104            );
105        }
106
107        Ok(vec![location?])
108    }
109
110    fn create_snapshot(
111        &self,
112        beacon: CardanoDbBeacon,
113        ongoing_immutables_snapshot: &FileArchive,
114        ongoing_ancillary_snapshot: &FileArchive,
115        snapshot_digest: String,
116        immutables_remote_locations: Vec<String>,
117        ancillary_remote_locations: Vec<String>,
118    ) -> StdResult<Snapshot> {
119        debug!(self.logger, ">> create_snapshot");
120
121        let snapshot = Snapshot {
122            digest: snapshot_digest,
123            network: self.cardano_network.into(),
124            beacon,
125            size: ongoing_immutables_snapshot.get_archive_size(),
126            ancillary_size: Some(ongoing_ancillary_snapshot.get_archive_size()),
127            locations: immutables_remote_locations,
128            ancillary_locations: Some(ancillary_remote_locations),
129            compression_algorithm: ongoing_immutables_snapshot.get_compression_algorithm(),
130            cardano_node_version: self.cardano_node_version.to_string(),
131        };
132
133        Ok(snapshot)
134    }
135
136    fn get_base_file_name_without_extension(
137        &self,
138        beacon: &CardanoDbBeacon,
139        snapshot_digest: &str,
140    ) -> String {
141        let snapshot_name = format!(
142            "{}-e{}-i{}.{}",
143            self.cardano_network, *beacon.epoch, beacon.immutable_file_number, snapshot_digest,
144        );
145        snapshot_name
146    }
147}
148
149#[async_trait]
150impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for CardanoImmutableFilesFullArtifactBuilder {
151    async fn compute_artifact(
152        &self,
153        beacon: CardanoDbBeacon,
154        certificate: &Certificate,
155    ) -> StdResult<Snapshot> {
156        let snapshot_digest = certificate
157            .protocol_message
158            .get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
159            .ok_or_else(|| {
160                CardanoImmutableFilesFullArtifactError::MissingProtocolMessage(beacon.clone())
161            })?
162            .to_owned();
163
164        let base_file_name_without_extension =
165            self.get_base_file_name_without_extension(&beacon, &snapshot_digest);
166        let ongoing_immutables_snapshot = self
167            .create_immutables_snapshot_archive(&base_file_name_without_extension)
168            .await
169            .with_context(|| {
170                "Cardano Immutable Files Full Artifact Builder can not create immutables snapshot archive"
171            })?;
172        let ongoing_ancillary_snapshot = self
173            .create_ancillary_snapshot_archive(beacon.immutable_file_number, &base_file_name_without_extension)
174            .await
175            .with_context(|| {
176                "Cardano Immutable Files Full Artifact Builder can not create ancillary snapshot archive"
177            })?;
178        let immutables_locations = self
179            .upload_snapshot_archive(&ongoing_immutables_snapshot)
180            .await
181            .with_context(|| {
182                format!("Cardano Immutable Files Full Artifact Builder can not upload immutables snapshot archive to path: '{:?}'", ongoing_immutables_snapshot.get_file_path())
183            })?;
184        let ancillary_locations = self
185            .upload_snapshot_archive(&ongoing_ancillary_snapshot)
186            .await
187            .with_context(|| {
188                format!("Cardano Immutable Files Full Artifact Builder can not upload ancillary snapshot archive to path: '{:?}'", ongoing_ancillary_snapshot.get_file_path())
189            })?;
190
191        let snapshot = self.create_snapshot(
192            beacon,
193            &ongoing_immutables_snapshot,
194            &ongoing_ancillary_snapshot,
195            snapshot_digest,
196            immutables_locations.into_iter().map(Into::into).collect(),
197            ancillary_locations.into_iter().map(Into::into).collect(),
198        )?;
199
200        Ok(snapshot)
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use anyhow::anyhow;
207    use tempfile::NamedTempFile;
208
209    use mithril_common::{entities::CompressionAlgorithm, test_utils::fake_data};
210
211    use crate::{
212        DumbUploader, file_uploaders::MockFileUploader, services::DumbSnapshotter,
213        test_tools::TestLogger,
214    };
215
216    use super::*;
217
218    #[tokio::test]
219    async fn should_compute_valid_artifact() {
220        let beacon = fake_data::beacon();
221        let certificate = fake_data::certificate("certificate-123".to_string());
222        let snapshot_digest = certificate
223            .protocol_message
224            .get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
225            .unwrap();
226        let individual_snapshot_archive_size = 7331;
227
228        let dumb_snapshotter = Arc::new(
229            DumbSnapshotter::new(CompressionAlgorithm::Zstandard)
230                .with_archive_size(individual_snapshot_archive_size),
231        );
232        let dumb_snapshot_uploader = Arc::new(DumbUploader::default());
233
234        let cardano_immutable_files_full_artifact_builder =
235            CardanoImmutableFilesFullArtifactBuilder::new(
236                fake_data::network(),
237                &Version::parse("1.0.0").unwrap(),
238                dumb_snapshotter.clone(),
239                dumb_snapshot_uploader.clone(),
240                TestLogger::stdout(),
241            );
242        let artifact = cardano_immutable_files_full_artifact_builder
243            .compute_artifact(beacon.clone(), &certificate)
244            .await
245            .unwrap();
246
247        let last_uploads = dumb_snapshot_uploader.get_last_n_uploads(2).unwrap();
248        let [ancillary_location, immutables_location, ..] = last_uploads.as_slice() else {
249            panic!("Two snapshots should have been 'uploaded'");
250        };
251        let artifact_expected = Snapshot {
252            digest: snapshot_digest.to_owned(),
253            network: fake_data::network().into(),
254            beacon,
255            size: individual_snapshot_archive_size,
256            ancillary_size: Some(individual_snapshot_archive_size),
257            locations: vec![immutables_location.into()],
258            ancillary_locations: Some(vec![ancillary_location.into()]),
259            compression_algorithm: CompressionAlgorithm::Zstandard,
260            cardano_node_version: "1.0.0".to_string(),
261        };
262
263        assert_eq!(artifact_expected, artifact);
264    }
265
266    #[tokio::test]
267    async fn snapshot_archive_name_include_beacon_and_network_values() {
268        let network = fake_data::network();
269        let beacon = CardanoDbBeacon::new(20, 145);
270        let digest = "test+digest";
271
272        let cardano_immutable_files_full_artifact_builder =
273            CardanoImmutableFilesFullArtifactBuilder::new(
274                network,
275                &Version::parse("1.0.0").unwrap(),
276                Arc::new(DumbSnapshotter::default()),
277                Arc::new(DumbUploader::default()),
278                TestLogger::stdout(),
279            );
280
281        let name = cardano_immutable_files_full_artifact_builder
282            .get_base_file_name_without_extension(&beacon, digest);
283
284        assert_eq!(
285            format!(
286                "{}-e{}-i{}.{digest}",
287                network, *beacon.epoch, beacon.immutable_file_number,
288            ),
289            name
290        );
291    }
292
293    #[tokio::test]
294    async fn remove_snapshot_archive_after_upload() {
295        let file = NamedTempFile::new().unwrap();
296        let file_path = file.path();
297        let snapshot = FileArchive::new(
298            file_path.to_path_buf(),
299            7331,
300            7331,
301            CompressionAlgorithm::default(),
302        );
303
304        let cardano_immutable_files_full_artifact_builder =
305            CardanoImmutableFilesFullArtifactBuilder::new(
306                fake_data::network(),
307                &Version::parse("1.0.0").unwrap(),
308                Arc::new(DumbSnapshotter::default()),
309                Arc::new(DumbUploader::default()),
310                TestLogger::stdout(),
311            );
312
313        cardano_immutable_files_full_artifact_builder
314            .upload_snapshot_archive(&snapshot)
315            .await
316            .expect("Snapshot upload should not fail");
317
318        assert!(
319            !file_path.exists(),
320            "Ongoing snapshot file should have been removed after upload"
321        );
322    }
323
324    #[tokio::test]
325    async fn remove_snapshot_archive_after_upload_even_if_an_error_occurred() {
326        let file = NamedTempFile::new().unwrap();
327        let file_path = file.path();
328        let snapshot = FileArchive::new(
329            file_path.to_path_buf(),
330            7331,
331            7331,
332            CompressionAlgorithm::default(),
333        );
334        let mut snapshot_uploader = MockFileUploader::new();
335        snapshot_uploader
336            .expect_upload()
337            .return_once(|_| Err(anyhow!("an error")))
338            .once();
339
340        let cardano_immutable_files_full_artifact_builder =
341            CardanoImmutableFilesFullArtifactBuilder::new(
342                fake_data::network(),
343                &Version::parse("1.0.0").unwrap(),
344                Arc::new(DumbSnapshotter::default()),
345                Arc::new(snapshot_uploader),
346                TestLogger::stdout(),
347            );
348
349        cardano_immutable_files_full_artifact_builder
350            .upload_snapshot_archive(&snapshot)
351            .await
352            .expect_err("Snapshot upload should have failed");
353
354        assert!(
355            !file_path.exists(),
356            "Ongoing snapshot file should have been removed even after upload failure"
357        );
358    }
359}