mithril_aggregator/services/
usage_reporter.rs

1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use crate::metrics::MetricLabelValueMap;
4use crate::{
5    event_store::{EventMessage, TransmitterService},
6    MetricsService,
7};
8use chrono::{DateTime, Utc};
9use mithril_common::logging::LoggerExtensions;
10use serde::{Deserialize, Serialize};
11use slog::{info, Logger};
12
13/// Message sent to the event store to report a metric value.
14#[derive(Serialize, Deserialize)]
15struct MetricEventMessage {
16    /// Name of the metric.
17    name: String,
18    /// Value of the metric.
19    value: i64,
20    /// Period of time during which the metric was collected.
21    period: Duration,
22    /// Identify the origin of the metric.
23    origin: String,
24    /// Identify the client type that collected the metric.
25    client_type: String,
26    /// Date at which the metric was collected.
27    date: DateTime<Utc>,
28}
29/// Reporter of usage metrics of the application.
30pub struct UsageReporter {
31    transmitter_service: Arc<TransmitterService<EventMessage>>,
32    metrics_service: Arc<MetricsService>,
33    last_reported_metrics: HashMap<String, HashMap<String, MetricLabelValueMap>>,
34    logger: Logger,
35}
36
37impl UsageReporter {
38    /// Create a new UsageReporter.
39    pub fn new(
40        transmitter_service: Arc<TransmitterService<EventMessage>>,
41        metrics_service: Arc<MetricsService>,
42        logger: Logger,
43    ) -> Self {
44        Self {
45            transmitter_service,
46            metrics_service,
47            last_reported_metrics: HashMap::new(),
48            logger: logger.new_with_component_name::<Self>(),
49        }
50    }
51
52    fn compute_metrics_delta(
53        metrics_before: Option<&HashMap<String, MetricLabelValueMap>>,
54        metrics_after: &HashMap<String, MetricLabelValueMap>,
55    ) -> HashMap<String, MetricLabelValueMap> {
56        metrics_after
57            .iter()
58            .filter_map(|(labels_values, metric)| {
59                let last_value = metrics_before.and_then(|m| m.get(labels_values));
60                let delta = match last_value {
61                    Some(last) => (metric.counter as i64) - (last.counter as i64),
62                    None => metric.counter as i64,
63                };
64                if delta != 0 {
65                    let mut delta_metric = metric.clone();
66                    delta_metric.counter = delta as i32;
67                    Some((labels_values.clone(), delta_metric))
68                } else {
69                    None
70                }
71            })
72            .collect()
73    }
74
75    fn compute_metrics_delta_with_label(
76        metrics_before: &HashMap<String, HashMap<String, MetricLabelValueMap>>,
77        metrics_after: &HashMap<String, HashMap<String, MetricLabelValueMap>>,
78    ) -> HashMap<String, HashMap<String, MetricLabelValueMap>> {
79        metrics_after
80            .iter()
81            .map(|(name, label_values)| {
82                (
83                    name.clone(),
84                    UsageReporter::compute_metrics_delta(metrics_before.get(name), label_values),
85                )
86            })
87            .filter(|(_name, value)| !value.is_empty())
88            .collect()
89    }
90
91    fn send_metrics(&mut self, duration: &Duration) {
92        let metrics = self.metrics_service.export_metrics_map();
93        let delta = Self::compute_metrics_delta_with_label(&self.last_reported_metrics, &metrics);
94        let date = Utc::now();
95
96        self.last_reported_metrics = metrics;
97
98        for (name, metrics_map) in delta {
99            for metric_map in metrics_map.values() {
100                let message = Self::create_metrics_event_message(
101                    name.clone(),
102                    metric_map.counter as i64,
103                    *duration,
104                    metric_map
105                        .label_value_map
106                        .get("origin_tag")
107                        .cloned()
108                        .unwrap_or_default(),
109                    metric_map
110                        .label_value_map
111                        .get("client_type")
112                        .cloned()
113                        .unwrap_or_default(),
114                    date,
115                );
116                self.transmitter_service.send(message);
117            }
118        }
119    }
120
121    /// Start a loop that send event about metrics at the given time interval.
122    pub async fn run_forever(&mut self, run_interval: Duration) {
123        let mut interval = tokio::time::interval(run_interval);
124
125        loop {
126            interval.tick().await;
127            self.send_metrics(&run_interval);
128
129            info!(
130                self.logger,
131                "Metrics sent, Sleeping for {} seconds",
132                run_interval.as_secs()
133            );
134        }
135    }
136
137    /// Create a new EventMessage for a metrics.
138    pub fn create_metrics_event_message(
139        name: String,
140        value: i64,
141        period: Duration,
142        origin: String,
143        client_type: String,
144        date: DateTime<Utc>,
145    ) -> EventMessage {
146        EventMessage::new(
147            "Metrics",
148            &name.clone(),
149            &MetricEventMessage {
150                name,
151                value,
152                period,
153                origin,
154                client_type,
155                date,
156            },
157            vec![],
158        )
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use mithril_metric::MetricCollector;
165    use tokio::sync::mpsc::UnboundedReceiver;
166
167    use super::UsageReporter;
168    use super::*;
169    use crate::test_tools::TestLogger;
170
171    fn build_usage_reporter() -> (
172        UsageReporter,
173        Arc<MetricsService>,
174        UnboundedReceiver<EventMessage>,
175    ) {
176        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
177        let metrics_service = Arc::new(MetricsService::new(TestLogger::stdout()).unwrap());
178        let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));
179        let usage_reporter = UsageReporter::new(
180            transmitter_service.clone(),
181            metrics_service.clone(),
182            TestLogger::stdout(),
183        );
184        (usage_reporter, metrics_service, rx)
185    }
186
187    fn received_messages(
188        rx: &mut tokio::sync::mpsc::UnboundedReceiver<EventMessage>,
189    ) -> Vec<EventMessage> {
190        let mut received_messages: Vec<EventMessage> = Vec::new();
191        while let Ok(message) = rx.try_recv() {
192            received_messages.push(message);
193        }
194        received_messages
195    }
196
197    fn extract_metric_value(message: &EventMessage) -> (String, i64) {
198        let metric_delta: MetricEventMessage =
199            serde_json::from_value(message.content.clone()).unwrap();
200        (message.action.clone(), metric_delta.value)
201    }
202
203    #[test]
204    fn when_no_metrics_no_message_sent() {
205        let (mut usage_reporter, _metrics_service, mut rx) = build_usage_reporter();
206
207        usage_reporter.send_metrics(&Duration::from_secs(10));
208
209        let received_messages = received_messages(&mut rx);
210        assert_eq!(0, received_messages.len());
211    }
212
213    #[test]
214    fn verify_event_content_on_a_metric() {
215        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
216
217        let metric = metrics_service.get_certificate_total_produced_since_startup();
218        metric.increment_by(3);
219        usage_reporter.send_metrics(&Duration::from_secs(10));
220
221        let received_messages = received_messages(&mut rx);
222        let message = &received_messages[0];
223        assert_eq!(message.source, "Metrics");
224        assert_eq!(message.action, metric.name());
225        let message_content: MetricEventMessage =
226            serde_json::from_value(message.content.clone()).unwrap();
227        assert_eq!(3, message_content.value);
228        assert_eq!(Duration::from_secs(10), message_content.period);
229        assert_eq!("".to_string(), message_content.origin);
230    }
231
232    #[test]
233    fn verify_event_content_on_a_metric_with_label() {
234        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
235
236        let metric = metrics_service.get_certificate_detail_total_served_since_startup();
237        metric.increment_by(&["ORIGIN_A", "CLIENT_TYPE"], 3);
238        usage_reporter.send_metrics(&Duration::from_secs(10));
239
240        let received_messages = received_messages(&mut rx);
241        let message = &received_messages[0];
242        assert_eq!(message.source, "Metrics");
243        assert_eq!(message.action, metric.name());
244        let message_content: MetricEventMessage =
245            serde_json::from_value(message.content.clone()).unwrap();
246        assert_eq!(3, message_content.value);
247        assert_eq!(Duration::from_secs(10), message_content.period);
248        assert_eq!(
249            "CLIENT_TYPE,ORIGIN_A",
250            format!("{},{}", message_content.client_type, message_content.origin)
251        );
252    }
253
254    #[test]
255    fn send_one_message_per_origin() {
256        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
257
258        let metric = metrics_service.get_certificate_detail_total_served_since_startup();
259        metric.increment_by(&["ORIGIN_A", "CLIENT_TYPE"], 3);
260        metric.increment_by(&["ORIGIN_B", "CLIENT_TYPE"], 7);
261        usage_reporter.send_metrics(&Duration::from_secs(10));
262
263        let received_messages = received_messages(&mut rx);
264        assert_eq!(2, received_messages.len());
265        let received_messages: HashMap<_, _> = received_messages
266            .iter()
267            .map(|message| {
268                let event: MetricEventMessage =
269                    serde_json::from_value(message.content.clone()).unwrap();
270                (
271                    format!("{},{}", event.client_type, event.origin),
272                    event.value,
273                )
274            })
275            .collect();
276        assert_eq!(Some(&3), received_messages.get("CLIENT_TYPE,ORIGIN_A"));
277        assert_eq!(Some(&7), received_messages.get("CLIENT_TYPE,ORIGIN_B"));
278    }
279
280    #[test]
281    fn send_one_message_for_each_non_zero_value_metrics() {
282        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
283
284        let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
285        let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
286        metric_1.increment();
287        metric_2.increment(&["ORIGIN", "CLIENT_TYPE"]);
288
289        usage_reporter.send_metrics(&Duration::from_secs(10));
290
291        let received_messages = received_messages(&mut rx);
292        let metric_values: Vec<(String, i64)> =
293            received_messages.iter().map(extract_metric_value).collect();
294
295        assert!(metric_values.contains(&(metric_1.name(), 1)));
296        assert!(metric_values.contains(&(metric_2.name(), 1)));
297    }
298
299    #[test]
300    fn resend_only_delta_since_last_send() {
301        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
302
303        let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
304        let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
305
306        {
307            metric_1.increment_by(12);
308            metric_2.increment_by(&["ORIGIN", "CLIENT_TYPE"], 5);
309            usage_reporter.send_metrics(&Duration::from_secs(10));
310
311            let received_messages = received_messages(&mut rx);
312            assert_eq!(2, received_messages.len());
313        }
314        {
315            metric_2.increment_by(&["ORIGIN", "CLIENT_TYPE"], 20);
316            metric_2.increment_by(&["ORIGIN", "CLIENT_TYPE"], 33);
317            usage_reporter.send_metrics(&Duration::from_secs(10));
318
319            let received_messages = received_messages(&mut rx);
320            assert_eq!(1, received_messages.len());
321
322            assert_eq!(
323                (metric_2.name(), 53),
324                extract_metric_value(&received_messages[0])
325            );
326        }
327        {
328            metric_2.increment_by(&["ORIGIN", "CLIENT_TYPE"], 15);
329            usage_reporter.send_metrics(&Duration::from_secs(10));
330
331            let received_messages = received_messages(&mut rx);
332            assert_eq!(1, received_messages.len());
333
334            assert_eq!(
335                (metric_2.name(), 15),
336                extract_metric_value(&received_messages[0])
337            );
338        }
339    }
340
341    mod metric_delta {
342        use super::*;
343
344        fn build_hashmap<T: Clone>(
345            values: &[(&str, &[(&str, T)])],
346        ) -> HashMap<String, HashMap<String, T>> {
347            values
348                .iter()
349                .map(|(name, origin_values)| {
350                    let value_per_label: HashMap<String, T> = origin_values
351                        .iter()
352                        .map(|(origin, value)| (origin.to_string(), value.clone()))
353                        .collect();
354                    (name.to_string(), value_per_label)
355                })
356                .collect()
357        }
358
359        fn build_metric_with_origin(origin: &str, value: i32) -> MetricLabelValueMap {
360            let mut metric = MetricLabelValueMap::default();
361            metric
362                .label_value_map
363                .insert("origin_tag".to_string(), origin.to_string());
364            metric.counter = value;
365            metric
366        }
367
368        #[test]
369        fn should_not_contain_metric_that_not_change() {
370            let metric1 = build_metric_with_origin("CLI", 1);
371            let metric2 = build_metric_with_origin("CLI", 1);
372
373            let metrics_before = build_hashmap(&[("metric_a", &[("CLI", metric1)])]);
374            let metrics_after = build_hashmap(&[("metric_a", &[("CLI", metric2)])]);
375            let expected = build_hashmap(&[]);
376
377            assert_eq!(
378                expected,
379                UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
380            );
381        }
382
383        #[test]
384        fn should_contain_the_difference_of_an_increased_metric() {
385            let metric1 = build_metric_with_origin("CLI", 1);
386            let metric2 = build_metric_with_origin("CLI", 5);
387            let expected_metric = build_metric_with_origin("CLI", 4);
388
389            let metrics_before = build_hashmap(&[("metric_a", &[("CLI", metric1)])]);
390            let metrics_after = build_hashmap(&[("metric_a", &[("CLI", metric2)])]);
391            let expected = build_hashmap(&[("metric_a", &[("CLI", expected_metric)])]);
392
393            assert_eq!(
394                expected,
395                UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
396            );
397        }
398
399        #[test]
400        fn should_contain_the_difference_of_a_decreased_metric() {
401            // TODO Does this test make sense? It seems to be testing a negative delta which is not typical for metrics since we can only increment them.
402            let metric1 = build_metric_with_origin("CLI", 5);
403            let metric2 = build_metric_with_origin("CLI", 2);
404            let expected_metric = build_metric_with_origin("CLI", -3);
405
406            let metrics_before = build_hashmap(&[("metric_a", &[("origin_1", metric1)])]);
407            let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", metric2)])]);
408            let expected = build_hashmap(&[("metric_a", &[("origin_1", expected_metric)])]);
409
410            assert_eq!(
411                expected,
412                UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
413            );
414        }
415
416        #[test]
417        fn should_contain_new_value_of_a_metric_not_present_before() {
418            let metric = build_metric_with_origin("CLI", 5);
419            let expected_metric = build_metric_with_origin("CLI", 5);
420
421            let metrics_before = build_hashmap(&[]);
422            let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", metric)])]);
423            let expected = build_hashmap(&[("metric_a", &[("origin_1", expected_metric)])]);
424
425            assert_eq!(
426                expected,
427                UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
428            );
429        }
430
431        #[test]
432        fn should_not_panic_with_a_large_delta() {
433            let metrics_at_0 = build_hashmap(&[(
434                "metric_a",
435                &[("origin_1", build_metric_with_origin("CLI", 0))],
436            )]);
437            let metrics_at_max = build_hashmap(&[(
438                "metric_a",
439                &[("origin_1", build_metric_with_origin("CLI", i32::MAX))],
440            )]);
441
442            assert_eq!(
443                build_hashmap(&[(
444                    "metric_a",
445                    &[("origin_1", build_metric_with_origin("CLI", i32::MAX))]
446                )]),
447                UsageReporter::compute_metrics_delta_with_label(&metrics_at_0, &metrics_at_max)
448            );
449
450            assert_eq!(
451                build_hashmap(&[(
452                    "metric_a",
453                    &[("origin_1", build_metric_with_origin("CLI", -(i32::MAX)))]
454                )]),
455                UsageReporter::compute_metrics_delta_with_label(&metrics_at_max, &metrics_at_0)
456            );
457        }
458
459        #[test]
460        fn should_contain_only_the_difference_for_origin_on_which_value_change() {
461            let metrics_before = build_hashmap(&[(
462                "metric_a",
463                &[
464                    ("origin_1", build_metric_with_origin("origin_1", 1)),
465                    ("origin_2", build_metric_with_origin("origin_2", 3)),
466                    ("origin_3", build_metric_with_origin("origin_3", 7)),
467                ],
468            )]);
469
470            let metrics_after = build_hashmap(&[(
471                "metric_a",
472                &[
473                    ("origin_1", build_metric_with_origin("origin_1", 5)),
474                    ("origin_2", build_metric_with_origin("origin_2", 3)),
475                    ("origin_3", build_metric_with_origin("origin_3", 9)),
476                ],
477            )]);
478
479            let expected = build_hashmap(&[(
480                "metric_a",
481                &[
482                    ("origin_1", build_metric_with_origin("origin_1", 4)),
483                    ("origin_3", build_metric_with_origin("origin_3", 2)),
484                ],
485            )]);
486
487            assert_eq!(
488                expected,
489                UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
490            );
491        }
492    }
493}