mithril_aggregator/http_server/routes/
statistics_routes.rs

1use warp::Filter;
2
3use crate::http_server::routes::middlewares;
4use crate::http_server::routes::router::RouterState;
5
6pub fn routes(
7    router_state: &RouterState,
8) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
9    post_statistics(router_state)
10        .or(post_cardano_database_immutable_files_restored(router_state))
11        .or(post_cardano_database_ancillary_files_restored(router_state))
12        .or(post_cardano_database_complete_restoration(router_state))
13        .or(post_cardano_database_partial_restoration(router_state))
14}
15
16/// POST /statistics/snapshot
17fn post_statistics(
18    router_state: &RouterState,
19) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
20    warp::path!("statistics" / "snapshot")
21        .and(warp::post())
22        .and(warp::body::json())
23        .and(middlewares::with_logger(router_state))
24        .and(middlewares::with_event_transmitter(router_state))
25        .and(middlewares::with_metrics_service(router_state))
26        .and_then(handlers::post_snapshot_statistics)
27}
28
29/// POST /statistics/cardano-database/immutable-files-restored
30fn post_cardano_database_immutable_files_restored(
31    router_state: &RouterState,
32) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
33    warp::path!("statistics" / "cardano-database" / "immutable-files-restored")
34        .and(warp::post())
35        .and(warp::body::json())
36        .and(middlewares::with_logger(router_state))
37        .and(middlewares::with_metrics_service(router_state))
38        .and_then(handlers::post_cardano_database_immutable_files_restored)
39}
40
41/// POST /statistics/cardano-database/ancillary-files-restored
42fn post_cardano_database_ancillary_files_restored(
43    router_state: &RouterState,
44) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
45    warp::path!("statistics" / "cardano-database" / "ancillary-files-restored")
46        .and(warp::post())
47        .and(middlewares::with_logger(router_state))
48        .and(middlewares::with_metrics_service(router_state))
49        .and_then(handlers::post_cardano_database_ancillary_files_restored)
50}
51
52/// POST /statistics/cardano-database/complete-restoration
53fn post_cardano_database_complete_restoration(
54    router_state: &RouterState,
55) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
56    warp::path!("statistics" / "cardano-database" / "complete-restoration")
57        .and(warp::post())
58        .and(middlewares::with_logger(router_state))
59        .and(middlewares::with_event_transmitter(router_state))
60        .and(middlewares::with_metrics_service(router_state))
61        .and_then(handlers::post_cardano_database_complete_restoration)
62}
63
64/// POST /statistics/cardano-database/partial-restoration
65fn post_cardano_database_partial_restoration(
66    router_state: &RouterState,
67) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
68    warp::path!("statistics" / "cardano-database" / "partial-restoration")
69        .and(warp::post())
70        .and(middlewares::with_logger(router_state))
71        .and(middlewares::with_event_transmitter(router_state))
72        .and(middlewares::with_metrics_service(router_state))
73        .and_then(handlers::post_cardano_database_partial_restoration)
74}
75
76mod handlers {
77    use slog::warn;
78    use std::{convert::Infallible, sync::Arc};
79    use warp::http::StatusCode;
80
81    use mithril_common::messages::{
82        CardanoDatabaseImmutableFilesRestoredMessage, SnapshotDownloadMessage,
83    };
84
85    use crate::event_store::{EventMessage, TransmitterService};
86    use crate::http_server::routes::reply;
87    use crate::MetricsService;
88
89    pub async fn post_snapshot_statistics(
90        snapshot_download_message: SnapshotDownloadMessage,
91        logger: slog::Logger,
92        event_transmitter: Arc<TransmitterService<EventMessage>>,
93        metrics_service: Arc<MetricsService>,
94    ) -> Result<impl warp::Reply, Infallible> {
95        metrics_service
96            .get_cardano_immutable_files_full_total_restoration_since_startup()
97            .increment();
98
99        let headers: Vec<(&str, &str)> = Vec::new();
100
101        let message = EventMessage::new(
102            "HTTP::statistics",
103            "snapshot_downloaded",
104            &snapshot_download_message,
105            headers,
106        );
107
108        match event_transmitter.try_send(message.clone()) {
109            Err(e) => {
110                warn!(logger, "Event message error"; "error" => ?e);
111                Ok(reply::internal_server_error(e))
112            }
113            Ok(_) => Ok(reply::empty(StatusCode::CREATED)),
114        }
115    }
116
117    pub async fn post_cardano_database_immutable_files_restored(
118        message: CardanoDatabaseImmutableFilesRestoredMessage,
119        _logger: slog::Logger,
120        metrics_service: Arc<MetricsService>,
121    ) -> Result<impl warp::Reply, Infallible> {
122        metrics_service
123            .get_cardano_database_immutable_files_restored_since_startup()
124            .increment_by(message.nb_immutable_files);
125
126        Ok(reply::empty(StatusCode::CREATED))
127    }
128
129    pub async fn post_cardano_database_ancillary_files_restored(
130        _logger: slog::Logger,
131        metrics_service: Arc<MetricsService>,
132    ) -> Result<impl warp::Reply, Infallible> {
133        metrics_service
134            .get_cardano_database_ancillary_files_restored_since_startup()
135            .increment();
136
137        Ok(reply::empty(StatusCode::CREATED))
138    }
139
140    pub async fn post_cardano_database_complete_restoration(
141        logger: slog::Logger,
142        event_transmitter: Arc<TransmitterService<EventMessage>>,
143        metrics_service: Arc<MetricsService>,
144    ) -> Result<impl warp::Reply, Infallible> {
145        metrics_service
146            .get_cardano_database_complete_restoration_since_startup()
147            .increment();
148
149        let headers: Vec<(&str, &str)> = Vec::new();
150        let message = EventMessage::new(
151            "HTTP::statistics",
152            "cardano_database_restoration",
153            &"complete".to_string(),
154            headers,
155        );
156
157        if let Err(e) = event_transmitter.try_send(message.clone()) {
158            warn!(logger, "Event message error"; "error" => ?e);
159            return Ok(reply::internal_server_error(e));
160        };
161
162        Ok(reply::empty(StatusCode::CREATED))
163    }
164
165    pub async fn post_cardano_database_partial_restoration(
166        logger: slog::Logger,
167        event_transmitter: Arc<TransmitterService<EventMessage>>,
168        metrics_service: Arc<MetricsService>,
169    ) -> Result<impl warp::Reply, Infallible> {
170        metrics_service
171            .get_cardano_database_partial_restoration_since_startup()
172            .increment();
173
174        let headers: Vec<(&str, &str)> = Vec::new();
175        let message = EventMessage::new(
176            "HTTP::statistics",
177            "cardano_database_restoration",
178            &"partial".to_string(),
179            headers,
180        );
181
182        if let Err(e) = event_transmitter.try_send(message.clone()) {
183            warn!(logger, "Event message error"; "error" => ?e);
184            return Ok(reply::internal_server_error(e));
185        };
186
187        Ok(reply::empty(StatusCode::CREATED))
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    use mithril_common::messages::{
196        CardanoDatabaseImmutableFilesRestoredMessage, SnapshotDownloadMessage,
197    };
198    use mithril_common::temp_dir;
199    use mithril_common::test_utils::apispec::APISpec;
200    use tokio::sync::mpsc::UnboundedReceiver;
201
202    use std::path::PathBuf;
203    use std::sync::Arc;
204    use warp::{
205        http::{Method, StatusCode},
206        test::request,
207    };
208
209    use crate::event_store::EventMessage;
210    use crate::DependencyContainer;
211    use crate::{
212        dependency_injection::DependenciesBuilder, initialize_dependencies, Configuration,
213    };
214
215    use serde_json::Value;
216
217    fn setup_router(
218        state: RouterState,
219    ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
220        let cors = warp::cors()
221            .allow_any_origin()
222            .allow_headers(vec!["content-type"])
223            .allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS]);
224
225        warp::any().and(routes(&state).with(cors))
226    }
227
228    #[tokio::test]
229    async fn post_statistics_ok() {
230        let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
231        let snapshot_download_message = SnapshotDownloadMessage::dummy();
232
233        let method = Method::POST.as_str();
234        let path = "/statistics/snapshot";
235
236        let response = request()
237            .method(method)
238            .json(&snapshot_download_message)
239            .path(path)
240            .reply(&setup_router(RouterState::new_with_dummy_config(
241                dependency_manager,
242            )))
243            .await;
244
245        let result = APISpec::verify_conformity(
246            APISpec::get_all_spec_files(),
247            method,
248            path,
249            "application/json",
250            &snapshot_download_message,
251            &response,
252            &StatusCode::CREATED,
253        );
254
255        let _ = rx.try_recv().unwrap();
256        result.unwrap();
257    }
258
259    #[tokio::test]
260    async fn test_post_statistics_increments_cardano_db_total_restoration_since_startup_metric() {
261        let method = Method::POST.as_str();
262        let path = "/statistics/snapshot";
263        let dependency_manager = Arc::new(initialize_dependencies!().await);
264        let initial_counter_value = dependency_manager
265            .metrics_service
266            .get_cardano_immutable_files_full_total_restoration_since_startup()
267            .get();
268
269        request()
270            .method(method)
271            .json(&SnapshotDownloadMessage::dummy())
272            .path(path)
273            .reply(&setup_router(RouterState::new_with_dummy_config(
274                dependency_manager.clone(),
275            )))
276            .await;
277
278        assert_eq!(
279            initial_counter_value + 1,
280            dependency_manager
281                .metrics_service
282                .get_cardano_immutable_files_full_total_restoration_since_startup()
283                .get()
284        );
285    }
286
287    mod post_cardano_database_immutable_files_restored {
288        use super::*;
289
290        const HTTP_METHOD: Method = Method::POST;
291        const PATH: &str = "/statistics/cardano-database/immutable-files-restored";
292
293        #[tokio::test]
294        async fn conform_to_open_api_when_created() {
295            let message = CardanoDatabaseImmutableFilesRestoredMessage::dummy();
296
297            let dependency_manager = Arc::new(initialize_dependencies!().await);
298            let response = request()
299                .method(HTTP_METHOD.as_str())
300                .json(&message)
301                .path(PATH)
302                .reply(&setup_router(RouterState::new_with_dummy_config(
303                    dependency_manager.clone(),
304                )))
305                .await;
306
307            let result = APISpec::verify_conformity(
308                APISpec::get_all_spec_files(),
309                HTTP_METHOD.as_str(),
310                PATH,
311                "application/json",
312                &message,
313                &response,
314                &StatusCode::CREATED,
315            );
316
317            result.unwrap();
318        }
319
320        #[tokio::test]
321        async fn increments_metric() {
322            let dependency_manager = Arc::new(initialize_dependencies!().await);
323            let metric_counter = dependency_manager
324                .metrics_service
325                .get_cardano_database_immutable_files_restored_since_startup();
326            let message = CardanoDatabaseImmutableFilesRestoredMessage {
327                nb_immutable_files: 3,
328            };
329
330            let initial_counter_value = metric_counter.get();
331
332            request()
333                .method(HTTP_METHOD.as_str())
334                .json(&message)
335                .path(PATH)
336                .reply(&setup_router(RouterState::new_with_dummy_config(
337                    dependency_manager.clone(),
338                )))
339                .await;
340
341            assert_eq!(initial_counter_value + 3, metric_counter.get());
342        }
343    }
344
345    mod post_cardano_database_ancillary_files_restored {
346        use super::*;
347
348        const HTTP_METHOD: Method = Method::POST;
349        const PATH: &str = "/statistics/cardano-database/ancillary-files-restored";
350
351        #[tokio::test]
352        async fn conform_to_open_api_when_created() {
353            let dependency_manager = Arc::new(initialize_dependencies!().await);
354            let response = request()
355                .method(HTTP_METHOD.as_str())
356                .json(&Value::Null)
357                .path(PATH)
358                .reply(&setup_router(RouterState::new_with_dummy_config(
359                    dependency_manager.clone(),
360                )))
361                .await;
362
363            let result = APISpec::verify_conformity(
364                APISpec::get_all_spec_files(),
365                HTTP_METHOD.as_str(),
366                PATH,
367                "application/json",
368                &Value::Null,
369                &response,
370                &StatusCode::CREATED,
371            );
372
373            result.unwrap();
374        }
375
376        #[tokio::test]
377        async fn increments_metric() {
378            let dependency_manager = Arc::new(initialize_dependencies!().await);
379            let metric_counter = dependency_manager
380                .metrics_service
381                .get_cardano_database_ancillary_files_restored_since_startup();
382
383            let initial_counter_value = metric_counter.get();
384
385            request()
386                .method(HTTP_METHOD.as_str())
387                .json(&Value::Null)
388                .path(PATH)
389                .reply(&setup_router(RouterState::new_with_dummy_config(
390                    dependency_manager.clone(),
391                )))
392                .await;
393
394            assert_eq!(initial_counter_value + 1, metric_counter.get());
395        }
396    }
397
398    async fn setup_dependencies(
399        snapshot_directory: PathBuf,
400    ) -> (Arc<DependencyContainer>, UnboundedReceiver<EventMessage>) {
401        let config = Configuration::new_sample(snapshot_directory);
402        let mut builder = DependenciesBuilder::new_with_stdout_logger(config);
403        let rx = builder.get_event_transmitter_receiver().await.unwrap();
404        let dependency_manager = Arc::new(builder.build_dependency_container().await.unwrap());
405        (dependency_manager, rx)
406    }
407
408    mod post_cardano_database_complete_restoration {
409        use super::*;
410
411        const HTTP_METHOD: Method = Method::POST;
412        const PATH: &str = "/statistics/cardano-database/complete-restoration";
413
414        #[tokio::test]
415        async fn conform_to_open_api_when_created() {
416            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
417
418            let response = request()
419                .method(HTTP_METHOD.as_str())
420                .json(&Value::Null)
421                .path(PATH)
422                .reply(&setup_router(RouterState::new_with_dummy_config(
423                    dependency_manager.clone(),
424                )))
425                .await;
426
427            let result = APISpec::verify_conformity(
428                APISpec::get_all_spec_files(),
429                HTTP_METHOD.as_str(),
430                PATH,
431                "application/json",
432                &Value::Null,
433                &response,
434                &StatusCode::CREATED,
435            );
436
437            result.unwrap();
438        }
439
440        #[tokio::test]
441        async fn should_conform_to_openapi_when_server_error() {
442            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
443            rx.close();
444
445            let response = request()
446                .method(HTTP_METHOD.as_str())
447                .json(&Value::Null)
448                .path(PATH)
449                .reply(&setup_router(RouterState::new_with_dummy_config(
450                    dependency_manager.clone(),
451                )))
452                .await;
453
454            APISpec::verify_conformity(
455                APISpec::get_all_spec_files(),
456                HTTP_METHOD.as_str(),
457                PATH,
458                "application/json",
459                &Value::Null,
460                &response,
461                &StatusCode::INTERNAL_SERVER_ERROR,
462            )
463            .unwrap();
464        }
465
466        #[tokio::test]
467        async fn should_send_event() {
468            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
469
470            request()
471                .method(HTTP_METHOD.as_str())
472                .json(&Value::Null)
473                .path(PATH)
474                .reply(&setup_router(RouterState::new_with_dummy_config(
475                    dependency_manager.clone(),
476                )))
477                .await;
478
479            let message = rx.try_recv().unwrap();
480            assert_eq!("HTTP::statistics", message.source);
481            assert_eq!("cardano_database_restoration", message.action);
482            assert_eq!("complete", message.content);
483        }
484
485        #[tokio::test]
486        async fn increments_metric() {
487            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
488            let metric_counter = dependency_manager
489                .metrics_service
490                .get_cardano_database_complete_restoration_since_startup();
491
492            let initial_counter_value = metric_counter.get();
493
494            request()
495                .method(HTTP_METHOD.as_str())
496                .json(&Value::Null)
497                .path(PATH)
498                .reply(&setup_router(RouterState::new_with_dummy_config(
499                    dependency_manager.clone(),
500                )))
501                .await;
502
503            assert_eq!(initial_counter_value + 1, metric_counter.get());
504        }
505    }
506
507    mod post_cardano_database_partial_restoration {
508        use super::*;
509
510        const HTTP_METHOD: Method = Method::POST;
511        const PATH: &str = "/statistics/cardano-database/partial-restoration";
512
513        #[tokio::test]
514        async fn conform_to_open_api_when_created() {
515            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
516            let response = request()
517                .method(HTTP_METHOD.as_str())
518                .json(&Value::Null)
519                .path(PATH)
520                .reply(&setup_router(RouterState::new_with_dummy_config(
521                    dependency_manager.clone(),
522                )))
523                .await;
524
525            let result = APISpec::verify_conformity(
526                APISpec::get_all_spec_files(),
527                HTTP_METHOD.as_str(),
528                PATH,
529                "application/json",
530                &Value::Null,
531                &response,
532                &StatusCode::CREATED,
533            );
534
535            result.unwrap();
536        }
537
538        #[tokio::test]
539        async fn should_conform_to_openapi_when_server_error() {
540            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
541            rx.close();
542
543            let response = request()
544                .method(HTTP_METHOD.as_str())
545                .json(&Value::Null)
546                .path(PATH)
547                .reply(&setup_router(RouterState::new_with_dummy_config(
548                    dependency_manager.clone(),
549                )))
550                .await;
551
552            APISpec::verify_conformity(
553                APISpec::get_all_spec_files(),
554                HTTP_METHOD.as_str(),
555                PATH,
556                "application/json",
557                &Value::Null,
558                &response,
559                &StatusCode::INTERNAL_SERVER_ERROR,
560            )
561            .unwrap();
562        }
563
564        #[tokio::test]
565        async fn should_send_event() {
566            let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
567
568            request()
569                .method(HTTP_METHOD.as_str())
570                .json(&Value::Null)
571                .path(PATH)
572                .reply(&setup_router(RouterState::new_with_dummy_config(
573                    dependency_manager.clone(),
574                )))
575                .await;
576
577            let message = rx.try_recv().unwrap();
578            assert_eq!("HTTP::statistics", message.source);
579            assert_eq!("cardano_database_restoration", message.action);
580            assert_eq!("partial", message.content);
581        }
582
583        #[tokio::test]
584        async fn increments_metric() {
585            let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
586            let metric_counter = dependency_manager
587                .metrics_service
588                .get_cardano_database_partial_restoration_since_startup();
589
590            let initial_counter_value = metric_counter.get();
591
592            request()
593                .method(HTTP_METHOD.as_str())
594                .json(&Value::Null)
595                .path(PATH)
596                .reply(&setup_router(RouterState::new_with_dummy_config(
597                    dependency_manager.clone(),
598                )))
599                .await;
600
601            assert_eq!(initial_counter_value + 1, metric_counter.get());
602        }
603    }
604}