1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use crate::metrics::MetricLabelValueMap;
4use crate::{
5 event_store::{EventMessage, TransmitterService},
6 MetricsService,
7};
8use chrono::{DateTime, Utc};
9use mithril_common::logging::LoggerExtensions;
10use serde::{Deserialize, Serialize};
11use slog::{info, Logger};
12
13#[derive(Serialize, Deserialize)]
15struct MetricEventMessage {
16 name: String,
18 value: i64,
20 period: Duration,
22 origin: String,
24 client_type: String,
26 date: DateTime<Utc>,
28}
29pub struct UsageReporter {
31 transmitter_service: Arc<TransmitterService<EventMessage>>,
32 metrics_service: Arc<MetricsService>,
33 last_reported_metrics: HashMap<String, HashMap<String, MetricLabelValueMap>>,
34 logger: Logger,
35}
36
37impl UsageReporter {
38 pub fn new(
40 transmitter_service: Arc<TransmitterService<EventMessage>>,
41 metrics_service: Arc<MetricsService>,
42 logger: Logger,
43 ) -> Self {
44 Self {
45 transmitter_service,
46 metrics_service,
47 last_reported_metrics: HashMap::new(),
48 logger: logger.new_with_component_name::<Self>(),
49 }
50 }
51
52 fn compute_metrics_delta(
53 metrics_before: Option<&HashMap<String, MetricLabelValueMap>>,
54 metrics_after: &HashMap<String, MetricLabelValueMap>,
55 ) -> HashMap<String, MetricLabelValueMap> {
56 metrics_after
57 .iter()
58 .filter_map(|(labels_values, metric)| {
59 let last_value = metrics_before.and_then(|m| m.get(labels_values));
60 let delta = match last_value {
61 Some(last) => (metric.counter as i64) - (last.counter as i64),
62 None => metric.counter as i64,
63 };
64 if delta != 0 {
65 let mut delta_metric = metric.clone();
66 delta_metric.counter = delta as i32;
67 Some((labels_values.clone(), delta_metric))
68 } else {
69 None
70 }
71 })
72 .collect()
73 }
74
75 fn compute_metrics_delta_with_label(
76 metrics_before: &HashMap<String, HashMap<String, MetricLabelValueMap>>,
77 metrics_after: &HashMap<String, HashMap<String, MetricLabelValueMap>>,
78 ) -> HashMap<String, HashMap<String, MetricLabelValueMap>> {
79 metrics_after
80 .iter()
81 .map(|(name, label_values)| {
82 (
83 name.clone(),
84 UsageReporter::compute_metrics_delta(metrics_before.get(name), label_values),
85 )
86 })
87 .filter(|(_name, value)| !value.is_empty())
88 .collect()
89 }
90
91 fn send_metrics(&mut self, duration: &Duration) {
92 let metrics = self.metrics_service.export_metrics_map();
93 let delta = Self::compute_metrics_delta_with_label(&self.last_reported_metrics, &metrics);
94 let date = Utc::now();
95
96 self.last_reported_metrics = metrics;
97
98 for (name, metrics_map) in delta {
99 for metric_map in metrics_map.values() {
100 let message = Self::create_metrics_event_message(
101 name.clone(),
102 metric_map.counter as i64,
103 *duration,
104 metric_map
105 .label_value_map
106 .get("origin_tag")
107 .cloned()
108 .unwrap_or_default(),
109 metric_map
110 .label_value_map
111 .get("client_type")
112 .cloned()
113 .unwrap_or_default(),
114 date,
115 );
116 self.transmitter_service.send(message);
117 }
118 }
119 }
120
121 pub async fn run_forever(&mut self, run_interval: Duration) {
123 let mut interval = tokio::time::interval(run_interval);
124
125 loop {
126 interval.tick().await;
127 self.send_metrics(&run_interval);
128
129 info!(
130 self.logger,
131 "Metrics sent, Sleeping for {} seconds",
132 run_interval.as_secs()
133 );
134 }
135 }
136
137 pub fn create_metrics_event_message(
139 name: String,
140 value: i64,
141 period: Duration,
142 origin: String,
143 client_type: String,
144 date: DateTime<Utc>,
145 ) -> EventMessage {
146 EventMessage::new(
147 "Metrics",
148 &name.clone(),
149 &MetricEventMessage {
150 name,
151 value,
152 period,
153 origin,
154 client_type,
155 date,
156 },
157 vec![],
158 )
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use mithril_metric::MetricCollector;
165 use tokio::sync::mpsc::UnboundedReceiver;
166
167 use super::UsageReporter;
168 use super::*;
169 use crate::test_tools::TestLogger;
170
171 fn build_usage_reporter() -> (
172 UsageReporter,
173 Arc<MetricsService>,
174 UnboundedReceiver<EventMessage>,
175 ) {
176 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
177 let metrics_service = Arc::new(MetricsService::new(TestLogger::stdout()).unwrap());
178 let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));
179 let usage_reporter = UsageReporter::new(
180 transmitter_service.clone(),
181 metrics_service.clone(),
182 TestLogger::stdout(),
183 );
184 (usage_reporter, metrics_service, rx)
185 }
186
187 fn received_messages(
188 rx: &mut tokio::sync::mpsc::UnboundedReceiver<EventMessage>,
189 ) -> Vec<EventMessage> {
190 let mut received_messages: Vec<EventMessage> = Vec::new();
191 while let Ok(message) = rx.try_recv() {
192 received_messages.push(message);
193 }
194 received_messages
195 }
196
197 fn extract_metric_value(message: &EventMessage) -> (String, i64) {
198 let metric_delta: MetricEventMessage =
199 serde_json::from_value(message.content.clone()).unwrap();
200 (message.action.clone(), metric_delta.value)
201 }
202
203 #[test]
204 fn when_no_metrics_no_message_sent() {
205 let (mut usage_reporter, _metrics_service, mut rx) = build_usage_reporter();
206
207 usage_reporter.send_metrics(&Duration::from_secs(10));
208
209 let received_messages = received_messages(&mut rx);
210 assert_eq!(0, received_messages.len());
211 }
212
213 #[test]
214 fn verify_event_content_on_a_metric() {
215 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
216
217 let metric = metrics_service.get_certificate_total_produced_since_startup();
218 metric.increment_by(3);
219 usage_reporter.send_metrics(&Duration::from_secs(10));
220
221 let received_messages = received_messages(&mut rx);
222 let message = &received_messages[0];
223 assert_eq!(message.source, "Metrics");
224 assert_eq!(message.action, metric.name());
225 let message_content: MetricEventMessage =
226 serde_json::from_value(message.content.clone()).unwrap();
227 assert_eq!(3, message_content.value);
228 assert_eq!(Duration::from_secs(10), message_content.period);
229 assert_eq!("".to_string(), message_content.origin);
230 }
231
232 #[test]
233 fn verify_event_content_on_a_metric_with_label() {
234 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
235
236 let metric = metrics_service.get_certificate_detail_total_served_since_startup();
237 metric.increment_by(&["ORIGIN_A", "CLIENT_TYPE"], 3);
238 usage_reporter.send_metrics(&Duration::from_secs(10));
239
240 let received_messages = received_messages(&mut rx);
241 let message = &received_messages[0];
242 assert_eq!(message.source, "Metrics");
243 assert_eq!(message.action, metric.name());
244 let message_content: MetricEventMessage =
245 serde_json::from_value(message.content.clone()).unwrap();
246 assert_eq!(3, message_content.value);
247 assert_eq!(Duration::from_secs(10), message_content.period);
248 assert_eq!(
249 "CLIENT_TYPE,ORIGIN_A",
250 format!("{},{}", message_content.client_type, message_content.origin)
251 );
252 }
253
254 #[test]
255 fn send_one_message_per_origin() {
256 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
257
258 let metric = metrics_service.get_certificate_detail_total_served_since_startup();
259 metric.increment_by(&["ORIGIN_A", "CLIENT_TYPE"], 3);
260 metric.increment_by(&["ORIGIN_B", "CLIENT_TYPE"], 7);
261 usage_reporter.send_metrics(&Duration::from_secs(10));
262
263 let received_messages = received_messages(&mut rx);
264 assert_eq!(2, received_messages.len());
265 let received_messages: HashMap<_, _> = received_messages
266 .iter()
267 .map(|message| {
268 let event: MetricEventMessage =
269 serde_json::from_value(message.content.clone()).unwrap();
270 (
271 format!("{},{}", event.client_type, event.origin),
272 event.value,
273 )
274 })
275 .collect();
276 assert_eq!(Some(&3), received_messages.get("CLIENT_TYPE,ORIGIN_A"));
277 assert_eq!(Some(&7), received_messages.get("CLIENT_TYPE,ORIGIN_B"));
278 }
279
280 #[test]
281 fn send_one_message_for_each_non_zero_value_metrics() {
282 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
283
284 let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
285 let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
286 metric_1.increment();
287 metric_2.increment(&["ORIGIN", "CLIENT_TYPE"]);
288
289 usage_reporter.send_metrics(&Duration::from_secs(10));
290
291 let received_messages = received_messages(&mut rx);
292 let metric_values: Vec<(String, i64)> =
293 received_messages.iter().map(extract_metric_value).collect();
294
295 assert!(metric_values.contains(&(metric_1.name(), 1)));
296 assert!(metric_values.contains(&(metric_2.name(), 1)));
297 }
298
299 #[test]
300 fn resend_only_delta_since_last_send() {
301 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
302
303 let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
304 let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
305
306 {
307 metric_1.increment_by(12);
308 metric_2.increment_by(&["ORIGIN", "CLIENT_TYPE"], 5);
309 usage_reporter.send_metrics(&Duration::from_secs(10));
310
311 let received_messages = received_messages(&mut rx);
312 assert_eq!(2, received_messages.len());
313 }
314 {
315 metric_2.increment_by(&["ORIGIN", "CLIENT_TYPE"], 20);
316 metric_2.increment_by(&["ORIGIN", "CLIENT_TYPE"], 33);
317 usage_reporter.send_metrics(&Duration::from_secs(10));
318
319 let received_messages = received_messages(&mut rx);
320 assert_eq!(1, received_messages.len());
321
322 assert_eq!(
323 (metric_2.name(), 53),
324 extract_metric_value(&received_messages[0])
325 );
326 }
327 {
328 metric_2.increment_by(&["ORIGIN", "CLIENT_TYPE"], 15);
329 usage_reporter.send_metrics(&Duration::from_secs(10));
330
331 let received_messages = received_messages(&mut rx);
332 assert_eq!(1, received_messages.len());
333
334 assert_eq!(
335 (metric_2.name(), 15),
336 extract_metric_value(&received_messages[0])
337 );
338 }
339 }
340
341 mod metric_delta {
342 use super::*;
343
344 fn build_hashmap<T: Clone>(
345 values: &[(&str, &[(&str, T)])],
346 ) -> HashMap<String, HashMap<String, T>> {
347 values
348 .iter()
349 .map(|(name, origin_values)| {
350 let value_per_label: HashMap<String, T> = origin_values
351 .iter()
352 .map(|(origin, value)| (origin.to_string(), value.clone()))
353 .collect();
354 (name.to_string(), value_per_label)
355 })
356 .collect()
357 }
358
359 fn build_metric_with_origin(origin: &str, value: i32) -> MetricLabelValueMap {
360 let mut metric = MetricLabelValueMap::default();
361 metric
362 .label_value_map
363 .insert("origin_tag".to_string(), origin.to_string());
364 metric.counter = value;
365 metric
366 }
367
368 #[test]
369 fn should_not_contain_metric_that_not_change() {
370 let metric1 = build_metric_with_origin("CLI", 1);
371 let metric2 = build_metric_with_origin("CLI", 1);
372
373 let metrics_before = build_hashmap(&[("metric_a", &[("CLI", metric1)])]);
374 let metrics_after = build_hashmap(&[("metric_a", &[("CLI", metric2)])]);
375 let expected = build_hashmap(&[]);
376
377 assert_eq!(
378 expected,
379 UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
380 );
381 }
382
383 #[test]
384 fn should_contain_the_difference_of_an_increased_metric() {
385 let metric1 = build_metric_with_origin("CLI", 1);
386 let metric2 = build_metric_with_origin("CLI", 5);
387 let expected_metric = build_metric_with_origin("CLI", 4);
388
389 let metrics_before = build_hashmap(&[("metric_a", &[("CLI", metric1)])]);
390 let metrics_after = build_hashmap(&[("metric_a", &[("CLI", metric2)])]);
391 let expected = build_hashmap(&[("metric_a", &[("CLI", expected_metric)])]);
392
393 assert_eq!(
394 expected,
395 UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
396 );
397 }
398
399 #[test]
400 fn should_contain_the_difference_of_a_decreased_metric() {
401 let metric1 = build_metric_with_origin("CLI", 5);
403 let metric2 = build_metric_with_origin("CLI", 2);
404 let expected_metric = build_metric_with_origin("CLI", -3);
405
406 let metrics_before = build_hashmap(&[("metric_a", &[("origin_1", metric1)])]);
407 let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", metric2)])]);
408 let expected = build_hashmap(&[("metric_a", &[("origin_1", expected_metric)])]);
409
410 assert_eq!(
411 expected,
412 UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
413 );
414 }
415
416 #[test]
417 fn should_contain_new_value_of_a_metric_not_present_before() {
418 let metric = build_metric_with_origin("CLI", 5);
419 let expected_metric = build_metric_with_origin("CLI", 5);
420
421 let metrics_before = build_hashmap(&[]);
422 let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", metric)])]);
423 let expected = build_hashmap(&[("metric_a", &[("origin_1", expected_metric)])]);
424
425 assert_eq!(
426 expected,
427 UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
428 );
429 }
430
431 #[test]
432 fn should_not_panic_with_a_large_delta() {
433 let metrics_at_0 = build_hashmap(&[(
434 "metric_a",
435 &[("origin_1", build_metric_with_origin("CLI", 0))],
436 )]);
437 let metrics_at_max = build_hashmap(&[(
438 "metric_a",
439 &[("origin_1", build_metric_with_origin("CLI", i32::MAX))],
440 )]);
441
442 assert_eq!(
443 build_hashmap(&[(
444 "metric_a",
445 &[("origin_1", build_metric_with_origin("CLI", i32::MAX))]
446 )]),
447 UsageReporter::compute_metrics_delta_with_label(&metrics_at_0, &metrics_at_max)
448 );
449
450 assert_eq!(
451 build_hashmap(&[(
452 "metric_a",
453 &[("origin_1", build_metric_with_origin("CLI", -(i32::MAX)))]
454 )]),
455 UsageReporter::compute_metrics_delta_with_label(&metrics_at_max, &metrics_at_0)
456 );
457 }
458
459 #[test]
460 fn should_contain_only_the_difference_for_origin_on_which_value_change() {
461 let metrics_before = build_hashmap(&[(
462 "metric_a",
463 &[
464 ("origin_1", build_metric_with_origin("origin_1", 1)),
465 ("origin_2", build_metric_with_origin("origin_2", 3)),
466 ("origin_3", build_metric_with_origin("origin_3", 7)),
467 ],
468 )]);
469
470 let metrics_after = build_hashmap(&[(
471 "metric_a",
472 &[
473 ("origin_1", build_metric_with_origin("origin_1", 5)),
474 ("origin_2", build_metric_with_origin("origin_2", 3)),
475 ("origin_3", build_metric_with_origin("origin_3", 9)),
476 ],
477 )]);
478
479 let expected = build_hashmap(&[(
480 "metric_a",
481 &[
482 ("origin_1", build_metric_with_origin("origin_1", 4)),
483 ("origin_3", build_metric_with_origin("origin_3", 2)),
484 ],
485 )]);
486
487 assert_eq!(
488 expected,
489 UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
490 );
491 }
492 }
493}