1use std::{path::Path, sync::Arc};
2
3use anyhow::{Context, anyhow};
4use async_trait::async_trait;
5use slog::{Logger, debug, error, warn};
6
7use mithril_common::{
8 CardanoNetwork, StdResult,
9 entities::{AncillaryLocation, CardanoDbBeacon, CompressionAlgorithm},
10 logging::LoggerExtensions,
11};
12
13use crate::{
14 DumbUploader, FileUploader,
15 file_uploaders::{CloudUploader, LocalUploader},
16 services::Snapshotter,
17 tools::file_archiver::FileArchive,
18};
19
20#[cfg_attr(test, mockall::automock)]
22#[async_trait]
23pub trait AncillaryFileUploader: Send + Sync {
24 async fn upload(
26 &self,
27 filepath: &Path,
28 compression_algorithm: Option<CompressionAlgorithm>,
29 ) -> StdResult<AncillaryLocation>;
30}
31
32#[async_trait]
33impl AncillaryFileUploader for DumbUploader {
34 async fn upload(
35 &self,
36 filepath: &Path,
37 compression_algorithm: Option<CompressionAlgorithm>,
38 ) -> StdResult<AncillaryLocation> {
39 let uri = FileUploader::upload(self, filepath).await?.into();
40
41 Ok(AncillaryLocation::CloudStorage {
42 uri,
43 compression_algorithm,
44 })
45 }
46}
47
48#[async_trait]
49impl AncillaryFileUploader for LocalUploader {
50 async fn upload(
51 &self,
52 filepath: &Path,
53 compression_algorithm: Option<CompressionAlgorithm>,
54 ) -> StdResult<AncillaryLocation> {
55 let uri = FileUploader::upload(self, filepath).await?.into();
56
57 Ok(AncillaryLocation::CloudStorage {
58 uri,
59 compression_algorithm,
60 })
61 }
62}
63
64#[async_trait]
65impl AncillaryFileUploader for CloudUploader {
66 async fn upload(
67 &self,
68 filepath: &Path,
69 compression_algorithm: Option<CompressionAlgorithm>,
70 ) -> StdResult<AncillaryLocation> {
71 let uri = FileUploader::upload(self, filepath).await?.into();
72
73 Ok(AncillaryLocation::CloudStorage {
74 uri,
75 compression_algorithm,
76 })
77 }
78}
79
80#[derive(Debug)]
81pub struct AncillaryUpload {
82 pub locations: Vec<AncillaryLocation>,
83 pub size: u64,
84}
85
86pub struct AncillaryArtifactBuilder {
89 uploaders: Vec<Arc<dyn AncillaryFileUploader>>,
90 snapshotter: Arc<dyn Snapshotter>,
91 cardano_network: CardanoNetwork,
92 logger: Logger,
93}
94
95impl AncillaryArtifactBuilder {
96 pub fn new(
98 uploaders: Vec<Arc<dyn AncillaryFileUploader>>,
99 snapshotter: Arc<dyn Snapshotter>,
100 cardano_network: CardanoNetwork,
101 logger: Logger,
102 ) -> StdResult<Self> {
103 if uploaders.is_empty() {
104 return Err(anyhow!(
105 "At least one uploader is required to create an 'AncillaryArtifactBuilder'"
106 ));
107 }
108
109 Ok(Self {
110 uploaders,
111 logger: logger.new_with_component_name::<Self>(),
112 cardano_network,
113 snapshotter,
114 })
115 }
116
117 pub async fn upload(&self, beacon: &CardanoDbBeacon) -> StdResult<AncillaryUpload> {
118 let snapshot = self.create_ancillary_archive(beacon).await?;
119 let locations = self.upload_ancillary_archive(&snapshot).await?;
120
121 Ok(AncillaryUpload {
122 locations,
123 size: snapshot.get_uncompressed_size(),
124 })
125 }
126
127 async fn create_ancillary_archive(&self, beacon: &CardanoDbBeacon) -> StdResult<FileArchive> {
129 debug!(
130 self.logger,
131 "Creating ancillary archive for immutable file number: {}",
132 beacon.immutable_file_number
133 );
134
135 let archive_name = format!(
136 "{}-e{}-i{}.ancillary",
137 self.cardano_network, *beacon.epoch, beacon.immutable_file_number,
138 );
139
140 let snapshot = self
141 .snapshotter
142 .snapshot_ancillary(beacon.immutable_file_number, &archive_name)
143 .await
144 .with_context(|| {
145 format!(
146 "Failed to create ancillary archive for immutable file number: {}",
147 beacon.immutable_file_number
148 )
149 })?;
150
151 debug!(
152 self.logger,
153 "Ancillary archive created at path: {:?}",
154 snapshot.get_file_path()
155 );
156
157 Ok(snapshot)
158 }
159
160 async fn upload_ancillary_archive(
162 &self,
163 file_archive: &FileArchive,
164 ) -> StdResult<Vec<AncillaryLocation>> {
165 let archive_filepath = file_archive.get_file_path();
166 let mut locations = Vec::new();
167 for uploader in &self.uploaders {
168 let result = uploader
169 .upload(
170 archive_filepath,
171 Some(file_archive.get_compression_algorithm()),
172 )
173 .await;
174 match result {
175 Ok(location) => {
176 locations.push(location);
177 }
178 Err(e) => {
179 error!(
180 self.logger,
181 "Failed to upload ancillary archive";
182 "error" => e.to_string()
183 );
184 }
185 }
186 }
187
188 if let Err(error) = tokio::fs::remove_file(archive_filepath).await {
189 warn!(
190 self.logger, " > Post upload ancillary archive file removal failure";
191 "error" => error
192 );
193 }
194
195 if locations.is_empty() {
196 return Err(anyhow!(
197 "Failed to upload ancillary archive with all uploaders"
198 ));
199 }
200
201 Ok(locations)
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use std::path::PathBuf;
208
209 use mithril_common::test_utils::{TempDir, assert_equivalent};
210
211 use crate::services::{DumbSnapshotter, MockSnapshotter};
212 use crate::test_tools::TestLogger;
213
214 use super::*;
215
216 fn fake_uploader_returning_error() -> MockAncillaryFileUploader {
217 let mut uploader = MockAncillaryFileUploader::new();
218 uploader
219 .expect_upload()
220 .return_once(|_, _| Err(anyhow!("Failure while uploading...")));
221
222 uploader
223 }
224
225 fn fake_uploader(
226 archive_path: &str,
227 location_uri: &str,
228 compression_algorithm: Option<CompressionAlgorithm>,
229 ) -> MockAncillaryFileUploader {
230 let uri = location_uri.to_string();
231 let filepath = archive_path.to_string();
232 let mut uploader = MockAncillaryFileUploader::new();
233 uploader
234 .expect_upload()
235 .withf(move |path, algorithm| {
236 path == Path::new(&filepath) && algorithm == &compression_algorithm
237 })
238 .times(1)
239 .return_once(move |_, _| {
240 Ok(AncillaryLocation::CloudStorage {
241 uri,
242 compression_algorithm,
243 })
244 });
245
246 uploader
247 }
248
249 fn create_fake_archive(dir: &Path, name: &str) -> PathBuf {
250 use std::fs::File;
251 use std::io::Write;
252
253 let file_path = dir.join(name);
254 let mut file = File::create(&file_path).unwrap();
255 writeln!(
256 file,
257 "I swear, this is an archive, not a temporary test file."
258 )
259 .unwrap();
260
261 file_path
262 }
263
264 #[test]
265 fn create_ancillary_builder_should_error_when_no_uploader() {
266 let result = AncillaryArtifactBuilder::new(
267 vec![],
268 Arc::new(DumbSnapshotter::default()),
269 CardanoNetwork::DevNet(123),
270 TestLogger::stdout(),
271 );
272
273 assert!(result.is_err(), "Should return an error when no uploaders")
274 }
275
276 #[tokio::test]
277 async fn upload_ancillary_archive_should_log_upload_errors() {
278 let (logger, log_inspector) = TestLogger::memory();
279 let mut uploader = MockAncillaryFileUploader::new();
280 uploader
281 .expect_upload()
282 .return_once(|_, _| Err(anyhow!("Failure while uploading...")));
283
284 let builder = AncillaryArtifactBuilder::new(
285 vec![Arc::new(uploader)],
286 Arc::new(DumbSnapshotter::default()),
287 CardanoNetwork::DevNet(123),
288 logger,
289 )
290 .unwrap();
291
292 let _ = builder.upload_ancillary_archive(&FileArchive::dummy()).await;
293
294 assert!(log_inspector.contains_log("Failure while uploading..."));
295 }
296
297 #[tokio::test]
298 async fn upload_ancillary_archive_should_error_when_no_location_is_returned() {
299 let uploader = fake_uploader_returning_error();
300
301 let builder = AncillaryArtifactBuilder::new(
302 vec![Arc::new(uploader)],
303 Arc::new(DumbSnapshotter::default()),
304 CardanoNetwork::DevNet(123),
305 TestLogger::stdout(),
306 )
307 .unwrap();
308
309 let result = builder.upload_ancillary_archive(&FileArchive::dummy()).await;
310
311 assert!(
312 result.is_err(),
313 "Should return an error when no location is returned"
314 );
315 }
316
317 #[tokio::test]
318 async fn upload_ancillary_archive_should_return_location_even_with_uploaders_errors() {
319 let first_uploader = fake_uploader_returning_error();
320 let second_uploader =
321 fake_uploader("archive_path", "an_uri", Some(CompressionAlgorithm::Gzip));
322 let third_uploader = fake_uploader_returning_error();
323
324 let uploaders: Vec<Arc<dyn AncillaryFileUploader>> = vec![
325 Arc::new(first_uploader),
326 Arc::new(second_uploader),
327 Arc::new(third_uploader),
328 ];
329
330 let builder = AncillaryArtifactBuilder::new(
331 uploaders,
332 Arc::new(DumbSnapshotter::default()),
333 CardanoNetwork::DevNet(123),
334 TestLogger::stdout(),
335 )
336 .unwrap();
337
338 let locations = builder
339 .upload_ancillary_archive(&FileArchive::new(
340 PathBuf::from("archive_path"),
341 0,
342 0,
343 CompressionAlgorithm::Gzip,
344 ))
345 .await
346 .unwrap();
347
348 assert_equivalent(
349 locations,
350 vec![AncillaryLocation::CloudStorage {
351 uri: "an_uri".to_string(),
352 compression_algorithm: Some(CompressionAlgorithm::Gzip),
353 }],
354 );
355 }
356
357 #[tokio::test]
358 async fn upload_ancillary_archive_should_return_all_uploaders_returned_locations() {
359 let first_uploader =
360 fake_uploader("archive_path", "an_uri", Some(CompressionAlgorithm::Gzip));
361 let second_uploader = fake_uploader(
362 "archive_path",
363 "another_uri",
364 Some(CompressionAlgorithm::Gzip),
365 );
366
367 let uploaders: Vec<Arc<dyn AncillaryFileUploader>> =
368 vec![Arc::new(first_uploader), Arc::new(second_uploader)];
369
370 let builder = AncillaryArtifactBuilder::new(
371 uploaders,
372 Arc::new(DumbSnapshotter::new(CompressionAlgorithm::Gzip)),
373 CardanoNetwork::DevNet(123),
374 TestLogger::stdout(),
375 )
376 .unwrap();
377
378 let locations = builder
379 .upload_ancillary_archive(&FileArchive::new(
380 PathBuf::from("archive_path"),
381 0,
382 0,
383 CompressionAlgorithm::Gzip,
384 ))
385 .await
386 .unwrap();
387
388 assert_equivalent(
389 locations,
390 vec![
391 AncillaryLocation::CloudStorage {
392 uri: "an_uri".to_string(),
393 compression_algorithm: Some(CompressionAlgorithm::Gzip),
394 },
395 AncillaryLocation::CloudStorage {
396 uri: "another_uri".to_string(),
397 compression_algorithm: Some(CompressionAlgorithm::Gzip),
398 },
399 ],
400 );
401 }
402
403 #[tokio::test]
404 async fn upload_ancillary_archive_should_remove_archive_after_upload() {
405 let source_dir = TempDir::create(
406 "ancillary",
407 "upload_ancillary_archive_should_remove_archive_after_upload",
408 );
409 let archive_path = create_fake_archive(&source_dir, "ancillary.tar.gz");
410 let archive = FileArchive::new(archive_path.clone(), 0, 0, CompressionAlgorithm::Gzip);
411 let uploader = fake_uploader(
412 archive_path.as_os_str().to_str().unwrap(),
413 "an_uri",
414 Some(CompressionAlgorithm::Gzip),
415 );
416
417 let builder = AncillaryArtifactBuilder::new(
418 vec![Arc::new(uploader)],
419 Arc::new(DumbSnapshotter::new(CompressionAlgorithm::Gzip)),
420 CardanoNetwork::DevNet(123),
421 TestLogger::stdout(),
422 )
423 .unwrap();
424
425 assert!(archive_path.exists());
426
427 builder.upload_ancillary_archive(&archive).await.unwrap();
428
429 assert!(!archive_path.exists());
430 }
431
432 #[tokio::test]
433 async fn upload_ancillary_archive_should_remove_archive_when_no_uploader_succeed() {
434 let source_dir = TempDir::create(
435 "ancillary",
436 "upload_ancillary_archive_should_remove_archive_when_no_uploader_succeed",
437 );
438 let archive_path = create_fake_archive(&source_dir, "ancillary.tar.gz");
439 let archive = FileArchive::new(archive_path.clone(), 0, 0, CompressionAlgorithm::Gzip);
440 let uploader = fake_uploader_returning_error();
441
442 let builder = AncillaryArtifactBuilder::new(
443 vec![Arc::new(uploader)],
444 Arc::new(DumbSnapshotter::default()),
445 CardanoNetwork::DevNet(123),
446 TestLogger::stdout(),
447 )
448 .unwrap();
449
450 assert!(archive_path.exists());
451
452 builder.upload_ancillary_archive(&archive).await.unwrap_err();
453
454 assert!(!archive_path.exists());
455 }
456
457 #[tokio::test]
458 async fn upload_should_return_error_and_not_upload_when_archive_creation_fails() {
459 let mut snapshotter = MockSnapshotter::new();
460 snapshotter
461 .expect_snapshot_ancillary()
462 .returning(|_, _| Err(anyhow!("Failed to create archive")));
463
464 let mut uploader = MockAncillaryFileUploader::new();
465 uploader.expect_upload().never();
466
467 let builder = AncillaryArtifactBuilder::new(
468 vec![Arc::new(uploader)],
469 Arc::new(snapshotter),
470 CardanoNetwork::DevNet(123),
471 TestLogger::stdout(),
472 )
473 .unwrap();
474
475 builder
476 .upload(&CardanoDbBeacon::new(99, 1))
477 .await
478 .expect_err("Should return an error when archive creation fails");
479 }
480
481 #[tokio::test]
482 async fn should_compute_the_size_of_the_ancillary() {
483 let mut snapshotter = MockSnapshotter::new();
484 snapshotter.expect_snapshot_ancillary().returning(|_, _| {
485 let expected_uncompressed_size = 123456;
486 Ok(FileArchive::new(
487 PathBuf::from("whatever.tar.gz"),
488 0,
489 expected_uncompressed_size,
490 CompressionAlgorithm::Gzip,
491 ))
492 });
493 let mut uploader = MockAncillaryFileUploader::new();
494 uploader.expect_upload().returning(|_, _| {
495 Ok(AncillaryLocation::CloudStorage {
496 uri: "an_uri".to_string(),
497 compression_algorithm: Some(CompressionAlgorithm::Gzip),
498 })
499 });
500
501 let builder = AncillaryArtifactBuilder::new(
502 vec![Arc::new(uploader)],
503 Arc::new(snapshotter),
504 CardanoNetwork::DevNet(123),
505 TestLogger::stdout(),
506 )
507 .unwrap();
508
509 let upload = builder.upload(&CardanoDbBeacon::new(99, 1)).await.unwrap();
510
511 assert_eq!(123456, upload.size);
512 }
513}