mithril_aggregator/artifact_builder/
cardano_immutable_files_full.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use mithril_common::entities::{FileUri, ImmutableFileNumber};
4use semver::Version;
5use slog::{debug, warn, Logger};
6use std::sync::Arc;
7use thiserror::Error;
8
9use mithril_common::logging::LoggerExtensions;
10use mithril_common::{
11    entities::{CardanoDbBeacon, Certificate, ProtocolMessagePartKey, Snapshot},
12    CardanoNetwork, StdResult,
13};
14
15use super::ArtifactBuilder;
16use crate::{services::Snapshotter, tools::file_archiver::FileArchive, FileUploader};
17
18/// [CardanoImmutableFilesFullArtifact] error
19#[derive(Debug, Error)]
20pub enum CardanoImmutableFilesFullArtifactError {
21    /// Protocol message part is missing
22    #[error("Missing protocol message for beacon: '{0}'.")]
23    MissingProtocolMessage(CardanoDbBeacon),
24}
25
26/// A [CardanoImmutableFilesFullArtifact] builder
27pub struct CardanoImmutableFilesFullArtifactBuilder {
28    cardano_network: CardanoNetwork,
29    cardano_node_version: Version,
30    snapshotter: Arc<dyn Snapshotter>,
31    snapshot_uploader: Arc<dyn FileUploader>,
32    logger: Logger,
33}
34
35impl CardanoImmutableFilesFullArtifactBuilder {
36    /// CardanoImmutableFilesFull artifact builder factory
37    pub fn new(
38        cardano_network: CardanoNetwork,
39        cardano_node_version: &Version,
40        snapshotter: Arc<dyn Snapshotter>,
41        snapshot_uploader: Arc<dyn FileUploader>,
42        logger: Logger,
43    ) -> Self {
44        Self {
45            cardano_network,
46            cardano_node_version: cardano_node_version.clone(),
47            snapshotter,
48            snapshot_uploader,
49            logger: logger.new_with_component_name::<Self>(),
50        }
51    }
52
53    async fn create_immutables_snapshot_archive(
54        &self,
55        base_file_name_without_extension: &str,
56    ) -> StdResult<FileArchive> {
57        debug!(self.logger, ">> create_immutables_snapshot_archive");
58
59        let snapshotter = self.snapshotter.clone();
60        let snapshot_name = base_file_name_without_extension.to_owned();
61        let ongoing_snapshot = snapshotter
62            .snapshot_all_completed_immutables(&snapshot_name)
63            .await?;
64
65        debug!(
66            self.logger,
67            " > Immutables snapshot created: '{ongoing_snapshot:?}'"
68        );
69
70        Ok(ongoing_snapshot)
71    }
72
73    async fn create_ancillary_snapshot_archive(
74        &self,
75        immutable_file_number: ImmutableFileNumber,
76        base_file_name_without_extension: &str,
77    ) -> StdResult<FileArchive> {
78        debug!(self.logger, ">> create_ancillary_snapshot_archive");
79
80        let snapshotter = self.snapshotter.clone();
81        let snapshot_name = format!("{base_file_name_without_extension}.ancillary");
82        let ongoing_snapshot = snapshotter
83            .snapshot_ancillary(immutable_file_number, &snapshot_name)
84            .await?;
85
86        debug!(
87            self.logger,
88            " > Ancillary snapshot created: '{ongoing_snapshot:?}'"
89        );
90
91        Ok(ongoing_snapshot)
92    }
93
94    async fn upload_snapshot_archive(
95        &self,
96        ongoing_snapshot: &FileArchive,
97    ) -> StdResult<Vec<FileUri>> {
98        debug!(self.logger, ">> upload_snapshot_archive");
99        let location = self
100            .snapshot_uploader
101            .upload(ongoing_snapshot.get_file_path())
102            .await;
103
104        if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await {
105            warn!(
106                self.logger, " > Post upload ongoing snapshot file removal failure";
107                "error" => error
108            );
109        }
110
111        Ok(vec![location?])
112    }
113
114    fn create_snapshot(
115        &self,
116        beacon: CardanoDbBeacon,
117        ongoing_immutables_snapshot: &FileArchive,
118        ongoing_ancillary_snapshot: &FileArchive,
119        snapshot_digest: String,
120        immutables_remote_locations: Vec<String>,
121        ancillary_remote_locations: Vec<String>,
122    ) -> StdResult<Snapshot> {
123        debug!(self.logger, ">> create_snapshot");
124
125        let snapshot = Snapshot {
126            digest: snapshot_digest,
127            network: self.cardano_network.into(),
128            beacon,
129            size: ongoing_immutables_snapshot.get_archive_size(),
130            ancillary_size: Some(ongoing_ancillary_snapshot.get_archive_size()),
131            locations: immutables_remote_locations,
132            ancillary_locations: Some(ancillary_remote_locations),
133            compression_algorithm: ongoing_immutables_snapshot.get_compression_algorithm(),
134            cardano_node_version: self.cardano_node_version.to_string(),
135        };
136
137        Ok(snapshot)
138    }
139
140    fn get_base_file_name_without_extension(
141        &self,
142        beacon: &CardanoDbBeacon,
143        snapshot_digest: &str,
144    ) -> String {
145        let snapshot_name = format!(
146            "{}-e{}-i{}.{}",
147            self.cardano_network, *beacon.epoch, beacon.immutable_file_number, snapshot_digest,
148        );
149        snapshot_name
150    }
151}
152
153#[async_trait]
154impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for CardanoImmutableFilesFullArtifactBuilder {
155    async fn compute_artifact(
156        &self,
157        beacon: CardanoDbBeacon,
158        certificate: &Certificate,
159    ) -> StdResult<Snapshot> {
160        let snapshot_digest = certificate
161            .protocol_message
162            .get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
163            .ok_or_else(|| {
164                CardanoImmutableFilesFullArtifactError::MissingProtocolMessage(beacon.clone())
165            })?
166            .to_owned();
167
168        let base_file_name_without_extension =
169            self.get_base_file_name_without_extension(&beacon, &snapshot_digest);
170        let ongoing_immutables_snapshot = self
171            .create_immutables_snapshot_archive(&base_file_name_without_extension)
172            .await
173            .with_context(|| {
174                "Cardano Immutable Files Full Artifact Builder can not create immutables snapshot archive"
175            })?;
176        let ongoing_ancillary_snapshot = self
177            .create_ancillary_snapshot_archive(beacon.immutable_file_number, &base_file_name_without_extension)
178            .await
179            .with_context(|| {
180                "Cardano Immutable Files Full Artifact Builder can not create ancillary snapshot archive"
181            })?;
182        let immutables_locations = self
183            .upload_snapshot_archive(&ongoing_immutables_snapshot)
184            .await
185            .with_context(|| {
186                format!("Cardano Immutable Files Full Artifact Builder can not upload immutables snapshot archive to path: '{:?}'", ongoing_immutables_snapshot.get_file_path())
187            })?;
188        let ancillary_locations = self
189            .upload_snapshot_archive(&ongoing_ancillary_snapshot)
190            .await
191            .with_context(|| {
192                format!("Cardano Immutable Files Full Artifact Builder can not upload ancillary snapshot archive to path: '{:?}'", ongoing_ancillary_snapshot.get_file_path())
193            })?;
194
195        let snapshot = self.create_snapshot(
196            beacon,
197            &ongoing_immutables_snapshot,
198            &ongoing_ancillary_snapshot,
199            snapshot_digest,
200            immutables_locations.into_iter().map(Into::into).collect(),
201            ancillary_locations.into_iter().map(Into::into).collect(),
202        )?;
203
204        Ok(snapshot)
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use anyhow::anyhow;
211    use tempfile::NamedTempFile;
212
213    use mithril_common::{entities::CompressionAlgorithm, test_utils::fake_data};
214
215    use crate::{
216        file_uploaders::MockFileUploader, services::DumbSnapshotter, test_tools::TestLogger,
217        DumbUploader,
218    };
219
220    use super::*;
221
222    #[tokio::test]
223    async fn should_compute_valid_artifact() {
224        let beacon = fake_data::beacon();
225        let certificate = fake_data::certificate("certificate-123".to_string());
226        let snapshot_digest = certificate
227            .protocol_message
228            .get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
229            .unwrap();
230        let individual_snapshot_archive_size = 7331;
231
232        let dumb_snapshotter = Arc::new(
233            DumbSnapshotter::new(CompressionAlgorithm::Zstandard)
234                .with_archive_size(individual_snapshot_archive_size),
235        );
236        let dumb_snapshot_uploader = Arc::new(DumbUploader::default());
237
238        let cardano_immutable_files_full_artifact_builder =
239            CardanoImmutableFilesFullArtifactBuilder::new(
240                fake_data::network(),
241                &Version::parse("1.0.0").unwrap(),
242                dumb_snapshotter.clone(),
243                dumb_snapshot_uploader.clone(),
244                TestLogger::stdout(),
245            );
246        let artifact = cardano_immutable_files_full_artifact_builder
247            .compute_artifact(beacon.clone(), &certificate)
248            .await
249            .unwrap();
250
251        let last_uploads = dumb_snapshot_uploader.get_last_n_uploads(2).unwrap();
252        let [ancillary_location, immutables_location, ..] = last_uploads.as_slice() else {
253            panic!("Two snapshots should have been 'uploaded'");
254        };
255        let artifact_expected = Snapshot {
256            digest: snapshot_digest.to_owned(),
257            network: fake_data::network().into(),
258            beacon,
259            size: individual_snapshot_archive_size,
260            ancillary_size: Some(individual_snapshot_archive_size),
261            locations: vec![immutables_location.into()],
262            ancillary_locations: Some(vec![ancillary_location.into()]),
263            compression_algorithm: CompressionAlgorithm::Zstandard,
264            cardano_node_version: "1.0.0".to_string(),
265        };
266
267        assert_eq!(artifact_expected, artifact);
268    }
269
270    #[tokio::test]
271    async fn snapshot_archive_name_include_beacon_and_network_values() {
272        let network = fake_data::network();
273        let beacon = CardanoDbBeacon::new(20, 145);
274        let digest = "test+digest";
275
276        let cardano_immutable_files_full_artifact_builder =
277            CardanoImmutableFilesFullArtifactBuilder::new(
278                network,
279                &Version::parse("1.0.0").unwrap(),
280                Arc::new(DumbSnapshotter::default()),
281                Arc::new(DumbUploader::default()),
282                TestLogger::stdout(),
283            );
284
285        let name = cardano_immutable_files_full_artifact_builder
286            .get_base_file_name_without_extension(&beacon, digest);
287
288        assert_eq!(
289            format!(
290                "{}-e{}-i{}.{digest}",
291                network, *beacon.epoch, beacon.immutable_file_number,
292            ),
293            name
294        );
295    }
296
297    #[tokio::test]
298    async fn remove_snapshot_archive_after_upload() {
299        let file = NamedTempFile::new().unwrap();
300        let file_path = file.path();
301        let snapshot = FileArchive::new(
302            file_path.to_path_buf(),
303            7331,
304            7331,
305            CompressionAlgorithm::default(),
306        );
307
308        let cardano_immutable_files_full_artifact_builder =
309            CardanoImmutableFilesFullArtifactBuilder::new(
310                fake_data::network(),
311                &Version::parse("1.0.0").unwrap(),
312                Arc::new(DumbSnapshotter::default()),
313                Arc::new(DumbUploader::default()),
314                TestLogger::stdout(),
315            );
316
317        cardano_immutable_files_full_artifact_builder
318            .upload_snapshot_archive(&snapshot)
319            .await
320            .expect("Snapshot upload should not fail");
321
322        assert!(
323            !file_path.exists(),
324            "Ongoing snapshot file should have been removed after upload"
325        );
326    }
327
328    #[tokio::test]
329    async fn remove_snapshot_archive_after_upload_even_if_an_error_occurred() {
330        let file = NamedTempFile::new().unwrap();
331        let file_path = file.path();
332        let snapshot = FileArchive::new(
333            file_path.to_path_buf(),
334            7331,
335            7331,
336            CompressionAlgorithm::default(),
337        );
338        let mut snapshot_uploader = MockFileUploader::new();
339        snapshot_uploader
340            .expect_upload()
341            .return_once(|_| Err(anyhow!("an error")))
342            .once();
343
344        let cardano_immutable_files_full_artifact_builder =
345            CardanoImmutableFilesFullArtifactBuilder::new(
346                fake_data::network(),
347                &Version::parse("1.0.0").unwrap(),
348                Arc::new(DumbSnapshotter::default()),
349                Arc::new(snapshot_uploader),
350                TestLogger::stdout(),
351            );
352
353        cardano_immutable_files_full_artifact_builder
354            .upload_snapshot_archive(&snapshot)
355            .await
356            .expect_err("Snapshot upload should have failed");
357
358        assert!(
359            !file_path.exists(),
360            "Ongoing snapshot file should have been removed even after upload failure"
361        );
362    }
363}