mithril_aggregator/artifact_builder/
cardano_immutable_files_full.rs1use anyhow::Context;
2use async_trait::async_trait;
3use mithril_common::entities::{FileUri, ImmutableFileNumber};
4use semver::Version;
5use slog::{debug, warn, Logger};
6use std::sync::Arc;
7use thiserror::Error;
8
9use mithril_common::logging::LoggerExtensions;
10use mithril_common::{
11 entities::{CardanoDbBeacon, Certificate, ProtocolMessagePartKey, Snapshot},
12 CardanoNetwork, StdResult,
13};
14
15use super::ArtifactBuilder;
16use crate::{services::Snapshotter, tools::file_archiver::FileArchive, FileUploader};
17
18#[derive(Debug, Error)]
20pub enum CardanoImmutableFilesFullArtifactError {
21 #[error("Missing protocol message for beacon: '{0}'.")]
23 MissingProtocolMessage(CardanoDbBeacon),
24}
25
26pub struct CardanoImmutableFilesFullArtifactBuilder {
28 cardano_network: CardanoNetwork,
29 cardano_node_version: Version,
30 snapshotter: Arc<dyn Snapshotter>,
31 snapshot_uploader: Arc<dyn FileUploader>,
32 logger: Logger,
33}
34
35impl CardanoImmutableFilesFullArtifactBuilder {
36 pub fn new(
38 cardano_network: CardanoNetwork,
39 cardano_node_version: &Version,
40 snapshotter: Arc<dyn Snapshotter>,
41 snapshot_uploader: Arc<dyn FileUploader>,
42 logger: Logger,
43 ) -> Self {
44 Self {
45 cardano_network,
46 cardano_node_version: cardano_node_version.clone(),
47 snapshotter,
48 snapshot_uploader,
49 logger: logger.new_with_component_name::<Self>(),
50 }
51 }
52
53 async fn create_immutables_snapshot_archive(
54 &self,
55 base_file_name_without_extension: &str,
56 ) -> StdResult<FileArchive> {
57 debug!(self.logger, ">> create_immutables_snapshot_archive");
58
59 let snapshotter = self.snapshotter.clone();
60 let snapshot_name = base_file_name_without_extension.to_owned();
61 let ongoing_snapshot = snapshotter
62 .snapshot_all_completed_immutables(&snapshot_name)
63 .await?;
64
65 debug!(
66 self.logger,
67 " > Immutables snapshot created: '{ongoing_snapshot:?}'"
68 );
69
70 Ok(ongoing_snapshot)
71 }
72
73 async fn create_ancillary_snapshot_archive(
74 &self,
75 immutable_file_number: ImmutableFileNumber,
76 base_file_name_without_extension: &str,
77 ) -> StdResult<FileArchive> {
78 debug!(self.logger, ">> create_ancillary_snapshot_archive");
79
80 let snapshotter = self.snapshotter.clone();
81 let snapshot_name = format!("{base_file_name_without_extension}.ancillary");
82 let ongoing_snapshot = snapshotter
83 .snapshot_ancillary(immutable_file_number, &snapshot_name)
84 .await?;
85
86 debug!(
87 self.logger,
88 " > Ancillary snapshot created: '{ongoing_snapshot:?}'"
89 );
90
91 Ok(ongoing_snapshot)
92 }
93
94 async fn upload_snapshot_archive(
95 &self,
96 ongoing_snapshot: &FileArchive,
97 ) -> StdResult<Vec<FileUri>> {
98 debug!(self.logger, ">> upload_snapshot_archive");
99 let location = self
100 .snapshot_uploader
101 .upload(ongoing_snapshot.get_file_path())
102 .await;
103
104 if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await {
105 warn!(
106 self.logger, " > Post upload ongoing snapshot file removal failure";
107 "error" => error
108 );
109 }
110
111 Ok(vec![location?])
112 }
113
114 fn create_snapshot(
115 &self,
116 beacon: CardanoDbBeacon,
117 ongoing_immutables_snapshot: &FileArchive,
118 ongoing_ancillary_snapshot: &FileArchive,
119 snapshot_digest: String,
120 immutables_remote_locations: Vec<String>,
121 ancillary_remote_locations: Vec<String>,
122 ) -> StdResult<Snapshot> {
123 debug!(self.logger, ">> create_snapshot");
124
125 let snapshot = Snapshot {
126 digest: snapshot_digest,
127 network: self.cardano_network.into(),
128 beacon,
129 size: ongoing_immutables_snapshot.get_archive_size(),
130 ancillary_size: Some(ongoing_ancillary_snapshot.get_archive_size()),
131 locations: immutables_remote_locations,
132 ancillary_locations: Some(ancillary_remote_locations),
133 compression_algorithm: ongoing_immutables_snapshot.get_compression_algorithm(),
134 cardano_node_version: self.cardano_node_version.to_string(),
135 };
136
137 Ok(snapshot)
138 }
139
140 fn get_base_file_name_without_extension(
141 &self,
142 beacon: &CardanoDbBeacon,
143 snapshot_digest: &str,
144 ) -> String {
145 let snapshot_name = format!(
146 "{}-e{}-i{}.{}",
147 self.cardano_network, *beacon.epoch, beacon.immutable_file_number, snapshot_digest,
148 );
149 snapshot_name
150 }
151}
152
153#[async_trait]
154impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for CardanoImmutableFilesFullArtifactBuilder {
155 async fn compute_artifact(
156 &self,
157 beacon: CardanoDbBeacon,
158 certificate: &Certificate,
159 ) -> StdResult<Snapshot> {
160 let snapshot_digest = certificate
161 .protocol_message
162 .get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
163 .ok_or_else(|| {
164 CardanoImmutableFilesFullArtifactError::MissingProtocolMessage(beacon.clone())
165 })?
166 .to_owned();
167
168 let base_file_name_without_extension =
169 self.get_base_file_name_without_extension(&beacon, &snapshot_digest);
170 let ongoing_immutables_snapshot = self
171 .create_immutables_snapshot_archive(&base_file_name_without_extension)
172 .await
173 .with_context(|| {
174 "Cardano Immutable Files Full Artifact Builder can not create immutables snapshot archive"
175 })?;
176 let ongoing_ancillary_snapshot = self
177 .create_ancillary_snapshot_archive(beacon.immutable_file_number, &base_file_name_without_extension)
178 .await
179 .with_context(|| {
180 "Cardano Immutable Files Full Artifact Builder can not create ancillary snapshot archive"
181 })?;
182 let immutables_locations = self
183 .upload_snapshot_archive(&ongoing_immutables_snapshot)
184 .await
185 .with_context(|| {
186 format!("Cardano Immutable Files Full Artifact Builder can not upload immutables snapshot archive to path: '{:?}'", ongoing_immutables_snapshot.get_file_path())
187 })?;
188 let ancillary_locations = self
189 .upload_snapshot_archive(&ongoing_ancillary_snapshot)
190 .await
191 .with_context(|| {
192 format!("Cardano Immutable Files Full Artifact Builder can not upload ancillary snapshot archive to path: '{:?}'", ongoing_ancillary_snapshot.get_file_path())
193 })?;
194
195 let snapshot = self.create_snapshot(
196 beacon,
197 &ongoing_immutables_snapshot,
198 &ongoing_ancillary_snapshot,
199 snapshot_digest,
200 immutables_locations.into_iter().map(Into::into).collect(),
201 ancillary_locations.into_iter().map(Into::into).collect(),
202 )?;
203
204 Ok(snapshot)
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use anyhow::anyhow;
211 use tempfile::NamedTempFile;
212
213 use mithril_common::{entities::CompressionAlgorithm, test_utils::fake_data};
214
215 use crate::{
216 file_uploaders::MockFileUploader, services::DumbSnapshotter, test_tools::TestLogger,
217 DumbUploader,
218 };
219
220 use super::*;
221
222 #[tokio::test]
223 async fn should_compute_valid_artifact() {
224 let beacon = fake_data::beacon();
225 let certificate = fake_data::certificate("certificate-123".to_string());
226 let snapshot_digest = certificate
227 .protocol_message
228 .get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
229 .unwrap();
230 let individual_snapshot_archive_size = 7331;
231
232 let dumb_snapshotter = Arc::new(
233 DumbSnapshotter::new(CompressionAlgorithm::Zstandard)
234 .with_archive_size(individual_snapshot_archive_size),
235 );
236 let dumb_snapshot_uploader = Arc::new(DumbUploader::default());
237
238 let cardano_immutable_files_full_artifact_builder =
239 CardanoImmutableFilesFullArtifactBuilder::new(
240 fake_data::network(),
241 &Version::parse("1.0.0").unwrap(),
242 dumb_snapshotter.clone(),
243 dumb_snapshot_uploader.clone(),
244 TestLogger::stdout(),
245 );
246 let artifact = cardano_immutable_files_full_artifact_builder
247 .compute_artifact(beacon.clone(), &certificate)
248 .await
249 .unwrap();
250
251 let last_uploads = dumb_snapshot_uploader.get_last_n_uploads(2).unwrap();
252 let [ancillary_location, immutables_location, ..] = last_uploads.as_slice() else {
253 panic!("Two snapshots should have been 'uploaded'");
254 };
255 let artifact_expected = Snapshot {
256 digest: snapshot_digest.to_owned(),
257 network: fake_data::network().into(),
258 beacon,
259 size: individual_snapshot_archive_size,
260 ancillary_size: Some(individual_snapshot_archive_size),
261 locations: vec![immutables_location.into()],
262 ancillary_locations: Some(vec![ancillary_location.into()]),
263 compression_algorithm: CompressionAlgorithm::Zstandard,
264 cardano_node_version: "1.0.0".to_string(),
265 };
266
267 assert_eq!(artifact_expected, artifact);
268 }
269
270 #[tokio::test]
271 async fn snapshot_archive_name_include_beacon_and_network_values() {
272 let network = fake_data::network();
273 let beacon = CardanoDbBeacon::new(20, 145);
274 let digest = "test+digest";
275
276 let cardano_immutable_files_full_artifact_builder =
277 CardanoImmutableFilesFullArtifactBuilder::new(
278 network,
279 &Version::parse("1.0.0").unwrap(),
280 Arc::new(DumbSnapshotter::default()),
281 Arc::new(DumbUploader::default()),
282 TestLogger::stdout(),
283 );
284
285 let name = cardano_immutable_files_full_artifact_builder
286 .get_base_file_name_without_extension(&beacon, digest);
287
288 assert_eq!(
289 format!(
290 "{}-e{}-i{}.{digest}",
291 network, *beacon.epoch, beacon.immutable_file_number,
292 ),
293 name
294 );
295 }
296
297 #[tokio::test]
298 async fn remove_snapshot_archive_after_upload() {
299 let file = NamedTempFile::new().unwrap();
300 let file_path = file.path();
301 let snapshot = FileArchive::new(
302 file_path.to_path_buf(),
303 7331,
304 7331,
305 CompressionAlgorithm::default(),
306 );
307
308 let cardano_immutable_files_full_artifact_builder =
309 CardanoImmutableFilesFullArtifactBuilder::new(
310 fake_data::network(),
311 &Version::parse("1.0.0").unwrap(),
312 Arc::new(DumbSnapshotter::default()),
313 Arc::new(DumbUploader::default()),
314 TestLogger::stdout(),
315 );
316
317 cardano_immutable_files_full_artifact_builder
318 .upload_snapshot_archive(&snapshot)
319 .await
320 .expect("Snapshot upload should not fail");
321
322 assert!(
323 !file_path.exists(),
324 "Ongoing snapshot file should have been removed after upload"
325 );
326 }
327
328 #[tokio::test]
329 async fn remove_snapshot_archive_after_upload_even_if_an_error_occurred() {
330 let file = NamedTempFile::new().unwrap();
331 let file_path = file.path();
332 let snapshot = FileArchive::new(
333 file_path.to_path_buf(),
334 7331,
335 7331,
336 CompressionAlgorithm::default(),
337 );
338 let mut snapshot_uploader = MockFileUploader::new();
339 snapshot_uploader
340 .expect_upload()
341 .return_once(|_| Err(anyhow!("an error")))
342 .once();
343
344 let cardano_immutable_files_full_artifact_builder =
345 CardanoImmutableFilesFullArtifactBuilder::new(
346 fake_data::network(),
347 &Version::parse("1.0.0").unwrap(),
348 Arc::new(DumbSnapshotter::default()),
349 Arc::new(snapshot_uploader),
350 TestLogger::stdout(),
351 );
352
353 cardano_immutable_files_full_artifact_builder
354 .upload_snapshot_archive(&snapshot)
355 .await
356 .expect_err("Snapshot upload should have failed");
357
358 assert!(
359 !file_path.exists(),
360 "Ongoing snapshot file should have been removed even after upload failure"
361 );
362 }
363}