mithril_aggregator/http_server/routes/artifact_routes/
snapshot.rs

1use crate::http_server::routes::middlewares;
2use crate::http_server::routes::router::RouterState;
3use warp::Filter;
4
5pub fn routes(
6    router_state: &RouterState,
7) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
8    artifact_cardano_full_immutable_snapshots(router_state)
9        .or(artifact_cardano_full_immutable_snapshot_by_id(router_state))
10        .or(serve_snapshots_dir(router_state))
11        .or(snapshot_download(router_state))
12}
13
14/// GET /artifact/snapshots
15fn artifact_cardano_full_immutable_snapshots(
16    router_state: &RouterState,
17) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
18    warp::path!("artifact" / "snapshots")
19        .and(warp::get())
20        .and(middlewares::with_logger(router_state))
21        .and(middlewares::with_http_message_service(router_state))
22        .and_then(handlers::list_artifacts)
23}
24
25/// GET /artifact/snapshot/:id
26fn artifact_cardano_full_immutable_snapshot_by_id(
27    dependency_manager: &RouterState,
28) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
29    warp::path!("artifact" / "snapshot" / String)
30        .and(warp::get())
31        .and(middlewares::with_logger(dependency_manager))
32        .and(middlewares::with_http_message_service(dependency_manager))
33        .and(middlewares::with_metrics_service(dependency_manager))
34        .and_then(handlers::get_artifact_by_signed_entity_id)
35}
36
37/// GET /artifact/snapshots/{digest}/download
38fn snapshot_download(
39    router_state: &RouterState,
40) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
41    warp::path!("artifact" / "snapshot" / String / "download")
42        .and(warp::get().or(warp::head()).unify())
43        .and(middlewares::with_logger(router_state))
44        .and(middlewares::extract_config(router_state, |config| {
45            config.server_url.clone()
46        }))
47        .and(middlewares::with_signed_entity_service(router_state))
48        .and_then(handlers::snapshot_download)
49}
50
51fn serve_snapshots_dir(
52    router_state: &RouterState,
53) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
54    warp::path(crate::http_server::SNAPSHOT_DOWNLOAD_PATH)
55        .and(warp::fs::dir(
56            router_state.configuration.snapshot_directory.clone(),
57        ))
58        .and(middlewares::with_logger(router_state))
59        .and(middlewares::with_signed_entity_service(router_state))
60        .and(middlewares::extract_config(router_state, |config| {
61            config.allow_http_serve_directory
62        }))
63        .and_then(handlers::ensure_downloaded_file_is_a_snapshot)
64}
65
66mod handlers {
67    use slog::{debug, warn, Logger};
68    use std::convert::Infallible;
69    use std::str::FromStr;
70    use std::sync::Arc;
71    use warp::http::{StatusCode, Uri};
72
73    use mithril_common::StdResult;
74
75    use crate::http_server::routes::reply;
76    use crate::services::{MessageService, SignedEntityService};
77    use crate::tools::url_sanitizer::SanitizedUrlWithTrailingSlash;
78    use crate::{unwrap_to_internal_server_error, MetricsService};
79
80    pub const LIST_MAX_ITEMS: usize = 20;
81
82    /// List Snapshot artifacts
83    pub async fn list_artifacts(
84        logger: Logger,
85        http_message_service: Arc<dyn MessageService>,
86    ) -> Result<impl warp::Reply, Infallible> {
87        match http_message_service
88            .get_snapshot_list_message(LIST_MAX_ITEMS)
89            .await
90        {
91            Ok(message) => Ok(reply::json(&message, StatusCode::OK)),
92            Err(err) => {
93                warn!(logger,"list_artifacts_snapshot"; "error" => ?err);
94                Ok(reply::server_error(err))
95            }
96        }
97    }
98
99    /// Get Artifact by signed entity id
100    pub async fn get_artifact_by_signed_entity_id(
101        signed_entity_id: String,
102        logger: Logger,
103        http_message_service: Arc<dyn MessageService>,
104        metrics_service: Arc<MetricsService>,
105    ) -> Result<impl warp::Reply, Infallible> {
106        metrics_service
107            .get_artifact_detail_cardano_immutable_files_full_total_served_since_startup()
108            .increment();
109
110        match http_message_service
111            .get_snapshot_message(&signed_entity_id)
112            .await
113        {
114            Ok(Some(signed_entity)) => Ok(reply::json(&signed_entity, StatusCode::OK)),
115            Ok(None) => {
116                warn!(logger, "snapshot_details::not_found");
117                Ok(reply::empty(StatusCode::NOT_FOUND))
118            }
119            Err(err) => {
120                warn!(logger,"snapshot_details::error"; "error" => ?err);
121                Ok(reply::server_error(err))
122            }
123        }
124    }
125
126    /// Download a file if and only if it's a snapshot archive
127    pub async fn ensure_downloaded_file_is_a_snapshot(
128        reply: warp::fs::File,
129        logger: Logger,
130        signed_entity_service: Arc<dyn SignedEntityService>,
131        allow_http_serve_directory: bool,
132    ) -> Result<impl warp::Reply, Infallible> {
133        let filepath = reply.path().to_path_buf();
134        debug!(
135            logger,
136            ">> ensure_downloaded_file_is_a_snapshot / file: `{}`",
137            filepath.display()
138        );
139
140        if !allow_http_serve_directory {
141            warn!(logger, "ensure_downloaded_file_is_a_snapshot::error"; "error" => "http serve directory is disabled");
142            return Ok(reply::empty(StatusCode::FORBIDDEN));
143        }
144
145        match crate::tools::extract_digest_from_path(&filepath) {
146            Ok(digest) => match signed_entity_service
147                .get_signed_snapshot_by_id(&digest)
148                .await
149            {
150                Ok(Some(_)) => Ok(reply::add_content_disposition_header(reply, &filepath)),
151                _ => Ok(reply::empty(StatusCode::NOT_FOUND)),
152            },
153            Err(err) => {
154                warn!(logger,"ensure_downloaded_file_is_a_snapshot::error"; "error" => ?err);
155                Ok(reply::empty(StatusCode::NOT_FOUND))
156            }
157        }
158    }
159
160    /// Snapshot download
161    pub async fn snapshot_download(
162        digest: String,
163        logger: Logger,
164        server_url: SanitizedUrlWithTrailingSlash,
165        signed_entity_service: Arc<dyn SignedEntityService>,
166    ) -> Result<impl warp::Reply, Infallible> {
167        match signed_entity_service
168            .get_signed_snapshot_by_id(&digest)
169            .await
170        {
171            Ok(Some(signed_entity)) => {
172                let snapshot = signed_entity.artifact;
173                let filename = format!(
174                    "{}-e{}-i{}.{}.{}",
175                    snapshot.network,
176                    snapshot.beacon.epoch,
177                    snapshot.beacon.immutable_file_number,
178                    snapshot.digest,
179                    snapshot.compression_algorithm.tar_file_extension()
180                );
181                let snapshot_uri = unwrap_to_internal_server_error!(
182                    absolute_snapshot_uri(&server_url, &filename),
183                    logger => "snapshot_download::error"
184                );
185
186                Ok(Box::new(warp::redirect::found(snapshot_uri)) as Box<dyn warp::Reply>)
187            }
188            Ok(None) => {
189                warn!(logger, "snapshot_download::not_found");
190                Ok(reply::empty(StatusCode::NOT_FOUND))
191            }
192            Err(err) => {
193                warn!(logger,"snapshot_download::error"; "error" => ?err);
194                Ok(reply::server_error(err))
195            }
196        }
197    }
198
199    fn absolute_snapshot_uri(
200        server_url: &SanitizedUrlWithTrailingSlash,
201        filename: &str,
202    ) -> StdResult<Uri> {
203        // warp requires an `Uri` from the `http` crate, but it does not have a 'join' method
204        let sanitized_url = server_url.join(&format!("snapshot_download/{filename}"))?;
205        let snapshot_uri = Uri::from_str(sanitized_url.as_str())?;
206        Ok(snapshot_uri)
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use crate::http_server::routes::artifact_routes::test_utils::*;
214    use crate::http_server::routes::router::RouterConfig;
215    use crate::tools::url_sanitizer::SanitizedUrlWithTrailingSlash;
216    use crate::{
217        initialize_dependencies,
218        services::{MockMessageService, MockSignedEntityService},
219    };
220    use mithril_common::messages::{SnapshotListItemMessage, SnapshotMessage};
221    use mithril_common::{
222        entities::{CardanoDbBeacon, SignedEntityType, Snapshot},
223        test_utils::{apispec::APISpec, fake_data},
224    };
225    use mithril_persistence::sqlite::HydrationError;
226    use serde_json::Value::Null;
227    use std::sync::Arc;
228    use warp::{
229        http::{Method, StatusCode},
230        test::request,
231    };
232
233    fn setup_router(
234        state: RouterState,
235    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
236        let cors = warp::cors()
237            .allow_any_origin()
238            .allow_headers(vec!["content-type"])
239            .allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS]);
240
241        warp::any().and(routes(&state).with(cors))
242    }
243
244    #[tokio::test]
245    async fn test_snapshots_get_ok() {
246        let mut mock_http_message_service = MockMessageService::new();
247        mock_http_message_service
248            .expect_get_snapshot_list_message()
249            .return_once(|_| Ok(vec![SnapshotListItemMessage::dummy()]))
250            .once();
251        let mut dependency_manager = initialize_dependencies!().await;
252        dependency_manager.message_service = Arc::new(mock_http_message_service);
253
254        let method = Method::GET.as_str();
255        let path = "/artifact/snapshots";
256
257        let response = request()
258            .method(method)
259            .path(path)
260            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
261                dependency_manager,
262            ))))
263            .await;
264
265        APISpec::verify_conformity(
266            APISpec::get_all_spec_files(),
267            method,
268            path,
269            "application/json",
270            &Null,
271            &response,
272            &StatusCode::OK,
273        )
274        .unwrap();
275    }
276
277    #[tokio::test]
278    async fn test_snapshots_get_ko() {
279        let mut mock_http_message_service = MockMessageService::new();
280        mock_http_message_service
281            .expect_get_snapshot_list_message()
282            .return_once(|_| Err(HydrationError::InvalidData("invalid data".to_string()).into()))
283            .once();
284        let mut dependency_manager = initialize_dependencies!().await;
285        dependency_manager.message_service = Arc::new(mock_http_message_service);
286
287        let method = Method::GET.as_str();
288        let path = "/artifact/snapshots";
289
290        let response = request()
291            .method(method)
292            .path(path)
293            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
294                dependency_manager,
295            ))))
296            .await;
297
298        APISpec::verify_conformity(
299            APISpec::get_all_spec_files(),
300            method,
301            path,
302            "application/json",
303            &Null,
304            &response,
305            &StatusCode::INTERNAL_SERVER_ERROR,
306        )
307        .unwrap();
308    }
309
310    #[tokio::test]
311    async fn test_snapshot_digest_increments_artifact_detail_total_served_since_startup_metric() {
312        let method = Method::GET.as_str();
313        let path = "/artifact/snapshot/{digest}";
314        let dependency_manager = Arc::new(initialize_dependencies!().await);
315        let initial_counter_value = dependency_manager
316            .metrics_service
317            .get_artifact_detail_cardano_immutable_files_full_total_served_since_startup()
318            .get();
319
320        request()
321            .method(method)
322            .path(path)
323            .reply(&setup_router(RouterState::new_with_dummy_config(
324                dependency_manager.clone(),
325            )))
326            .await;
327
328        assert_eq!(
329            initial_counter_value + 1,
330            dependency_manager
331                .metrics_service
332                .get_artifact_detail_cardano_immutable_files_full_total_served_since_startup()
333                .get()
334        );
335    }
336
337    #[tokio::test]
338    async fn test_snapshot_digest_get_ok() {
339        let mut mock_http_message_service = MockMessageService::new();
340        mock_http_message_service
341            .expect_get_snapshot_message()
342            .return_once(|_| Ok(Some(SnapshotMessage::dummy())))
343            .once();
344        let mut dependency_manager = initialize_dependencies!().await;
345        dependency_manager.message_service = Arc::new(mock_http_message_service);
346
347        let method = Method::GET.as_str();
348        let path = "/artifact/snapshot/{digest}";
349
350        let response = request()
351            .method(method)
352            .path(path)
353            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
354                dependency_manager,
355            ))))
356            .await;
357
358        APISpec::verify_conformity(
359            APISpec::get_all_spec_files(),
360            method,
361            path,
362            "application/json",
363            &Null,
364            &response,
365            &StatusCode::OK,
366        )
367        .unwrap();
368    }
369
370    #[tokio::test]
371    async fn test_snapshot_digest_returns_404_not_found_when_no_snapshot() {
372        let mut mock_http_message_service = MockMessageService::new();
373        mock_http_message_service
374            .expect_get_snapshot_message()
375            .return_once(|_| Ok(None))
376            .once();
377        let mut dependency_manager = initialize_dependencies!().await;
378        dependency_manager.message_service = Arc::new(mock_http_message_service);
379
380        let method = Method::GET.as_str();
381        let path = "/artifact/snapshot/{digest}";
382
383        let response = request()
384            .method(method)
385            .path(path)
386            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
387                dependency_manager,
388            ))))
389            .await;
390
391        APISpec::verify_conformity(
392            APISpec::get_all_spec_files(),
393            method,
394            path,
395            "application/json",
396            &Null,
397            &response,
398            &StatusCode::NOT_FOUND,
399        )
400        .unwrap();
401    }
402
403    #[tokio::test]
404    async fn test_snapshot_digest_get_ko() {
405        let mut mock_http_message_service = MockMessageService::new();
406        mock_http_message_service
407            .expect_get_snapshot_message()
408            .return_once(|_| Err(HydrationError::InvalidData("invalid data".to_string()).into()))
409            .once();
410        let mut dependency_manager = initialize_dependencies!().await;
411        dependency_manager.message_service = Arc::new(mock_http_message_service);
412
413        let method = Method::GET.as_str();
414        let path = "/artifact/snapshot/{digest}";
415
416        let response = request()
417            .method(method)
418            .path(path)
419            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
420                dependency_manager,
421            ))))
422            .await;
423
424        APISpec::verify_conformity(
425            APISpec::get_all_spec_files(),
426            method,
427            path,
428            "application/json",
429            &Null,
430            &response,
431            &StatusCode::INTERNAL_SERVER_ERROR,
432        )
433        .unwrap();
434    }
435
436    #[tokio::test]
437    async fn test_snapshot_local_download_returns_302_found_when_the_snapshot_exists() {
438        let network = "devnet";
439        let signed_entity = create_signed_entity(
440            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
441            Snapshot {
442                beacon: CardanoDbBeacon::new(1, 10),
443                ..fake_data::snapshots(1)[0].clone()
444            },
445        );
446        let mut mock_signed_entity_service = MockSignedEntityService::new();
447        mock_signed_entity_service
448            .expect_get_signed_snapshot_by_id()
449            .return_once(|_| Ok(Some(signed_entity)))
450            .once();
451        let mut dependency_manager = initialize_dependencies!().await;
452        dependency_manager.signed_entity_service = Arc::new(mock_signed_entity_service);
453
454        let method = Method::GET.as_str();
455        let path = "/artifact/snapshot/{digest}/download";
456
457        let response = request()
458            .method(method)
459            .path(path)
460            .reply(&setup_router(RouterState::new(
461                Arc::new(dependency_manager),
462                RouterConfig {
463                    server_url: SanitizedUrlWithTrailingSlash::parse("https://1.2.3.4:5552/")
464                        .unwrap(),
465                    ..RouterConfig::dummy()
466                },
467            )))
468            .await;
469
470        assert_eq!(response.status(), StatusCode::FOUND);
471        let location = std::str::from_utf8(response.headers()["location"].as_bytes())
472            .unwrap()
473            .to_string();
474        assert!(
475            location.contains(&format!("https://1.2.3.4:5552/snapshot_download/{network}")),
476            "Expected value 'https://1.2.3.4:5552/snapshot_download/testnet' not found in {location}",
477        );
478    }
479
480    #[tokio::test]
481    async fn test_snapshot_download_returns_404_not_found_when_no_snapshot() {
482        let mut mock_signed_entity_service = MockSignedEntityService::new();
483        mock_signed_entity_service
484            .expect_get_signed_snapshot_by_id()
485            .return_once(|_| Ok(None))
486            .once();
487        let mut dependency_manager = initialize_dependencies!().await;
488        dependency_manager.signed_entity_service = Arc::new(mock_signed_entity_service);
489
490        let method = Method::GET.as_str();
491        let path = "/artifact/snapshot/{digest}/download";
492
493        let response = request()
494            .method(method)
495            .path(path)
496            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
497                dependency_manager,
498            ))))
499            .await;
500
501        APISpec::verify_conformity(
502            APISpec::get_all_spec_files(),
503            method,
504            path,
505            "application/gzip",
506            &Null,
507            &response,
508            &StatusCode::NOT_FOUND,
509        )
510        .unwrap();
511    }
512
513    #[tokio::test]
514    async fn test_snapshot_download_get_ko() {
515        let mut mock_signed_entity_service = MockSignedEntityService::new();
516        mock_signed_entity_service
517            .expect_get_signed_snapshot_by_id()
518            .return_once(|_| Err(HydrationError::InvalidData("invalid data".to_string()).into()))
519            .once();
520        let mut dependency_manager = initialize_dependencies!().await;
521        dependency_manager.signed_entity_service = Arc::new(mock_signed_entity_service);
522
523        let method = Method::GET.as_str();
524        let path = "/artifact/snapshot/{digest}/download";
525
526        let response = request()
527            .method(method)
528            .path(path)
529            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
530                dependency_manager,
531            ))))
532            .await;
533
534        APISpec::verify_conformity(
535            APISpec::get_all_spec_files(),
536            method,
537            path,
538            "application/json",
539            &Null,
540            &response,
541            &StatusCode::INTERNAL_SERVER_ERROR,
542        )
543        .unwrap();
544    }
545}