mithril_aggregator/file_uploaders/
interface.rs

1use async_trait::async_trait;
2use mithril_common::{StdResult, entities::FileUri};
3use std::{path::Path, time::Duration};
4
5/// Policy for retrying file uploads.
6#[derive(Debug, PartialEq, Clone)]
7pub struct FileUploadRetryPolicy {
8    /// Number of attempts to upload a file.
9    pub attempts: usize,
10    /// Delay between two attempts.
11    pub delay_between_attempts: Duration,
12}
13
14impl FileUploadRetryPolicy {
15    /// Create a policy that never retries.
16    pub fn never() -> Self {
17        Self {
18            attempts: 1,
19            delay_between_attempts: Duration::from_secs(0),
20        }
21    }
22}
23
24impl Default for FileUploadRetryPolicy {
25    /// Create a default retry policy.
26    fn default() -> Self {
27        Self {
28            attempts: 3,
29            delay_between_attempts: Duration::from_secs(5),
30        }
31    }
32}
33
34/// FileUploader represents a file uploader interactor.
35/// It retries the upload operation according to the retry policy.
36#[cfg_attr(test, mockall::automock)]
37#[async_trait]
38pub trait FileUploader: Sync + Send {
39    /// Try to upload once.
40    async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri>;
41
42    /// Get the retry policy for this uploader.
43    fn retry_policy(&self) -> FileUploadRetryPolicy {
44        FileUploadRetryPolicy::never()
45    }
46
47    /// Upload a file with retries according to the retry policy.
48    async fn upload(&self, filepath: &Path) -> StdResult<FileUri> {
49        let retry_policy = self.retry_policy();
50
51        let mut nb_attempts = 0;
52        loop {
53            nb_attempts += 1;
54            match self.upload_without_retry(filepath).await {
55                Ok(result) => return Ok(result),
56                Err(e) if nb_attempts >= retry_policy.attempts => {
57                    return Err(anyhow::anyhow!(e).context(format!(
58                        "Upload failed after {nb_attempts} attempts. Uploaded file path: {}",
59                        filepath.display()
60                    )));
61                }
62                _ => tokio::time::sleep(retry_policy.delay_between_attempts).await,
63            }
64        }
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use std::{path::PathBuf, sync::Mutex, time::Instant};
71
72    use super::*;
73    use anyhow::anyhow;
74    use mockall::{mock, predicate::eq};
75
76    mock! {
77        TestFileUploaderWithDefaultRetryPolicy {
78        }
79        #[async_trait]
80        impl FileUploader for TestFileUploaderWithDefaultRetryPolicy {
81            async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri>;
82        }
83    }
84
85    mock! {
86        TestFileUploader {
87        }
88
89        #[async_trait]
90        impl FileUploader for TestFileUploader {
91            async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri>;
92            fn retry_policy(&self) -> FileUploadRetryPolicy;
93        }
94    }
95
96    #[tokio::test]
97    async fn upload_return_the_result_of_upload_without_retry() {
98        let mut uploader = MockTestFileUploaderWithDefaultRetryPolicy::new();
99        uploader
100            .expect_upload_without_retry()
101            .with(eq(PathBuf::from("file_to_upload")))
102            .times(1)
103            .returning(|_| Ok(FileUri("file_uploaded".to_string())));
104
105        let file_uploaded = uploader.upload(Path::new("file_to_upload")).await.unwrap();
106        assert_eq!(FileUri("file_uploaded".to_string()), file_uploaded);
107    }
108
109    #[tokio::test]
110    async fn when_upload_fails_do_not_retry_by_default() {
111        let mut uploader = MockTestFileUploaderWithDefaultRetryPolicy::new();
112        uploader
113            .expect_upload_without_retry()
114            .with(eq(PathBuf::from("file_to_upload")))
115            .times(1)
116            .returning(|_| Err(anyhow!("Failure while uploading...")));
117
118        uploader
119            .upload(Path::new("file_to_upload"))
120            .await
121            .expect_err("Should fail on upload");
122    }
123
124    #[tokio::test]
125    async fn should_retry_if_fail() {
126        let mut uploader = MockTestFileUploader::new();
127
128        uploader.expect_retry_policy().returning(|| FileUploadRetryPolicy {
129            attempts: 50,
130            delay_between_attempts: Duration::ZERO,
131        });
132
133        uploader
134            .expect_upload_without_retry()
135            .with(eq(PathBuf::from("file_to_upload")))
136            .times(2)
137            .returning(|_| Err(anyhow!("Failure while uploading...")));
138        uploader
139            .expect_upload_without_retry()
140            .with(eq(PathBuf::from("file_to_upload")))
141            .times(1)
142            .returning(|_| Ok(FileUri("file_uploaded".to_string())));
143
144        let file_uploaded = uploader.upload(Path::new("file_to_upload")).await.unwrap();
145        assert_eq!(FileUri("file_uploaded".to_string()), file_uploaded);
146    }
147
148    #[tokio::test]
149    async fn should_recall_a_failing_inner_uploader_up_to_the_limit() {
150        let mut uploader = MockTestFileUploader::new();
151
152        uploader.expect_retry_policy().returning(|| FileUploadRetryPolicy {
153            attempts: 4,
154            delay_between_attempts: Duration::ZERO,
155        });
156
157        uploader
158            .expect_upload_without_retry()
159            .with(eq(PathBuf::from("file_to_upload")))
160            .times(4)
161            .returning(|_| Err(anyhow!("Failure while uploading...")));
162
163        uploader
164            .upload(&PathBuf::from("file_to_upload"))
165            .await
166            .expect_err("An error should be returned when all retries are done");
167    }
168
169    #[tokio::test]
170    async fn should_delay_between_retries() {
171        struct FileUploaderAssertDelay {
172            delay_ms: u64,
173            last_attempt_start_time: Mutex<Option<Instant>>,
174        }
175
176        #[async_trait]
177        impl FileUploader for FileUploaderAssertDelay {
178            async fn upload_without_retry(&self, _filepath: &Path) -> StdResult<FileUri> {
179                let mut last_attempt_start_time = self.last_attempt_start_time.lock().unwrap();
180                if let Some(last_start_attempt) = *last_attempt_start_time {
181                    let duration = last_start_attempt.elapsed();
182                    let expected_delay_greater_than_or_equal = Duration::from_millis(self.delay_ms);
183                    assert!(
184                        duration >= expected_delay_greater_than_or_equal,
185                        "duration should be greater than or equal to {}ms but was {}ms",
186                        expected_delay_greater_than_or_equal.as_millis(),
187                        duration.as_millis()
188                    );
189                    let expected_delay_less_than = Duration::from_millis(2 * self.delay_ms);
190                    assert!(
191                        duration < expected_delay_less_than,
192                        "duration should be less than {}ms but was {}ms",
193                        expected_delay_less_than.as_millis(),
194                        duration.as_millis()
195                    );
196                }
197                *last_attempt_start_time = Some(Instant::now());
198
199                Err(anyhow::anyhow!("Upload failed"))
200            }
201
202            fn retry_policy(&self) -> FileUploadRetryPolicy {
203                FileUploadRetryPolicy {
204                    attempts: 4,
205                    delay_between_attempts: Duration::from_millis(self.delay_ms),
206                }
207            }
208        }
209
210        let uploader = FileUploaderAssertDelay {
211            delay_ms: 50,
212            last_attempt_start_time: Mutex::new(None),
213        };
214
215        uploader
216            .upload(&PathBuf::from("file_to_upload"))
217            .await
218            .expect_err("An error should be returned when all retries are done");
219    }
220}