1use warp::Filter;
2
3use crate::http_server::routes::middlewares;
4use crate::http_server::routes::router::RouterState;
5
6pub fn routes(
7 router_state: &RouterState,
8) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
9 post_statistics(router_state)
10 .or(post_cardano_database_immutable_files_restored(router_state))
11 .or(post_cardano_database_ancillary_files_restored(router_state))
12 .or(post_cardano_database_complete_restoration(router_state))
13 .or(post_cardano_database_partial_restoration(router_state))
14}
15
16fn post_statistics(
18 router_state: &RouterState,
19) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
20 warp::path!("statistics" / "snapshot")
21 .and(warp::post())
22 .and(middlewares::with_client_metadata(router_state))
23 .and(warp::body::json())
24 .and(middlewares::with_logger(router_state))
25 .and(middlewares::with_event_transmitter(router_state))
26 .and(middlewares::with_metrics_service(router_state))
27 .and_then(handlers::post_snapshot_statistics)
28}
29
30fn post_cardano_database_immutable_files_restored(
32 router_state: &RouterState,
33) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
34 warp::path!("statistics" / "cardano-database" / "immutable-files-restored")
35 .and(warp::post())
36 .and(middlewares::with_client_metadata(router_state))
37 .and(warp::body::json())
38 .and(middlewares::with_logger(router_state))
39 .and(middlewares::with_metrics_service(router_state))
40 .and_then(handlers::post_cardano_database_immutable_files_restored)
41}
42
43fn post_cardano_database_ancillary_files_restored(
45 router_state: &RouterState,
46) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
47 warp::path!("statistics" / "cardano-database" / "ancillary-files-restored")
48 .and(warp::post())
49 .and(middlewares::with_client_metadata(router_state))
50 .and(middlewares::with_logger(router_state))
51 .and(middlewares::with_metrics_service(router_state))
52 .and_then(handlers::post_cardano_database_ancillary_files_restored)
53}
54
55fn post_cardano_database_complete_restoration(
57 router_state: &RouterState,
58) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
59 warp::path!("statistics" / "cardano-database" / "complete-restoration")
60 .and(warp::post())
61 .and(middlewares::with_client_metadata(router_state))
62 .and(middlewares::with_logger(router_state))
63 .and(middlewares::with_event_transmitter(router_state))
64 .and(middlewares::with_metrics_service(router_state))
65 .and_then(handlers::post_cardano_database_complete_restoration)
66}
67
68fn post_cardano_database_partial_restoration(
70 router_state: &RouterState,
71) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
72 warp::path!("statistics" / "cardano-database" / "partial-restoration")
73 .and(warp::post())
74 .and(middlewares::with_client_metadata(router_state))
75 .and(middlewares::with_logger(router_state))
76 .and(middlewares::with_event_transmitter(router_state))
77 .and(middlewares::with_metrics_service(router_state))
78 .and_then(handlers::post_cardano_database_partial_restoration)
79}
80
81mod handlers {
82 use slog::warn;
83 use std::{convert::Infallible, sync::Arc};
84 use warp::http::StatusCode;
85
86 use mithril_common::messages::{
87 CardanoDatabaseImmutableFilesRestoredMessage, SnapshotDownloadMessage,
88 };
89
90 use crate::MetricsService;
91 use crate::event_store::{EventMessage, TransmitterService};
92 use crate::http_server::routes::middlewares::ClientMetadata;
93 use crate::http_server::routes::reply;
94
95 pub async fn post_snapshot_statistics(
96 client_metadata: ClientMetadata,
97 snapshot_download_message: SnapshotDownloadMessage,
98 logger: slog::Logger,
99 event_transmitter: Arc<TransmitterService<EventMessage>>,
100 metrics_service: Arc<MetricsService>,
101 ) -> Result<impl warp::Reply, Infallible> {
102 metrics_service
103 .get_cardano_immutable_files_full_total_restoration_since_startup()
104 .increment(&[
105 client_metadata.origin_tag.as_deref().unwrap_or_default(),
106 client_metadata.client_type.as_deref().unwrap_or_default(),
107 ]);
108
109 let headers: Vec<(&str, &str)> = Vec::new();
110
111 let message = EventMessage::new(
112 "HTTP::statistics",
113 "snapshot_downloaded",
114 &snapshot_download_message,
115 headers,
116 );
117
118 match event_transmitter.try_send(message.clone()) {
119 Err(e) => {
120 warn!(logger, "Event message error"; "error" => ?e);
121 Ok(reply::internal_server_error(e))
122 }
123 Ok(_) => Ok(reply::empty(StatusCode::CREATED)),
124 }
125 }
126
127 pub async fn post_cardano_database_immutable_files_restored(
128 client_metadata: ClientMetadata,
129 message: CardanoDatabaseImmutableFilesRestoredMessage,
130 _logger: slog::Logger,
131 metrics_service: Arc<MetricsService>,
132 ) -> Result<impl warp::Reply, Infallible> {
133 metrics_service
134 .get_cardano_database_immutable_files_restored_since_startup()
135 .increment_by(
136 &[
137 client_metadata.origin_tag.as_deref().unwrap_or_default(),
138 client_metadata.client_type.as_deref().unwrap_or_default(),
139 ],
140 message.nb_immutable_files,
141 );
142
143 Ok(reply::empty(StatusCode::CREATED))
144 }
145
146 pub async fn post_cardano_database_ancillary_files_restored(
147 client_metadata: ClientMetadata,
148 _logger: slog::Logger,
149 metrics_service: Arc<MetricsService>,
150 ) -> Result<impl warp::Reply, Infallible> {
151 metrics_service
152 .get_cardano_database_ancillary_files_restored_since_startup()
153 .increment(&[
154 client_metadata.origin_tag.as_deref().unwrap_or_default(),
155 client_metadata.client_type.as_deref().unwrap_or_default(),
156 ]);
157
158 Ok(reply::empty(StatusCode::CREATED))
159 }
160
161 pub async fn post_cardano_database_complete_restoration(
162 client_metadata: ClientMetadata,
163 logger: slog::Logger,
164 event_transmitter: Arc<TransmitterService<EventMessage>>,
165 metrics_service: Arc<MetricsService>,
166 ) -> Result<impl warp::Reply, Infallible> {
167 metrics_service
168 .get_cardano_database_complete_restoration_since_startup()
169 .increment(&[
170 client_metadata.origin_tag.as_deref().unwrap_or_default(),
171 client_metadata.client_type.as_deref().unwrap_or_default(),
172 ]);
173
174 let headers: Vec<(&str, &str)> = Vec::new();
175 let message = EventMessage::new(
176 "HTTP::statistics",
177 "cardano_database_restoration",
178 &"complete".to_string(),
179 headers,
180 );
181
182 if let Err(e) = event_transmitter.try_send(message.clone()) {
183 warn!(logger, "Event message error"; "error" => ?e);
184 return Ok(reply::internal_server_error(e));
185 };
186
187 Ok(reply::empty(StatusCode::CREATED))
188 }
189
190 pub async fn post_cardano_database_partial_restoration(
191 client_metadata: ClientMetadata,
192 logger: slog::Logger,
193 event_transmitter: Arc<TransmitterService<EventMessage>>,
194 metrics_service: Arc<MetricsService>,
195 ) -> Result<impl warp::Reply, Infallible> {
196 metrics_service
197 .get_cardano_database_partial_restoration_since_startup()
198 .increment(&[
199 client_metadata.origin_tag.as_deref().unwrap_or_default(),
200 client_metadata.client_type.as_deref().unwrap_or_default(),
201 ]);
202
203 let headers: Vec<(&str, &str)> = Vec::new();
204 let message = EventMessage::new(
205 "HTTP::statistics",
206 "cardano_database_restoration",
207 &"partial".to_string(),
208 headers,
209 );
210
211 if let Err(e) = event_transmitter.try_send(message.clone()) {
212 warn!(logger, "Event message error"; "error" => ?e);
213 return Ok(reply::internal_server_error(e));
214 };
215
216 Ok(reply::empty(StatusCode::CREATED))
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use serde_json::Value;
223 use std::path::PathBuf;
224 use std::sync::Arc;
225 use tokio::sync::mpsc::UnboundedReceiver;
226 use warp::{
227 http::{Method, StatusCode},
228 test::request,
229 };
230
231 use mithril_api_spec::APISpec;
232 use mithril_common::messages::{
233 CardanoDatabaseImmutableFilesRestoredMessage, SnapshotDownloadMessage,
234 };
235 use mithril_common::{MITHRIL_CLIENT_TYPE_HEADER, MITHRIL_ORIGIN_TAG_HEADER, temp_dir};
236
237 use crate::ServeCommandDependenciesContainer;
238 use crate::event_store::EventMessage;
239 use crate::{
240 ServeCommandConfiguration, dependency_injection::DependenciesBuilder,
241 initialize_dependencies,
242 };
243
244 use super::*;
245
246 fn setup_router(
247 state: RouterState,
248 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
249 let cors = warp::cors()
250 .allow_any_origin()
251 .allow_headers(vec!["content-type"])
252 .allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS]);
253
254 warp::any().and(routes(&state).with(cors))
255 }
256
257 #[tokio::test]
258 async fn post_statistics_ok() {
259 let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
260 let snapshot_download_message = SnapshotDownloadMessage::dummy();
261
262 let method = Method::POST.as_str();
263 let path = "/statistics/snapshot";
264
265 let response = request()
266 .method(method)
267 .json(&snapshot_download_message)
268 .path(path)
269 .reply(&setup_router(RouterState::new_with_dummy_config(
270 dependency_manager,
271 )))
272 .await;
273
274 let result = APISpec::verify_conformity(
275 APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
276 method,
277 path,
278 "application/json",
279 &snapshot_download_message,
280 &response,
281 &StatusCode::CREATED,
282 );
283
284 let _ = rx.try_recv().unwrap();
285 result.unwrap();
286 }
287
288 #[tokio::test]
289 async fn test_post_statistics_increments_cardano_db_total_restoration_since_startup_metric() {
290 let method = Method::POST.as_str();
291 let path = "/statistics/snapshot";
292 let dependency_manager = Arc::new(initialize_dependencies!().await);
293 let initial_counter_value = dependency_manager
294 .metrics_service
295 .get_cardano_immutable_files_full_total_restoration_since_startup()
296 .get(&["TEST", "CLI"]);
297
298 request()
299 .method(method)
300 .json(&SnapshotDownloadMessage::dummy())
301 .path(path)
302 .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
303 .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
304 .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
305 dependency_manager.clone(),
306 &["TEST"],
307 )))
308 .await;
309
310 assert_eq!(
311 initial_counter_value + 1,
312 dependency_manager
313 .metrics_service
314 .get_cardano_immutable_files_full_total_restoration_since_startup()
315 .get(&["TEST", "CLI"])
316 );
317 }
318
319 mod post_cardano_database_immutable_files_restored {
320 use super::*;
321
322 const HTTP_METHOD: Method = Method::POST;
323 const PATH: &str = "/statistics/cardano-database/immutable-files-restored";
324
325 #[tokio::test]
326 async fn conform_to_open_api_when_created() {
327 let message = CardanoDatabaseImmutableFilesRestoredMessage::dummy();
328
329 let dependency_manager = Arc::new(initialize_dependencies!().await);
330 let response = request()
331 .method(HTTP_METHOD.as_str())
332 .json(&message)
333 .path(PATH)
334 .reply(&setup_router(RouterState::new_with_dummy_config(
335 dependency_manager.clone(),
336 )))
337 .await;
338
339 let result = APISpec::verify_conformity(
340 APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
341 HTTP_METHOD.as_str(),
342 PATH,
343 "application/json",
344 &message,
345 &response,
346 &StatusCode::CREATED,
347 );
348
349 result.unwrap();
350 }
351
352 #[tokio::test]
353 async fn increments_metric() {
354 let dependency_manager = Arc::new(initialize_dependencies!().await);
355 let metric_counter = dependency_manager
356 .metrics_service
357 .get_cardano_database_immutable_files_restored_since_startup();
358 let message = CardanoDatabaseImmutableFilesRestoredMessage {
359 nb_immutable_files: 3,
360 };
361
362 let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
363
364 request()
365 .method(HTTP_METHOD.as_str())
366 .json(&message)
367 .path(PATH)
368 .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
369 .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
370 .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
371 dependency_manager.clone(),
372 &["TEST"],
373 )))
374 .await;
375
376 assert_eq!(
377 initial_counter_value + 3,
378 metric_counter.get(&["TEST", "CLI"])
379 );
380 }
381 }
382
383 mod post_cardano_database_ancillary_files_restored {
384 use super::*;
385
386 const HTTP_METHOD: Method = Method::POST;
387 const PATH: &str = "/statistics/cardano-database/ancillary-files-restored";
388
389 #[tokio::test]
390 async fn conform_to_open_api_when_created() {
391 let dependency_manager = Arc::new(initialize_dependencies!().await);
392 let response = request()
393 .method(HTTP_METHOD.as_str())
394 .json(&Value::Null)
395 .path(PATH)
396 .reply(&setup_router(RouterState::new_with_dummy_config(
397 dependency_manager.clone(),
398 )))
399 .await;
400
401 let result = APISpec::verify_conformity(
402 APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
403 HTTP_METHOD.as_str(),
404 PATH,
405 "application/json",
406 &Value::Null,
407 &response,
408 &StatusCode::CREATED,
409 );
410
411 result.unwrap();
412 }
413
414 #[tokio::test]
415 async fn increments_metric() {
416 let dependency_manager = Arc::new(initialize_dependencies!().await);
417 let metric_counter = dependency_manager
418 .metrics_service
419 .get_cardano_database_ancillary_files_restored_since_startup();
420
421 let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
422
423 request()
424 .method(HTTP_METHOD.as_str())
425 .json(&Value::Null)
426 .path(PATH)
427 .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
428 .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
429 .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
430 dependency_manager.clone(),
431 &["TEST"],
432 )))
433 .await;
434
435 assert_eq!(
436 initial_counter_value + 1,
437 metric_counter.get(&["TEST", "CLI"])
438 );
439 }
440 }
441
442 async fn setup_dependencies(
443 snapshot_directory: PathBuf,
444 ) -> (
445 Arc<ServeCommandDependenciesContainer>,
446 UnboundedReceiver<EventMessage>,
447 ) {
448 let config = ServeCommandConfiguration::new_sample(snapshot_directory);
449 let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
450 let rx = builder.get_event_transmitter_receiver().await.unwrap();
451 let dependencies_manager =
452 Arc::new(builder.build_serve_dependencies_container().await.unwrap());
453 (dependencies_manager, rx)
454 }
455
456 mod post_cardano_database_complete_restoration {
457 use super::*;
458
459 const HTTP_METHOD: Method = Method::POST;
460 const PATH: &str = "/statistics/cardano-database/complete-restoration";
461
462 #[tokio::test]
463 async fn conform_to_open_api_when_created() {
464 let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
465
466 let response = request()
467 .method(HTTP_METHOD.as_str())
468 .json(&Value::Null)
469 .path(PATH)
470 .reply(&setup_router(RouterState::new_with_dummy_config(
471 dependency_manager.clone(),
472 )))
473 .await;
474
475 let result = APISpec::verify_conformity(
476 APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
477 HTTP_METHOD.as_str(),
478 PATH,
479 "application/json",
480 &Value::Null,
481 &response,
482 &StatusCode::CREATED,
483 );
484
485 result.unwrap();
486 }
487
488 #[tokio::test]
489 async fn should_conform_to_openapi_when_server_error() {
490 let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
491 rx.close();
492
493 let response = request()
494 .method(HTTP_METHOD.as_str())
495 .json(&Value::Null)
496 .path(PATH)
497 .reply(&setup_router(RouterState::new_with_dummy_config(
498 dependency_manager.clone(),
499 )))
500 .await;
501
502 APISpec::verify_conformity(
503 APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
504 HTTP_METHOD.as_str(),
505 PATH,
506 "application/json",
507 &Value::Null,
508 &response,
509 &StatusCode::INTERNAL_SERVER_ERROR,
510 )
511 .unwrap();
512 }
513
514 #[tokio::test]
515 async fn should_send_event() {
516 let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
517
518 request()
519 .method(HTTP_METHOD.as_str())
520 .json(&Value::Null)
521 .path(PATH)
522 .reply(&setup_router(RouterState::new_with_dummy_config(
523 dependency_manager.clone(),
524 )))
525 .await;
526
527 let message = rx.try_recv().unwrap();
528 assert_eq!("HTTP::statistics", message.source);
529 assert_eq!("cardano_database_restoration", message.action);
530 assert_eq!("complete", message.content);
531 }
532
533 #[tokio::test]
534 async fn increments_metric() {
535 let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
536 let metric_counter = dependency_manager
537 .metrics_service
538 .get_cardano_database_complete_restoration_since_startup();
539
540 let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
541
542 request()
543 .method(HTTP_METHOD.as_str())
544 .json(&Value::Null)
545 .path(PATH)
546 .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
547 .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
548 .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
549 dependency_manager.clone(),
550 &["TEST"],
551 )))
552 .await;
553
554 assert_eq!(
555 initial_counter_value + 1,
556 metric_counter.get(&["TEST", "CLI"])
557 );
558 }
559 }
560
561 mod post_cardano_database_partial_restoration {
562 use super::*;
563
564 const HTTP_METHOD: Method = Method::POST;
565 const PATH: &str = "/statistics/cardano-database/partial-restoration";
566
567 #[tokio::test]
568 async fn conform_to_open_api_when_created() {
569 let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
570 let response = request()
571 .method(HTTP_METHOD.as_str())
572 .json(&Value::Null)
573 .path(PATH)
574 .reply(&setup_router(RouterState::new_with_dummy_config(
575 dependency_manager.clone(),
576 )))
577 .await;
578
579 let result = APISpec::verify_conformity(
580 APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
581 HTTP_METHOD.as_str(),
582 PATH,
583 "application/json",
584 &Value::Null,
585 &response,
586 &StatusCode::CREATED,
587 );
588
589 result.unwrap();
590 }
591
592 #[tokio::test]
593 async fn should_conform_to_openapi_when_server_error() {
594 let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
595 rx.close();
596
597 let response = request()
598 .method(HTTP_METHOD.as_str())
599 .json(&Value::Null)
600 .path(PATH)
601 .reply(&setup_router(RouterState::new_with_dummy_config(
602 dependency_manager.clone(),
603 )))
604 .await;
605
606 APISpec::verify_conformity(
607 APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
608 HTTP_METHOD.as_str(),
609 PATH,
610 "application/json",
611 &Value::Null,
612 &response,
613 &StatusCode::INTERNAL_SERVER_ERROR,
614 )
615 .unwrap();
616 }
617
618 #[tokio::test]
619 async fn should_send_event() {
620 let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
621
622 request()
623 .method(HTTP_METHOD.as_str())
624 .json(&Value::Null)
625 .path(PATH)
626 .reply(&setup_router(RouterState::new_with_dummy_config(
627 dependency_manager.clone(),
628 )))
629 .await;
630
631 let message = rx.try_recv().unwrap();
632 assert_eq!("HTTP::statistics", message.source);
633 assert_eq!("cardano_database_restoration", message.action);
634 assert_eq!("partial", message.content);
635 }
636
637 #[tokio::test]
638 async fn increments_metric() {
639 let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
640 let metric_counter = dependency_manager
641 .metrics_service
642 .get_cardano_database_partial_restoration_since_startup();
643
644 let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
645
646 request()
647 .method(HTTP_METHOD.as_str())
648 .json(&Value::Null)
649 .path(PATH)
650 .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
651 .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
652 .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
653 dependency_manager.clone(),
654 &["TEST"],
655 )))
656 .await;
657
658 assert_eq!(
659 initial_counter_value + 1,
660 metric_counter.get(&["TEST", "CLI"])
661 );
662 }
663 }
664}