mithril_aggregator/artifact_builder/
cardano_immutable_files_full.rs1use anyhow::Context;
2use async_trait::async_trait;
3use mithril_common::entities::{FileUri, ImmutableFileNumber};
4use semver::Version;
5use slog::{Logger, debug, warn};
6use std::sync::Arc;
7use thiserror::Error;
8
9use mithril_common::logging::LoggerExtensions;
10use mithril_common::{
11 CardanoNetwork, StdResult,
12 entities::{CardanoDbBeacon, Certificate, ProtocolMessagePartKey, Snapshot},
13};
14
15use super::ArtifactBuilder;
16use crate::{FileUploader, services::Snapshotter, tools::file_archiver::FileArchive};
17
18#[derive(Debug, Error)]
20pub enum CardanoImmutableFilesFullArtifactError {
21 #[error("Missing protocol message for beacon: '{0}'.")]
23 MissingProtocolMessage(CardanoDbBeacon),
24}
25
26pub struct CardanoImmutableFilesFullArtifactBuilder {
28 cardano_network: CardanoNetwork,
29 cardano_node_version: Version,
30 snapshotter: Arc<dyn Snapshotter>,
31 snapshot_uploader: Arc<dyn FileUploader>,
32 logger: Logger,
33}
34
35impl CardanoImmutableFilesFullArtifactBuilder {
36 pub fn new(
38 cardano_network: CardanoNetwork,
39 cardano_node_version: &Version,
40 snapshotter: Arc<dyn Snapshotter>,
41 snapshot_uploader: Arc<dyn FileUploader>,
42 logger: Logger,
43 ) -> Self {
44 Self {
45 cardano_network,
46 cardano_node_version: cardano_node_version.clone(),
47 snapshotter,
48 snapshot_uploader,
49 logger: logger.new_with_component_name::<Self>(),
50 }
51 }
52
53 async fn create_immutables_snapshot_archive(
54 &self,
55 base_file_name_without_extension: &str,
56 ) -> StdResult<FileArchive> {
57 debug!(self.logger, ">> create_immutables_snapshot_archive");
58
59 let snapshotter = self.snapshotter.clone();
60 let snapshot_name = base_file_name_without_extension.to_owned();
61 let ongoing_snapshot =
62 snapshotter.snapshot_all_completed_immutables(&snapshot_name).await?;
63
64 debug!(
65 self.logger,
66 " > Immutables snapshot created: '{ongoing_snapshot:?}'"
67 );
68
69 Ok(ongoing_snapshot)
70 }
71
72 async fn create_ancillary_snapshot_archive(
73 &self,
74 immutable_file_number: ImmutableFileNumber,
75 base_file_name_without_extension: &str,
76 ) -> StdResult<FileArchive> {
77 debug!(self.logger, ">> create_ancillary_snapshot_archive");
78
79 let snapshotter = self.snapshotter.clone();
80 let snapshot_name = format!("{base_file_name_without_extension}.ancillary");
81 let ongoing_snapshot = snapshotter
82 .snapshot_ancillary(immutable_file_number, &snapshot_name)
83 .await?;
84
85 debug!(
86 self.logger,
87 " > Ancillary snapshot created: '{ongoing_snapshot:?}'"
88 );
89
90 Ok(ongoing_snapshot)
91 }
92
93 async fn upload_snapshot_archive(
94 &self,
95 ongoing_snapshot: &FileArchive,
96 ) -> StdResult<Vec<FileUri>> {
97 debug!(self.logger, ">> upload_snapshot_archive");
98 let location = self.snapshot_uploader.upload(ongoing_snapshot.get_file_path()).await;
99
100 if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await {
101 warn!(
102 self.logger, " > Post upload ongoing snapshot file removal failure";
103 "error" => error
104 );
105 }
106
107 Ok(vec![location?])
108 }
109
110 fn create_snapshot(
111 &self,
112 beacon: CardanoDbBeacon,
113 ongoing_immutables_snapshot: &FileArchive,
114 ongoing_ancillary_snapshot: &FileArchive,
115 snapshot_digest: String,
116 immutables_remote_locations: Vec<String>,
117 ancillary_remote_locations: Vec<String>,
118 ) -> StdResult<Snapshot> {
119 debug!(self.logger, ">> create_snapshot");
120
121 let snapshot = Snapshot {
122 digest: snapshot_digest,
123 network: self.cardano_network.into(),
124 beacon,
125 size: ongoing_immutables_snapshot.get_archive_size(),
126 ancillary_size: Some(ongoing_ancillary_snapshot.get_archive_size()),
127 locations: immutables_remote_locations,
128 ancillary_locations: Some(ancillary_remote_locations),
129 compression_algorithm: ongoing_immutables_snapshot.get_compression_algorithm(),
130 cardano_node_version: self.cardano_node_version.to_string(),
131 };
132
133 Ok(snapshot)
134 }
135
136 fn get_base_file_name_without_extension(
137 &self,
138 beacon: &CardanoDbBeacon,
139 snapshot_digest: &str,
140 ) -> String {
141 let snapshot_name = format!(
142 "{}-e{}-i{}.{}",
143 self.cardano_network, *beacon.epoch, beacon.immutable_file_number, snapshot_digest,
144 );
145 snapshot_name
146 }
147}
148
149#[async_trait]
150impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for CardanoImmutableFilesFullArtifactBuilder {
151 async fn compute_artifact(
152 &self,
153 beacon: CardanoDbBeacon,
154 certificate: &Certificate,
155 ) -> StdResult<Snapshot> {
156 let snapshot_digest = certificate
157 .protocol_message
158 .get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
159 .ok_or_else(|| {
160 CardanoImmutableFilesFullArtifactError::MissingProtocolMessage(beacon.clone())
161 })?
162 .to_owned();
163
164 let base_file_name_without_extension =
165 self.get_base_file_name_without_extension(&beacon, &snapshot_digest);
166 let ongoing_immutables_snapshot = self
167 .create_immutables_snapshot_archive(&base_file_name_without_extension)
168 .await
169 .with_context(|| {
170 "Cardano Immutable Files Full Artifact Builder can not create immutables snapshot archive"
171 })?;
172 let ongoing_ancillary_snapshot = self
173 .create_ancillary_snapshot_archive(beacon.immutable_file_number, &base_file_name_without_extension)
174 .await
175 .with_context(|| {
176 "Cardano Immutable Files Full Artifact Builder can not create ancillary snapshot archive"
177 })?;
178 let immutables_locations = self
179 .upload_snapshot_archive(&ongoing_immutables_snapshot)
180 .await
181 .with_context(|| {
182 format!("Cardano Immutable Files Full Artifact Builder can not upload immutables snapshot archive to path: '{:?}'", ongoing_immutables_snapshot.get_file_path())
183 })?;
184 let ancillary_locations = self
185 .upload_snapshot_archive(&ongoing_ancillary_snapshot)
186 .await
187 .with_context(|| {
188 format!("Cardano Immutable Files Full Artifact Builder can not upload ancillary snapshot archive to path: '{:?}'", ongoing_ancillary_snapshot.get_file_path())
189 })?;
190
191 let snapshot = self.create_snapshot(
192 beacon,
193 &ongoing_immutables_snapshot,
194 &ongoing_ancillary_snapshot,
195 snapshot_digest,
196 immutables_locations.into_iter().map(Into::into).collect(),
197 ancillary_locations.into_iter().map(Into::into).collect(),
198 )?;
199
200 Ok(snapshot)
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use anyhow::anyhow;
207 use tempfile::NamedTempFile;
208
209 use mithril_common::{entities::CompressionAlgorithm, test::double::fake_data};
210
211 use crate::{
212 DumbUploader, file_uploaders::MockFileUploader, services::DumbSnapshotter, test::TestLogger,
213 };
214
215 use super::*;
216
217 #[tokio::test]
218 async fn should_compute_valid_artifact() {
219 let beacon = fake_data::beacon();
220 let certificate = fake_data::certificate("certificate-123".to_string());
221 let snapshot_digest = certificate
222 .protocol_message
223 .get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
224 .unwrap();
225 let individual_snapshot_archive_size = 7331;
226
227 let dumb_snapshotter = Arc::new(
228 DumbSnapshotter::new(CompressionAlgorithm::Zstandard)
229 .with_archive_size(individual_snapshot_archive_size),
230 );
231 let dumb_snapshot_uploader = Arc::new(DumbUploader::default());
232
233 let cardano_immutable_files_full_artifact_builder =
234 CardanoImmutableFilesFullArtifactBuilder::new(
235 fake_data::network(),
236 &Version::parse("1.0.0").unwrap(),
237 dumb_snapshotter.clone(),
238 dumb_snapshot_uploader.clone(),
239 TestLogger::stdout(),
240 );
241 let artifact = cardano_immutable_files_full_artifact_builder
242 .compute_artifact(beacon.clone(), &certificate)
243 .await
244 .unwrap();
245
246 let last_uploads = dumb_snapshot_uploader.get_last_n_uploads(2).unwrap();
247 let [ancillary_location, immutables_location, ..] = last_uploads.as_slice() else {
248 panic!("Two snapshots should have been 'uploaded'");
249 };
250 let artifact_expected = Snapshot {
251 digest: snapshot_digest.to_owned(),
252 network: fake_data::network().into(),
253 beacon,
254 size: individual_snapshot_archive_size,
255 ancillary_size: Some(individual_snapshot_archive_size),
256 locations: vec![immutables_location.into()],
257 ancillary_locations: Some(vec![ancillary_location.into()]),
258 compression_algorithm: CompressionAlgorithm::Zstandard,
259 cardano_node_version: "1.0.0".to_string(),
260 };
261
262 assert_eq!(artifact_expected, artifact);
263 }
264
265 #[tokio::test]
266 async fn snapshot_archive_name_include_beacon_and_network_values() {
267 let network = fake_data::network();
268 let beacon = CardanoDbBeacon::new(20, 145);
269 let digest = "test+digest";
270
271 let cardano_immutable_files_full_artifact_builder =
272 CardanoImmutableFilesFullArtifactBuilder::new(
273 network,
274 &Version::parse("1.0.0").unwrap(),
275 Arc::new(DumbSnapshotter::default()),
276 Arc::new(DumbUploader::default()),
277 TestLogger::stdout(),
278 );
279
280 let name = cardano_immutable_files_full_artifact_builder
281 .get_base_file_name_without_extension(&beacon, digest);
282
283 assert_eq!(
284 format!(
285 "{}-e{}-i{}.{digest}",
286 network, *beacon.epoch, beacon.immutable_file_number,
287 ),
288 name
289 );
290 }
291
292 #[tokio::test]
293 async fn remove_snapshot_archive_after_upload() {
294 let file = NamedTempFile::new().unwrap();
295 let file_path = file.path();
296 let snapshot = FileArchive::new(
297 file_path.to_path_buf(),
298 7331,
299 7331,
300 CompressionAlgorithm::default(),
301 );
302
303 let cardano_immutable_files_full_artifact_builder =
304 CardanoImmutableFilesFullArtifactBuilder::new(
305 fake_data::network(),
306 &Version::parse("1.0.0").unwrap(),
307 Arc::new(DumbSnapshotter::default()),
308 Arc::new(DumbUploader::default()),
309 TestLogger::stdout(),
310 );
311
312 cardano_immutable_files_full_artifact_builder
313 .upload_snapshot_archive(&snapshot)
314 .await
315 .expect("Snapshot upload should not fail");
316
317 assert!(
318 !file_path.exists(),
319 "Ongoing snapshot file should have been removed after upload"
320 );
321 }
322
323 #[tokio::test]
324 async fn remove_snapshot_archive_after_upload_even_if_an_error_occurred() {
325 let file = NamedTempFile::new().unwrap();
326 let file_path = file.path();
327 let snapshot = FileArchive::new(
328 file_path.to_path_buf(),
329 7331,
330 7331,
331 CompressionAlgorithm::default(),
332 );
333 let mut snapshot_uploader = MockFileUploader::new();
334 snapshot_uploader
335 .expect_upload()
336 .return_once(|_| Err(anyhow!("an error")))
337 .once();
338
339 let cardano_immutable_files_full_artifact_builder =
340 CardanoImmutableFilesFullArtifactBuilder::new(
341 fake_data::network(),
342 &Version::parse("1.0.0").unwrap(),
343 Arc::new(DumbSnapshotter::default()),
344 Arc::new(snapshot_uploader),
345 TestLogger::stdout(),
346 );
347
348 cardano_immutable_files_full_artifact_builder
349 .upload_snapshot_archive(&snapshot)
350 .await
351 .expect_err("Snapshot upload should have failed");
352
353 assert!(
354 !file_path.exists(),
355 "Ongoing snapshot file should have been removed even after upload failure"
356 );
357 }
358}