1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use crate::{
4 event_store::{EventMessage, TransmitterService},
5 MetricsService,
6};
7use chrono::{DateTime, Utc};
8use mithril_common::logging::LoggerExtensions;
9use serde::{Deserialize, Serialize};
10use slog::{info, Logger};
11
12#[derive(Serialize, Deserialize)]
14struct MetricEventMessage {
15 name: String,
17 value: i64,
19 period: Duration,
21 origin: String,
23 date: DateTime<Utc>,
25}
26pub struct UsageReporter {
28 transmitter_service: Arc<TransmitterService<EventMessage>>,
29 metrics_service: Arc<MetricsService>,
30 last_reported_metrics: HashMap<String, HashMap<String, u32>>,
31 logger: Logger,
32}
33
34impl UsageReporter {
35 pub fn new(
37 transmitter_service: Arc<TransmitterService<EventMessage>>,
38 metrics_service: Arc<MetricsService>,
39 logger: Logger,
40 ) -> Self {
41 Self {
42 transmitter_service,
43 metrics_service,
44 last_reported_metrics: HashMap::new(),
45 logger: logger.new_with_component_name::<Self>(),
46 }
47 }
48
49 fn compute_metrics_delta(
50 metrics_before: Option<&HashMap<String, u32>>,
51 metrics_after: &HashMap<String, u32>,
52 ) -> HashMap<String, i64> {
53 metrics_after
54 .iter()
55 .map(|(name, value)| {
56 let last_value = metrics_before.and_then(|m| m.get(name)).unwrap_or(&0);
57 let delta: i64 = (*value as i64) - (*last_value as i64);
58 (name.clone(), delta)
59 })
60 .filter(|(_name, value)| *value != 0)
61 .collect()
62 }
63
64 fn compute_metrics_delta_with_label(
65 metrics_before: &HashMap<String, HashMap<String, u32>>,
66 metrics_after: &HashMap<String, HashMap<String, u32>>,
67 ) -> HashMap<String, HashMap<String, i64>> {
68 metrics_after
69 .iter()
70 .map(|(name, label_values)| {
71 (
72 name.clone(),
73 UsageReporter::compute_metrics_delta(metrics_before.get(name), label_values),
74 )
75 })
76 .filter(|(_name, value)| !value.is_empty())
77 .collect()
78 }
79
80 fn send_metrics(&mut self, duration: &Duration) {
81 let metrics = self.metrics_service.export_metrics_map();
82 let delta = Self::compute_metrics_delta_with_label(&self.last_reported_metrics, &metrics);
83 let date = Utc::now();
84
85 self.last_reported_metrics = metrics;
86
87 for (name, origin_values) in delta {
88 for (origin, value) in origin_values {
89 let message = Self::create_metrics_event_message(
90 name.clone(),
91 value,
92 *duration,
93 origin,
94 date,
95 );
96 self.transmitter_service.send(message);
97 }
98 }
99 }
100
101 pub async fn run_forever(&mut self, run_interval: Duration) {
103 let mut interval = tokio::time::interval(run_interval);
104
105 loop {
106 interval.tick().await;
107 self.send_metrics(&run_interval);
108
109 info!(
110 self.logger,
111 "Metrics sent, Sleeping for {} seconds",
112 run_interval.as_secs()
113 );
114 }
115 }
116
117 pub fn create_metrics_event_message(
119 name: String,
120 value: i64,
121 period: Duration,
122 origin: String,
123 date: DateTime<Utc>,
124 ) -> EventMessage {
125 EventMessage::new(
126 "Metrics",
127 &name.clone(),
128 &MetricEventMessage {
129 name,
130 value,
131 period,
132 origin,
133 date,
134 },
135 vec![],
136 )
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use mithril_metric::MetricCollector;
143 use tokio::sync::mpsc::UnboundedReceiver;
144
145 use super::UsageReporter;
146 use super::*;
147 use crate::test_tools::TestLogger;
148
149 fn build_usage_reporter() -> (
150 UsageReporter,
151 Arc<MetricsService>,
152 UnboundedReceiver<EventMessage>,
153 ) {
154 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
155 let metrics_service = Arc::new(MetricsService::new(TestLogger::stdout()).unwrap());
156 let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));
157 let usage_reporter = UsageReporter::new(
158 transmitter_service.clone(),
159 metrics_service.clone(),
160 TestLogger::stdout(),
161 );
162 (usage_reporter, metrics_service, rx)
163 }
164
165 fn received_messages(
166 rx: &mut tokio::sync::mpsc::UnboundedReceiver<EventMessage>,
167 ) -> Vec<EventMessage> {
168 let mut received_messages: Vec<EventMessage> = Vec::new();
169 while let Ok(message) = rx.try_recv() {
170 received_messages.push(message);
171 }
172 received_messages
173 }
174
175 fn extract_metric_value(message: &EventMessage) -> (String, i64) {
176 let metric_delta: MetricEventMessage =
177 serde_json::from_value(message.content.clone()).unwrap();
178 (message.action.clone(), metric_delta.value)
179 }
180
181 #[test]
182 fn when_no_metrics_no_message_sent() {
183 let (mut usage_reporter, _metrics_service, mut rx) = build_usage_reporter();
184
185 usage_reporter.send_metrics(&Duration::from_secs(10));
186
187 let received_messages = received_messages(&mut rx);
188 assert_eq!(0, received_messages.len());
189 }
190
191 #[test]
192 fn verify_event_content_on_a_metric() {
193 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
194
195 let metric = metrics_service.get_certificate_total_produced_since_startup();
196 metric.increment_by(3);
197 usage_reporter.send_metrics(&Duration::from_secs(10));
198
199 let received_messages = received_messages(&mut rx);
200 let message = &received_messages[0];
201 assert_eq!(message.source, "Metrics");
202 assert_eq!(message.action, metric.name());
203 let message_content: MetricEventMessage =
204 serde_json::from_value(message.content.clone()).unwrap();
205 assert_eq!(3, message_content.value);
206 assert_eq!(Duration::from_secs(10), message_content.period);
207 assert_eq!("".to_string(), message_content.origin);
208 }
209
210 #[test]
211 fn verify_event_content_on_a_metric_with_label() {
212 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
213
214 let metric = metrics_service.get_certificate_detail_total_served_since_startup();
215 metric.increment_by(&["ORIGIN_A"], 3);
216 usage_reporter.send_metrics(&Duration::from_secs(10));
217
218 let received_messages = received_messages(&mut rx);
219 let message = &received_messages[0];
220 assert_eq!(message.source, "Metrics");
221 assert_eq!(message.action, metric.name());
222 let message_content: MetricEventMessage =
223 serde_json::from_value(message.content.clone()).unwrap();
224 assert_eq!(3, message_content.value);
225 assert_eq!(Duration::from_secs(10), message_content.period);
226 assert_eq!("ORIGIN_A", message_content.origin);
227 }
228
229 #[test]
230 fn send_one_message_per_origin() {
231 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
232
233 let metric = metrics_service.get_certificate_detail_total_served_since_startup();
234 metric.increment_by(&["ORIGIN_A"], 3);
235 metric.increment_by(&["ORIGIN_B"], 7);
236 usage_reporter.send_metrics(&Duration::from_secs(10));
237
238 let received_messages = received_messages(&mut rx);
239 assert_eq!(2, received_messages.len());
240 let received_messages: HashMap<_, _> = received_messages
241 .iter()
242 .map(|message| {
243 let event: MetricEventMessage =
244 serde_json::from_value(message.content.clone()).unwrap();
245 (event.origin, event.value)
246 })
247 .collect();
248 assert_eq!(Some(&3), received_messages.get("ORIGIN_A"));
249 assert_eq!(Some(&7), received_messages.get("ORIGIN_B"));
250 }
251
252 #[test]
253 fn send_one_message_for_each_non_zero_value_metrics() {
254 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
255
256 let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
257 let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
258 metric_1.increment();
259 metric_2.increment(&["ORIGIN"]);
260
261 usage_reporter.send_metrics(&Duration::from_secs(10));
262
263 let received_messages = received_messages(&mut rx);
264 let metric_values: Vec<(String, i64)> =
265 received_messages.iter().map(extract_metric_value).collect();
266
267 assert!(metric_values.contains(&(metric_1.name(), 1)));
268 assert!(metric_values.contains(&(metric_2.name(), 1)));
269 }
270
271 #[test]
272 fn resend_only_delta_since_last_send() {
273 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
274
275 let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
276 let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
277
278 {
279 metric_1.increment_by(12);
280 metric_2.increment_by(&["ORIGIN"], 5);
281 usage_reporter.send_metrics(&Duration::from_secs(10));
282
283 let received_messages = received_messages(&mut rx);
284 assert_eq!(2, received_messages.len());
285 }
286 {
287 metric_2.increment_by(&["ORIGIN"], 20);
288 metric_2.increment_by(&["ORIGIN"], 33);
289 usage_reporter.send_metrics(&Duration::from_secs(10));
290
291 let received_messages = received_messages(&mut rx);
292 assert_eq!(1, received_messages.len());
293
294 assert_eq!(
295 (metric_2.name(), 53),
296 extract_metric_value(&received_messages[0])
297 );
298 }
299 {
300 metric_2.increment_by(&["ORIGIN"], 15);
301 usage_reporter.send_metrics(&Duration::from_secs(10));
302
303 let received_messages = received_messages(&mut rx);
304 assert_eq!(1, received_messages.len());
305
306 assert_eq!(
307 (metric_2.name(), 15),
308 extract_metric_value(&received_messages[0])
309 );
310 }
311 }
312
313 mod metric_delta {
314 use super::*;
315
316 fn build_hashmap<T: Copy>(
317 values: &[(&str, &[(&str, T)])],
318 ) -> HashMap<String, HashMap<String, T>> {
319 values
320 .iter()
321 .map(|(name, origin_values)| {
322 let value_per_label: HashMap<String, T> = origin_values
323 .iter()
324 .map(|(origin, value)| (origin.to_string(), *value))
325 .collect();
326 (name.to_string(), value_per_label)
327 })
328 .collect()
329 }
330
331 #[test]
332 fn should_not_contain_metric_that_not_change() {
333 let metrics_before = build_hashmap(&[("metric_a", &[("origin_1", 1)])]);
334
335 let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", 1)])]);
336
337 let expected = build_hashmap(&[]);
338
339 assert_eq!(
340 expected,
341 UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
342 );
343 }
344
345 #[test]
346 fn should_contain_the_difference_of_an_increased_metric() {
347 let metrics_before = build_hashmap(&[("metric_a", &[("origin_1", 1)])]);
348
349 let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", 5)])]);
350
351 let expected = build_hashmap(&[("metric_a", &[("origin_1", 4)])]);
352
353 assert_eq!(
354 expected,
355 UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
356 );
357 }
358
359 #[test]
360 fn should_contain_the_difference_of_a_decreased_metric() {
361 let metrics_before = build_hashmap(&[("metric_a", &[("origin_1", 5)])]);
362
363 let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", 2)])]);
364
365 let expected = build_hashmap(&[("metric_a", &[("origin_1", -3)])]);
366
367 assert_eq!(
368 expected,
369 UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
370 );
371 }
372
373 #[test]
374 fn should_contain_new_value_of_a_metric_not_present_before() {
375 let metrics_before = build_hashmap(&[]);
376
377 let metrics_after = build_hashmap(&[("metric_a", &[("origin_1", 5)])]);
378
379 let expected = build_hashmap(&[("metric_a", &[("origin_1", 5)])]);
380
381 assert_eq!(
382 expected,
383 UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
384 );
385 }
386
387 #[test]
388 fn should_not_panic_with_a_large_delta() {
389 let metrics_at_0 = build_hashmap(&[("metric_a", &[("origin_1", 0)])]);
390 let metrics_at_max = build_hashmap(&[("metric_a", &[("origin_1", u32::MAX)])]);
391
392 assert_eq!(
393 build_hashmap(&[("metric_a", &[("origin_1", u32::MAX as i64)])]),
394 UsageReporter::compute_metrics_delta_with_label(&metrics_at_0, &metrics_at_max)
395 );
396 assert_eq!(
397 build_hashmap(&[("metric_a", &[("origin_1", -(u32::MAX as i64))])]),
398 UsageReporter::compute_metrics_delta_with_label(&metrics_at_max, &metrics_at_0)
399 );
400 }
401
402 #[test]
403 fn should_contain_only_the_difference_for_origin_on_which_value_change() {
404 let metrics_before = build_hashmap(&[(
405 "metric_a",
406 &[("origin_1", 1), ("origin_2", 3), ("origin_3", 7)],
407 )]);
408
409 let metrics_after = build_hashmap(&[(
410 "metric_a",
411 &[("origin_1", 5), ("origin_2", 3), ("origin_3", 9)],
412 )]);
413
414 let expected = build_hashmap(&[("metric_a", &[("origin_1", 4), ("origin_3", 2)])]);
415
416 assert_eq!(
417 expected,
418 UsageReporter::compute_metrics_delta_with_label(&metrics_before, &metrics_after)
419 );
420 }
421 }
422}