mithril_aggregator/file_uploaders/
gcp_uploader.rs

1use anyhow::{anyhow, Context};
2use async_trait::async_trait;
3use cloud_storage::{
4    bucket::Entity, bucket_access_control::Role, object_access_control::NewObjectAccessControl,
5    Client,
6};
7use slog::{info, Logger};
8use std::{
9    env,
10    fmt::Display,
11    path::{Path, PathBuf},
12    sync::Arc,
13};
14use tokio_util::codec::{BytesCodec, FramedRead};
15
16use mithril_common::{entities::FileUri, logging::LoggerExtensions, StdResult};
17
18use crate::FileUploader;
19
20use super::FileUploadRetryPolicy;
21
22/// CloudRemotePath represents a cloud remote path
23#[derive(Debug, Clone, PartialEq)]
24pub struct CloudRemotePath(PathBuf);
25
26impl CloudRemotePath {
27    /// CloudRemotePath factory
28    pub fn new(file_path: &str) -> Self {
29        Self(PathBuf::from(file_path))
30    }
31
32    /// Join a file path to the current remote path
33    pub fn join(&self, file_path: &str) -> Self {
34        Self(self.0.join(file_path))
35    }
36}
37
38impl Display for CloudRemotePath {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "{}", self.0.to_string_lossy())
41    }
42}
43
44impl From<&Path> for CloudRemotePath {
45    fn from(path: &Path) -> Self {
46        CloudRemotePath(path.to_path_buf())
47    }
48}
49
50fn get_file_name(file_path: &Path) -> StdResult<&str> {
51    file_path
52        .file_name()
53        .map(|s| s.to_str())
54        .ok_or(anyhow!("Could not convert file path to file name"))?
55        .ok_or(anyhow!("Could not find the final component of the path"))
56}
57
58/// CloudBackendUploader represents a cloud backend uploader
59#[cfg_attr(test, mockall::automock)]
60#[async_trait]
61pub trait CloudBackendUploader: Send + Sync {
62    /// Check if a file exists in the cloud backend
63    async fn file_exists(&self, remote_file_path: &CloudRemotePath) -> StdResult<Option<FileUri>>;
64
65    /// Upload a file to the cloud backend
66    async fn upload_file(
67        &self,
68        local_file_path: &Path,
69        remote_file_path: &CloudRemotePath,
70    ) -> StdResult<FileUri>;
71
72    /// Make a file public in the cloud backend
73    async fn make_file_public(&self, remote_file_path: &CloudRemotePath) -> StdResult<()>;
74}
75
76/// GcpBackendUploader represents a Google Cloud Platform file uploader
77#[derive(Debug)]
78pub struct GcpBackendUploader {
79    bucket: String,
80    use_cdn_domain: bool,
81    client: Client,
82    logger: Logger,
83}
84
85impl GcpBackendUploader {
86    /// GcpBackendUploader factory
87    pub fn try_new(bucket: String, use_cdn_domain: bool, logger: Logger) -> StdResult<Self> {
88        if env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_err() {
89            return Err(anyhow!(
90                "Missing GOOGLE_APPLICATION_CREDENTIALS_JSON environment variable".to_string()
91            ));
92        };
93
94        Ok(Self {
95            bucket,
96            use_cdn_domain,
97            logger: logger.new_with_component_name::<Self>(),
98            client: Client::default(),
99        })
100    }
101
102    fn get_location(&self, remote_file_path: &CloudRemotePath) -> FileUri {
103        let mut uri = vec![];
104        if !self.use_cdn_domain {
105            uri.push("storage.googleapis.com");
106        }
107        uri.push(&self.bucket);
108        let file_path = remote_file_path.to_string();
109        uri.push(&file_path);
110
111        FileUri(format!("https://{}", uri.join("/")))
112    }
113}
114
115#[async_trait]
116impl CloudBackendUploader for GcpBackendUploader {
117    async fn file_exists(&self, remote_file_path: &CloudRemotePath) -> StdResult<Option<FileUri>> {
118        info!(self.logger, "Reading file metadata {remote_file_path}");
119        let file_uri = match self
120            .client
121            .object()
122            .read(&self.bucket, &remote_file_path.to_string())
123            .await
124            .with_context(|| "remote reading file metadata failure")
125        {
126            Ok(_) => {
127                info!(self.logger, "Found file metadata {remote_file_path}");
128
129                Some(self.get_location(remote_file_path))
130            }
131            Err(_) => {
132                info!(self.logger, "Missing file metadata {remote_file_path}");
133
134                None
135            }
136        };
137
138        Ok(file_uri)
139    }
140
141    async fn upload_file(
142        &self,
143        local_file_path: &Path,
144        remote_file_path: &CloudRemotePath,
145    ) -> StdResult<FileUri> {
146        info!(
147            self.logger,
148            "Uploading {} to {remote_file_path}",
149            local_file_path.display()
150        );
151        let file = tokio::fs::File::open(local_file_path).await.unwrap();
152        let stream = FramedRead::new(file, BytesCodec::new());
153        self.client
154            .object()
155            .create_streamed(
156                &self.bucket,
157                stream,
158                None,
159                &remote_file_path.to_string(),
160                "application/octet-stream",
161            )
162            .await
163            .with_context(|| "remote uploading failure")?;
164        info!(
165            self.logger,
166            "Uploaded {} to {remote_file_path}",
167            local_file_path.display()
168        );
169
170        Ok(self.get_location(remote_file_path))
171    }
172
173    async fn make_file_public(&self, remote_file_path: &CloudRemotePath) -> StdResult<()> {
174        let new_bucket_access_control = NewObjectAccessControl {
175            entity: Entity::AllUsers,
176            role: Role::Reader,
177        };
178        info!(
179            self.logger,
180            "Updating acl for {remote_file_path}: {new_bucket_access_control:?}"
181        );
182        self.client
183            .object_access_control()
184            .create(
185                &self.bucket,
186                &remote_file_path.to_string(),
187                &new_bucket_access_control,
188            )
189            .await
190            .with_context(|| "updating acl failure")?;
191
192        info!(self.logger, "Updated acl for {remote_file_path}");
193
194        Ok(())
195    }
196}
197
198/// GcpUploader represents a Google Cloud Platform file uploader interactor
199pub struct GcpUploader {
200    cloud_backend_uploader: Arc<dyn CloudBackendUploader>,
201    remote_folder: CloudRemotePath,
202    allow_overwrite: bool,
203    retry_policy: FileUploadRetryPolicy,
204}
205
206impl GcpUploader {
207    /// GcpUploader factory
208    pub fn new(
209        cloud_backend_uploader: Arc<dyn CloudBackendUploader>,
210        remote_folder: CloudRemotePath,
211        allow_overwrite: bool,
212        retry_policy: FileUploadRetryPolicy,
213    ) -> Self {
214        Self {
215            cloud_backend_uploader,
216            remote_folder,
217            allow_overwrite,
218            retry_policy,
219        }
220    }
221}
222
223#[async_trait]
224impl FileUploader for GcpUploader {
225    async fn upload_without_retry(&self, file_path: &Path) -> StdResult<FileUri> {
226        let remote_file_path = self.remote_folder.join(get_file_name(file_path)?);
227        if !self.allow_overwrite {
228            if let Some(file_uri) = self
229                .cloud_backend_uploader
230                .file_exists(&remote_file_path)
231                .await
232                .with_context(|| "checking if file exists in cloud")?
233            {
234                return Ok(file_uri);
235            }
236        }
237
238        let file_uri = self
239            .cloud_backend_uploader
240            .upload_file(file_path, &remote_file_path)
241            .await
242            .with_context(|| "uploading file to cloud")?;
243        self.cloud_backend_uploader
244            .make_file_public(&remote_file_path)
245            .await
246            .with_context(|| "making file public in cloud")?;
247
248        Ok(file_uri)
249    }
250
251    fn retry_policy(&self) -> FileUploadRetryPolicy {
252        self.retry_policy.clone()
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use std::time::Duration;
259
260    use crate::{file_uploaders::FileUploadRetryPolicy, test_tools::TestLogger};
261
262    use super::*;
263
264    mod cloud_backend_uploader {
265        use mockall::predicate::eq;
266
267        use super::*;
268
269        #[tokio::test]
270        async fn upload_public_file_succeeds_when_file_does_not_exist_remotely_and_without_overwriting_allowed(
271        ) {
272            let allow_overwrite = false;
273            let local_file_path = Path::new("local_folder").join("snapshot.xxx.tar.gz");
274            let remote_folder_path = CloudRemotePath::new("remote_folder");
275            let remote_file_path = remote_folder_path.join("snapshot.xxx.tar.gz");
276            let expected_file_uri =
277                FileUri("https://cloud-host/remote_folder/snapshot.xxx.tar.gz".to_string());
278            let cloud_backend_uploader = {
279                let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
280                mock_cloud_backend_uploader
281                    .expect_file_exists()
282                    .with(eq(remote_file_path.clone()))
283                    .return_once(move |_| Ok(None))
284                    .once();
285                let expected_file_uri_clone = expected_file_uri.clone();
286                mock_cloud_backend_uploader
287                    .expect_upload_file()
288                    .with(eq(local_file_path.clone()), eq(remote_file_path.clone()))
289                    .return_once(move |_, _| Ok(expected_file_uri_clone))
290                    .once();
291                mock_cloud_backend_uploader
292                    .expect_make_file_public()
293                    .with(eq(remote_file_path.clone()))
294                    .return_once(move |_| Ok(()))
295                    .once();
296
297                mock_cloud_backend_uploader
298            };
299            let file_uploader = GcpUploader::new(
300                Arc::new(cloud_backend_uploader),
301                remote_folder_path,
302                allow_overwrite,
303                FileUploadRetryPolicy::never(),
304            );
305
306            let file_uri = file_uploader.upload(&local_file_path).await.unwrap();
307
308            assert_eq!(expected_file_uri, file_uri);
309        }
310
311        #[tokio::test]
312        async fn upload_public_file_succeeds_when_file_exists_remotely_and_without_overwriting_allowed(
313        ) {
314            let allow_overwrite = false;
315            let local_file_path = Path::new("local_folder").join("snapshot.xxx.tar.gz");
316            let remote_folder_path = CloudRemotePath::new("remote_folder");
317            let remote_file_path = remote_folder_path.join("snapshot.xxx.tar.gz");
318            let expected_file_uri =
319                FileUri("https://cloud-host/remote_folder/snapshot.xxx.tar.gz".to_string());
320            let cloud_backend_uploader = {
321                let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
322                let expected_file_uri_clone = expected_file_uri.clone();
323                mock_cloud_backend_uploader
324                    .expect_file_exists()
325                    .with(eq(remote_file_path))
326                    .return_once(move |_| Ok(Some(expected_file_uri_clone)))
327                    .once();
328
329                mock_cloud_backend_uploader
330            };
331            let file_uploader = GcpUploader::new(
332                Arc::new(cloud_backend_uploader),
333                remote_folder_path,
334                allow_overwrite,
335                FileUploadRetryPolicy::never(),
336            );
337
338            let file_uri = file_uploader.upload(&local_file_path).await.unwrap();
339
340            assert_eq!(expected_file_uri, file_uri);
341        }
342
343        #[tokio::test]
344        async fn upload_public_file_succeeds_with_overwriting_allowed() {
345            let allow_overwrite = true;
346            let local_file_path = Path::new("local_folder").join("snapshot.xxx.tar.gz");
347            let remote_folder_path = CloudRemotePath::new("remote_folder");
348            let remote_file_path = remote_folder_path.join("snapshot.xxx.tar.gz");
349            let expected_file_uri =
350                FileUri("https://cloud-host/remote_folder/snapshot.xxx.tar.gz".to_string());
351            let cloud_backend_uploader = {
352                let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
353                let expected_file_uri_clone = expected_file_uri.clone();
354                mock_cloud_backend_uploader
355                    .expect_upload_file()
356                    .with(eq(local_file_path.clone()), eq(remote_file_path.clone()))
357                    .return_once(move |_, _| Ok(expected_file_uri_clone))
358                    .once();
359                mock_cloud_backend_uploader
360                    .expect_make_file_public()
361                    .with(eq(remote_file_path))
362                    .return_once(move |_| Ok(()))
363                    .once();
364
365                mock_cloud_backend_uploader
366            };
367            let file_uploader = GcpUploader::new(
368                Arc::new(cloud_backend_uploader),
369                remote_folder_path,
370                allow_overwrite,
371                FileUploadRetryPolicy::never(),
372            );
373
374            let file_uri = file_uploader.upload(&local_file_path).await.unwrap();
375
376            assert_eq!(expected_file_uri, file_uri);
377        }
378
379        #[tokio::test]
380        async fn upload_public_file_fails_when_file_exists_fails_and_without_overwriting_allowed() {
381            let allow_overwrite = false;
382            let cloud_backend_uploader = {
383                let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
384                mock_cloud_backend_uploader
385                    .expect_file_exists()
386                    .returning(|_| Err(anyhow!("file exists error")));
387
388                mock_cloud_backend_uploader
389            };
390            let file_uploader = GcpUploader::new(
391                Arc::new(cloud_backend_uploader),
392                CloudRemotePath::new("remote_folder"),
393                allow_overwrite,
394                FileUploadRetryPolicy::never(),
395            );
396
397            file_uploader
398                .upload(Path::new("whatever"))
399                .await
400                .expect_err("should have failed");
401        }
402
403        #[tokio::test]
404        async fn upload_public_file_fails_when_upload_fails() {
405            let allow_overwrite = true;
406            let cloud_backend_uploader = {
407                let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
408                mock_cloud_backend_uploader
409                    .expect_upload_file()
410                    .return_once(move |_, _| Err(anyhow!("upload error")))
411                    .once();
412
413                mock_cloud_backend_uploader
414            };
415            let file_uploader = GcpUploader::new(
416                Arc::new(cloud_backend_uploader),
417                CloudRemotePath::new("remote_folder"),
418                allow_overwrite,
419                FileUploadRetryPolicy::never(),
420            );
421
422            file_uploader
423                .upload(Path::new("whatever"))
424                .await
425                .expect_err("should have failed");
426        }
427
428        #[tokio::test]
429        async fn upload_public_file_fails_when_make_public_fails() {
430            let allow_overwrite = true;
431            let cloud_backend_uploader = {
432                let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
433                mock_cloud_backend_uploader
434                    .expect_upload_file()
435                    .returning(|_, _| Ok(FileUri("https://whatever".to_string())));
436                mock_cloud_backend_uploader
437                    .expect_make_file_public()
438                    .returning(|_| Err(anyhow!("make public error")));
439
440                mock_cloud_backend_uploader
441            };
442            let file_uploader = GcpUploader::new(
443                Arc::new(cloud_backend_uploader),
444                CloudRemotePath::new("remote_folder"),
445                allow_overwrite,
446                FileUploadRetryPolicy::never(),
447            );
448
449            file_uploader
450                .upload(Path::new("whatever"))
451                .await
452                .expect_err("should have failed");
453        }
454    }
455
456    mod gcp_backend_uploader {
457        use super::*;
458
459        #[tokio::test]
460        async fn get_location_not_using_cdn_domain_return_google_api_uri() {
461            env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", "credentials");
462            let use_cdn_domain = false;
463            let gcp_file_uploader = GcpBackendUploader::try_new(
464                "cdn.mithril.network".to_string(),
465                use_cdn_domain,
466                TestLogger::stdout(),
467            )
468            .unwrap();
469            let remote_file_path =
470                CloudRemotePath::new("remote_folder").join("snapshot.xxx.tar.gz");
471            let expected_location =
472                "https://storage.googleapis.com/cdn.mithril.network/remote_folder/snapshot.xxx.tar.gz"
473                    .to_string();
474
475            let location = gcp_file_uploader.get_location(&remote_file_path);
476
477            assert_eq!(FileUri(expected_location), location);
478        }
479
480        #[tokio::test]
481        async fn get_location_using_cdn_domain_return_cdn_in_uri() {
482            env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", "credentials");
483            let use_cdn_domain = true;
484            let gcp_file_uploader = GcpBackendUploader::try_new(
485                "cdn.mithril.network".to_string(),
486                use_cdn_domain,
487                TestLogger::stdout(),
488            )
489            .unwrap();
490            let remote_file_path =
491                CloudRemotePath::new("remote_folder").join("snapshot.xxx.tar.gz");
492            let expected_location =
493                "https://cdn.mithril.network/remote_folder/snapshot.xxx.tar.gz".to_string();
494
495            let location = gcp_file_uploader.get_location(&remote_file_path);
496
497            assert_eq!(FileUri(expected_location), location);
498        }
499    }
500
501    #[tokio::test]
502    async fn retry_policy_from_file_uploader_trait_should_be_implemented() {
503        let expected_policy = FileUploadRetryPolicy {
504            attempts: 10,
505            delay_between_attempts: Duration::from_millis(123),
506        };
507
508        let file_uploader: Box<dyn FileUploader> = Box::new(GcpUploader::new(
509            Arc::new(MockCloudBackendUploader::new()),
510            CloudRemotePath::new("remote_folder"),
511            true,
512            expected_policy.clone(),
513        ));
514
515        assert_eq!(expected_policy, file_uploader.retry_policy());
516    }
517}