mithril_aggregator/http_server/routes/
statistics_routes.rs1use warp::Filter;
2
3use crate::http_server::routes::middlewares;
4use crate::http_server::routes::router::RouterState;
5
6pub fn routes(
7 router_state: &RouterState,
8) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
9 post_statistics(router_state)
10 .or(post_cardano_database_immutable_files_restored(router_state))
11 .or(post_cardano_database_ancillary_files_restored(router_state))
12 .or(post_cardano_database_complete_restoration(router_state))
13 .or(post_cardano_database_partial_restoration(router_state))
14}
15
16fn post_statistics(
18 router_state: &RouterState,
19) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
20 warp::path!("statistics" / "snapshot")
21 .and(warp::post())
22 .and(warp::body::json())
23 .and(middlewares::with_logger(router_state))
24 .and(middlewares::with_event_transmitter(router_state))
25 .and(middlewares::with_metrics_service(router_state))
26 .and_then(handlers::post_snapshot_statistics)
27}
28
29fn post_cardano_database_immutable_files_restored(
31 router_state: &RouterState,
32) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
33 warp::path!("statistics" / "cardano-database" / "immutable-files-restored")
34 .and(warp::post())
35 .and(warp::body::json())
36 .and(middlewares::with_logger(router_state))
37 .and(middlewares::with_metrics_service(router_state))
38 .and_then(handlers::post_cardano_database_immutable_files_restored)
39}
40
41fn post_cardano_database_ancillary_files_restored(
43 router_state: &RouterState,
44) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
45 warp::path!("statistics" / "cardano-database" / "ancillary-files-restored")
46 .and(warp::post())
47 .and(middlewares::with_logger(router_state))
48 .and(middlewares::with_metrics_service(router_state))
49 .and_then(handlers::post_cardano_database_ancillary_files_restored)
50}
51
52fn post_cardano_database_complete_restoration(
54 router_state: &RouterState,
55) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
56 warp::path!("statistics" / "cardano-database" / "complete-restoration")
57 .and(warp::post())
58 .and(middlewares::with_logger(router_state))
59 .and(middlewares::with_event_transmitter(router_state))
60 .and(middlewares::with_metrics_service(router_state))
61 .and_then(handlers::post_cardano_database_complete_restoration)
62}
63
64fn post_cardano_database_partial_restoration(
66 router_state: &RouterState,
67) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
68 warp::path!("statistics" / "cardano-database" / "partial-restoration")
69 .and(warp::post())
70 .and(middlewares::with_logger(router_state))
71 .and(middlewares::with_event_transmitter(router_state))
72 .and(middlewares::with_metrics_service(router_state))
73 .and_then(handlers::post_cardano_database_partial_restoration)
74}
75
76mod handlers {
77 use slog::warn;
78 use std::{convert::Infallible, sync::Arc};
79 use warp::http::StatusCode;
80
81 use mithril_common::messages::{
82 CardanoDatabaseImmutableFilesRestoredMessage, SnapshotDownloadMessage,
83 };
84
85 use crate::event_store::{EventMessage, TransmitterService};
86 use crate::http_server::routes::reply;
87 use crate::MetricsService;
88
89 pub async fn post_snapshot_statistics(
90 snapshot_download_message: SnapshotDownloadMessage,
91 logger: slog::Logger,
92 event_transmitter: Arc<TransmitterService<EventMessage>>,
93 metrics_service: Arc<MetricsService>,
94 ) -> Result<impl warp::Reply, Infallible> {
95 metrics_service
96 .get_cardano_immutable_files_full_total_restoration_since_startup()
97 .increment();
98
99 let headers: Vec<(&str, &str)> = Vec::new();
100
101 let message = EventMessage::new(
102 "HTTP::statistics",
103 "snapshot_downloaded",
104 &snapshot_download_message,
105 headers,
106 );
107
108 match event_transmitter.try_send(message.clone()) {
109 Err(e) => {
110 warn!(logger, "Event message error"; "error" => ?e);
111 Ok(reply::internal_server_error(e))
112 }
113 Ok(_) => Ok(reply::empty(StatusCode::CREATED)),
114 }
115 }
116
117 pub async fn post_cardano_database_immutable_files_restored(
118 message: CardanoDatabaseImmutableFilesRestoredMessage,
119 _logger: slog::Logger,
120 metrics_service: Arc<MetricsService>,
121 ) -> Result<impl warp::Reply, Infallible> {
122 metrics_service
123 .get_cardano_database_immutable_files_restored_since_startup()
124 .increment_by(message.nb_immutable_files);
125
126 Ok(reply::empty(StatusCode::CREATED))
127 }
128
129 pub async fn post_cardano_database_ancillary_files_restored(
130 _logger: slog::Logger,
131 metrics_service: Arc<MetricsService>,
132 ) -> Result<impl warp::Reply, Infallible> {
133 metrics_service
134 .get_cardano_database_ancillary_files_restored_since_startup()
135 .increment();
136
137 Ok(reply::empty(StatusCode::CREATED))
138 }
139
140 pub async fn post_cardano_database_complete_restoration(
141 logger: slog::Logger,
142 event_transmitter: Arc<TransmitterService<EventMessage>>,
143 metrics_service: Arc<MetricsService>,
144 ) -> Result<impl warp::Reply, Infallible> {
145 metrics_service
146 .get_cardano_database_complete_restoration_since_startup()
147 .increment();
148
149 let headers: Vec<(&str, &str)> = Vec::new();
150 let message = EventMessage::new(
151 "HTTP::statistics",
152 "cardano_database_restoration",
153 &"complete".to_string(),
154 headers,
155 );
156
157 if let Err(e) = event_transmitter.try_send(message.clone()) {
158 warn!(logger, "Event message error"; "error" => ?e);
159 return Ok(reply::internal_server_error(e));
160 };
161
162 Ok(reply::empty(StatusCode::CREATED))
163 }
164
165 pub async fn post_cardano_database_partial_restoration(
166 logger: slog::Logger,
167 event_transmitter: Arc<TransmitterService<EventMessage>>,
168 metrics_service: Arc<MetricsService>,
169 ) -> Result<impl warp::Reply, Infallible> {
170 metrics_service
171 .get_cardano_database_partial_restoration_since_startup()
172 .increment();
173
174 let headers: Vec<(&str, &str)> = Vec::new();
175 let message = EventMessage::new(
176 "HTTP::statistics",
177 "cardano_database_restoration",
178 &"partial".to_string(),
179 headers,
180 );
181
182 if let Err(e) = event_transmitter.try_send(message.clone()) {
183 warn!(logger, "Event message error"; "error" => ?e);
184 return Ok(reply::internal_server_error(e));
185 };
186
187 Ok(reply::empty(StatusCode::CREATED))
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 use mithril_common::messages::{
196 CardanoDatabaseImmutableFilesRestoredMessage, SnapshotDownloadMessage,
197 };
198 use mithril_common::temp_dir;
199 use mithril_common::test_utils::apispec::APISpec;
200 use tokio::sync::mpsc::UnboundedReceiver;
201
202 use std::path::PathBuf;
203 use std::sync::Arc;
204 use warp::{
205 http::{Method, StatusCode},
206 test::request,
207 };
208
209 use crate::event_store::EventMessage;
210 use crate::DependencyContainer;
211 use crate::{
212 dependency_injection::DependenciesBuilder, initialize_dependencies, Configuration,
213 };
214
215 use serde_json::Value;
216
217 fn setup_router(
218 state: RouterState,
219 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
220 let cors = warp::cors()
221 .allow_any_origin()
222 .allow_headers(vec!["content-type"])
223 .allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS]);
224
225 warp::any().and(routes(&state).with(cors))
226 }
227
228 #[tokio::test]
229 async fn post_statistics_ok() {
230 let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
231 let snapshot_download_message = SnapshotDownloadMessage::dummy();
232
233 let method = Method::POST.as_str();
234 let path = "/statistics/snapshot";
235
236 let response = request()
237 .method(method)
238 .json(&snapshot_download_message)
239 .path(path)
240 .reply(&setup_router(RouterState::new_with_dummy_config(
241 dependency_manager,
242 )))
243 .await;
244
245 let result = APISpec::verify_conformity(
246 APISpec::get_all_spec_files(),
247 method,
248 path,
249 "application/json",
250 &snapshot_download_message,
251 &response,
252 &StatusCode::CREATED,
253 );
254
255 let _ = rx.try_recv().unwrap();
256 result.unwrap();
257 }
258
259 #[tokio::test]
260 async fn test_post_statistics_increments_cardano_db_total_restoration_since_startup_metric() {
261 let method = Method::POST.as_str();
262 let path = "/statistics/snapshot";
263 let dependency_manager = Arc::new(initialize_dependencies!().await);
264 let initial_counter_value = dependency_manager
265 .metrics_service
266 .get_cardano_immutable_files_full_total_restoration_since_startup()
267 .get();
268
269 request()
270 .method(method)
271 .json(&SnapshotDownloadMessage::dummy())
272 .path(path)
273 .reply(&setup_router(RouterState::new_with_dummy_config(
274 dependency_manager.clone(),
275 )))
276 .await;
277
278 assert_eq!(
279 initial_counter_value + 1,
280 dependency_manager
281 .metrics_service
282 .get_cardano_immutable_files_full_total_restoration_since_startup()
283 .get()
284 );
285 }
286
287 mod post_cardano_database_immutable_files_restored {
288 use super::*;
289
290 const HTTP_METHOD: Method = Method::POST;
291 const PATH: &str = "/statistics/cardano-database/immutable-files-restored";
292
293 #[tokio::test]
294 async fn conform_to_open_api_when_created() {
295 let message = CardanoDatabaseImmutableFilesRestoredMessage::dummy();
296
297 let dependency_manager = Arc::new(initialize_dependencies!().await);
298 let response = request()
299 .method(HTTP_METHOD.as_str())
300 .json(&message)
301 .path(PATH)
302 .reply(&setup_router(RouterState::new_with_dummy_config(
303 dependency_manager.clone(),
304 )))
305 .await;
306
307 let result = APISpec::verify_conformity(
308 APISpec::get_all_spec_files(),
309 HTTP_METHOD.as_str(),
310 PATH,
311 "application/json",
312 &message,
313 &response,
314 &StatusCode::CREATED,
315 );
316
317 result.unwrap();
318 }
319
320 #[tokio::test]
321 async fn increments_metric() {
322 let dependency_manager = Arc::new(initialize_dependencies!().await);
323 let metric_counter = dependency_manager
324 .metrics_service
325 .get_cardano_database_immutable_files_restored_since_startup();
326 let message = CardanoDatabaseImmutableFilesRestoredMessage {
327 nb_immutable_files: 3,
328 };
329
330 let initial_counter_value = metric_counter.get();
331
332 request()
333 .method(HTTP_METHOD.as_str())
334 .json(&message)
335 .path(PATH)
336 .reply(&setup_router(RouterState::new_with_dummy_config(
337 dependency_manager.clone(),
338 )))
339 .await;
340
341 assert_eq!(initial_counter_value + 3, metric_counter.get());
342 }
343 }
344
345 mod post_cardano_database_ancillary_files_restored {
346 use super::*;
347
348 const HTTP_METHOD: Method = Method::POST;
349 const PATH: &str = "/statistics/cardano-database/ancillary-files-restored";
350
351 #[tokio::test]
352 async fn conform_to_open_api_when_created() {
353 let dependency_manager = Arc::new(initialize_dependencies!().await);
354 let response = request()
355 .method(HTTP_METHOD.as_str())
356 .json(&Value::Null)
357 .path(PATH)
358 .reply(&setup_router(RouterState::new_with_dummy_config(
359 dependency_manager.clone(),
360 )))
361 .await;
362
363 let result = APISpec::verify_conformity(
364 APISpec::get_all_spec_files(),
365 HTTP_METHOD.as_str(),
366 PATH,
367 "application/json",
368 &Value::Null,
369 &response,
370 &StatusCode::CREATED,
371 );
372
373 result.unwrap();
374 }
375
376 #[tokio::test]
377 async fn increments_metric() {
378 let dependency_manager = Arc::new(initialize_dependencies!().await);
379 let metric_counter = dependency_manager
380 .metrics_service
381 .get_cardano_database_ancillary_files_restored_since_startup();
382
383 let initial_counter_value = metric_counter.get();
384
385 request()
386 .method(HTTP_METHOD.as_str())
387 .json(&Value::Null)
388 .path(PATH)
389 .reply(&setup_router(RouterState::new_with_dummy_config(
390 dependency_manager.clone(),
391 )))
392 .await;
393
394 assert_eq!(initial_counter_value + 1, metric_counter.get());
395 }
396 }
397
398 async fn setup_dependencies(
399 snapshot_directory: PathBuf,
400 ) -> (Arc<DependencyContainer>, UnboundedReceiver<EventMessage>) {
401 let config = Configuration::new_sample(snapshot_directory);
402 let mut builder = DependenciesBuilder::new_with_stdout_logger(config);
403 let rx = builder.get_event_transmitter_receiver().await.unwrap();
404 let dependency_manager = Arc::new(builder.build_dependency_container().await.unwrap());
405 (dependency_manager, rx)
406 }
407
408 mod post_cardano_database_complete_restoration {
409 use super::*;
410
411 const HTTP_METHOD: Method = Method::POST;
412 const PATH: &str = "/statistics/cardano-database/complete-restoration";
413
414 #[tokio::test]
415 async fn conform_to_open_api_when_created() {
416 let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
417
418 let response = request()
419 .method(HTTP_METHOD.as_str())
420 .json(&Value::Null)
421 .path(PATH)
422 .reply(&setup_router(RouterState::new_with_dummy_config(
423 dependency_manager.clone(),
424 )))
425 .await;
426
427 let result = APISpec::verify_conformity(
428 APISpec::get_all_spec_files(),
429 HTTP_METHOD.as_str(),
430 PATH,
431 "application/json",
432 &Value::Null,
433 &response,
434 &StatusCode::CREATED,
435 );
436
437 result.unwrap();
438 }
439
440 #[tokio::test]
441 async fn should_conform_to_openapi_when_server_error() {
442 let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
443 rx.close();
444
445 let response = request()
446 .method(HTTP_METHOD.as_str())
447 .json(&Value::Null)
448 .path(PATH)
449 .reply(&setup_router(RouterState::new_with_dummy_config(
450 dependency_manager.clone(),
451 )))
452 .await;
453
454 APISpec::verify_conformity(
455 APISpec::get_all_spec_files(),
456 HTTP_METHOD.as_str(),
457 PATH,
458 "application/json",
459 &Value::Null,
460 &response,
461 &StatusCode::INTERNAL_SERVER_ERROR,
462 )
463 .unwrap();
464 }
465
466 #[tokio::test]
467 async fn should_send_event() {
468 let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
469
470 request()
471 .method(HTTP_METHOD.as_str())
472 .json(&Value::Null)
473 .path(PATH)
474 .reply(&setup_router(RouterState::new_with_dummy_config(
475 dependency_manager.clone(),
476 )))
477 .await;
478
479 let message = rx.try_recv().unwrap();
480 assert_eq!("HTTP::statistics", message.source);
481 assert_eq!("cardano_database_restoration", message.action);
482 assert_eq!("complete", message.content);
483 }
484
485 #[tokio::test]
486 async fn increments_metric() {
487 let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
488 let metric_counter = dependency_manager
489 .metrics_service
490 .get_cardano_database_complete_restoration_since_startup();
491
492 let initial_counter_value = metric_counter.get();
493
494 request()
495 .method(HTTP_METHOD.as_str())
496 .json(&Value::Null)
497 .path(PATH)
498 .reply(&setup_router(RouterState::new_with_dummy_config(
499 dependency_manager.clone(),
500 )))
501 .await;
502
503 assert_eq!(initial_counter_value + 1, metric_counter.get());
504 }
505 }
506
507 mod post_cardano_database_partial_restoration {
508 use super::*;
509
510 const HTTP_METHOD: Method = Method::POST;
511 const PATH: &str = "/statistics/cardano-database/partial-restoration";
512
513 #[tokio::test]
514 async fn conform_to_open_api_when_created() {
515 let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
516 let response = request()
517 .method(HTTP_METHOD.as_str())
518 .json(&Value::Null)
519 .path(PATH)
520 .reply(&setup_router(RouterState::new_with_dummy_config(
521 dependency_manager.clone(),
522 )))
523 .await;
524
525 let result = APISpec::verify_conformity(
526 APISpec::get_all_spec_files(),
527 HTTP_METHOD.as_str(),
528 PATH,
529 "application/json",
530 &Value::Null,
531 &response,
532 &StatusCode::CREATED,
533 );
534
535 result.unwrap();
536 }
537
538 #[tokio::test]
539 async fn should_conform_to_openapi_when_server_error() {
540 let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
541 rx.close();
542
543 let response = request()
544 .method(HTTP_METHOD.as_str())
545 .json(&Value::Null)
546 .path(PATH)
547 .reply(&setup_router(RouterState::new_with_dummy_config(
548 dependency_manager.clone(),
549 )))
550 .await;
551
552 APISpec::verify_conformity(
553 APISpec::get_all_spec_files(),
554 HTTP_METHOD.as_str(),
555 PATH,
556 "application/json",
557 &Value::Null,
558 &response,
559 &StatusCode::INTERNAL_SERVER_ERROR,
560 )
561 .unwrap();
562 }
563
564 #[tokio::test]
565 async fn should_send_event() {
566 let (dependency_manager, mut rx) = setup_dependencies(temp_dir!()).await;
567
568 request()
569 .method(HTTP_METHOD.as_str())
570 .json(&Value::Null)
571 .path(PATH)
572 .reply(&setup_router(RouterState::new_with_dummy_config(
573 dependency_manager.clone(),
574 )))
575 .await;
576
577 let message = rx.try_recv().unwrap();
578 assert_eq!("HTTP::statistics", message.source);
579 assert_eq!("cardano_database_restoration", message.action);
580 assert_eq!("partial", message.content);
581 }
582
583 #[tokio::test]
584 async fn increments_metric() {
585 let (dependency_manager, _rx) = setup_dependencies(temp_dir!()).await;
586 let metric_counter = dependency_manager
587 .metrics_service
588 .get_cardano_database_partial_restoration_since_startup();
589
590 let initial_counter_value = metric_counter.get();
591
592 request()
593 .method(HTTP_METHOD.as_str())
594 .json(&Value::Null)
595 .path(PATH)
596 .reply(&setup_router(RouterState::new_with_dummy_config(
597 dependency_manager.clone(),
598 )))
599 .await;
600
601 assert_eq!(initial_counter_value + 1, metric_counter.get());
602 }
603 }
604}