mithril_aggregator/http_server/routes/artifact_routes/
snapshot.rs

1use crate::http_server::routes::middlewares;
2use crate::http_server::routes::router::RouterState;
3use warp::Filter;
4
5pub fn routes(
6    router_state: &RouterState,
7) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
8    artifact_cardano_full_immutable_snapshots(router_state)
9        .or(artifact_cardano_full_immutable_snapshot_by_id(router_state))
10        .or(serve_snapshots_dir(router_state))
11        .or(snapshot_download(router_state))
12}
13
14/// GET /artifact/snapshots
15fn artifact_cardano_full_immutable_snapshots(
16    router_state: &RouterState,
17) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
18    warp::path!("artifact" / "snapshots")
19        .and(warp::get())
20        .and(middlewares::with_logger(router_state))
21        .and(middlewares::with_http_message_service(router_state))
22        .and_then(handlers::list_artifacts)
23}
24
25/// GET /artifact/snapshot/:id
26fn artifact_cardano_full_immutable_snapshot_by_id(
27    dependency_manager: &RouterState,
28) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
29    warp::path!("artifact" / "snapshot" / String)
30        .and(warp::get())
31        .and(middlewares::with_client_metadata(dependency_manager))
32        .and(middlewares::with_logger(dependency_manager))
33        .and(middlewares::with_http_message_service(dependency_manager))
34        .and(middlewares::with_metrics_service(dependency_manager))
35        .and_then(handlers::get_artifact_by_signed_entity_id)
36}
37
38/// GET /artifact/snapshots/{digest}/download
39fn snapshot_download(
40    router_state: &RouterState,
41) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
42    warp::path!("artifact" / "snapshot" / String / "download")
43        .and(warp::get().or(warp::head()).unify())
44        .and(middlewares::with_logger(router_state))
45        .and(middlewares::extract_config(router_state, |config| {
46            config.server_url.clone()
47        }))
48        .and(middlewares::with_signed_entity_service(router_state))
49        .and_then(handlers::snapshot_download)
50}
51
52fn serve_snapshots_dir(
53    router_state: &RouterState,
54) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
55    warp::path(crate::http_server::SNAPSHOT_DOWNLOAD_PATH)
56        .and(warp::fs::dir(
57            router_state.configuration.snapshot_directory.clone(),
58        ))
59        .and(middlewares::with_logger(router_state))
60        .and(middlewares::with_signed_entity_service(router_state))
61        .and(middlewares::extract_config(router_state, |config| {
62            config.allow_http_serve_directory
63        }))
64        .and_then(handlers::ensure_downloaded_file_is_a_snapshot)
65}
66
67mod handlers {
68    use slog::{Logger, debug, warn};
69    use std::convert::Infallible;
70    use std::str::FromStr;
71    use std::sync::Arc;
72    use warp::http::{StatusCode, Uri};
73
74    use mithril_common::StdResult;
75
76    use crate::http_server::routes::middlewares::ClientMetadata;
77    use crate::http_server::routes::reply;
78    use crate::services::{MessageService, SignedEntityService};
79    use crate::tools::url_sanitizer::SanitizedUrlWithTrailingSlash;
80    use crate::{MetricsService, unwrap_to_internal_server_error};
81
82    pub const LIST_MAX_ITEMS: usize = 20;
83
84    /// List Snapshot artifacts
85    pub async fn list_artifacts(
86        logger: Logger,
87        http_message_service: Arc<dyn MessageService>,
88    ) -> Result<impl warp::Reply, Infallible> {
89        match http_message_service.get_snapshot_list_message(LIST_MAX_ITEMS).await {
90            Ok(message) => Ok(reply::json(&message, StatusCode::OK)),
91            Err(err) => {
92                warn!(logger,"list_artifacts_snapshot"; "error" => ?err);
93                Ok(reply::server_error(err))
94            }
95        }
96    }
97
98    /// Get Artifact by signed entity id
99    pub async fn get_artifact_by_signed_entity_id(
100        signed_entity_id: String,
101        client_metadata: ClientMetadata,
102        logger: Logger,
103        http_message_service: Arc<dyn MessageService>,
104        metrics_service: Arc<MetricsService>,
105    ) -> Result<impl warp::Reply, Infallible> {
106        metrics_service
107            .get_artifact_detail_cardano_immutable_files_full_total_served_since_startup()
108            .increment(&[
109                client_metadata.origin_tag.as_deref().unwrap_or_default(),
110                client_metadata.client_type.as_deref().unwrap_or_default(),
111            ]);
112
113        match http_message_service.get_snapshot_message(&signed_entity_id).await {
114            Ok(Some(signed_entity)) => Ok(reply::json(&signed_entity, StatusCode::OK)),
115            Ok(None) => {
116                warn!(logger, "snapshot_details::not_found");
117                Ok(reply::empty(StatusCode::NOT_FOUND))
118            }
119            Err(err) => {
120                warn!(logger,"snapshot_details::error"; "error" => ?err);
121                Ok(reply::server_error(err))
122            }
123        }
124    }
125
126    /// Download a file if and only if it's a snapshot archive
127    pub async fn ensure_downloaded_file_is_a_snapshot(
128        reply: warp::fs::File,
129        logger: Logger,
130        signed_entity_service: Arc<dyn SignedEntityService>,
131        allow_http_serve_directory: bool,
132    ) -> Result<impl warp::Reply, Infallible> {
133        let filepath = reply.path().to_path_buf();
134        debug!(
135            logger,
136            ">> ensure_downloaded_file_is_a_snapshot / file: `{}`",
137            filepath.display()
138        );
139
140        if !allow_http_serve_directory {
141            warn!(logger, "ensure_downloaded_file_is_a_snapshot::error"; "error" => "http serve directory is disabled");
142            return Ok(reply::empty(StatusCode::FORBIDDEN));
143        }
144
145        match crate::tools::extract_digest_from_path(&filepath) {
146            Ok(digest) => match signed_entity_service.get_signed_snapshot_by_id(&digest).await {
147                Ok(Some(_)) => Ok(reply::add_content_disposition_header(reply, &filepath)),
148                _ => Ok(reply::empty(StatusCode::NOT_FOUND)),
149            },
150            Err(err) => {
151                warn!(logger,"ensure_downloaded_file_is_a_snapshot::error"; "error" => ?err);
152                Ok(reply::empty(StatusCode::NOT_FOUND))
153            }
154        }
155    }
156
157    /// Snapshot download
158    pub async fn snapshot_download(
159        digest: String,
160        logger: Logger,
161        server_url: SanitizedUrlWithTrailingSlash,
162        signed_entity_service: Arc<dyn SignedEntityService>,
163    ) -> Result<impl warp::Reply, Infallible> {
164        match signed_entity_service.get_signed_snapshot_by_id(&digest).await {
165            Ok(Some(signed_entity)) => {
166                let snapshot = signed_entity.artifact;
167                let filename = format!(
168                    "{}-e{}-i{}.{}.{}",
169                    snapshot.network,
170                    snapshot.beacon.epoch,
171                    snapshot.beacon.immutable_file_number,
172                    snapshot.digest,
173                    snapshot.compression_algorithm.tar_file_extension()
174                );
175                let snapshot_uri = unwrap_to_internal_server_error!(
176                    absolute_snapshot_uri(&server_url, &filename),
177                    logger => "snapshot_download::error"
178                );
179
180                Ok(Box::new(warp::redirect::found(snapshot_uri)) as Box<dyn warp::Reply>)
181            }
182            Ok(None) => {
183                warn!(logger, "snapshot_download::not_found");
184                Ok(reply::empty(StatusCode::NOT_FOUND))
185            }
186            Err(err) => {
187                warn!(logger,"snapshot_download::error"; "error" => ?err);
188                Ok(reply::server_error(err))
189            }
190        }
191    }
192
193    fn absolute_snapshot_uri(
194        server_url: &SanitizedUrlWithTrailingSlash,
195        filename: &str,
196    ) -> StdResult<Uri> {
197        // warp requires an `Uri` from the `http` crate, but it does not have a 'join' method
198        let sanitized_url = server_url.join(&format!("snapshot_download/{filename}"))?;
199        let snapshot_uri = Uri::from_str(sanitized_url.as_str())?;
200        Ok(snapshot_uri)
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use serde_json::Value::Null;
207    use std::sync::Arc;
208    use warp::{
209        http::{Method, StatusCode},
210        test::request,
211    };
212
213    use mithril_api_spec::APISpec;
214    use mithril_common::{
215        MITHRIL_CLIENT_TYPE_HEADER, MITHRIL_ORIGIN_TAG_HEADER,
216        entities::{CardanoDbBeacon, SignedEntityType, Snapshot},
217        messages::{SnapshotListItemMessage, SnapshotMessage},
218        test_utils::fake_data,
219    };
220    use mithril_persistence::sqlite::HydrationError;
221
222    use crate::{
223        http_server::routes::{artifact_routes::test_utils::*, router::RouterConfig},
224        initialize_dependencies,
225        services::{MockMessageService, MockSignedEntityService},
226        tools::url_sanitizer::SanitizedUrlWithTrailingSlash,
227    };
228
229    use super::*;
230
231    fn setup_router(
232        state: RouterState,
233    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
234        let cors = warp::cors()
235            .allow_any_origin()
236            .allow_headers(vec!["content-type"])
237            .allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS]);
238
239        warp::any().and(routes(&state).with(cors))
240    }
241
242    #[tokio::test]
243    async fn test_snapshots_get_ok() {
244        let mut mock_http_message_service = MockMessageService::new();
245        mock_http_message_service
246            .expect_get_snapshot_list_message()
247            .return_once(|_| Ok(vec![SnapshotListItemMessage::dummy()]))
248            .once();
249        let mut dependency_manager = initialize_dependencies!().await;
250        dependency_manager.message_service = Arc::new(mock_http_message_service);
251
252        let method = Method::GET.as_str();
253        let path = "/artifact/snapshots";
254
255        let response = request()
256            .method(method)
257            .path(path)
258            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
259                dependency_manager,
260            ))))
261            .await;
262
263        APISpec::verify_conformity(
264            APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
265            method,
266            path,
267            "application/json",
268            &Null,
269            &response,
270            &StatusCode::OK,
271        )
272        .unwrap();
273    }
274
275    #[tokio::test]
276    async fn test_snapshots_get_ko() {
277        let mut mock_http_message_service = MockMessageService::new();
278        mock_http_message_service
279            .expect_get_snapshot_list_message()
280            .return_once(|_| Err(HydrationError::InvalidData("invalid data".to_string()).into()))
281            .once();
282        let mut dependency_manager = initialize_dependencies!().await;
283        dependency_manager.message_service = Arc::new(mock_http_message_service);
284
285        let method = Method::GET.as_str();
286        let path = "/artifact/snapshots";
287
288        let response = request()
289            .method(method)
290            .path(path)
291            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
292                dependency_manager,
293            ))))
294            .await;
295
296        APISpec::verify_conformity(
297            APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
298            method,
299            path,
300            "application/json",
301            &Null,
302            &response,
303            &StatusCode::INTERNAL_SERVER_ERROR,
304        )
305        .unwrap();
306    }
307
308    #[tokio::test]
309    async fn test_snapshot_digest_increments_artifact_detail_total_served_since_startup_metric() {
310        let method = Method::GET.as_str();
311        let path = "/artifact/snapshot/{digest}";
312        let dependency_manager = Arc::new(initialize_dependencies!().await);
313        let initial_counter_value = dependency_manager
314            .metrics_service
315            .get_artifact_detail_cardano_immutable_files_full_total_served_since_startup()
316            .get(&["TEST", "CLI"]);
317
318        request()
319            .method(method)
320            .path(path)
321            .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
322            .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
323            .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
324                dependency_manager.clone(),
325                &["TEST"],
326            )))
327            .await;
328
329        assert_eq!(
330            initial_counter_value + 1,
331            dependency_manager
332                .metrics_service
333                .get_artifact_detail_cardano_immutable_files_full_total_served_since_startup()
334                .get(&["TEST", "CLI"])
335        );
336    }
337
338    #[tokio::test]
339    async fn test_snapshot_digest_get_ok() {
340        let mut mock_http_message_service = MockMessageService::new();
341        mock_http_message_service
342            .expect_get_snapshot_message()
343            .return_once(|_| Ok(Some(SnapshotMessage::dummy())))
344            .once();
345        let mut dependency_manager = initialize_dependencies!().await;
346        dependency_manager.message_service = Arc::new(mock_http_message_service);
347
348        let method = Method::GET.as_str();
349        let path = "/artifact/snapshot/{digest}";
350
351        let response = request()
352            .method(method)
353            .path(path)
354            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
355                dependency_manager,
356            ))))
357            .await;
358
359        APISpec::verify_conformity(
360            APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
361            method,
362            path,
363            "application/json",
364            &Null,
365            &response,
366            &StatusCode::OK,
367        )
368        .unwrap();
369    }
370
371    #[tokio::test]
372    async fn test_snapshot_digest_returns_404_not_found_when_no_snapshot() {
373        let mut mock_http_message_service = MockMessageService::new();
374        mock_http_message_service
375            .expect_get_snapshot_message()
376            .return_once(|_| Ok(None))
377            .once();
378        let mut dependency_manager = initialize_dependencies!().await;
379        dependency_manager.message_service = Arc::new(mock_http_message_service);
380
381        let method = Method::GET.as_str();
382        let path = "/artifact/snapshot/{digest}";
383
384        let response = request()
385            .method(method)
386            .path(path)
387            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
388                dependency_manager,
389            ))))
390            .await;
391
392        APISpec::verify_conformity(
393            APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
394            method,
395            path,
396            "application/json",
397            &Null,
398            &response,
399            &StatusCode::NOT_FOUND,
400        )
401        .unwrap();
402    }
403
404    #[tokio::test]
405    async fn test_snapshot_digest_get_ko() {
406        let mut mock_http_message_service = MockMessageService::new();
407        mock_http_message_service
408            .expect_get_snapshot_message()
409            .return_once(|_| Err(HydrationError::InvalidData("invalid data".to_string()).into()))
410            .once();
411        let mut dependency_manager = initialize_dependencies!().await;
412        dependency_manager.message_service = Arc::new(mock_http_message_service);
413
414        let method = Method::GET.as_str();
415        let path = "/artifact/snapshot/{digest}";
416
417        let response = request()
418            .method(method)
419            .path(path)
420            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
421                dependency_manager,
422            ))))
423            .await;
424
425        APISpec::verify_conformity(
426            APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
427            method,
428            path,
429            "application/json",
430            &Null,
431            &response,
432            &StatusCode::INTERNAL_SERVER_ERROR,
433        )
434        .unwrap();
435    }
436
437    #[tokio::test]
438    async fn test_snapshot_local_download_returns_302_found_when_the_snapshot_exists() {
439        let network = "devnet";
440        let signed_entity = create_signed_entity(
441            SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
442            Snapshot {
443                beacon: CardanoDbBeacon::new(1, 10),
444                ..fake_data::snapshots(1)[0].clone()
445            },
446        );
447        let mut mock_signed_entity_service = MockSignedEntityService::new();
448        mock_signed_entity_service
449            .expect_get_signed_snapshot_by_id()
450            .return_once(|_| Ok(Some(signed_entity)))
451            .once();
452        let mut dependency_manager = initialize_dependencies!().await;
453        dependency_manager.signed_entity_service = Arc::new(mock_signed_entity_service);
454
455        let method = Method::GET.as_str();
456        let path = "/artifact/snapshot/{digest}/download";
457
458        let response = request()
459            .method(method)
460            .path(path)
461            .reply(&setup_router(RouterState::new(
462                Arc::new(dependency_manager),
463                RouterConfig {
464                    server_url: SanitizedUrlWithTrailingSlash::parse("https://1.2.3.4:5552/")
465                        .unwrap(),
466                    ..RouterConfig::dummy()
467                },
468            )))
469            .await;
470
471        assert_eq!(response.status(), StatusCode::FOUND);
472        let location = std::str::from_utf8(response.headers()["location"].as_bytes())
473            .unwrap()
474            .to_string();
475        assert!(
476            location.contains(&format!("https://1.2.3.4:5552/snapshot_download/{network}")),
477            "Expected value 'https://1.2.3.4:5552/snapshot_download/testnet' not found in {location}",
478        );
479    }
480
481    #[tokio::test]
482    async fn test_snapshot_download_returns_404_not_found_when_no_snapshot() {
483        let mut mock_signed_entity_service = MockSignedEntityService::new();
484        mock_signed_entity_service
485            .expect_get_signed_snapshot_by_id()
486            .return_once(|_| Ok(None))
487            .once();
488        let mut dependency_manager = initialize_dependencies!().await;
489        dependency_manager.signed_entity_service = Arc::new(mock_signed_entity_service);
490
491        let method = Method::GET.as_str();
492        let path = "/artifact/snapshot/{digest}/download";
493
494        let response = request()
495            .method(method)
496            .path(path)
497            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
498                dependency_manager,
499            ))))
500            .await;
501
502        APISpec::verify_conformity(
503            APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
504            method,
505            path,
506            "application/gzip",
507            &Null,
508            &response,
509            &StatusCode::NOT_FOUND,
510        )
511        .unwrap();
512    }
513
514    #[tokio::test]
515    async fn test_snapshot_download_get_ko() {
516        let mut mock_signed_entity_service = MockSignedEntityService::new();
517        mock_signed_entity_service
518            .expect_get_signed_snapshot_by_id()
519            .return_once(|_| Err(HydrationError::InvalidData("invalid data".to_string()).into()))
520            .once();
521        let mut dependency_manager = initialize_dependencies!().await;
522        dependency_manager.signed_entity_service = Arc::new(mock_signed_entity_service);
523
524        let method = Method::GET.as_str();
525        let path = "/artifact/snapshot/{digest}/download";
526
527        let response = request()
528            .method(method)
529            .path(path)
530            .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
531                dependency_manager,
532            ))))
533            .await;
534
535        APISpec::verify_conformity(
536            APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
537            method,
538            path,
539            "application/json",
540            &Null,
541            &response,
542            &StatusCode::INTERNAL_SERVER_ERROR,
543        )
544        .unwrap();
545    }
546}