mithril_aggregator/services/
usage_reporter.rs

1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use crate::{
4    event_store::{EventMessage, TransmitterService},
5    MetricsService,
6};
7use chrono::{DateTime, Utc};
8use mithril_common::logging::LoggerExtensions;
9use serde::{Deserialize, Serialize};
10use slog::{info, Logger};
11
12/// Message sent to the event store to report a metric value.
13#[derive(Serialize, Deserialize)]
14struct MetricEventMessage {
15    /// Name of the metric.
16    name: String,
17    /// Value of the metric.
18    value: i64,
19    /// Period of time during which the metric was collected.
20    period: Duration,
21    /// Date at which the metric was collected.
22    date: DateTime<Utc>,
23}
24/// Reporter of usage metrics of the application.
25pub struct UsageReporter {
26    transmitter_service: Arc<TransmitterService<EventMessage>>,
27    metrics_service: Arc<MetricsService>,
28    last_reported_metrics: HashMap<String, u32>,
29    logger: Logger,
30}
31
32impl UsageReporter {
33    /// Create a new UsageReporter.
34    pub fn new(
35        transmitter_service: Arc<TransmitterService<EventMessage>>,
36        metrics_service: Arc<MetricsService>,
37        logger: Logger,
38    ) -> Self {
39        Self {
40            transmitter_service,
41            metrics_service,
42            last_reported_metrics: HashMap::new(),
43            logger: logger.new_with_component_name::<Self>(),
44        }
45    }
46
47    fn compute_metrics_delta(
48        metrics_before: &HashMap<String, u32>,
49        metrics_after: &HashMap<String, u32>,
50    ) -> HashMap<String, i64> {
51        metrics_after
52            .iter()
53            .map(|(name, value)| {
54                let last_value = metrics_before.get(name).unwrap_or(&0);
55                let delta: i64 = (*value as i64) - (*last_value as i64);
56                (name.clone(), delta)
57            })
58            .filter(|(_name, value)| *value != 0)
59            .collect()
60    }
61
62    fn send_metrics(&mut self, duration: &Duration) {
63        let metrics = self.metrics_service.export_metrics_map();
64        let delta = Self::compute_metrics_delta(&self.last_reported_metrics, &metrics);
65        let date = Utc::now();
66
67        self.last_reported_metrics = metrics;
68
69        for (name, value) in delta {
70            let message = Self::create_metrics_event_message(name, value, *duration, date);
71            self.transmitter_service.send(message);
72        }
73    }
74
75    /// Start a loop that send event about metrics at the given time interval.
76    pub async fn run_forever(&mut self, run_interval: Duration) {
77        let mut interval = tokio::time::interval(run_interval);
78
79        loop {
80            interval.tick().await;
81            self.send_metrics(&run_interval);
82
83            info!(
84                self.logger,
85                "Metrics sent, Sleeping for {} seconds",
86                run_interval.as_secs()
87            );
88        }
89    }
90
91    /// Create a new EventMessage for a metrics.
92    pub fn create_metrics_event_message(
93        name: String,
94        value: i64,
95        period: Duration,
96        date: DateTime<Utc>,
97    ) -> EventMessage {
98        EventMessage::new(
99            "Metrics",
100            &name.clone(),
101            &MetricEventMessage {
102                name,
103                value,
104                period,
105                date,
106            },
107            vec![],
108        )
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use mithril_metric::MetricCollector;
115    use tokio::sync::mpsc::UnboundedReceiver;
116
117    use super::UsageReporter;
118    use super::*;
119    use crate::test_tools::TestLogger;
120
121    fn build_usage_reporter() -> (
122        UsageReporter,
123        Arc<MetricsService>,
124        UnboundedReceiver<EventMessage>,
125    ) {
126        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
127        let metrics_service = Arc::new(MetricsService::new(TestLogger::stdout()).unwrap());
128        let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));
129        let usage_reporter = UsageReporter::new(
130            transmitter_service.clone(),
131            metrics_service.clone(),
132            TestLogger::stdout(),
133        );
134        (usage_reporter, metrics_service, rx)
135    }
136
137    fn received_messages(
138        rx: &mut tokio::sync::mpsc::UnboundedReceiver<EventMessage>,
139    ) -> Vec<EventMessage> {
140        let mut received_messages: Vec<EventMessage> = Vec::new();
141        while let Ok(message) = rx.try_recv() {
142            received_messages.push(message);
143        }
144        received_messages
145    }
146
147    fn extract_metric_value(message: &EventMessage) -> (String, i64) {
148        let metric_delta: MetricEventMessage =
149            serde_json::from_value(message.content.clone()).unwrap();
150        (message.action.clone(), metric_delta.value)
151    }
152
153    #[test]
154    fn when_no_metrics_no_message_sent() {
155        let (mut usage_reporter, _metrics_service, mut rx) = build_usage_reporter();
156
157        usage_reporter.send_metrics(&Duration::from_secs(10));
158
159        let received_messages = received_messages(&mut rx);
160        assert_eq!(0, received_messages.len());
161    }
162
163    #[test]
164    fn verify_event_content_on_a_metric() {
165        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
166
167        let metric = metrics_service.get_certificate_total_produced_since_startup();
168        metric.increment_by(3);
169        usage_reporter.send_metrics(&Duration::from_secs(10));
170
171        let received_messages = received_messages(&mut rx);
172        let message = &received_messages[0];
173        assert_eq!(message.source, "Metrics");
174        assert_eq!(message.action, metric.name());
175        let message_content: MetricEventMessage =
176            serde_json::from_value(message.content.clone()).unwrap();
177        assert_eq!(3, message_content.value);
178        assert_eq!(Duration::from_secs(10), message_content.period);
179    }
180
181    #[test]
182    fn send_one_message_for_each_non_zero_value_metrics() {
183        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
184
185        let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
186        let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
187        metric_1.increment();
188        metric_2.increment();
189
190        usage_reporter.send_metrics(&Duration::from_secs(10));
191
192        let received_messages = received_messages(&mut rx);
193        let metric_values: Vec<(String, i64)> =
194            received_messages.iter().map(extract_metric_value).collect();
195
196        assert!(metric_values.contains(&(metric_1.name(), 1)));
197        assert!(metric_values.contains(&(metric_2.name(), 1)));
198    }
199
200    #[test]
201    fn resend_only_delta_since_last_send() {
202        let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
203
204        let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
205        let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
206
207        {
208            metric_1.increment_by(12);
209            metric_2.increment_by(5);
210            usage_reporter.send_metrics(&Duration::from_secs(10));
211
212            let received_messages = received_messages(&mut rx);
213            assert_eq!(2, received_messages.len());
214        }
215        {
216            metric_2.increment_by(20);
217            metric_2.increment_by(33);
218            usage_reporter.send_metrics(&Duration::from_secs(10));
219
220            let received_messages = received_messages(&mut rx);
221            assert_eq!(1, received_messages.len());
222
223            assert_eq!(
224                (metric_2.name(), 53),
225                extract_metric_value(&received_messages[0])
226            );
227        }
228        {
229            metric_2.increment_by(15);
230            usage_reporter.send_metrics(&Duration::from_secs(10));
231
232            let received_messages = received_messages(&mut rx);
233            assert_eq!(1, received_messages.len());
234
235            assert_eq!(
236                (metric_2.name(), 15),
237                extract_metric_value(&received_messages[0])
238            );
239        }
240    }
241
242    mod metric_delta {
243        use super::*;
244
245        fn build_hashmap<T: Copy>(values: &[(&str, T)]) -> HashMap<String, T> {
246            values
247                .iter()
248                .map(|(name, value)| (name.to_string(), *value))
249                .collect()
250        }
251
252        #[test]
253        fn should_not_contain_metric_that_not_change() {
254            let metrics_before = build_hashmap(&[("a", 1)]);
255
256            let metrics_after = build_hashmap(&[("a", 1)]);
257
258            let expected = build_hashmap(&[]);
259
260            assert_eq!(
261                expected,
262                UsageReporter::compute_metrics_delta(&metrics_before, &metrics_after)
263            );
264        }
265
266        #[test]
267        fn should_contain_the_difference_of_an_increased_metric() {
268            let metrics_before = build_hashmap(&[("a", 1)]);
269
270            let metrics_after = build_hashmap(&[("a", 5)]);
271
272            let expected = build_hashmap(&[("a", 4)]);
273
274            assert_eq!(
275                expected,
276                UsageReporter::compute_metrics_delta(&metrics_before, &metrics_after)
277            );
278        }
279
280        #[test]
281        fn should_contain_the_difference_of_a_decreased_metric() {
282            let metrics_before = build_hashmap(&[("a", 5)]);
283
284            let metrics_after = build_hashmap(&[("a", 2)]);
285
286            let expected = build_hashmap(&[("a", -3)]);
287
288            assert_eq!(
289                expected,
290                UsageReporter::compute_metrics_delta(&metrics_before, &metrics_after)
291            );
292        }
293
294        #[test]
295        fn should_contain_new_value_of_a_metric_not_present_before() {
296            let metrics_before = build_hashmap(&[]);
297
298            let metrics_after = build_hashmap(&[("a", 5)]);
299
300            let expected = build_hashmap(&[("a", 5)]);
301
302            assert_eq!(
303                expected,
304                UsageReporter::compute_metrics_delta(&metrics_before, &metrics_after)
305            );
306        }
307
308        #[test]
309        fn should_not_panic_with_a_large_delta() {
310            let metrics_at_0 = build_hashmap(&[("a", 0)]);
311            let metrics_at_max = build_hashmap(&[("a", u32::MAX)]);
312
313            assert_eq!(
314                build_hashmap(&[("a", u32::MAX as i64)]),
315                UsageReporter::compute_metrics_delta(&metrics_at_0, &metrics_at_max)
316            );
317            assert_eq!(
318                build_hashmap(&[("a", -(u32::MAX as i64))]),
319                UsageReporter::compute_metrics_delta(&metrics_at_max, &metrics_at_0)
320            );
321        }
322    }
323}