mithril_aggregator/file_uploaders/
interface.rs1use async_trait::async_trait;
2use mithril_common::{entities::FileUri, StdResult};
3use std::{path::Path, time::Duration};
4
5#[derive(Debug, PartialEq, Clone)]
7pub struct FileUploadRetryPolicy {
8 pub attempts: usize,
10 pub delay_between_attempts: Duration,
12}
13
14impl FileUploadRetryPolicy {
15 pub fn never() -> Self {
17 Self {
18 attempts: 1,
19 delay_between_attempts: Duration::from_secs(0),
20 }
21 }
22}
23
24impl Default for FileUploadRetryPolicy {
25 fn default() -> Self {
27 Self {
28 attempts: 3,
29 delay_between_attempts: Duration::from_secs(5),
30 }
31 }
32}
33
34#[cfg_attr(test, mockall::automock)]
37#[async_trait]
38pub trait FileUploader: Sync + Send {
39 async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri>;
41
42 fn retry_policy(&self) -> FileUploadRetryPolicy {
44 FileUploadRetryPolicy::never()
45 }
46
47 async fn upload(&self, filepath: &Path) -> StdResult<FileUri> {
49 let retry_policy = self.retry_policy();
50
51 let mut nb_attempts = 0;
52 loop {
53 nb_attempts += 1;
54 match self.upload_without_retry(filepath).await {
55 Ok(result) => return Ok(result),
56 Err(_) if nb_attempts >= retry_policy.attempts => {
57 return Err(anyhow::anyhow!(
58 "Upload failed after {} attempts",
59 nb_attempts
60 ));
61 }
62 _ => tokio::time::sleep(retry_policy.delay_between_attempts).await,
63 }
64 }
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use std::{path::PathBuf, sync::Mutex, time::Instant};
71
72 use super::*;
73 use anyhow::anyhow;
74 use mockall::{mock, predicate::eq};
75
76 mock! {
77 TestFileUploaderWithDefaultRetryPolicy {
78 }
79 #[async_trait]
80 impl FileUploader for TestFileUploaderWithDefaultRetryPolicy {
81 async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri>;
82 }
83 }
84
85 mock! {
86 TestFileUploader {
87 }
88
89 #[async_trait]
90 impl FileUploader for TestFileUploader {
91 async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri>;
92 fn retry_policy(&self) -> FileUploadRetryPolicy;
93 }
94 }
95
96 #[tokio::test]
97 async fn upload_return_the_result_of_upload_without_retry() {
98 let mut uploader = MockTestFileUploaderWithDefaultRetryPolicy::new();
99 uploader
100 .expect_upload_without_retry()
101 .with(eq(PathBuf::from("file_to_upload")))
102 .times(1)
103 .returning(|_| Ok(FileUri("file_uploaded".to_string())));
104
105 let file_uploaded = uploader.upload(Path::new("file_to_upload")).await.unwrap();
106 assert_eq!(FileUri("file_uploaded".to_string()), file_uploaded);
107 }
108
109 #[tokio::test]
110 async fn when_upload_fails_do_not_retry_by_default() {
111 let mut uploader = MockTestFileUploaderWithDefaultRetryPolicy::new();
112 uploader
113 .expect_upload_without_retry()
114 .with(eq(PathBuf::from("file_to_upload")))
115 .times(1)
116 .returning(|_| Err(anyhow!("Failure while uploading...")));
117
118 uploader
119 .upload(Path::new("file_to_upload"))
120 .await
121 .expect_err("Should fail on upload");
122 }
123
124 #[tokio::test]
125 async fn should_retry_if_fail() {
126 let mut uploader = MockTestFileUploader::new();
127
128 uploader
129 .expect_retry_policy()
130 .returning(|| FileUploadRetryPolicy {
131 attempts: 50,
132 delay_between_attempts: Duration::ZERO,
133 });
134
135 uploader
136 .expect_upload_without_retry()
137 .with(eq(PathBuf::from("file_to_upload")))
138 .times(2)
139 .returning(|_| Err(anyhow!("Failure while uploading...")));
140 uploader
141 .expect_upload_without_retry()
142 .with(eq(PathBuf::from("file_to_upload")))
143 .times(1)
144 .returning(|_| Ok(FileUri("file_uploaded".to_string())));
145
146 let file_uploaded = uploader.upload(Path::new("file_to_upload")).await.unwrap();
147 assert_eq!(FileUri("file_uploaded".to_string()), file_uploaded);
148 }
149
150 #[tokio::test]
151 async fn should_recall_a_failing_inner_uploader_up_to_the_limit() {
152 let mut uploader = MockTestFileUploader::new();
153
154 uploader
155 .expect_retry_policy()
156 .returning(|| FileUploadRetryPolicy {
157 attempts: 4,
158 delay_between_attempts: Duration::ZERO,
159 });
160
161 uploader
162 .expect_upload_without_retry()
163 .with(eq(PathBuf::from("file_to_upload")))
164 .times(4)
165 .returning(|_| Err(anyhow!("Failure while uploading...")));
166
167 uploader
168 .upload(&PathBuf::from("file_to_upload"))
169 .await
170 .expect_err("An error should be returned when all retries are done");
171 }
172
173 #[tokio::test]
174 async fn should_delay_between_retries() {
175 struct FileUploaderAssertDelay {
176 delay_ms: u64,
177 last_attempt_start_time: Mutex<Option<Instant>>,
178 }
179
180 #[async_trait]
181 impl FileUploader for FileUploaderAssertDelay {
182 async fn upload_without_retry(&self, _filepath: &Path) -> StdResult<FileUri> {
183 let mut last_attempt_start_time = self.last_attempt_start_time.lock().unwrap();
184 if let Some(last_start_attempt) = *last_attempt_start_time {
185 let duration = last_start_attempt.elapsed();
186 let expected_delay_greater_than_or_equal = Duration::from_millis(self.delay_ms);
187 assert!(
188 duration >= expected_delay_greater_than_or_equal,
189 "duration should be greater than or equal to {}ms but was {}ms",
190 expected_delay_greater_than_or_equal.as_millis(),
191 duration.as_millis()
192 );
193 let expected_delay_less_than = Duration::from_millis(2 * self.delay_ms);
194 assert!(
195 duration < expected_delay_less_than,
196 "duration should be less than {}ms but was {}ms",
197 expected_delay_less_than.as_millis(),
198 duration.as_millis()
199 );
200 }
201 *last_attempt_start_time = Some(Instant::now());
202
203 Err(anyhow::anyhow!("Upload failed"))
204 }
205
206 fn retry_policy(&self) -> FileUploadRetryPolicy {
207 FileUploadRetryPolicy {
208 attempts: 4,
209 delay_between_attempts: Duration::from_millis(self.delay_ms),
210 }
211 }
212 }
213
214 let uploader = FileUploaderAssertDelay {
215 delay_ms: 50,
216 last_attempt_start_time: Mutex::new(None),
217 };
218
219 uploader
220 .upload(&PathBuf::from("file_to_upload"))
221 .await
222 .expect_err("An error should be returned when all retries are done");
223 }
224}