mithril_aggregator/services/
usage_reporter.rs

1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use crate::{
4    event_store::{EventMessage, TransmitterService},
5    MetricsService,
6};
7use chrono::{DateTime, Utc};
8use mithril_common::logging::LoggerExtensions;
9use serde::{Deserialize, Serialize};
10use slog::{info, Logger};
11
12/// Message sent to the event store to report a metric value.
13#[derive(Serialize, Deserialize)]
14struct MetricEventMessage {
15    /// Name of the metric.
16    name: String,
17    /// Value of the metric.
18    value: i64,
19    /// Period of time during which the metric was collected.
20    period: Duration,
21    /// Identify the origin of the metric.
22    origin: String,
23    /// Date at which the metric was collected.
24    date: DateTime<Utc>,
25}
26/// Reporter of usage metrics of the application.
27pub struct UsageReporter {
28    transmitter_service: Arc<TransmitterService<EventMessage>>,
29    metrics_service: Arc<MetricsService>,
30    last_reported_metrics: HashMap<String, HashMap<String, u32>>,
31    logger: Logger,
32}
33
34impl UsageReporter {
35    /// Create a new UsageReporter.
36    pub fn new(
37        transmitter_service: Arc<TransmitterService<EventMessage>>,
38        metrics_service: Arc<MetricsService>,
39        logger: Logger,
40    ) -> Self {
41        Self {
42            transmitter_service,
43            metrics_service,
44            last_reported_metrics: HashMap::new(),
45            logger: logger.new_with_component_name::<Self>(),
46        }
47    }
48
49    fn compute_metrics_delta(
50        metrics_before: Option<&HashMap<String, u32>>,
51        metrics_after: &HashMap<String, u32>,
52    ) -> HashMap<String, i64> {
53        metrics_after
54            .iter()
55            .map(|(name, value)| {
56                let last_value = metrics_before.and_then(|m| m.get(name)).unwrap_or(&0);
57                let delta: i64 = (*value as i64) - (*last_value as i64);
58                (name.clone(), delta)
59            })
60            .filter(|(_name, value)| *value != 0)
61            .collect()
62    }
63
64    fn compute_metrics_delta_with_label(
65        metrics_before: &HashMap<String, HashMap<String, u32>>,
66        metrics_after: &HashMap<String, HashMap<String, u32>>,
67    ) -> HashMap<String, HashMap<String, i64>> {
68        metrics_after
69            .iter()
70            .map(|(name, label_values)| {
71                (
72                    name.clone(),
73                    UsageReporter::compute_metrics_delta(metrics_before.get(name), label_values),
74                )
75            })
76            .filter(|(_name, value)| !value.is_empty())
77            .collect()
78    }
79
80    fn send_metrics(&mut self, duration: &Duration) {
81        let metrics = self.metrics_service.export_metrics_map();
82        let delta = Self::compute_metrics_delta_with_label(&self.last_reported_metrics, &metrics);
83        let date = Utc::now();
84
85        self.last_reported_metrics = metrics;
86
87        for (name, origin_values) in delta {
88            for (origin, value) in origin_values {
89                let message = Self::create_metrics_event_message(
90                    name.clone(),
91                    value,
92                    *duration,
93                    origin,
94                    date,
95                );
96                self.transmitter_service.send(message);
97            }
98        }
99    }
100
101    /// Start a loop that send event about metrics at the given time interval.
102    pub async fn run_forever(&mut self, run_interval: Duration) {
103        let mut interval = tokio::time::interval(run_interval);
104
105        loop {
106            interval.tick().await;
107            self.send_metrics(&run_interval);
108
109            info!(
110                self.logger,
111                "Metrics sent, Sleeping for {} seconds",
112                run_interval.as_secs()
113            );
114        }
115    }
116
117    /// Create a new EventMessage for a metrics.
118    pub fn create_metrics_event_message(
119        name: String,
120        value: i64,
121        period: Duration,
122        origin: String,
123        date: DateTime<Utc>,
124    ) -> EventMessage {
125        EventMessage::new(
126            "Metrics",
127            &name.clone(),
128            &MetricEventMessage {
129                name,
130                value,
131                period,
132                origin,
133                date,
134            },
135            vec![],
136        )
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use mithril_metric::MetricCollector;
143    use tokio::sync::mpsc::UnboundedReceiver;
144
145    use super::UsageReporter;
146    use super::*;
147    use crate::test_tools::TestLogger;
148
149    fn build_usage_reporter() -> (
150        UsageReporter,
151        Arc<MetricsService>,
152        UnboundedReceiver<EventMessage>,
153    ) {
154        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
155        let metrics_service = Arc::new(MetricsService::new(TestLogger::stdout()).unwrap());
156        let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));
157        let usage_reporter = UsageReporter::new(
158            transmitter_service.clone(),
159            metrics_service.clone(),
160            TestLogger::stdout(),
161        );
162        (usage_reporter, metrics_service, rx)
163    }
164
165    fn received_messages(
166        rx: &mut tokio::sync::mpsc::UnboundedReceiver<EventMessage>,
167    ) -> Vec<EventMessage> {
168        let mut received_messages: Vec<EventMessage> = Vec::new();
169        while let Ok(message) = rx.try_recv() {
170            received_messages.push(message);
171        }
172        received_messages
173    }
174
175    fn extract_metric_value(message: &EventMessage) -> (String, i64) {
176        let metric_delta: MetricEventMessage =
177            serde_json::from_value(message.content.clone()).unwrap();
178        (message.action.clone(), metric_delta.value)
179    }
180
181    #[test]
182    fn when_no_metrics_no_message_sent() {
183        let (mut usage_reporter, _metrics_service, mut rx) = build_usage_reporter();
184
185        usage_reporter.send_metrics(&Duration::from_secs(10));
186
187        let received_messages = received_messages(&mut rx);
188        assert_eq!(0, received_messages.len());
189    }
190
191    #[test]
192    fn verify_event_content_on_a_metric() {
193        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
194
195        let metric = metrics_service.get_certificate_total_produced_since_startup();
196        metric.increment_by(3);
197        usage_reporter.send_metrics(&Duration::from_secs(10));
198
199        let received_messages = received_messages(&mut rx);
200        let message = &received_messages[0];
201        assert_eq!(message.source, "Metrics");
202        assert_eq!(message.action, metric.name());
203        let message_content: MetricEventMessage =
204            serde_json::from_value(message.content.clone()).unwrap();
205        assert_eq!(3, message_content.value);
206        assert_eq!(Duration::from_secs(10), message_content.period);
207        assert_eq!("".to_string(), message_content.origin);
208    }
209
210    #[test]
211    fn verify_event_content_on_a_metric_with_label() {
212        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
213
214        let metric = metrics_service.get_certificate_detail_total_served_since_startup();
215        metric.increment_by(&["ORIGIN_A"], 3);
216        usage_reporter.send_metrics(&Duration::from_secs(10));
217
218        let received_messages = received_messages(&mut rx);
219        let message = &received_messages[0];
220        assert_eq!(message.source, "Metrics");
221        assert_eq!(message.action, metric.name());
222        let message_content: MetricEventMessage =
223            serde_json::from_value(message.content.clone()).unwrap();
224        assert_eq!(3, message_content.value);
225        assert_eq!(Duration::from_secs(10), message_content.period);
226        assert_eq!("ORIGIN_A", message_content.origin);
227    }
228
229    #[test]
230    fn send_one_message_per_origin() {
231        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
232
233        let metric = metrics_service.get_certificate_detail_total_served_since_startup();
234        metric.increment_by(&["ORIGIN_A"], 3);
235        metric.increment_by(&["ORIGIN_B"], 7);
236        usage_reporter.send_metrics(&Duration::from_secs(10));
237
238        let received_messages = received_messages(&mut rx);
239        assert_eq!(2, received_messages.len());
240        let received_messages: HashMap<_, _> = received_messages
241            .iter()
242            .map(|message| {
243                let event: MetricEventMessage =
244                    serde_json::from_value(message.content.clone()).unwrap();
245                (event.origin, event.value)
246            })
247            .collect();
248        assert_eq!(Some(&3), received_messages.get("ORIGIN_A"));
249        assert_eq!(Some(&7), received_messages.get("ORIGIN_B"));
250    }
251
252    #[test]
253    fn send_one_message_for_each_non_zero_value_metrics() {
254        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
255
256        let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
257        let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
258        metric_1.increment();
259        metric_2.increment(&["ORIGIN"]);
260
261        usage_reporter.send_metrics(&Duration::from_secs(10));
262
263        let received_messages = received_messages(&mut rx);
264        let metric_values: Vec<(String, i64)> =
265            received_messages.iter().map(extract_metric_value).collect();
266
267        assert!(metric_values.contains(&(metric_1.name(), 1)));
268        assert!(metric_values.contains(&(metric_2.name(), 1)));
269    }
270
271    #[test]
272    fn resend_only_delta_since_last_send() {
273        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
274
275        let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
276        let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
277
278        {
279            metric_1.increment_by(12);
280            metric_2.increment_by(&["ORIGIN"], 5);
281            usage_reporter.send_metrics(&Duration::from_secs(10));
282
283            let received_messages = received_messages(&mut rx);
284            assert_eq!(2, received_messages.len());
285        }
286        {
287            metric_2.increment_by(&["ORIGIN"], 20);
288            metric_2.increment_by(&["ORIGIN"], 33);
289            usage_reporter.send_metrics(&Duration::from_secs(10));
290
291            let received_messages = received_messages(&mut rx);
292            assert_eq!(1, received_messages.len());
293
294            assert_eq!(
295                (metric_2.name(), 53),
296                extract_metric_value(&received_messages[0])
297            );
298        }
299        {
300            metric_2.increment_by(&["ORIGIN"], 15);
301            usage_reporter.send_metrics(&Duration::from_secs(10));
302
303            let received_messages = received_messages(&mut rx);
304            assert_eq!(1, received_messages.len());
305
306            assert_eq!(
307                (metric_2.name(), 15),
308                extract_metric_value(&received_messages[0])
309            );
310        }
311    }
312
313    mod metric_delta {
314        use super::*;
315
316        fn build_hashmap<T: Copy>(
317            values: &[(&str, &[(&str, T)])],
318        ) -> HashMap<String, HashMap<String, T>> {
319            values
320                .iter()
321                .map(|(name, origin_values)| {
322                    let value_per_label: HashMap<String, T> = origin_values
323                        .iter()
324                        .map(|(origin, value)| (origin.to_string(), *value))
325                        .collect();
326                    (name.to_string(), value_per_label)
327                })
328                .collect()
329        }
330
331        #[test]
332        fn should_not_contain_metric_that_not_change() {
333            let metrics_before = build_hashmap(&[("metric_a", &[("origin_1", 1)])]);
334
335            let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", 1)])]);
336
337            let expected = build_hashmap(&[]);
338
339            assert_eq!(
340                expected,
341                UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
342            );
343        }
344
345        #[test]
346        fn should_contain_the_difference_of_an_increased_metric() {
347            let metrics_before = build_hashmap(&[("metric_a", &[("origin_1", 1)])]);
348
349            let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", 5)])]);
350
351            let expected = build_hashmap(&[("metric_a", &[("origin_1", 4)])]);
352
353            assert_eq!(
354                expected,
355                UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
356            );
357        }
358
359        #[test]
360        fn should_contain_the_difference_of_a_decreased_metric() {
361            let metrics_before = build_hashmap(&[("metric_a", &[("origin_1", 5)])]);
362
363            let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", 2)])]);
364
365            let expected = build_hashmap(&[("metric_a", &[("origin_1", -3)])]);
366
367            assert_eq!(
368                expected,
369                UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
370            );
371        }
372
373        #[test]
374        fn should_contain_new_value_of_a_metric_not_present_before() {
375            let metrics_before = build_hashmap(&[]);
376
377            let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", 5)])]);
378
379            let expected = build_hashmap(&[("metric_a", &[("origin_1", 5)])]);
380
381            assert_eq!(
382                expected,
383                UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
384            );
385        }
386
387        #[test]
388        fn should_not_panic_with_a_large_delta() {
389            let metrics_at_0 = build_hashmap(&[("metric_a", &[("origin_1", 0)])]);
390            let metrics_at_max = build_hashmap(&[("metric_a", &[("origin_1", u32::MAX)])]);
391
392            assert_eq!(
393                build_hashmap(&[("metric_a", &[("origin_1", u32::MAX as i64)])]),
394                UsageReporter::compute_metrics_delta_with_label(&metrics_at_0, &metrics_at_max)
395            );
396            assert_eq!(
397                build_hashmap(&[("metric_a", &[("origin_1", -(u32::MAX as i64))])]),
398                UsageReporter::compute_metrics_delta_with_label(&metrics_at_max, &metrics_at_0)
399            );
400        }
401
402        #[test]
403        fn should_contain_only_the_difference_for_origin_on_which_value_change() {
404            let metrics_before = build_hashmap(&[(
405                "metric_a",
406                &[("origin_1", 1), ("origin_2", 3), ("origin_3", 7)],
407            )]);
408
409            let metrics_after = build_hashmap(&[(
410                "metric_a",
411                &[("origin_1", 5), ("origin_2", 3), ("origin_3", 9)],
412            )]);
413
414            let expected = build_hashmap(&[("metric_a", &[("origin_1", 4), ("origin_3", 2)])]);
415
416            assert_eq!(
417                expected,
418                UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
419            );
420        }
421    }
422}