mithril_aggregator/file_uploaders/
interface.rs

1use async_trait::async_trait;
2use mithril_common::{entities::FileUri, StdResult};
3use std::{path::Path, time::Duration};
4
5/// Policy for retrying file uploads.
6#[derive(Debug, PartialEq, Clone)]
7pub struct FileUploadRetryPolicy {
8    /// Number of attempts to upload a file.
9    pub attempts: usize,
10    /// Delay between two attempts.
11    pub delay_between_attempts: Duration,
12}
13
14impl FileUploadRetryPolicy {
15    /// Create a policy that never retries.
16    pub fn never() -> Self {
17        Self {
18            attempts: 1,
19            delay_between_attempts: Duration::from_secs(0),
20        }
21    }
22}
23
24impl Default for FileUploadRetryPolicy {
25    /// Create a default retry policy.
26    fn default() -> Self {
27        Self {
28            attempts: 3,
29            delay_between_attempts: Duration::from_secs(5),
30        }
31    }
32}
33
34/// FileUploader represents a file uploader interactor.
35/// It retries the upload operation according to the retry policy.
36#[cfg_attr(test, mockall::automock)]
37#[async_trait]
38pub trait FileUploader: Sync + Send {
39    /// Try to upload once.
40    async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri>;
41
42    /// Get the retry policy for this uploader.
43    fn retry_policy(&self) -> FileUploadRetryPolicy {
44        FileUploadRetryPolicy::never()
45    }
46
47    /// Upload a file with retries according to the retry policy.
48    async fn upload(&self, filepath: &Path) -> StdResult<FileUri> {
49        let retry_policy = self.retry_policy();
50
51        let mut nb_attempts = 0;
52        loop {
53            nb_attempts += 1;
54            match self.upload_without_retry(filepath).await {
55                Ok(result) => return Ok(result),
56                Err(_) if nb_attempts >= retry_policy.attempts => {
57                    return Err(anyhow::anyhow!(
58                        "Upload failed after {} attempts",
59                        nb_attempts
60                    ));
61                }
62                _ => tokio::time::sleep(retry_policy.delay_between_attempts).await,
63            }
64        }
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use std::{path::PathBuf, sync::Mutex, time::Instant};
71
72    use super::*;
73    use anyhow::anyhow;
74    use mockall::{mock, predicate::eq};
75
76    mock! {
77        TestFileUploaderWithDefaultRetryPolicy {
78        }
79        #[async_trait]
80        impl FileUploader for TestFileUploaderWithDefaultRetryPolicy {
81            async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri>;
82        }
83    }
84
85    mock! {
86        TestFileUploader {
87        }
88
89        #[async_trait]
90        impl FileUploader for TestFileUploader {
91            async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri>;
92            fn retry_policy(&self) -> FileUploadRetryPolicy;
93        }
94    }
95
96    #[tokio::test]
97    async fn upload_return_the_result_of_upload_without_retry() {
98        let mut uploader = MockTestFileUploaderWithDefaultRetryPolicy::new();
99        uploader
100            .expect_upload_without_retry()
101            .with(eq(PathBuf::from("file_to_upload")))
102            .times(1)
103            .returning(|_| Ok(FileUri("file_uploaded".to_string())));
104
105        let file_uploaded = uploader.upload(Path::new("file_to_upload")).await.unwrap();
106        assert_eq!(FileUri("file_uploaded".to_string()), file_uploaded);
107    }
108
109    #[tokio::test]
110    async fn when_upload_fails_do_not_retry_by_default() {
111        let mut uploader = MockTestFileUploaderWithDefaultRetryPolicy::new();
112        uploader
113            .expect_upload_without_retry()
114            .with(eq(PathBuf::from("file_to_upload")))
115            .times(1)
116            .returning(|_| Err(anyhow!("Failure while uploading...")));
117
118        uploader
119            .upload(Path::new("file_to_upload"))
120            .await
121            .expect_err("Should fail on upload");
122    }
123
124    #[tokio::test]
125    async fn should_retry_if_fail() {
126        let mut uploader = MockTestFileUploader::new();
127
128        uploader
129            .expect_retry_policy()
130            .returning(|| FileUploadRetryPolicy {
131                attempts: 50,
132                delay_between_attempts: Duration::ZERO,
133            });
134
135        uploader
136            .expect_upload_without_retry()
137            .with(eq(PathBuf::from("file_to_upload")))
138            .times(2)
139            .returning(|_| Err(anyhow!("Failure while uploading...")));
140        uploader
141            .expect_upload_without_retry()
142            .with(eq(PathBuf::from("file_to_upload")))
143            .times(1)
144            .returning(|_| Ok(FileUri("file_uploaded".to_string())));
145
146        let file_uploaded = uploader.upload(Path::new("file_to_upload")).await.unwrap();
147        assert_eq!(FileUri("file_uploaded".to_string()), file_uploaded);
148    }
149
150    #[tokio::test]
151    async fn should_recall_a_failing_inner_uploader_up_to_the_limit() {
152        let mut uploader = MockTestFileUploader::new();
153
154        uploader
155            .expect_retry_policy()
156            .returning(|| FileUploadRetryPolicy {
157                attempts: 4,
158                delay_between_attempts: Duration::ZERO,
159            });
160
161        uploader
162            .expect_upload_without_retry()
163            .with(eq(PathBuf::from("file_to_upload")))
164            .times(4)
165            .returning(|_| Err(anyhow!("Failure while uploading...")));
166
167        uploader
168            .upload(&PathBuf::from("file_to_upload"))
169            .await
170            .expect_err("An error should be returned when all retries are done");
171    }
172
173    #[tokio::test]
174    async fn should_delay_between_retries() {
175        struct FileUploaderAssertDelay {
176            delay_ms: u64,
177            last_attempt_start_time: Mutex<Option<Instant>>,
178        }
179
180        #[async_trait]
181        impl FileUploader for FileUploaderAssertDelay {
182            async fn upload_without_retry(&self, _filepath: &Path) -> StdResult<FileUri> {
183                let mut last_attempt_start_time = self.last_attempt_start_time.lock().unwrap();
184                if let Some(last_start_attempt) = *last_attempt_start_time {
185                    let duration = last_start_attempt.elapsed();
186                    let expected_delay_greater_than_or_equal = Duration::from_millis(self.delay_ms);
187                    assert!(
188                        duration >= expected_delay_greater_than_or_equal,
189                        "duration should be greater than or equal to {}ms but was {}ms",
190                        expected_delay_greater_than_or_equal.as_millis(),
191                        duration.as_millis()
192                    );
193                    let expected_delay_less_than = Duration::from_millis(2 * self.delay_ms);
194                    assert!(
195                        duration < expected_delay_less_than,
196                        "duration should be less than {}ms but was {}ms",
197                        expected_delay_less_than.as_millis(),
198                        duration.as_millis()
199                    );
200                }
201                *last_attempt_start_time = Some(Instant::now());
202
203                Err(anyhow::anyhow!("Upload failed"))
204            }
205
206            fn retry_policy(&self) -> FileUploadRetryPolicy {
207                FileUploadRetryPolicy {
208                    attempts: 4,
209                    delay_between_attempts: Duration::from_millis(self.delay_ms),
210                }
211            }
212        }
213
214        let uploader = FileUploaderAssertDelay {
215            delay_ms: 50,
216            last_attempt_start_time: Mutex::new(None),
217        };
218
219        uploader
220            .upload(&PathBuf::from("file_to_upload"))
221            .await
222            .expect_err("An error should be returned when all retries are done");
223    }
224}