mithril_aggregator/file_uploaders/
gcp_uploader.rs1use anyhow::{anyhow, Context};
2use async_trait::async_trait;
3use cloud_storage::{
4 bucket::Entity, bucket_access_control::Role, object_access_control::NewObjectAccessControl,
5 Client,
6};
7use slog::{info, Logger};
8use std::{
9 env,
10 fmt::Display,
11 path::{Path, PathBuf},
12 sync::Arc,
13};
14use tokio_util::codec::{BytesCodec, FramedRead};
15
16use mithril_common::{entities::FileUri, logging::LoggerExtensions, StdResult};
17
18use crate::FileUploader;
19
20use super::FileUploadRetryPolicy;
21
22#[derive(Debug, Clone, PartialEq)]
24pub struct CloudRemotePath(PathBuf);
25
26impl CloudRemotePath {
27 pub fn new(file_path: &str) -> Self {
29 Self(PathBuf::from(file_path))
30 }
31
32 pub fn join(&self, file_path: &str) -> Self {
34 Self(self.0.join(file_path))
35 }
36}
37
38impl Display for CloudRemotePath {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "{}", self.0.to_string_lossy())
41 }
42}
43
44impl From<&Path> for CloudRemotePath {
45 fn from(path: &Path) -> Self {
46 CloudRemotePath(path.to_path_buf())
47 }
48}
49
50fn get_file_name(file_path: &Path) -> StdResult<&str> {
51 file_path
52 .file_name()
53 .map(|s| s.to_str())
54 .ok_or(anyhow!("Could not convert file path to file name"))?
55 .ok_or(anyhow!("Could not find the final component of the path"))
56}
57
58#[cfg_attr(test, mockall::automock)]
60#[async_trait]
61pub trait CloudBackendUploader: Send + Sync {
62 async fn file_exists(&self, remote_file_path: &CloudRemotePath) -> StdResult<Option<FileUri>>;
64
65 async fn upload_file(
67 &self,
68 local_file_path: &Path,
69 remote_file_path: &CloudRemotePath,
70 ) -> StdResult<FileUri>;
71
72 async fn make_file_public(&self, remote_file_path: &CloudRemotePath) -> StdResult<()>;
74}
75
76#[derive(Debug)]
78pub struct GcpBackendUploader {
79 bucket: String,
80 use_cdn_domain: bool,
81 client: Client,
82 logger: Logger,
83}
84
85impl GcpBackendUploader {
86 pub fn try_new(bucket: String, use_cdn_domain: bool, logger: Logger) -> StdResult<Self> {
88 if env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_err() {
89 return Err(anyhow!(
90 "Missing GOOGLE_APPLICATION_CREDENTIALS_JSON environment variable".to_string()
91 ));
92 };
93
94 Ok(Self {
95 bucket,
96 use_cdn_domain,
97 logger: logger.new_with_component_name::<Self>(),
98 client: Client::default(),
99 })
100 }
101
102 fn get_location(&self, remote_file_path: &CloudRemotePath) -> FileUri {
103 let mut uri = vec![];
104 if !self.use_cdn_domain {
105 uri.push("storage.googleapis.com");
106 }
107 uri.push(&self.bucket);
108 let file_path = remote_file_path.to_string();
109 uri.push(&file_path);
110
111 FileUri(format!("https://{}", uri.join("/")))
112 }
113}
114
115#[async_trait]
116impl CloudBackendUploader for GcpBackendUploader {
117 async fn file_exists(&self, remote_file_path: &CloudRemotePath) -> StdResult<Option<FileUri>> {
118 info!(self.logger, "Reading file metadata {remote_file_path}");
119 let file_uri = match self
120 .client
121 .object()
122 .read(&self.bucket, &remote_file_path.to_string())
123 .await
124 .with_context(|| "remote reading file metadata failure")
125 {
126 Ok(_) => {
127 info!(self.logger, "Found file metadata {remote_file_path}");
128
129 Some(self.get_location(remote_file_path))
130 }
131 Err(_) => {
132 info!(self.logger, "Missing file metadata {remote_file_path}");
133
134 None
135 }
136 };
137
138 Ok(file_uri)
139 }
140
141 async fn upload_file(
142 &self,
143 local_file_path: &Path,
144 remote_file_path: &CloudRemotePath,
145 ) -> StdResult<FileUri> {
146 info!(
147 self.logger,
148 "Uploading {} to {remote_file_path}",
149 local_file_path.display()
150 );
151 let file = tokio::fs::File::open(local_file_path).await.unwrap();
152 let stream = FramedRead::new(file, BytesCodec::new());
153 self.client
154 .object()
155 .create_streamed(
156 &self.bucket,
157 stream,
158 None,
159 &remote_file_path.to_string(),
160 "application/octet-stream",
161 )
162 .await
163 .with_context(|| "remote uploading failure")?;
164 info!(
165 self.logger,
166 "Uploaded {} to {remote_file_path}",
167 local_file_path.display()
168 );
169
170 Ok(self.get_location(remote_file_path))
171 }
172
173 async fn make_file_public(&self, remote_file_path: &CloudRemotePath) -> StdResult<()> {
174 let new_bucket_access_control = NewObjectAccessControl {
175 entity: Entity::AllUsers,
176 role: Role::Reader,
177 };
178 info!(
179 self.logger,
180 "Updating acl for {remote_file_path}: {new_bucket_access_control:?}"
181 );
182 self.client
183 .object_access_control()
184 .create(
185 &self.bucket,
186 &remote_file_path.to_string(),
187 &new_bucket_access_control,
188 )
189 .await
190 .with_context(|| "updating acl failure")?;
191
192 info!(self.logger, "Updated acl for {remote_file_path}");
193
194 Ok(())
195 }
196}
197
198pub struct GcpUploader {
200 cloud_backend_uploader: Arc<dyn CloudBackendUploader>,
201 remote_folder: CloudRemotePath,
202 allow_overwrite: bool,
203 retry_policy: FileUploadRetryPolicy,
204}
205
206impl GcpUploader {
207 pub fn new(
209 cloud_backend_uploader: Arc<dyn CloudBackendUploader>,
210 remote_folder: CloudRemotePath,
211 allow_overwrite: bool,
212 retry_policy: FileUploadRetryPolicy,
213 ) -> Self {
214 Self {
215 cloud_backend_uploader,
216 remote_folder,
217 allow_overwrite,
218 retry_policy,
219 }
220 }
221}
222
223#[async_trait]
224impl FileUploader for GcpUploader {
225 async fn upload_without_retry(&self, file_path: &Path) -> StdResult<FileUri> {
226 let remote_file_path = self.remote_folder.join(get_file_name(file_path)?);
227 if !self.allow_overwrite {
228 if let Some(file_uri) = self
229 .cloud_backend_uploader
230 .file_exists(&remote_file_path)
231 .await
232 .with_context(|| "checking if file exists in cloud")?
233 {
234 return Ok(file_uri);
235 }
236 }
237
238 let file_uri = self
239 .cloud_backend_uploader
240 .upload_file(file_path, &remote_file_path)
241 .await
242 .with_context(|| "uploading file to cloud")?;
243 self.cloud_backend_uploader
244 .make_file_public(&remote_file_path)
245 .await
246 .with_context(|| "making file public in cloud")?;
247
248 Ok(file_uri)
249 }
250
251 fn retry_policy(&self) -> FileUploadRetryPolicy {
252 self.retry_policy.clone()
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use std::time::Duration;
259
260 use crate::{file_uploaders::FileUploadRetryPolicy, test_tools::TestLogger};
261
262 use super::*;
263
264 mod cloud_backend_uploader {
265 use mockall::predicate::eq;
266
267 use super::*;
268
269 #[tokio::test]
270 async fn upload_public_file_succeeds_when_file_does_not_exist_remotely_and_without_overwriting_allowed(
271 ) {
272 let allow_overwrite = false;
273 let local_file_path = Path::new("local_folder").join("snapshot.xxx.tar.gz");
274 let remote_folder_path = CloudRemotePath::new("remote_folder");
275 let remote_file_path = remote_folder_path.join("snapshot.xxx.tar.gz");
276 let expected_file_uri =
277 FileUri("https://cloud-host/remote_folder/snapshot.xxx.tar.gz".to_string());
278 let cloud_backend_uploader = {
279 let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
280 mock_cloud_backend_uploader
281 .expect_file_exists()
282 .with(eq(remote_file_path.clone()))
283 .return_once(move |_| Ok(None))
284 .once();
285 let expected_file_uri_clone = expected_file_uri.clone();
286 mock_cloud_backend_uploader
287 .expect_upload_file()
288 .with(eq(local_file_path.clone()), eq(remote_file_path.clone()))
289 .return_once(move |_, _| Ok(expected_file_uri_clone))
290 .once();
291 mock_cloud_backend_uploader
292 .expect_make_file_public()
293 .with(eq(remote_file_path.clone()))
294 .return_once(move |_| Ok(()))
295 .once();
296
297 mock_cloud_backend_uploader
298 };
299 let file_uploader = GcpUploader::new(
300 Arc::new(cloud_backend_uploader),
301 remote_folder_path,
302 allow_overwrite,
303 FileUploadRetryPolicy::never(),
304 );
305
306 let file_uri = file_uploader.upload(&local_file_path).await.unwrap();
307
308 assert_eq!(expected_file_uri, file_uri);
309 }
310
311 #[tokio::test]
312 async fn upload_public_file_succeeds_when_file_exists_remotely_and_without_overwriting_allowed(
313 ) {
314 let allow_overwrite = false;
315 let local_file_path = Path::new("local_folder").join("snapshot.xxx.tar.gz");
316 let remote_folder_path = CloudRemotePath::new("remote_folder");
317 let remote_file_path = remote_folder_path.join("snapshot.xxx.tar.gz");
318 let expected_file_uri =
319 FileUri("https://cloud-host/remote_folder/snapshot.xxx.tar.gz".to_string());
320 let cloud_backend_uploader = {
321 let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
322 let expected_file_uri_clone = expected_file_uri.clone();
323 mock_cloud_backend_uploader
324 .expect_file_exists()
325 .with(eq(remote_file_path))
326 .return_once(move |_| Ok(Some(expected_file_uri_clone)))
327 .once();
328
329 mock_cloud_backend_uploader
330 };
331 let file_uploader = GcpUploader::new(
332 Arc::new(cloud_backend_uploader),
333 remote_folder_path,
334 allow_overwrite,
335 FileUploadRetryPolicy::never(),
336 );
337
338 let file_uri = file_uploader.upload(&local_file_path).await.unwrap();
339
340 assert_eq!(expected_file_uri, file_uri);
341 }
342
343 #[tokio::test]
344 async fn upload_public_file_succeeds_with_overwriting_allowed() {
345 let allow_overwrite = true;
346 let local_file_path = Path::new("local_folder").join("snapshot.xxx.tar.gz");
347 let remote_folder_path = CloudRemotePath::new("remote_folder");
348 let remote_file_path = remote_folder_path.join("snapshot.xxx.tar.gz");
349 let expected_file_uri =
350 FileUri("https://cloud-host/remote_folder/snapshot.xxx.tar.gz".to_string());
351 let cloud_backend_uploader = {
352 let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
353 let expected_file_uri_clone = expected_file_uri.clone();
354 mock_cloud_backend_uploader
355 .expect_upload_file()
356 .with(eq(local_file_path.clone()), eq(remote_file_path.clone()))
357 .return_once(move |_, _| Ok(expected_file_uri_clone))
358 .once();
359 mock_cloud_backend_uploader
360 .expect_make_file_public()
361 .with(eq(remote_file_path))
362 .return_once(move |_| Ok(()))
363 .once();
364
365 mock_cloud_backend_uploader
366 };
367 let file_uploader = GcpUploader::new(
368 Arc::new(cloud_backend_uploader),
369 remote_folder_path,
370 allow_overwrite,
371 FileUploadRetryPolicy::never(),
372 );
373
374 let file_uri = file_uploader.upload(&local_file_path).await.unwrap();
375
376 assert_eq!(expected_file_uri, file_uri);
377 }
378
379 #[tokio::test]
380 async fn upload_public_file_fails_when_file_exists_fails_and_without_overwriting_allowed() {
381 let allow_overwrite = false;
382 let cloud_backend_uploader = {
383 let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
384 mock_cloud_backend_uploader
385 .expect_file_exists()
386 .returning(|_| Err(anyhow!("file exists error")));
387
388 mock_cloud_backend_uploader
389 };
390 let file_uploader = GcpUploader::new(
391 Arc::new(cloud_backend_uploader),
392 CloudRemotePath::new("remote_folder"),
393 allow_overwrite,
394 FileUploadRetryPolicy::never(),
395 );
396
397 file_uploader
398 .upload(Path::new("whatever"))
399 .await
400 .expect_err("should have failed");
401 }
402
403 #[tokio::test]
404 async fn upload_public_file_fails_when_upload_fails() {
405 let allow_overwrite = true;
406 let cloud_backend_uploader = {
407 let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
408 mock_cloud_backend_uploader
409 .expect_upload_file()
410 .return_once(move |_, _| Err(anyhow!("upload error")))
411 .once();
412
413 mock_cloud_backend_uploader
414 };
415 let file_uploader = GcpUploader::new(
416 Arc::new(cloud_backend_uploader),
417 CloudRemotePath::new("remote_folder"),
418 allow_overwrite,
419 FileUploadRetryPolicy::never(),
420 );
421
422 file_uploader
423 .upload(Path::new("whatever"))
424 .await
425 .expect_err("should have failed");
426 }
427
428 #[tokio::test]
429 async fn upload_public_file_fails_when_make_public_fails() {
430 let allow_overwrite = true;
431 let cloud_backend_uploader = {
432 let mut mock_cloud_backend_uploader = MockCloudBackendUploader::new();
433 mock_cloud_backend_uploader
434 .expect_upload_file()
435 .returning(|_, _| Ok(FileUri("https://whatever".to_string())));
436 mock_cloud_backend_uploader
437 .expect_make_file_public()
438 .returning(|_| Err(anyhow!("make public error")));
439
440 mock_cloud_backend_uploader
441 };
442 let file_uploader = GcpUploader::new(
443 Arc::new(cloud_backend_uploader),
444 CloudRemotePath::new("remote_folder"),
445 allow_overwrite,
446 FileUploadRetryPolicy::never(),
447 );
448
449 file_uploader
450 .upload(Path::new("whatever"))
451 .await
452 .expect_err("should have failed");
453 }
454 }
455
456 mod gcp_backend_uploader {
457 use super::*;
458
459 #[tokio::test]
460 async fn get_location_not_using_cdn_domain_return_google_api_uri() {
461 env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", "credentials");
462 let use_cdn_domain = false;
463 let gcp_file_uploader = GcpBackendUploader::try_new(
464 "cdn.mithril.network".to_string(),
465 use_cdn_domain,
466 TestLogger::stdout(),
467 )
468 .unwrap();
469 let remote_file_path =
470 CloudRemotePath::new("remote_folder").join("snapshot.xxx.tar.gz");
471 let expected_location =
472 "https://storage.googleapis.com/cdn.mithril.network/remote_folder/snapshot.xxx.tar.gz"
473 .to_string();
474
475 let location = gcp_file_uploader.get_location(&remote_file_path);
476
477 assert_eq!(FileUri(expected_location), location);
478 }
479
480 #[tokio::test]
481 async fn get_location_using_cdn_domain_return_cdn_in_uri() {
482 env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", "credentials");
483 let use_cdn_domain = true;
484 let gcp_file_uploader = GcpBackendUploader::try_new(
485 "cdn.mithril.network".to_string(),
486 use_cdn_domain,
487 TestLogger::stdout(),
488 )
489 .unwrap();
490 let remote_file_path =
491 CloudRemotePath::new("remote_folder").join("snapshot.xxx.tar.gz");
492 let expected_location =
493 "https://cdn.mithril.network/remote_folder/snapshot.xxx.tar.gz".to_string();
494
495 let location = gcp_file_uploader.get_location(&remote_file_path);
496
497 assert_eq!(FileUri(expected_location), location);
498 }
499 }
500
501 #[tokio::test]
502 async fn retry_policy_from_file_uploader_trait_should_be_implemented() {
503 let expected_policy = FileUploadRetryPolicy {
504 attempts: 10,
505 delay_between_attempts: Duration::from_millis(123),
506 };
507
508 let file_uploader: Box<dyn FileUploader> = Box::new(GcpUploader::new(
509 Arc::new(MockCloudBackendUploader::new()),
510 CloudRemotePath::new("remote_folder"),
511 true,
512 expected_policy.clone(),
513 ));
514
515 assert_eq!(expected_policy, file_uploader.retry_policy());
516 }
517}