1use std::{path::Path, sync::Arc};
2
3use anyhow::{anyhow, Context};
4use async_trait::async_trait;
5use slog::{debug, error, warn, Logger};
6
7use mithril_common::{
8 entities::{AncillaryLocation, CardanoDbBeacon, CompressionAlgorithm},
9 logging::LoggerExtensions,
10 CardanoNetwork, StdResult,
11};
12
13use crate::{
14 file_uploaders::{GcpUploader, LocalUploader},
15 services::Snapshotter,
16 tools::file_archiver::FileArchive,
17 DumbUploader, FileUploader,
18};
19
20#[cfg_attr(test, mockall::automock)]
22#[async_trait]
23pub trait AncillaryFileUploader: Send + Sync {
24 async fn upload(
26 &self,
27 filepath: &Path,
28 compression_algorithm: Option<CompressionAlgorithm>,
29 ) -> StdResult<AncillaryLocation>;
30}
31
32#[async_trait]
33impl AncillaryFileUploader for DumbUploader {
34 async fn upload(
35 &self,
36 filepath: &Path,
37 compression_algorithm: Option<CompressionAlgorithm>,
38 ) -> StdResult<AncillaryLocation> {
39 let uri = FileUploader::upload(self, filepath).await?.into();
40
41 Ok(AncillaryLocation::CloudStorage {
42 uri,
43 compression_algorithm,
44 })
45 }
46}
47
48#[async_trait]
49impl AncillaryFileUploader for LocalUploader {
50 async fn upload(
51 &self,
52 filepath: &Path,
53 compression_algorithm: Option<CompressionAlgorithm>,
54 ) -> StdResult<AncillaryLocation> {
55 let uri = FileUploader::upload(self, filepath).await?.into();
56
57 Ok(AncillaryLocation::CloudStorage {
58 uri,
59 compression_algorithm,
60 })
61 }
62}
63
64#[async_trait]
65impl AncillaryFileUploader for GcpUploader {
66 async fn upload(
67 &self,
68 filepath: &Path,
69 compression_algorithm: Option<CompressionAlgorithm>,
70 ) -> StdResult<AncillaryLocation> {
71 let uri = FileUploader::upload(self, filepath).await?.into();
72
73 Ok(AncillaryLocation::CloudStorage {
74 uri,
75 compression_algorithm,
76 })
77 }
78}
79
80#[derive(Debug)]
81pub struct AncillaryUpload {
82 pub locations: Vec<AncillaryLocation>,
83 pub size: u64,
84}
85
86pub struct AncillaryArtifactBuilder {
89 uploaders: Vec<Arc<dyn AncillaryFileUploader>>,
90 snapshotter: Arc<dyn Snapshotter>,
91 cardano_network: CardanoNetwork,
92 logger: Logger,
93}
94
95impl AncillaryArtifactBuilder {
96 pub fn new(
98 uploaders: Vec<Arc<dyn AncillaryFileUploader>>,
99 snapshotter: Arc<dyn Snapshotter>,
100 cardano_network: CardanoNetwork,
101 logger: Logger,
102 ) -> StdResult<Self> {
103 if uploaders.is_empty() {
104 return Err(anyhow!(
105 "At least one uploader is required to create an 'AncillaryArtifactBuilder'"
106 ));
107 }
108
109 Ok(Self {
110 uploaders,
111 logger: logger.new_with_component_name::<Self>(),
112 cardano_network,
113 snapshotter,
114 })
115 }
116
117 pub async fn upload(&self, beacon: &CardanoDbBeacon) -> StdResult<AncillaryUpload> {
118 let snapshot = self.create_ancillary_archive(beacon).await?;
119 let locations = self.upload_ancillary_archive(&snapshot).await?;
120
121 Ok(AncillaryUpload {
122 locations,
123 size: snapshot.get_uncompressed_size(),
124 })
125 }
126
127 async fn create_ancillary_archive(&self, beacon: &CardanoDbBeacon) -> StdResult<FileArchive> {
129 debug!(
130 self.logger,
131 "Creating ancillary archive for immutable file number: {}",
132 beacon.immutable_file_number
133 );
134
135 let archive_name = format!(
136 "{}-e{}-i{}.ancillary",
137 self.cardano_network, *beacon.epoch, beacon.immutable_file_number,
138 );
139
140 let snapshot = self
141 .snapshotter
142 .snapshot_ancillary(beacon.immutable_file_number, &archive_name)
143 .await
144 .with_context(|| {
145 format!(
146 "Failed to create ancillary archive for immutable file number: {}",
147 beacon.immutable_file_number
148 )
149 })?;
150
151 debug!(
152 self.logger,
153 "Ancillary archive created at path: {:?}",
154 snapshot.get_file_path()
155 );
156
157 Ok(snapshot)
158 }
159
160 async fn upload_ancillary_archive(
162 &self,
163 file_archive: &FileArchive,
164 ) -> StdResult<Vec<AncillaryLocation>> {
165 let archive_filepath = file_archive.get_file_path();
166 let mut locations = Vec::new();
167 for uploader in &self.uploaders {
168 let result = uploader
169 .upload(
170 archive_filepath,
171 Some(file_archive.get_compression_algorithm()),
172 )
173 .await;
174 match result {
175 Ok(location) => {
176 locations.push(location);
177 }
178 Err(e) => {
179 error!(
180 self.logger,
181 "Failed to upload ancillary archive";
182 "error" => e.to_string()
183 );
184 }
185 }
186 }
187
188 if let Err(error) = tokio::fs::remove_file(archive_filepath).await {
189 warn!(
190 self.logger, " > Post upload ancillary archive file removal failure";
191 "error" => error
192 );
193 }
194
195 if locations.is_empty() {
196 return Err(anyhow!(
197 "Failed to upload ancillary archive with all uploaders"
198 ));
199 }
200
201 Ok(locations)
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use std::path::PathBuf;
208
209 use mithril_common::test_utils::{assert_equivalent, TempDir};
210
211 use crate::services::{DumbSnapshotter, MockSnapshotter};
212 use crate::test_tools::TestLogger;
213
214 use super::*;
215
216 fn fake_uploader_returning_error() -> MockAncillaryFileUploader {
217 let mut uploader = MockAncillaryFileUploader::new();
218 uploader
219 .expect_upload()
220 .return_once(|_, _| Err(anyhow!("Failure while uploading...")));
221
222 uploader
223 }
224
225 fn fake_uploader(
226 archive_path: &str,
227 location_uri: &str,
228 compression_algorithm: Option<CompressionAlgorithm>,
229 ) -> MockAncillaryFileUploader {
230 let uri = location_uri.to_string();
231 let filepath = archive_path.to_string();
232 let mut uploader = MockAncillaryFileUploader::new();
233 uploader
234 .expect_upload()
235 .withf(move |path, algorithm| {
236 path == Path::new(&filepath) && algorithm == &compression_algorithm
237 })
238 .times(1)
239 .return_once(move |_, _| {
240 Ok(AncillaryLocation::CloudStorage {
241 uri,
242 compression_algorithm,
243 })
244 });
245
246 uploader
247 }
248
249 fn create_fake_archive(dir: &Path, name: &str) -> PathBuf {
250 use std::fs::File;
251 use std::io::Write;
252
253 let file_path = dir.join(name);
254 let mut file = File::create(&file_path).unwrap();
255 writeln!(
256 file,
257 "I swear, this is an archive, not a temporary test file."
258 )
259 .unwrap();
260
261 file_path
262 }
263
264 #[test]
265 fn create_ancillary_builder_should_error_when_no_uploader() {
266 let result = AncillaryArtifactBuilder::new(
267 vec![],
268 Arc::new(DumbSnapshotter::default()),
269 CardanoNetwork::DevNet(123),
270 TestLogger::stdout(),
271 );
272
273 assert!(result.is_err(), "Should return an error when no uploaders")
274 }
275
276 #[tokio::test]
277 async fn upload_ancillary_archive_should_log_upload_errors() {
278 let log_path = TempDir::create(
279 "ancillary",
280 "upload_ancillary_archive_should_log_upload_errors",
281 )
282 .join("test.log");
283
284 let mut uploader = MockAncillaryFileUploader::new();
285 uploader
286 .expect_upload()
287 .return_once(|_, _| Err(anyhow!("Failure while uploading...")));
288
289 {
290 let builder = AncillaryArtifactBuilder::new(
291 vec![Arc::new(uploader)],
292 Arc::new(DumbSnapshotter::default()),
293 CardanoNetwork::DevNet(123),
294 TestLogger::file(&log_path),
295 )
296 .unwrap();
297
298 let _ = builder
299 .upload_ancillary_archive(&FileArchive::dummy())
300 .await;
301 }
302
303 let logs = std::fs::read_to_string(&log_path).unwrap();
304 assert!(logs.contains("Failure while uploading..."));
305 }
306
307 #[tokio::test]
308 async fn upload_ancillary_archive_should_error_when_no_location_is_returned() {
309 let uploader = fake_uploader_returning_error();
310
311 let builder = AncillaryArtifactBuilder::new(
312 vec![Arc::new(uploader)],
313 Arc::new(DumbSnapshotter::default()),
314 CardanoNetwork::DevNet(123),
315 TestLogger::stdout(),
316 )
317 .unwrap();
318
319 let result = builder
320 .upload_ancillary_archive(&FileArchive::dummy())
321 .await;
322
323 assert!(
324 result.is_err(),
325 "Should return an error when no location is returned"
326 );
327 }
328
329 #[tokio::test]
330 async fn upload_ancillary_archive_should_return_location_even_with_uploaders_errors() {
331 let first_uploader = fake_uploader_returning_error();
332 let second_uploader =
333 fake_uploader("archive_path", "an_uri", Some(CompressionAlgorithm::Gzip));
334 let third_uploader = fake_uploader_returning_error();
335
336 let uploaders: Vec<Arc<dyn AncillaryFileUploader>> = vec![
337 Arc::new(first_uploader),
338 Arc::new(second_uploader),
339 Arc::new(third_uploader),
340 ];
341
342 let builder = AncillaryArtifactBuilder::new(
343 uploaders,
344 Arc::new(DumbSnapshotter::default()),
345 CardanoNetwork::DevNet(123),
346 TestLogger::stdout(),
347 )
348 .unwrap();
349
350 let locations = builder
351 .upload_ancillary_archive(&FileArchive::new(
352 PathBuf::from("archive_path"),
353 0,
354 0,
355 CompressionAlgorithm::Gzip,
356 ))
357 .await
358 .unwrap();
359
360 assert_equivalent(
361 locations,
362 vec![AncillaryLocation::CloudStorage {
363 uri: "an_uri".to_string(),
364 compression_algorithm: Some(CompressionAlgorithm::Gzip),
365 }],
366 );
367 }
368
369 #[tokio::test]
370 async fn upload_ancillary_archive_should_return_all_uploaders_returned_locations() {
371 let first_uploader =
372 fake_uploader("archive_path", "an_uri", Some(CompressionAlgorithm::Gzip));
373 let second_uploader = fake_uploader(
374 "archive_path",
375 "another_uri",
376 Some(CompressionAlgorithm::Gzip),
377 );
378
379 let uploaders: Vec<Arc<dyn AncillaryFileUploader>> =
380 vec![Arc::new(first_uploader), Arc::new(second_uploader)];
381
382 let builder = AncillaryArtifactBuilder::new(
383 uploaders,
384 Arc::new(DumbSnapshotter::new(CompressionAlgorithm::Gzip)),
385 CardanoNetwork::DevNet(123),
386 TestLogger::stdout(),
387 )
388 .unwrap();
389
390 let locations = builder
391 .upload_ancillary_archive(&FileArchive::new(
392 PathBuf::from("archive_path"),
393 0,
394 0,
395 CompressionAlgorithm::Gzip,
396 ))
397 .await
398 .unwrap();
399
400 assert_equivalent(
401 locations,
402 vec![
403 AncillaryLocation::CloudStorage {
404 uri: "an_uri".to_string(),
405 compression_algorithm: Some(CompressionAlgorithm::Gzip),
406 },
407 AncillaryLocation::CloudStorage {
408 uri: "another_uri".to_string(),
409 compression_algorithm: Some(CompressionAlgorithm::Gzip),
410 },
411 ],
412 );
413 }
414
415 #[tokio::test]
416 async fn upload_ancillary_archive_should_remove_archive_after_upload() {
417 let source_dir = TempDir::create(
418 "ancillary",
419 "upload_ancillary_archive_should_remove_archive_after_upload",
420 );
421 let archive_path = create_fake_archive(&source_dir, "ancillary.tar.gz");
422 let archive = FileArchive::new(archive_path.clone(), 0, 0, CompressionAlgorithm::Gzip);
423 let uploader = fake_uploader(
424 archive_path.as_os_str().to_str().unwrap(),
425 "an_uri",
426 Some(CompressionAlgorithm::Gzip),
427 );
428
429 let builder = AncillaryArtifactBuilder::new(
430 vec![Arc::new(uploader)],
431 Arc::new(DumbSnapshotter::new(CompressionAlgorithm::Gzip)),
432 CardanoNetwork::DevNet(123),
433 TestLogger::stdout(),
434 )
435 .unwrap();
436
437 assert!(archive_path.exists());
438
439 builder.upload_ancillary_archive(&archive).await.unwrap();
440
441 assert!(!archive_path.exists());
442 }
443
444 #[tokio::test]
445 async fn upload_ancillary_archive_should_remove_archive_when_no_uploader_succeed() {
446 let source_dir = TempDir::create(
447 "ancillary",
448 "upload_ancillary_archive_should_remove_archive_when_no_uploader_succeed",
449 );
450 let archive_path = create_fake_archive(&source_dir, "ancillary.tar.gz");
451 let archive = FileArchive::new(archive_path.clone(), 0, 0, CompressionAlgorithm::Gzip);
452 let uploader = fake_uploader_returning_error();
453
454 let builder = AncillaryArtifactBuilder::new(
455 vec![Arc::new(uploader)],
456 Arc::new(DumbSnapshotter::default()),
457 CardanoNetwork::DevNet(123),
458 TestLogger::stdout(),
459 )
460 .unwrap();
461
462 assert!(archive_path.exists());
463
464 builder
465 .upload_ancillary_archive(&archive)
466 .await
467 .unwrap_err();
468
469 assert!(!archive_path.exists());
470 }
471
472 #[tokio::test]
473 async fn upload_should_return_error_and_not_upload_when_archive_creation_fails() {
474 let mut snapshotter = MockSnapshotter::new();
475 snapshotter
476 .expect_snapshot_ancillary()
477 .returning(|_, _| Err(anyhow!("Failed to create archive")));
478
479 let mut uploader = MockAncillaryFileUploader::new();
480 uploader.expect_upload().never();
481
482 let builder = AncillaryArtifactBuilder::new(
483 vec![Arc::new(uploader)],
484 Arc::new(snapshotter),
485 CardanoNetwork::DevNet(123),
486 TestLogger::stdout(),
487 )
488 .unwrap();
489
490 builder
491 .upload(&CardanoDbBeacon::new(99, 1))
492 .await
493 .expect_err("Should return an error when archive creation fails");
494 }
495
496 #[tokio::test]
497 async fn should_compute_the_size_of_the_ancillary() {
498 let mut snapshotter = MockSnapshotter::new();
499 snapshotter.expect_snapshot_ancillary().returning(|_, _| {
500 let expected_uncompressed_size = 123456;
501 Ok(FileArchive::new(
502 PathBuf::from("whatever.tar.gz"),
503 0,
504 expected_uncompressed_size,
505 CompressionAlgorithm::Gzip,
506 ))
507 });
508 let mut uploader = MockAncillaryFileUploader::new();
509 uploader.expect_upload().returning(|_, _| {
510 Ok(AncillaryLocation::CloudStorage {
511 uri: "an_uri".to_string(),
512 compression_algorithm: Some(CompressionAlgorithm::Gzip),
513 })
514 });
515
516 let builder = AncillaryArtifactBuilder::new(
517 vec![Arc::new(uploader)],
518 Arc::new(snapshotter),
519 CardanoNetwork::DevNet(123),
520 TestLogger::stdout(),
521 )
522 .unwrap();
523
524 let upload = builder.upload(&CardanoDbBeacon::new(99, 1)).await.unwrap();
525
526 assert_eq!(123456, upload.size);
527 }
528}