mithril_aggregator/http_server/routes/
statistics_routes.rs

1use warp::Filter;
2
3use crate::http_server::routes::middlewares;
4use crate::http_server::routes::router::RouterState;
5
6pub fn routes(
7    router_state: &RouterState,
8) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
9    post_statistics(router_state)
10        .or(post_cardano_database_immutable_files_restored(router_state))
11        .or(post_cardano_database_ancillary_files_restored(router_state))
12        .or(post_cardano_database_complete_restoration(router_state))
13        .or(post_cardano_database_partial_restoration(router_state))
14}
15
16/// POST /statistics/snapshot
17fn post_statistics(
18    router_state: &RouterState,
19) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
20    warp::path!("statistics" / "snapshot")
21        .and(warp::post())
22        .and(middlewares::with_client_metadata(router_state))
23        .and(warp::body::json())
24        .and(middlewares::with_logger(router_state))
25        .and(middlewares::with_event_transmitter(router_state))
26        .and(middlewares::with_metrics_service(router_state))
27        .and_then(handlers::post_snapshot_statistics)
28}
29
30/// POST /statistics/cardano-database/immutable-files-restored
31fn post_cardano_database_immutable_files_restored(
32    router_state: &RouterState,
33) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
34    warp::path!("statistics" / "cardano-database" / "immutable-files-restored")
35        .and(warp::post())
36        .and(middlewares::with_client_metadata(router_state))
37        .and(warp::body::json())
38        .and(middlewares::with_logger(router_state))
39        .and(middlewares::with_metrics_service(router_state))
40        .and_then(handlers::post_cardano_database_immutable_files_restored)
41}
42
43/// POST /statistics/cardano-database/ancillary-files-restored
44fn post_cardano_database_ancillary_files_restored(
45    router_state: &RouterState,
46) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
47    warp::path!("statistics" / "cardano-database" / "ancillary-files-restored")
48        .and(warp::post())
49        .and(middlewares::with_client_metadata(router_state))
50        .and(middlewares::with_logger(router_state))
51        .and(middlewares::with_metrics_service(router_state))
52        .and_then(handlers::post_cardano_database_ancillary_files_restored)
53}
54
55/// POST /statistics/cardano-database/complete-restoration
56fn post_cardano_database_complete_restoration(
57    router_state: &RouterState,
58) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
59    warp::path!("statistics" / "cardano-database" / "complete-restoration")
60        .and(warp::post())
61        .and(middlewares::with_client_metadata(router_state))
62        .and(middlewares::with_logger(router_state))
63        .and(middlewares::with_event_transmitter(router_state))
64        .and(middlewares::with_metrics_service(router_state))
65        .and_then(handlers::post_cardano_database_complete_restoration)
66}
67
68/// POST /statistics/cardano-database/partial-restoration
69fn post_cardano_database_partial_restoration(
70    router_state: &RouterState,
71) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
72    warp::path!("statistics" / "cardano-database" / "partial-restoration")
73        .and(warp::post())
74        .and(middlewares::with_client_metadata(router_state))
75        .and(middlewares::with_logger(router_state))
76        .and(middlewares::with_event_transmitter(router_state))
77        .and(middlewares::with_metrics_service(router_state))
78        .and_then(handlers::post_cardano_database_partial_restoration)
79}
80
81mod handlers {
82    use slog::warn;
83    use std::{convert::Infallible, sync::Arc};
84    use warp::http::StatusCode;
85
86    use mithril_common::messages::{
87        CardanoDatabaseImmutableFilesRestoredMessage, SnapshotDownloadMessage,
88    };
89
90    use crate::MetricsService;
91    use crate::event_store::{EventMessage, TransmitterService};
92    use crate::http_server::routes::middlewares::ClientMetadata;
93    use crate::http_server::routes::reply;
94
95    pub async fn post_snapshot_statistics(
96        client_metadata: ClientMetadata,
97        snapshot_download_message: SnapshotDownloadMessage,
98        logger: slog::Logger,
99        event_transmitter: Arc<TransmitterService<EventMessage>>,
100        metrics_service: Arc<MetricsService>,
101    ) -> Result<impl warp::Reply, Infallible> {
102        metrics_service
103            .get_cardano_immutable_files_full_total_restoration_since_startup()
104            .increment(&[
105                client_metadata.origin_tag.as_deref().unwrap_or_default(),
106                client_metadata.client_type.as_deref().unwrap_or_default(),
107            ]);
108
109        let headers: Vec<(&str, &str)> = Vec::new();
110
111        let message = EventMessage::new(
112            "HTTP::statistics",
113            "snapshot_downloaded",
114            &snapshot_download_message,
115            headers,
116        );
117
118        match event_transmitter.try_send(message.clone()) {
119            Err(e) => {
120                warn!(logger, "Event message error"; "error" => ?e);
121                Ok(reply::internal_server_error(e))
122            }
123            Ok(_) => Ok(reply::empty(StatusCode::CREATED)),
124        }
125    }
126
127    pub async fn post_cardano_database_immutable_files_restored(
128        client_metadata: ClientMetadata,
129        message: CardanoDatabaseImmutableFilesRestoredMessage,
130        _logger: slog::Logger,
131        metrics_service: Arc<MetricsService>,
132    ) -> Result<impl warp::Reply, Infallible> {
133        metrics_service
134            .get_cardano_database_immutable_files_restored_since_startup()
135            .increment_by(
136                &[
137                    client_metadata.origin_tag.as_deref().unwrap_or_default(),
138                    client_metadata.client_type.as_deref().unwrap_or_default(),
139                ],
140                message.nb_immutable_files,
141            );
142
143        Ok(reply::empty(StatusCode::CREATED))
144    }
145
146    pub async fn post_cardano_database_ancillary_files_restored(
147        client_metadata: ClientMetadata,
148        _logger: slog::Logger,
149        metrics_service: Arc<MetricsService>,
150    ) -> Result<impl warp::Reply, Infallible> {
151        metrics_service
152            .get_cardano_database_ancillary_files_restored_since_startup()
153            .increment(&[
154                client_metadata.origin_tag.as_deref().unwrap_or_default(),
155                client_metadata.client_type.as_deref().unwrap_or_default(),
156            ]);
157
158        Ok(reply::empty(StatusCode::CREATED))
159    }
160
161    pub async fn post_cardano_database_complete_restoration(
162        client_metadata: ClientMetadata,
163        logger: slog::Logger,
164        event_transmitter: Arc<TransmitterService<EventMessage>>,
165        metrics_service: Arc<MetricsService>,
166    ) -> Result<impl warp::Reply, Infallible> {
167        metrics_service
168            .get_cardano_database_complete_restoration_since_startup()
169            .increment(&[
170                client_metadata.origin_tag.as_deref().unwrap_or_default(),
171                client_metadata.client_type.as_deref().unwrap_or_default(),
172            ]);
173
174        let headers: Vec<(&str, &str)> = Vec::new();
175        let message = EventMessage::new(
176            "HTTP::statistics",
177            "cardano_database_restoration",
178            &"complete".to_string(),
179            headers,
180        );
181
182        if let Err(e) = event_transmitter.try_send(message.clone()) {
183            warn!(logger, "Event message error"; "error" => ?e);
184            return Ok(reply::internal_server_error(e));
185        };
186
187        Ok(reply::empty(StatusCode::CREATED))
188    }
189
190    pub async fn post_cardano_database_partial_restoration(
191        client_metadata: ClientMetadata,
192        logger: slog::Logger,
193        event_transmitter: Arc<TransmitterService<EventMessage>>,
194        metrics_service: Arc<MetricsService>,
195    ) -> Result<impl warp::Reply, Infallible> {
196        metrics_service
197            .get_cardano_database_partial_restoration_since_startup()
198            .increment(&[
199                client_metadata.origin_tag.as_deref().unwrap_or_default(),
200                client_metadata.client_type.as_deref().unwrap_or_default(),
201            ]);
202
203        let headers: Vec<(&str, &str)> = Vec::new();
204        let message = EventMessage::new(
205            "HTTP::statistics",
206            "cardano_database_restoration",
207            &"partial".to_string(),
208            headers,
209        );
210
211        if let Err(e) = event_transmitter.try_send(message.clone()) {
212            warn!(logger, "Event message error"; "error" => ?e);
213            return Ok(reply::internal_server_error(e));
214        };
215
216        Ok(reply::empty(StatusCode::CREATED))
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use serde_json::Value;
223    use std::path::PathBuf;
224    use std::sync::Arc;
225    use tokio::sync::mpsc::UnboundedReceiver;
226    use warp::{
227        http::{Method, StatusCode},
228        test::request,
229    };
230
231    use mithril_api_spec::APISpec;
232    use mithril_common::messages::{
233        CardanoDatabaseImmutableFilesRestoredMessage, SnapshotDownloadMessage,
234    };
235    use mithril_common::{
236        MITHRIL_CLIENT_TYPE_HEADER, MITHRIL_ORIGIN_TAG_HEADER, temp_dir, test::double::Dummy,
237    };
238
239    use crate::ServeCommandDependenciesContainer;
240    use crate::event_store::EventMessage;
241    use crate::{
242        ServeCommandConfiguration, dependency_injection::DependenciesBuilder,
243        initialize_dependencies,
244    };
245
246    use super::*;
247
248    fn setup_router(
249        state: RouterState,
250    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
251        let cors = warp::cors()
252            .allow_any_origin()
253            .allow_headers(vec!["content-type"])
254            .allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS]);
255
256        warp::any().and(routes(&state).with(cors))
257    }
258
259    #[tokio::test]
260    async fn post_statistics_ok() {
261        let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
262        let snapshot_download_message = SnapshotDownloadMessage::dummy();
263
264        let method = Method::POST.as_str();
265        let path = "/statistics/snapshot";
266
267        let response = request()
268            .method(method)
269            .json(&snapshot_download_message)
270            .path(path)
271            .reply(&setup_router(RouterState::new_with_dummy_config(
272                dependency_manager,
273            )))
274            .await;
275
276        let result = APISpec::verify_conformity(
277            APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
278            method,
279            path,
280            "application/json",
281            &snapshot_download_message,
282            &response,
283            &StatusCode::CREATED,
284        );
285
286        let _ = rx.try_recv().unwrap();
287        result.unwrap();
288    }
289
290    #[tokio::test]
291    async fn test_post_statistics_increments_cardano_db_total_restoration_since_startup_metric() {
292        let method = Method::POST.as_str();
293        let path = "/statistics/snapshot";
294        let dependency_manager = Arc::new(initialize_dependencies!().await);
295        let initial_counter_value = dependency_manager
296            .metrics_service
297            .get_cardano_immutable_files_full_total_restoration_since_startup()
298            .get(&["TEST", "CLI"]);
299
300        request()
301            .method(method)
302            .json(&SnapshotDownloadMessage::dummy())
303            .path(path)
304            .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
305            .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
306            .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
307                dependency_manager.clone(),
308                &["TEST"],
309            )))
310            .await;
311
312        assert_eq!(
313            initial_counter_value + 1,
314            dependency_manager
315                .metrics_service
316                .get_cardano_immutable_files_full_total_restoration_since_startup()
317                .get(&["TEST", "CLI"])
318        );
319    }
320
321    mod post_cardano_database_immutable_files_restored {
322        use super::*;
323
324        const HTTP_METHOD: Method = Method::POST;
325        const PATH: &str = "/statistics/cardano-database/immutable-files-restored";
326
327        #[tokio::test]
328        async fn conform_to_open_api_when_created() {
329            let message = CardanoDatabaseImmutableFilesRestoredMessage::dummy();
330
331            let dependency_manager = Arc::new(initialize_dependencies!().await);
332            let response = request()
333                .method(HTTP_METHOD.as_str())
334                .json(&message)
335                .path(PATH)
336                .reply(&setup_router(RouterState::new_with_dummy_config(
337                    dependency_manager.clone(),
338                )))
339                .await;
340
341            let result = APISpec::verify_conformity(
342                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
343                HTTP_METHOD.as_str(),
344                PATH,
345                "application/json",
346                &message,
347                &response,
348                &StatusCode::CREATED,
349            );
350
351            result.unwrap();
352        }
353
354        #[tokio::test]
355        async fn increments_metric() {
356            let dependency_manager = Arc::new(initialize_dependencies!().await);
357            let metric_counter = dependency_manager
358                .metrics_service
359                .get_cardano_database_immutable_files_restored_since_startup();
360            let message = CardanoDatabaseImmutableFilesRestoredMessage {
361                nb_immutable_files: 3,
362            };
363
364            let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
365
366            request()
367                .method(HTTP_METHOD.as_str())
368                .json(&message)
369                .path(PATH)
370                .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
371                .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
372                .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
373                    dependency_manager.clone(),
374                    &["TEST"],
375                )))
376                .await;
377
378            assert_eq!(
379                initial_counter_value + 3,
380                metric_counter.get(&["TEST", "CLI"])
381            );
382        }
383    }
384
385    mod post_cardano_database_ancillary_files_restored {
386        use super::*;
387
388        const HTTP_METHOD: Method = Method::POST;
389        const PATH: &str = "/statistics/cardano-database/ancillary-files-restored";
390
391        #[tokio::test]
392        async fn conform_to_open_api_when_created() {
393            let dependency_manager = Arc::new(initialize_dependencies!().await);
394            let response = request()
395                .method(HTTP_METHOD.as_str())
396                .json(&Value::Null)
397                .path(PATH)
398                .reply(&setup_router(RouterState::new_with_dummy_config(
399                    dependency_manager.clone(),
400                )))
401                .await;
402
403            let result = APISpec::verify_conformity(
404                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
405                HTTP_METHOD.as_str(),
406                PATH,
407                "application/json",
408                &Value::Null,
409                &response,
410                &StatusCode::CREATED,
411            );
412
413            result.unwrap();
414        }
415
416        #[tokio::test]
417        async fn increments_metric() {
418            let dependency_manager = Arc::new(initialize_dependencies!().await);
419            let metric_counter = dependency_manager
420                .metrics_service
421                .get_cardano_database_ancillary_files_restored_since_startup();
422
423            let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
424
425            request()
426                .method(HTTP_METHOD.as_str())
427                .json(&Value::Null)
428                .path(PATH)
429                .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
430                .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
431                .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
432                    dependency_manager.clone(),
433                    &["TEST"],
434                )))
435                .await;
436
437            assert_eq!(
438                initial_counter_value + 1,
439                metric_counter.get(&["TEST", "CLI"])
440            );
441        }
442    }
443
444    async fn setup_dependencies(
445        snapshot_directory: PathBuf,
446    ) -> (
447        Arc<ServeCommandDependenciesContainer>,
448        UnboundedReceiver<EventMessage>,
449    ) {
450        let config = ServeCommandConfiguration::new_sample(snapshot_directory);
451        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
452        let rx = builder.get_event_transmitter_receiver().await.unwrap();
453        let dependencies_manager =
454            Arc::new(builder.build_serve_dependencies_container().await.unwrap());
455        (dependencies_manager, rx)
456    }
457
458    mod post_cardano_database_complete_restoration {
459        use super::*;
460
461        const HTTP_METHOD: Method = Method::POST;
462        const PATH: &str = "/statistics/cardano-database/complete-restoration";
463
464        #[tokio::test]
465        async fn conform_to_open_api_when_created() {
466            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
467
468            let response = request()
469                .method(HTTP_METHOD.as_str())
470                .json(&Value::Null)
471                .path(PATH)
472                .reply(&setup_router(RouterState::new_with_dummy_config(
473                    dependency_manager.clone(),
474                )))
475                .await;
476
477            let result = APISpec::verify_conformity(
478                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
479                HTTP_METHOD.as_str(),
480                PATH,
481                "application/json",
482                &Value::Null,
483                &response,
484                &StatusCode::CREATED,
485            );
486
487            result.unwrap();
488        }
489
490        #[tokio::test]
491        async fn should_conform_to_openapi_when_server_error() {
492            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
493            rx.close();
494
495            let response = request()
496                .method(HTTP_METHOD.as_str())
497                .json(&Value::Null)
498                .path(PATH)
499                .reply(&setup_router(RouterState::new_with_dummy_config(
500                    dependency_manager.clone(),
501                )))
502                .await;
503
504            APISpec::verify_conformity(
505                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
506                HTTP_METHOD.as_str(),
507                PATH,
508                "application/json",
509                &Value::Null,
510                &response,
511                &StatusCode::INTERNAL_SERVER_ERROR,
512            )
513            .unwrap();
514        }
515
516        #[tokio::test]
517        async fn should_send_event() {
518            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
519
520            request()
521                .method(HTTP_METHOD.as_str())
522                .json(&Value::Null)
523                .path(PATH)
524                .reply(&setup_router(RouterState::new_with_dummy_config(
525                    dependency_manager.clone(),
526                )))
527                .await;
528
529            let message = rx.try_recv().unwrap();
530            assert_eq!("HTTP::statistics", message.source);
531            assert_eq!("cardano_database_restoration", message.action);
532            assert_eq!("complete", message.content);
533        }
534
535        #[tokio::test]
536        async fn increments_metric() {
537            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
538            let metric_counter = dependency_manager
539                .metrics_service
540                .get_cardano_database_complete_restoration_since_startup();
541
542            let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
543
544            request()
545                .method(HTTP_METHOD.as_str())
546                .json(&Value::Null)
547                .path(PATH)
548                .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
549                .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
550                .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
551                    dependency_manager.clone(),
552                    &["TEST"],
553                )))
554                .await;
555
556            assert_eq!(
557                initial_counter_value + 1,
558                metric_counter.get(&["TEST", "CLI"])
559            );
560        }
561    }
562
563    mod post_cardano_database_partial_restoration {
564        use super::*;
565
566        const HTTP_METHOD: Method = Method::POST;
567        const PATH: &str = "/statistics/cardano-database/partial-restoration";
568
569        #[tokio::test]
570        async fn conform_to_open_api_when_created() {
571            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
572            let response = request()
573                .method(HTTP_METHOD.as_str())
574                .json(&Value::Null)
575                .path(PATH)
576                .reply(&setup_router(RouterState::new_with_dummy_config(
577                    dependency_manager.clone(),
578                )))
579                .await;
580
581            let result = APISpec::verify_conformity(
582                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
583                HTTP_METHOD.as_str(),
584                PATH,
585                "application/json",
586                &Value::Null,
587                &response,
588                &StatusCode::CREATED,
589            );
590
591            result.unwrap();
592        }
593
594        #[tokio::test]
595        async fn should_conform_to_openapi_when_server_error() {
596            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
597            rx.close();
598
599            let response = request()
600                .method(HTTP_METHOD.as_str())
601                .json(&Value::Null)
602                .path(PATH)
603                .reply(&setup_router(RouterState::new_with_dummy_config(
604                    dependency_manager.clone(),
605                )))
606                .await;
607
608            APISpec::verify_conformity(
609                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
610                HTTP_METHOD.as_str(),
611                PATH,
612                "application/json",
613                &Value::Null,
614                &response,
615                &StatusCode::INTERNAL_SERVER_ERROR,
616            )
617            .unwrap();
618        }
619
620        #[tokio::test]
621        async fn should_send_event() {
622            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
623
624            request()
625                .method(HTTP_METHOD.as_str())
626                .json(&Value::Null)
627                .path(PATH)
628                .reply(&setup_router(RouterState::new_with_dummy_config(
629                    dependency_manager.clone(),
630                )))
631                .await;
632
633            let message = rx.try_recv().unwrap();
634            assert_eq!("HTTP::statistics", message.source);
635            assert_eq!("cardano_database_restoration", message.action);
636            assert_eq!("partial", message.content);
637        }
638
639        #[tokio::test]
640        async fn increments_metric() {
641            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
642            let metric_counter = dependency_manager
643                .metrics_service
644                .get_cardano_database_partial_restoration_since_startup();
645
646            let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
647
648            request()
649                .method(HTTP_METHOD.as_str())
650                .json(&Value::Null)
651                .path(PATH)
652                .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
653                .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
654                .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
655                    dependency_manager.clone(),
656                    &["TEST"],
657                )))
658                .await;
659
660            assert_eq!(
661                initial_counter_value + 1,
662                metric_counter.get(&["TEST", "CLI"])
663            );
664        }
665    }
666}