mithril_aggregator/http_server/routes/artifact_routes/
snapshot.rs1use crate::http_server::routes::middlewares;
2use crate::http_server::routes::router::RouterState;
3use warp::Filter;
4
5pub fn routes(
6 router_state: &RouterState,
7) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
8 artifact_cardano_full_immutable_snapshots(router_state)
9 .or(artifact_cardano_full_immutable_snapshot_by_id(router_state))
10 .or(serve_snapshots_dir(router_state))
11 .or(snapshot_download(router_state))
12}
13
14fn artifact_cardano_full_immutable_snapshots(
16 router_state: &RouterState,
17) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
18 warp::path!("artifact" / "snapshots")
19 .and(warp::get())
20 .and(middlewares::with_logger(router_state))
21 .and(middlewares::with_http_message_service(router_state))
22 .and_then(handlers::list_artifacts)
23}
24
25fn artifact_cardano_full_immutable_snapshot_by_id(
27 dependency_manager: &RouterState,
28) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
29 warp::path!("artifact" / "snapshot" / String)
30 .and(warp::get())
31 .and(middlewares::with_logger(dependency_manager))
32 .and(middlewares::with_http_message_service(dependency_manager))
33 .and(middlewares::with_metrics_service(dependency_manager))
34 .and_then(handlers::get_artifact_by_signed_entity_id)
35}
36
37fn snapshot_download(
39 router_state: &RouterState,
40) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
41 warp::path!("artifact" / "snapshot" / String / "download")
42 .and(warp::get().or(warp::head()).unify())
43 .and(middlewares::with_logger(router_state))
44 .and(middlewares::extract_config(router_state, |config| {
45 config.server_url.clone()
46 }))
47 .and(middlewares::with_signed_entity_service(router_state))
48 .and_then(handlers::snapshot_download)
49}
50
51fn serve_snapshots_dir(
52 router_state: &RouterState,
53) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
54 warp::path(crate::http_server::SNAPSHOT_DOWNLOAD_PATH)
55 .and(warp::fs::dir(
56 router_state.configuration.snapshot_directory.clone(),
57 ))
58 .and(middlewares::with_logger(router_state))
59 .and(middlewares::with_signed_entity_service(router_state))
60 .and(middlewares::extract_config(router_state, |config| {
61 config.allow_http_serve_directory
62 }))
63 .and_then(handlers::ensure_downloaded_file_is_a_snapshot)
64}
65
66mod handlers {
67 use slog::{debug, warn, Logger};
68 use std::convert::Infallible;
69 use std::str::FromStr;
70 use std::sync::Arc;
71 use warp::http::{StatusCode, Uri};
72
73 use mithril_common::StdResult;
74
75 use crate::http_server::routes::reply;
76 use crate::services::{MessageService, SignedEntityService};
77 use crate::tools::url_sanitizer::SanitizedUrlWithTrailingSlash;
78 use crate::{unwrap_to_internal_server_error, MetricsService};
79
80 pub const LIST_MAX_ITEMS: usize = 20;
81
82 pub async fn list_artifacts(
84 logger: Logger,
85 http_message_service: Arc<dyn MessageService>,
86 ) -> Result<impl warp::Reply, Infallible> {
87 match http_message_service
88 .get_snapshot_list_message(LIST_MAX_ITEMS)
89 .await
90 {
91 Ok(message) => Ok(reply::json(&message, StatusCode::OK)),
92 Err(err) => {
93 warn!(logger,"list_artifacts_snapshot"; "error" => ?err);
94 Ok(reply::server_error(err))
95 }
96 }
97 }
98
99 pub async fn get_artifact_by_signed_entity_id(
101 signed_entity_id: String,
102 logger: Logger,
103 http_message_service: Arc<dyn MessageService>,
104 metrics_service: Arc<MetricsService>,
105 ) -> Result<impl warp::Reply, Infallible> {
106 metrics_service
107 .get_artifact_detail_cardano_immutable_files_full_total_served_since_startup()
108 .increment();
109
110 match http_message_service
111 .get_snapshot_message(&signed_entity_id)
112 .await
113 {
114 Ok(Some(signed_entity)) => Ok(reply::json(&signed_entity, StatusCode::OK)),
115 Ok(None) => {
116 warn!(logger, "snapshot_details::not_found");
117 Ok(reply::empty(StatusCode::NOT_FOUND))
118 }
119 Err(err) => {
120 warn!(logger,"snapshot_details::error"; "error" => ?err);
121 Ok(reply::server_error(err))
122 }
123 }
124 }
125
126 pub async fn ensure_downloaded_file_is_a_snapshot(
128 reply: warp::fs::File,
129 logger: Logger,
130 signed_entity_service: Arc<dyn SignedEntityService>,
131 allow_http_serve_directory: bool,
132 ) -> Result<impl warp::Reply, Infallible> {
133 let filepath = reply.path().to_path_buf();
134 debug!(
135 logger,
136 ">> ensure_downloaded_file_is_a_snapshot / file: `{}`",
137 filepath.display()
138 );
139
140 if !allow_http_serve_directory {
141 warn!(logger, "ensure_downloaded_file_is_a_snapshot::error"; "error" => "http serve directory is disabled");
142 return Ok(reply::empty(StatusCode::FORBIDDEN));
143 }
144
145 match crate::tools::extract_digest_from_path(&filepath) {
146 Ok(digest) => match signed_entity_service
147 .get_signed_snapshot_by_id(&digest)
148 .await
149 {
150 Ok(Some(_)) => Ok(reply::add_content_disposition_header(reply, &filepath)),
151 _ => Ok(reply::empty(StatusCode::NOT_FOUND)),
152 },
153 Err(err) => {
154 warn!(logger,"ensure_downloaded_file_is_a_snapshot::error"; "error" => ?err);
155 Ok(reply::empty(StatusCode::NOT_FOUND))
156 }
157 }
158 }
159
160 pub async fn snapshot_download(
162 digest: String,
163 logger: Logger,
164 server_url: SanitizedUrlWithTrailingSlash,
165 signed_entity_service: Arc<dyn SignedEntityService>,
166 ) -> Result<impl warp::Reply, Infallible> {
167 match signed_entity_service
168 .get_signed_snapshot_by_id(&digest)
169 .await
170 {
171 Ok(Some(signed_entity)) => {
172 let snapshot = signed_entity.artifact;
173 let filename = format!(
174 "{}-e{}-i{}.{}.{}",
175 snapshot.network,
176 snapshot.beacon.epoch,
177 snapshot.beacon.immutable_file_number,
178 snapshot.digest,
179 snapshot.compression_algorithm.tar_file_extension()
180 );
181 let snapshot_uri = unwrap_to_internal_server_error!(
182 absolute_snapshot_uri(&server_url, &filename),
183 logger => "snapshot_download::error"
184 );
185
186 Ok(Box::new(warp::redirect::found(snapshot_uri)) as Box<dyn warp::Reply>)
187 }
188 Ok(None) => {
189 warn!(logger, "snapshot_download::not_found");
190 Ok(reply::empty(StatusCode::NOT_FOUND))
191 }
192 Err(err) => {
193 warn!(logger,"snapshot_download::error"; "error" => ?err);
194 Ok(reply::server_error(err))
195 }
196 }
197 }
198
199 fn absolute_snapshot_uri(
200 server_url: &SanitizedUrlWithTrailingSlash,
201 filename: &str,
202 ) -> StdResult<Uri> {
203 let sanitized_url = server_url.join(&format!("snapshot_download/{filename}"))?;
205 let snapshot_uri = Uri::from_str(sanitized_url.as_str())?;
206 Ok(snapshot_uri)
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use crate::http_server::routes::artifact_routes::test_utils::*;
214 use crate::http_server::routes::router::RouterConfig;
215 use crate::tools::url_sanitizer::SanitizedUrlWithTrailingSlash;
216 use crate::{
217 initialize_dependencies,
218 services::{MockMessageService, MockSignedEntityService},
219 };
220 use mithril_common::messages::{SnapshotListItemMessage, SnapshotMessage};
221 use mithril_common::{
222 entities::{CardanoDbBeacon, SignedEntityType, Snapshot},
223 test_utils::{apispec::APISpec, fake_data},
224 };
225 use mithril_persistence::sqlite::HydrationError;
226 use serde_json::Value::Null;
227 use std::sync::Arc;
228 use warp::{
229 http::{Method, StatusCode},
230 test::request,
231 };
232
233 fn setup_router(
234 state: RouterState,
235 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
236 let cors = warp::cors()
237 .allow_any_origin()
238 .allow_headers(vec!["content-type"])
239 .allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS]);
240
241 warp::any().and(routes(&state).with(cors))
242 }
243
244 #[tokio::test]
245 async fn test_snapshots_get_ok() {
246 let mut mock_http_message_service = MockMessageService::new();
247 mock_http_message_service
248 .expect_get_snapshot_list_message()
249 .return_once(|_| Ok(vec![SnapshotListItemMessage::dummy()]))
250 .once();
251 let mut dependency_manager = initialize_dependencies!().await;
252 dependency_manager.message_service = Arc::new(mock_http_message_service);
253
254 let method = Method::GET.as_str();
255 let path = "/artifact/snapshots";
256
257 let response = request()
258 .method(method)
259 .path(path)
260 .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
261 dependency_manager,
262 ))))
263 .await;
264
265 APISpec::verify_conformity(
266 APISpec::get_all_spec_files(),
267 method,
268 path,
269 "application/json",
270 &Null,
271 &response,
272 &StatusCode::OK,
273 )
274 .unwrap();
275 }
276
277 #[tokio::test]
278 async fn test_snapshots_get_ko() {
279 let mut mock_http_message_service = MockMessageService::new();
280 mock_http_message_service
281 .expect_get_snapshot_list_message()
282 .return_once(|_| Err(HydrationError::InvalidData("invalid data".to_string()).into()))
283 .once();
284 let mut dependency_manager = initialize_dependencies!().await;
285 dependency_manager.message_service = Arc::new(mock_http_message_service);
286
287 let method = Method::GET.as_str();
288 let path = "/artifact/snapshots";
289
290 let response = request()
291 .method(method)
292 .path(path)
293 .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
294 dependency_manager,
295 ))))
296 .await;
297
298 APISpec::verify_conformity(
299 APISpec::get_all_spec_files(),
300 method,
301 path,
302 "application/json",
303 &Null,
304 &response,
305 &StatusCode::INTERNAL_SERVER_ERROR,
306 )
307 .unwrap();
308 }
309
310 #[tokio::test]
311 async fn test_snapshot_digest_increments_artifact_detail_total_served_since_startup_metric() {
312 let method = Method::GET.as_str();
313 let path = "/artifact/snapshot/{digest}";
314 let dependency_manager = Arc::new(initialize_dependencies!().await);
315 let initial_counter_value = dependency_manager
316 .metrics_service
317 .get_artifact_detail_cardano_immutable_files_full_total_served_since_startup()
318 .get();
319
320 request()
321 .method(method)
322 .path(path)
323 .reply(&setup_router(RouterState::new_with_dummy_config(
324 dependency_manager.clone(),
325 )))
326 .await;
327
328 assert_eq!(
329 initial_counter_value + 1,
330 dependency_manager
331 .metrics_service
332 .get_artifact_detail_cardano_immutable_files_full_total_served_since_startup()
333 .get()
334 );
335 }
336
337 #[tokio::test]
338 async fn test_snapshot_digest_get_ok() {
339 let mut mock_http_message_service = MockMessageService::new();
340 mock_http_message_service
341 .expect_get_snapshot_message()
342 .return_once(|_| Ok(Some(SnapshotMessage::dummy())))
343 .once();
344 let mut dependency_manager = initialize_dependencies!().await;
345 dependency_manager.message_service = Arc::new(mock_http_message_service);
346
347 let method = Method::GET.as_str();
348 let path = "/artifact/snapshot/{digest}";
349
350 let response = request()
351 .method(method)
352 .path(path)
353 .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
354 dependency_manager,
355 ))))
356 .await;
357
358 APISpec::verify_conformity(
359 APISpec::get_all_spec_files(),
360 method,
361 path,
362 "application/json",
363 &Null,
364 &response,
365 &StatusCode::OK,
366 )
367 .unwrap();
368 }
369
370 #[tokio::test]
371 async fn test_snapshot_digest_returns_404_not_found_when_no_snapshot() {
372 let mut mock_http_message_service = MockMessageService::new();
373 mock_http_message_service
374 .expect_get_snapshot_message()
375 .return_once(|_| Ok(None))
376 .once();
377 let mut dependency_manager = initialize_dependencies!().await;
378 dependency_manager.message_service = Arc::new(mock_http_message_service);
379
380 let method = Method::GET.as_str();
381 let path = "/artifact/snapshot/{digest}";
382
383 let response = request()
384 .method(method)
385 .path(path)
386 .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
387 dependency_manager,
388 ))))
389 .await;
390
391 APISpec::verify_conformity(
392 APISpec::get_all_spec_files(),
393 method,
394 path,
395 "application/json",
396 &Null,
397 &response,
398 &StatusCode::NOT_FOUND,
399 )
400 .unwrap();
401 }
402
403 #[tokio::test]
404 async fn test_snapshot_digest_get_ko() {
405 let mut mock_http_message_service = MockMessageService::new();
406 mock_http_message_service
407 .expect_get_snapshot_message()
408 .return_once(|_| Err(HydrationError::InvalidData("invalid data".to_string()).into()))
409 .once();
410 let mut dependency_manager = initialize_dependencies!().await;
411 dependency_manager.message_service = Arc::new(mock_http_message_service);
412
413 let method = Method::GET.as_str();
414 let path = "/artifact/snapshot/{digest}";
415
416 let response = request()
417 .method(method)
418 .path(path)
419 .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
420 dependency_manager,
421 ))))
422 .await;
423
424 APISpec::verify_conformity(
425 APISpec::get_all_spec_files(),
426 method,
427 path,
428 "application/json",
429 &Null,
430 &response,
431 &StatusCode::INTERNAL_SERVER_ERROR,
432 )
433 .unwrap();
434 }
435
436 #[tokio::test]
437 async fn test_snapshot_local_download_returns_302_found_when_the_snapshot_exists() {
438 let network = "devnet";
439 let signed_entity = create_signed_entity(
440 SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()),
441 Snapshot {
442 beacon: CardanoDbBeacon::new(1, 10),
443 ..fake_data::snapshots(1)[0].clone()
444 },
445 );
446 let mut mock_signed_entity_service = MockSignedEntityService::new();
447 mock_signed_entity_service
448 .expect_get_signed_snapshot_by_id()
449 .return_once(|_| Ok(Some(signed_entity)))
450 .once();
451 let mut dependency_manager = initialize_dependencies!().await;
452 dependency_manager.signed_entity_service = Arc::new(mock_signed_entity_service);
453
454 let method = Method::GET.as_str();
455 let path = "/artifact/snapshot/{digest}/download";
456
457 let response = request()
458 .method(method)
459 .path(path)
460 .reply(&setup_router(RouterState::new(
461 Arc::new(dependency_manager),
462 RouterConfig {
463 server_url: SanitizedUrlWithTrailingSlash::parse("https://1.2.3.4:5552/")
464 .unwrap(),
465 ..RouterConfig::dummy()
466 },
467 )))
468 .await;
469
470 assert_eq!(response.status(), StatusCode::FOUND);
471 let location = std::str::from_utf8(response.headers()["location"].as_bytes())
472 .unwrap()
473 .to_string();
474 assert!(
475 location.contains(&format!("https://1.2.3.4:5552/snapshot_download/{network}")),
476 "Expected value 'https://1.2.3.4:5552/snapshot_download/testnet' not found in {location}",
477 );
478 }
479
480 #[tokio::test]
481 async fn test_snapshot_download_returns_404_not_found_when_no_snapshot() {
482 let mut mock_signed_entity_service = MockSignedEntityService::new();
483 mock_signed_entity_service
484 .expect_get_signed_snapshot_by_id()
485 .return_once(|_| Ok(None))
486 .once();
487 let mut dependency_manager = initialize_dependencies!().await;
488 dependency_manager.signed_entity_service = Arc::new(mock_signed_entity_service);
489
490 let method = Method::GET.as_str();
491 let path = "/artifact/snapshot/{digest}/download";
492
493 let response = request()
494 .method(method)
495 .path(path)
496 .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
497 dependency_manager,
498 ))))
499 .await;
500
501 APISpec::verify_conformity(
502 APISpec::get_all_spec_files(),
503 method,
504 path,
505 "application/gzip",
506 &Null,
507 &response,
508 &StatusCode::NOT_FOUND,
509 )
510 .unwrap();
511 }
512
513 #[tokio::test]
514 async fn test_snapshot_download_get_ko() {
515 let mut mock_signed_entity_service = MockSignedEntityService::new();
516 mock_signed_entity_service
517 .expect_get_signed_snapshot_by_id()
518 .return_once(|_| Err(HydrationError::InvalidData("invalid data".to_string()).into()))
519 .once();
520 let mut dependency_manager = initialize_dependencies!().await;
521 dependency_manager.signed_entity_service = Arc::new(mock_signed_entity_service);
522
523 let method = Method::GET.as_str();
524 let path = "/artifact/snapshot/{digest}/download";
525
526 let response = request()
527 .method(method)
528 .path(path)
529 .reply(&setup_router(RouterState::new_with_dummy_config(Arc::new(
530 dependency_manager,
531 ))))
532 .await;
533
534 APISpec::verify_conformity(
535 APISpec::get_all_spec_files(),
536 method,
537 path,
538 "application/json",
539 &Null,
540 &response,
541 &StatusCode::INTERNAL_SERVER_ERROR,
542 )
543 .unwrap();
544 }
545}