mithril_aggregator/artifact_builder/
cardano_immutable_files_full.rs1use anyhow::Context;
2use async_trait::async_trait;
3use mithril_common::entities::{FileUri, ImmutableFileNumber};
4use semver::Version;
5use slog::{Logger, debug, warn};
6use std::sync::Arc;
7use thiserror::Error;
8
9use mithril_common::logging::LoggerExtensions;
10use mithril_common::{
11 CardanoNetwork, StdResult,
12 entities::{CardanoDbBeacon, Certificate, ProtocolMessagePartKey, Snapshot},
13};
14
15use super::ArtifactBuilder;
16use crate::{FileUploader, services::Snapshotter, tools::file_archiver::FileArchive};
17
18#[derive(Debug, Error)]
20pub enum CardanoImmutableFilesFullArtifactError {
21 #[error("Missing protocol message for beacon: '{0}'.")]
23 MissingProtocolMessage(CardanoDbBeacon),
24}
25
26pub struct CardanoImmutableFilesFullArtifactBuilder {
28 cardano_network: CardanoNetwork,
29 cardano_node_version: Version,
30 snapshotter: Arc<dyn Snapshotter>,
31 snapshot_uploader: Arc<dyn FileUploader>,
32 logger: Logger,
33}
34
35impl CardanoImmutableFilesFullArtifactBuilder {
36 pub fn new(
38 cardano_network: CardanoNetwork,
39 cardano_node_version: &Version,
40 snapshotter: Arc<dyn Snapshotter>,
41 snapshot_uploader: Arc<dyn FileUploader>,
42 logger: Logger,
43 ) -> Self {
44 Self {
45 cardano_network,
46 cardano_node_version: cardano_node_version.clone(),
47 snapshotter,
48 snapshot_uploader,
49 logger: logger.new_with_component_name::<Self>(),
50 }
51 }
52
53 async fn create_immutables_snapshot_archive(
54 &self,
55 base_file_name_without_extension: &str,
56 ) -> StdResult<FileArchive> {
57 debug!(self.logger, ">> create_immutables_snapshot_archive");
58
59 let snapshotter = self.snapshotter.clone();
60 let snapshot_name = base_file_name_without_extension.to_owned();
61 let ongoing_snapshot =
62 snapshotter.snapshot_all_completed_immutables(&snapshot_name).await?;
63
64 debug!(
65 self.logger,
66 " > Immutables snapshot created: '{ongoing_snapshot:?}'"
67 );
68
69 Ok(ongoing_snapshot)
70 }
71
72 async fn create_ancillary_snapshot_archive(
73 &self,
74 immutable_file_number: ImmutableFileNumber,
75 base_file_name_without_extension: &str,
76 ) -> StdResult<FileArchive> {
77 debug!(self.logger, ">> create_ancillary_snapshot_archive");
78
79 let snapshotter = self.snapshotter.clone();
80 let snapshot_name = format!("{base_file_name_without_extension}.ancillary");
81 let ongoing_snapshot = snapshotter
82 .snapshot_ancillary(immutable_file_number, &snapshot_name)
83 .await?;
84
85 debug!(
86 self.logger,
87 " > Ancillary snapshot created: '{ongoing_snapshot:?}'"
88 );
89
90 Ok(ongoing_snapshot)
91 }
92
93 async fn upload_snapshot_archive(
94 &self,
95 ongoing_snapshot: &FileArchive,
96 ) -> StdResult<Vec<FileUri>> {
97 debug!(self.logger, ">> upload_snapshot_archive");
98 let location = self.snapshot_uploader.upload(ongoing_snapshot.get_file_path()).await;
99
100 if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await {
101 warn!(
102 self.logger, " > Post upload ongoing snapshot file removal failure";
103 "error" => error
104 );
105 }
106
107 Ok(vec![location?])
108 }
109
110 fn create_snapshot(
111 &self,
112 beacon: CardanoDbBeacon,
113 ongoing_immutables_snapshot: &FileArchive,
114 ongoing_ancillary_snapshot: &FileArchive,
115 snapshot_digest: String,
116 immutables_remote_locations: Vec<String>,
117 ancillary_remote_locations: Vec<String>,
118 ) -> StdResult<Snapshot> {
119 debug!(self.logger, ">> create_snapshot");
120
121 let snapshot = Snapshot {
122 digest: snapshot_digest,
123 network: self.cardano_network.into(),
124 beacon,
125 size: ongoing_immutables_snapshot.get_archive_size(),
126 ancillary_size: Some(ongoing_ancillary_snapshot.get_archive_size()),
127 locations: immutables_remote_locations,
128 ancillary_locations: Some(ancillary_remote_locations),
129 compression_algorithm: ongoing_immutables_snapshot.get_compression_algorithm(),
130 cardano_node_version: self.cardano_node_version.to_string(),
131 };
132
133 Ok(snapshot)
134 }
135
136 fn get_base_file_name_without_extension(
137 &self,
138 beacon: &CardanoDbBeacon,
139 snapshot_digest: &str,
140 ) -> String {
141 let snapshot_name = format!(
142 "{}-e{}-i{}.{}",
143 self.cardano_network, *beacon.epoch, beacon.immutable_file_number, snapshot_digest,
144 );
145 snapshot_name
146 }
147}
148
149#[async_trait]
150impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for CardanoImmutableFilesFullArtifactBuilder {
151 async fn compute_artifact(
152 &self,
153 beacon: CardanoDbBeacon,
154 certificate: &Certificate,
155 ) -> StdResult<Snapshot> {
156 let snapshot_digest = certificate
157 .protocol_message
158 .get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
159 .ok_or_else(|| {
160 CardanoImmutableFilesFullArtifactError::MissingProtocolMessage(beacon.clone())
161 })?
162 .to_owned();
163
164 let base_file_name_without_extension =
165 self.get_base_file_name_without_extension(&beacon, &snapshot_digest);
166 let ongoing_immutables_snapshot = self
167 .create_immutables_snapshot_archive(&base_file_name_without_extension)
168 .await
169 .with_context(|| {
170 "Cardano Immutable Files Full Artifact Builder can not create immutables snapshot archive"
171 })?;
172 let ongoing_ancillary_snapshot = self
173 .create_ancillary_snapshot_archive(beacon.immutable_file_number, &base_file_name_without_extension)
174 .await
175 .with_context(|| {
176 "Cardano Immutable Files Full Artifact Builder can not create ancillary snapshot archive"
177 })?;
178 let immutables_locations = self
179 .upload_snapshot_archive(&ongoing_immutables_snapshot)
180 .await
181 .with_context(|| {
182 format!("Cardano Immutable Files Full Artifact Builder can not upload immutables snapshot archive to path: '{:?}'", ongoing_immutables_snapshot.get_file_path())
183 })?;
184 let ancillary_locations = self
185 .upload_snapshot_archive(&ongoing_ancillary_snapshot)
186 .await
187 .with_context(|| {
188 format!("Cardano Immutable Files Full Artifact Builder can not upload ancillary snapshot archive to path: '{:?}'", ongoing_ancillary_snapshot.get_file_path())
189 })?;
190
191 let snapshot = self.create_snapshot(
192 beacon,
193 &ongoing_immutables_snapshot,
194 &ongoing_ancillary_snapshot,
195 snapshot_digest,
196 immutables_locations.into_iter().map(Into::into).collect(),
197 ancillary_locations.into_iter().map(Into::into).collect(),
198 )?;
199
200 Ok(snapshot)
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use anyhow::anyhow;
207 use tempfile::NamedTempFile;
208
209 use mithril_common::{entities::CompressionAlgorithm, test_utils::fake_data};
210
211 use crate::{
212 DumbUploader, file_uploaders::MockFileUploader, services::DumbSnapshotter,
213 test_tools::TestLogger,
214 };
215
216 use super::*;
217
218 #[tokio::test]
219 async fn should_compute_valid_artifact() {
220 let beacon = fake_data::beacon();
221 let certificate = fake_data::certificate("certificate-123".to_string());
222 let snapshot_digest = certificate
223 .protocol_message
224 .get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
225 .unwrap();
226 let individual_snapshot_archive_size = 7331;
227
228 let dumb_snapshotter = Arc::new(
229 DumbSnapshotter::new(CompressionAlgorithm::Zstandard)
230 .with_archive_size(individual_snapshot_archive_size),
231 );
232 let dumb_snapshot_uploader = Arc::new(DumbUploader::default());
233
234 let cardano_immutable_files_full_artifact_builder =
235 CardanoImmutableFilesFullArtifactBuilder::new(
236 fake_data::network(),
237 &Version::parse("1.0.0").unwrap(),
238 dumb_snapshotter.clone(),
239 dumb_snapshot_uploader.clone(),
240 TestLogger::stdout(),
241 );
242 let artifact = cardano_immutable_files_full_artifact_builder
243 .compute_artifact(beacon.clone(), &certificate)
244 .await
245 .unwrap();
246
247 let last_uploads = dumb_snapshot_uploader.get_last_n_uploads(2).unwrap();
248 let [ancillary_location, immutables_location, ..] = last_uploads.as_slice() else {
249 panic!("Two snapshots should have been 'uploaded'");
250 };
251 let artifact_expected = Snapshot {
252 digest: snapshot_digest.to_owned(),
253 network: fake_data::network().into(),
254 beacon,
255 size: individual_snapshot_archive_size,
256 ancillary_size: Some(individual_snapshot_archive_size),
257 locations: vec![immutables_location.into()],
258 ancillary_locations: Some(vec![ancillary_location.into()]),
259 compression_algorithm: CompressionAlgorithm::Zstandard,
260 cardano_node_version: "1.0.0".to_string(),
261 };
262
263 assert_eq!(artifact_expected, artifact);
264 }
265
266 #[tokio::test]
267 async fn snapshot_archive_name_include_beacon_and_network_values() {
268 let network = fake_data::network();
269 let beacon = CardanoDbBeacon::new(20, 145);
270 let digest = "test+digest";
271
272 let cardano_immutable_files_full_artifact_builder =
273 CardanoImmutableFilesFullArtifactBuilder::new(
274 network,
275 &Version::parse("1.0.0").unwrap(),
276 Arc::new(DumbSnapshotter::default()),
277 Arc::new(DumbUploader::default()),
278 TestLogger::stdout(),
279 );
280
281 let name = cardano_immutable_files_full_artifact_builder
282 .get_base_file_name_without_extension(&beacon, digest);
283
284 assert_eq!(
285 format!(
286 "{}-e{}-i{}.{digest}",
287 network, *beacon.epoch, beacon.immutable_file_number,
288 ),
289 name
290 );
291 }
292
293 #[tokio::test]
294 async fn remove_snapshot_archive_after_upload() {
295 let file = NamedTempFile::new().unwrap();
296 let file_path = file.path();
297 let snapshot = FileArchive::new(
298 file_path.to_path_buf(),
299 7331,
300 7331,
301 CompressionAlgorithm::default(),
302 );
303
304 let cardano_immutable_files_full_artifact_builder =
305 CardanoImmutableFilesFullArtifactBuilder::new(
306 fake_data::network(),
307 &Version::parse("1.0.0").unwrap(),
308 Arc::new(DumbSnapshotter::default()),
309 Arc::new(DumbUploader::default()),
310 TestLogger::stdout(),
311 );
312
313 cardano_immutable_files_full_artifact_builder
314 .upload_snapshot_archive(&snapshot)
315 .await
316 .expect("Snapshot upload should not fail");
317
318 assert!(
319 !file_path.exists(),
320 "Ongoing snapshot file should have been removed after upload"
321 );
322 }
323
324 #[tokio::test]
325 async fn remove_snapshot_archive_after_upload_even_if_an_error_occurred() {
326 let file = NamedTempFile::new().unwrap();
327 let file_path = file.path();
328 let snapshot = FileArchive::new(
329 file_path.to_path_buf(),
330 7331,
331 7331,
332 CompressionAlgorithm::default(),
333 );
334 let mut snapshot_uploader = MockFileUploader::new();
335 snapshot_uploader
336 .expect_upload()
337 .return_once(|_| Err(anyhow!("an error")))
338 .once();
339
340 let cardano_immutable_files_full_artifact_builder =
341 CardanoImmutableFilesFullArtifactBuilder::new(
342 fake_data::network(),
343 &Version::parse("1.0.0").unwrap(),
344 Arc::new(DumbSnapshotter::default()),
345 Arc::new(snapshot_uploader),
346 TestLogger::stdout(),
347 );
348
349 cardano_immutable_files_full_artifact_builder
350 .upload_snapshot_archive(&snapshot)
351 .await
352 .expect_err("Snapshot upload should have failed");
353
354 assert!(
355 !file_path.exists(),
356 "Ongoing snapshot file should have been removed even after upload failure"
357 );
358 }
359}