mithril_aggregator/http_server/routes/
statistics_routes.rs

1use warp::Filter;
2
3use crate::http_server::routes::middlewares;
4use crate::http_server::routes::router::RouterState;
5
6pub fn routes(
7    router_state: &RouterState,
8) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
9    post_statistics(router_state)
10        .or(post_cardano_database_immutable_files_restored(router_state))
11        .or(post_cardano_database_ancillary_files_restored(router_state))
12        .or(post_cardano_database_complete_restoration(router_state))
13        .or(post_cardano_database_partial_restoration(router_state))
14}
15
16/// POST /statistics/snapshot
17fn post_statistics(
18    router_state: &RouterState,
19) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
20    warp::path!("statistics" / "snapshot")
21        .and(warp::post())
22        .and(middlewares::with_client_metadata(router_state))
23        .and(warp::body::json())
24        .and(middlewares::with_logger(router_state))
25        .and(middlewares::with_event_transmitter(router_state))
26        .and(middlewares::with_metrics_service(router_state))
27        .and_then(handlers::post_snapshot_statistics)
28}
29
30/// POST /statistics/cardano-database/immutable-files-restored
31fn post_cardano_database_immutable_files_restored(
32    router_state: &RouterState,
33) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
34    warp::path!("statistics" / "cardano-database" / "immutable-files-restored")
35        .and(warp::post())
36        .and(middlewares::with_client_metadata(router_state))
37        .and(warp::body::json())
38        .and(middlewares::with_logger(router_state))
39        .and(middlewares::with_metrics_service(router_state))
40        .and_then(handlers::post_cardano_database_immutable_files_restored)
41}
42
43/// POST /statistics/cardano-database/ancillary-files-restored
44fn post_cardano_database_ancillary_files_restored(
45    router_state: &RouterState,
46) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
47    warp::path!("statistics" / "cardano-database" / "ancillary-files-restored")
48        .and(warp::post())
49        .and(middlewares::with_client_metadata(router_state))
50        .and(middlewares::with_logger(router_state))
51        .and(middlewares::with_metrics_service(router_state))
52        .and_then(handlers::post_cardano_database_ancillary_files_restored)
53}
54
55/// POST /statistics/cardano-database/complete-restoration
56fn post_cardano_database_complete_restoration(
57    router_state: &RouterState,
58) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
59    warp::path!("statistics" / "cardano-database" / "complete-restoration")
60        .and(warp::post())
61        .and(middlewares::with_client_metadata(router_state))
62        .and(middlewares::with_logger(router_state))
63        .and(middlewares::with_event_transmitter(router_state))
64        .and(middlewares::with_metrics_service(router_state))
65        .and_then(handlers::post_cardano_database_complete_restoration)
66}
67
68/// POST /statistics/cardano-database/partial-restoration
69fn post_cardano_database_partial_restoration(
70    router_state: &RouterState,
71) -> impl Filter<Extract = (impl warp::Reply + use<>,), Error = warp::Rejection> + Clone + use<> {
72    warp::path!("statistics" / "cardano-database" / "partial-restoration")
73        .and(warp::post())
74        .and(middlewares::with_client_metadata(router_state))
75        .and(middlewares::with_logger(router_state))
76        .and(middlewares::with_event_transmitter(router_state))
77        .and(middlewares::with_metrics_service(router_state))
78        .and_then(handlers::post_cardano_database_partial_restoration)
79}
80
81mod handlers {
82    use slog::warn;
83    use std::{convert::Infallible, sync::Arc};
84    use warp::http::StatusCode;
85
86    use mithril_common::messages::{
87        CardanoDatabaseImmutableFilesRestoredMessage, SnapshotDownloadMessage,
88    };
89
90    use crate::MetricsService;
91    use crate::event_store::{EventMessage, TransmitterService};
92    use crate::http_server::routes::middlewares::ClientMetadata;
93    use crate::http_server::routes::reply;
94
95    pub async fn post_snapshot_statistics(
96        client_metadata: ClientMetadata,
97        snapshot_download_message: SnapshotDownloadMessage,
98        logger: slog::Logger,
99        event_transmitter: Arc<TransmitterService<EventMessage>>,
100        metrics_service: Arc<MetricsService>,
101    ) -> Result<impl warp::Reply, Infallible> {
102        metrics_service
103            .get_cardano_immutable_files_full_total_restoration_since_startup()
104            .increment(&[
105                client_metadata.origin_tag.as_deref().unwrap_or_default(),
106                client_metadata.client_type.as_deref().unwrap_or_default(),
107            ]);
108
109        let headers: Vec<(&str, &str)> = Vec::new();
110
111        let message = EventMessage::new(
112            "HTTP::statistics",
113            "snapshot_downloaded",
114            &snapshot_download_message,
115            headers,
116        );
117
118        match event_transmitter.try_send(message.clone()) {
119            Err(e) => {
120                warn!(logger, "Event message error"; "error" => ?e);
121                Ok(reply::internal_server_error(e))
122            }
123            Ok(_) => Ok(reply::empty(StatusCode::CREATED)),
124        }
125    }
126
127    pub async fn post_cardano_database_immutable_files_restored(
128        client_metadata: ClientMetadata,
129        message: CardanoDatabaseImmutableFilesRestoredMessage,
130        _logger: slog::Logger,
131        metrics_service: Arc<MetricsService>,
132    ) -> Result<impl warp::Reply, Infallible> {
133        metrics_service
134            .get_cardano_database_immutable_files_restored_since_startup()
135            .increment_by(
136                &[
137                    client_metadata.origin_tag.as_deref().unwrap_or_default(),
138                    client_metadata.client_type.as_deref().unwrap_or_default(),
139                ],
140                message.nb_immutable_files,
141            );
142
143        Ok(reply::empty(StatusCode::CREATED))
144    }
145
146    pub async fn post_cardano_database_ancillary_files_restored(
147        client_metadata: ClientMetadata,
148        _logger: slog::Logger,
149        metrics_service: Arc<MetricsService>,
150    ) -> Result<impl warp::Reply, Infallible> {
151        metrics_service
152            .get_cardano_database_ancillary_files_restored_since_startup()
153            .increment(&[
154                client_metadata.origin_tag.as_deref().unwrap_or_default(),
155                client_metadata.client_type.as_deref().unwrap_or_default(),
156            ]);
157
158        Ok(reply::empty(StatusCode::CREATED))
159    }
160
161    pub async fn post_cardano_database_complete_restoration(
162        client_metadata: ClientMetadata,
163        logger: slog::Logger,
164        event_transmitter: Arc<TransmitterService<EventMessage>>,
165        metrics_service: Arc<MetricsService>,
166    ) -> Result<impl warp::Reply, Infallible> {
167        metrics_service
168            .get_cardano_database_complete_restoration_since_startup()
169            .increment(&[
170                client_metadata.origin_tag.as_deref().unwrap_or_default(),
171                client_metadata.client_type.as_deref().unwrap_or_default(),
172            ]);
173
174        let headers: Vec<(&str, &str)> = Vec::new();
175        let message = EventMessage::new(
176            "HTTP::statistics",
177            "cardano_database_restoration",
178            &"complete".to_string(),
179            headers,
180        );
181
182        if let Err(e) = event_transmitter.try_send(message.clone()) {
183            warn!(logger, "Event message error"; "error" => ?e);
184            return Ok(reply::internal_server_error(e));
185        };
186
187        Ok(reply::empty(StatusCode::CREATED))
188    }
189
190    pub async fn post_cardano_database_partial_restoration(
191        client_metadata: ClientMetadata,
192        logger: slog::Logger,
193        event_transmitter: Arc<TransmitterService<EventMessage>>,
194        metrics_service: Arc<MetricsService>,
195    ) -> Result<impl warp::Reply, Infallible> {
196        metrics_service
197            .get_cardano_database_partial_restoration_since_startup()
198            .increment(&[
199                client_metadata.origin_tag.as_deref().unwrap_or_default(),
200                client_metadata.client_type.as_deref().unwrap_or_default(),
201            ]);
202
203        let headers: Vec<(&str, &str)> = Vec::new();
204        let message = EventMessage::new(
205            "HTTP::statistics",
206            "cardano_database_restoration",
207            &"partial".to_string(),
208            headers,
209        );
210
211        if let Err(e) = event_transmitter.try_send(message.clone()) {
212            warn!(logger, "Event message error"; "error" => ?e);
213            return Ok(reply::internal_server_error(e));
214        };
215
216        Ok(reply::empty(StatusCode::CREATED))
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use serde_json::Value;
223    use std::path::PathBuf;
224    use std::sync::Arc;
225    use tokio::sync::mpsc::UnboundedReceiver;
226    use warp::{
227        http::{Method, StatusCode},
228        test::request,
229    };
230
231    use mithril_api_spec::APISpec;
232    use mithril_common::messages::{
233        CardanoDatabaseImmutableFilesRestoredMessage, SnapshotDownloadMessage,
234    };
235    use mithril_common::{MITHRIL_CLIENT_TYPE_HEADER, MITHRIL_ORIGIN_TAG_HEADER, temp_dir};
236
237    use crate::ServeCommandDependenciesContainer;
238    use crate::event_store::EventMessage;
239    use crate::{
240        ServeCommandConfiguration, dependency_injection::DependenciesBuilder,
241        initialize_dependencies,
242    };
243
244    use super::*;
245
246    fn setup_router(
247        state: RouterState,
248    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
249        let cors = warp::cors()
250            .allow_any_origin()
251            .allow_headers(vec!["content-type"])
252            .allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS]);
253
254        warp::any().and(routes(&state).with(cors))
255    }
256
257    #[tokio::test]
258    async fn post_statistics_ok() {
259        let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
260        let snapshot_download_message = SnapshotDownloadMessage::dummy();
261
262        let method = Method::POST.as_str();
263        let path = "/statistics/snapshot";
264
265        let response = request()
266            .method(method)
267            .json(&snapshot_download_message)
268            .path(path)
269            .reply(&setup_router(RouterState::new_with_dummy_config(
270                dependency_manager,
271            )))
272            .await;
273
274        let result = APISpec::verify_conformity(
275            APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
276            method,
277            path,
278            "application/json",
279            &snapshot_download_message,
280            &response,
281            &StatusCode::CREATED,
282        );
283
284        let _ = rx.try_recv().unwrap();
285        result.unwrap();
286    }
287
288    #[tokio::test]
289    async fn test_post_statistics_increments_cardano_db_total_restoration_since_startup_metric() {
290        let method = Method::POST.as_str();
291        let path = "/statistics/snapshot";
292        let dependency_manager = Arc::new(initialize_dependencies!().await);
293        let initial_counter_value = dependency_manager
294            .metrics_service
295            .get_cardano_immutable_files_full_total_restoration_since_startup()
296            .get(&["TEST", "CLI"]);
297
298        request()
299            .method(method)
300            .json(&SnapshotDownloadMessage::dummy())
301            .path(path)
302            .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
303            .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
304            .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
305                dependency_manager.clone(),
306                &["TEST"],
307            )))
308            .await;
309
310        assert_eq!(
311            initial_counter_value + 1,
312            dependency_manager
313                .metrics_service
314                .get_cardano_immutable_files_full_total_restoration_since_startup()
315                .get(&["TEST", "CLI"])
316        );
317    }
318
319    mod post_cardano_database_immutable_files_restored {
320        use super::*;
321
322        const HTTP_METHOD: Method = Method::POST;
323        const PATH: &str = "/statistics/cardano-database/immutable-files-restored";
324
325        #[tokio::test]
326        async fn conform_to_open_api_when_created() {
327            let message = CardanoDatabaseImmutableFilesRestoredMessage::dummy();
328
329            let dependency_manager = Arc::new(initialize_dependencies!().await);
330            let response = request()
331                .method(HTTP_METHOD.as_str())
332                .json(&message)
333                .path(PATH)
334                .reply(&setup_router(RouterState::new_with_dummy_config(
335                    dependency_manager.clone(),
336                )))
337                .await;
338
339            let result = APISpec::verify_conformity(
340                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
341                HTTP_METHOD.as_str(),
342                PATH,
343                "application/json",
344                &message,
345                &response,
346                &StatusCode::CREATED,
347            );
348
349            result.unwrap();
350        }
351
352        #[tokio::test]
353        async fn increments_metric() {
354            let dependency_manager = Arc::new(initialize_dependencies!().await);
355            let metric_counter = dependency_manager
356                .metrics_service
357                .get_cardano_database_immutable_files_restored_since_startup();
358            let message = CardanoDatabaseImmutableFilesRestoredMessage {
359                nb_immutable_files: 3,
360            };
361
362            let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
363
364            request()
365                .method(HTTP_METHOD.as_str())
366                .json(&message)
367                .path(PATH)
368                .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
369                .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
370                .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
371                    dependency_manager.clone(),
372                    &["TEST"],
373                )))
374                .await;
375
376            assert_eq!(
377                initial_counter_value + 3,
378                metric_counter.get(&["TEST", "CLI"])
379            );
380        }
381    }
382
383    mod post_cardano_database_ancillary_files_restored {
384        use super::*;
385
386        const HTTP_METHOD: Method = Method::POST;
387        const PATH: &str = "/statistics/cardano-database/ancillary-files-restored";
388
389        #[tokio::test]
390        async fn conform_to_open_api_when_created() {
391            let dependency_manager = Arc::new(initialize_dependencies!().await);
392            let response = request()
393                .method(HTTP_METHOD.as_str())
394                .json(&Value::Null)
395                .path(PATH)
396                .reply(&setup_router(RouterState::new_with_dummy_config(
397                    dependency_manager.clone(),
398                )))
399                .await;
400
401            let result = APISpec::verify_conformity(
402                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
403                HTTP_METHOD.as_str(),
404                PATH,
405                "application/json",
406                &Value::Null,
407                &response,
408                &StatusCode::CREATED,
409            );
410
411            result.unwrap();
412        }
413
414        #[tokio::test]
415        async fn increments_metric() {
416            let dependency_manager = Arc::new(initialize_dependencies!().await);
417            let metric_counter = dependency_manager
418                .metrics_service
419                .get_cardano_database_ancillary_files_restored_since_startup();
420
421            let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
422
423            request()
424                .method(HTTP_METHOD.as_str())
425                .json(&Value::Null)
426                .path(PATH)
427                .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
428                .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
429                .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
430                    dependency_manager.clone(),
431                    &["TEST"],
432                )))
433                .await;
434
435            assert_eq!(
436                initial_counter_value + 1,
437                metric_counter.get(&["TEST", "CLI"])
438            );
439        }
440    }
441
442    async fn setup_dependencies(
443        snapshot_directory: PathBuf,
444    ) -> (
445        Arc<ServeCommandDependenciesContainer>,
446        UnboundedReceiver<EventMessage>,
447    ) {
448        let config = ServeCommandConfiguration::new_sample(snapshot_directory);
449        let mut builder = DependenciesBuilder::new_with_stdout_logger(Arc::new(config));
450        let rx = builder.get_event_transmitter_receiver().await.unwrap();
451        let dependencies_manager =
452            Arc::new(builder.build_serve_dependencies_container().await.unwrap());
453        (dependencies_manager, rx)
454    }
455
456    mod post_cardano_database_complete_restoration {
457        use super::*;
458
459        const HTTP_METHOD: Method = Method::POST;
460        const PATH: &str = "/statistics/cardano-database/complete-restoration";
461
462        #[tokio::test]
463        async fn conform_to_open_api_when_created() {
464            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
465
466            let response = request()
467                .method(HTTP_METHOD.as_str())
468                .json(&Value::Null)
469                .path(PATH)
470                .reply(&setup_router(RouterState::new_with_dummy_config(
471                    dependency_manager.clone(),
472                )))
473                .await;
474
475            let result = APISpec::verify_conformity(
476                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
477                HTTP_METHOD.as_str(),
478                PATH,
479                "application/json",
480                &Value::Null,
481                &response,
482                &StatusCode::CREATED,
483            );
484
485            result.unwrap();
486        }
487
488        #[tokio::test]
489        async fn should_conform_to_openapi_when_server_error() {
490            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
491            rx.close();
492
493            let response = request()
494                .method(HTTP_METHOD.as_str())
495                .json(&Value::Null)
496                .path(PATH)
497                .reply(&setup_router(RouterState::new_with_dummy_config(
498                    dependency_manager.clone(),
499                )))
500                .await;
501
502            APISpec::verify_conformity(
503                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
504                HTTP_METHOD.as_str(),
505                PATH,
506                "application/json",
507                &Value::Null,
508                &response,
509                &StatusCode::INTERNAL_SERVER_ERROR,
510            )
511            .unwrap();
512        }
513
514        #[tokio::test]
515        async fn should_send_event() {
516            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
517
518            request()
519                .method(HTTP_METHOD.as_str())
520                .json(&Value::Null)
521                .path(PATH)
522                .reply(&setup_router(RouterState::new_with_dummy_config(
523                    dependency_manager.clone(),
524                )))
525                .await;
526
527            let message = rx.try_recv().unwrap();
528            assert_eq!("HTTP::statistics", message.source);
529            assert_eq!("cardano_database_restoration", message.action);
530            assert_eq!("complete", message.content);
531        }
532
533        #[tokio::test]
534        async fn increments_metric() {
535            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
536            let metric_counter = dependency_manager
537                .metrics_service
538                .get_cardano_database_complete_restoration_since_startup();
539
540            let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
541
542            request()
543                .method(HTTP_METHOD.as_str())
544                .json(&Value::Null)
545                .path(PATH)
546                .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
547                .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
548                .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
549                    dependency_manager.clone(),
550                    &["TEST"],
551                )))
552                .await;
553
554            assert_eq!(
555                initial_counter_value + 1,
556                metric_counter.get(&["TEST", "CLI"])
557            );
558        }
559    }
560
561    mod post_cardano_database_partial_restoration {
562        use super::*;
563
564        const HTTP_METHOD: Method = Method::POST;
565        const PATH: &str = "/statistics/cardano-database/partial-restoration";
566
567        #[tokio::test]
568        async fn conform_to_open_api_when_created() {
569            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
570            let response = request()
571                .method(HTTP_METHOD.as_str())
572                .json(&Value::Null)
573                .path(PATH)
574                .reply(&setup_router(RouterState::new_with_dummy_config(
575                    dependency_manager.clone(),
576                )))
577                .await;
578
579            let result = APISpec::verify_conformity(
580                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
581                HTTP_METHOD.as_str(),
582                PATH,
583                "application/json",
584                &Value::Null,
585                &response,
586                &StatusCode::CREATED,
587            );
588
589            result.unwrap();
590        }
591
592        #[tokio::test]
593        async fn should_conform_to_openapi_when_server_error() {
594            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
595            rx.close();
596
597            let response = request()
598                .method(HTTP_METHOD.as_str())
599                .json(&Value::Null)
600                .path(PATH)
601                .reply(&setup_router(RouterState::new_with_dummy_config(
602                    dependency_manager.clone(),
603                )))
604                .await;
605
606            APISpec::verify_conformity(
607                APISpec::get_default_spec_file_from(crate::http_server::API_SPEC_LOCATION),
608                HTTP_METHOD.as_str(),
609                PATH,
610                "application/json",
611                &Value::Null,
612                &response,
613                &StatusCode::INTERNAL_SERVER_ERROR,
614            )
615            .unwrap();
616        }
617
618        #[tokio::test]
619        async fn should_send_event() {
620            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
621
622            request()
623                .method(HTTP_METHOD.as_str())
624                .json(&Value::Null)
625                .path(PATH)
626                .reply(&setup_router(RouterState::new_with_dummy_config(
627                    dependency_manager.clone(),
628                )))
629                .await;
630
631            let message = rx.try_recv().unwrap();
632            assert_eq!("HTTP::statistics", message.source);
633            assert_eq!("cardano_database_restoration", message.action);
634            assert_eq!("partial", message.content);
635        }
636
637        #[tokio::test]
638        async fn increments_metric() {
639            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
640            let metric_counter = dependency_manager
641                .metrics_service
642                .get_cardano_database_partial_restoration_since_startup();
643
644            let initial_counter_value = metric_counter.get(&["TEST", "CLI"]);
645
646            request()
647                .method(HTTP_METHOD.as_str())
648                .json(&Value::Null)
649                .path(PATH)
650                .header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
651                .header(MITHRIL_CLIENT_TYPE_HEADER, "CLI")
652                .reply(&setup_router(RouterState::new_with_origin_tag_white_list(
653                    dependency_manager.clone(),
654                    &["TEST"],
655                )))
656                .await;
657
658            assert_eq!(
659                initial_counter_value + 1,
660                metric_counter.get(&["TEST", "CLI"])
661            );
662        }
663    }
664}