mithril_aggregator/services/
usage_reporter.rs1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use crate::{
4 event_store::{EventMessage, TransmitterService},
5 MetricsService,
6};
7use chrono::{DateTime, Utc};
8use mithril_common::logging::LoggerExtensions;
9use serde::{Deserialize, Serialize};
10use slog::{info, Logger};
11
12#[derive(Serialize, Deserialize)]
14struct MetricEventMessage {
15 name: String,
17 value: i64,
19 period: Duration,
21 date: DateTime<Utc>,
23}
24pub struct UsageReporter {
26 transmitter_service: Arc<TransmitterService<EventMessage>>,
27 metrics_service: Arc<MetricsService>,
28 last_reported_metrics: HashMap<String, u32>,
29 logger: Logger,
30}
31
32impl UsageReporter {
33 pub fn new(
35 transmitter_service: Arc<TransmitterService<EventMessage>>,
36 metrics_service: Arc<MetricsService>,
37 logger: Logger,
38 ) -> Self {
39 Self {
40 transmitter_service,
41 metrics_service,
42 last_reported_metrics: HashMap::new(),
43 logger: logger.new_with_component_name::<Self>(),
44 }
45 }
46
47 fn compute_metrics_delta(
48 metrics_before: &HashMap<String, u32>,
49 metrics_after: &HashMap<String, u32>,
50 ) -> HashMap<String, i64> {
51 metrics_after
52 .iter()
53 .map(|(name, value)| {
54 let last_value = metrics_before.get(name).unwrap_or(&0);
55 let delta: i64 = (*value as i64) - (*last_value as i64);
56 (name.clone(), delta)
57 })
58 .filter(|(_name, value)| *value != 0)
59 .collect()
60 }
61
62 fn send_metrics(&mut self, duration: &Duration) {
63 let metrics = self.metrics_service.export_metrics_map();
64 let delta = Self::compute_metrics_delta(&self.last_reported_metrics, &metrics);
65 let date = Utc::now();
66
67 self.last_reported_metrics = metrics;
68
69 for (name, value) in delta {
70 let message = Self::create_metrics_event_message(name, value, *duration, date);
71 self.transmitter_service.send(message);
72 }
73 }
74
75 pub async fn run_forever(&mut self, run_interval: Duration) {
77 let mut interval = tokio::time::interval(run_interval);
78
79 loop {
80 interval.tick().await;
81 self.send_metrics(&run_interval);
82
83 info!(
84 self.logger,
85 "Metrics sent, Sleeping for {} seconds",
86 run_interval.as_secs()
87 );
88 }
89 }
90
91 pub fn create_metrics_event_message(
93 name: String,
94 value: i64,
95 period: Duration,
96 date: DateTime<Utc>,
97 ) -> EventMessage {
98 EventMessage::new(
99 "Metrics",
100 &name.clone(),
101 &MetricEventMessage {
102 name,
103 value,
104 period,
105 date,
106 },
107 vec![],
108 )
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use mithril_metric::MetricCollector;
115 use tokio::sync::mpsc::UnboundedReceiver;
116
117 use super::UsageReporter;
118 use super::*;
119 use crate::test_tools::TestLogger;
120
121 fn build_usage_reporter() -> (
122 UsageReporter,
123 Arc<MetricsService>,
124 UnboundedReceiver<EventMessage>,
125 ) {
126 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
127 let metrics_service = Arc::new(MetricsService::new(TestLogger::stdout()).unwrap());
128 let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));
129 let usage_reporter = UsageReporter::new(
130 transmitter_service.clone(),
131 metrics_service.clone(),
132 TestLogger::stdout(),
133 );
134 (usage_reporter, metrics_service, rx)
135 }
136
137 fn received_messages(
138 rx: &mut tokio::sync::mpsc::UnboundedReceiver<EventMessage>,
139 ) -> Vec<EventMessage> {
140 let mut received_messages: Vec<EventMessage> = Vec::new();
141 while let Ok(message) = rx.try_recv() {
142 received_messages.push(message);
143 }
144 received_messages
145 }
146
147 fn extract_metric_value(message: &EventMessage) -> (String, i64) {
148 let metric_delta: MetricEventMessage =
149 serde_json::from_value(message.content.clone()).unwrap();
150 (message.action.clone(), metric_delta.value)
151 }
152
153 #[test]
154 fn when_no_metrics_no_message_sent() {
155 let (mut usage_reporter, _metrics_service, mut rx) = build_usage_reporter();
156
157 usage_reporter.send_metrics(&Duration::from_secs(10));
158
159 let received_messages = received_messages(&mut rx);
160 assert_eq!(0, received_messages.len());
161 }
162
163 #[test]
164 fn verify_event_content_on_a_metric() {
165 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
166
167 let metric = metrics_service.get_certificate_total_produced_since_startup();
168 metric.increment_by(3);
169 usage_reporter.send_metrics(&Duration::from_secs(10));
170
171 let received_messages = received_messages(&mut rx);
172 let message = &received_messages[0];
173 assert_eq!(message.source, "Metrics");
174 assert_eq!(message.action, metric.name());
175 let message_content: MetricEventMessage =
176 serde_json::from_value(message.content.clone()).unwrap();
177 assert_eq!(3, message_content.value);
178 assert_eq!(Duration::from_secs(10), message_content.period);
179 }
180
181 #[test]
182 fn send_one_message_for_each_non_zero_value_metrics() {
183 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
184
185 let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
186 let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
187 metric_1.increment();
188 metric_2.increment();
189
190 usage_reporter.send_metrics(&Duration::from_secs(10));
191
192 let received_messages = received_messages(&mut rx);
193 let metric_values: Vec<(String, i64)> =
194 received_messages.iter().map(extract_metric_value).collect();
195
196 assert!(metric_values.contains(&(metric_1.name(), 1)));
197 assert!(metric_values.contains(&(metric_2.name(), 1)));
198 }
199
200 #[test]
201 fn resend_only_delta_since_last_send() {
202 let (mut usage_reporter, metrics_service, mut rx) = build_usage_reporter();
203
204 let metric_1 = metrics_service.get_certificate_total_produced_since_startup();
205 let metric_2 = metrics_service.get_certificate_detail_total_served_since_startup();
206
207 {
208 metric_1.increment_by(12);
209 metric_2.increment_by(5);
210 usage_reporter.send_metrics(&Duration::from_secs(10));
211
212 let received_messages = received_messages(&mut rx);
213 assert_eq!(2, received_messages.len());
214 }
215 {
216 metric_2.increment_by(20);
217 metric_2.increment_by(33);
218 usage_reporter.send_metrics(&Duration::from_secs(10));
219
220 let received_messages = received_messages(&mut rx);
221 assert_eq!(1, received_messages.len());
222
223 assert_eq!(
224 (metric_2.name(), 53),
225 extract_metric_value(&received_messages[0])
226 );
227 }
228 {
229 metric_2.increment_by(15);
230 usage_reporter.send_metrics(&Duration::from_secs(10));
231
232 let received_messages = received_messages(&mut rx);
233 assert_eq!(1, received_messages.len());
234
235 assert_eq!(
236 (metric_2.name(), 15),
237 extract_metric_value(&received_messages[0])
238 );
239 }
240 }
241
242 mod metric_delta {
243 use super::*;
244
245 fn build_hashmap<T: Copy>(values: &[(&str, T)]) -> HashMap<String, T> {
246 values
247 .iter()
248 .map(|(name, value)| (name.to_string(), *value))
249 .collect()
250 }
251
252 #[test]
253 fn should_not_contain_metric_that_not_change() {
254 let metrics_before = build_hashmap(&[("a", 1)]);
255
256 let metrics_after = build_hashmap(&[("a", 1)]);
257
258 let expected = build_hashmap(&[]);
259
260 assert_eq!(
261 expected,
262 UsageReporter::compute_metrics_delta(&metrics_before, &metrics_after)
263 );
264 }
265
266 #[test]
267 fn should_contain_the_difference_of_an_increased_metric() {
268 let metrics_before = build_hashmap(&[("a", 1)]);
269
270 let metrics_after = build_hashmap(&[("a", 5)]);
271
272 let expected = build_hashmap(&[("a", 4)]);
273
274 assert_eq!(
275 expected,
276 UsageReporter::compute_metrics_delta(&metrics_before, &metrics_after)
277 );
278 }
279
280 #[test]
281 fn should_contain_the_difference_of_a_decreased_metric() {
282 let metrics_before = build_hashmap(&[("a", 5)]);
283
284 let metrics_after = build_hashmap(&[("a", 2)]);
285
286 let expected = build_hashmap(&[("a", -3)]);
287
288 assert_eq!(
289 expected,
290 UsageReporter::compute_metrics_delta(&metrics_before, &metrics_after)
291 );
292 }
293
294 #[test]
295 fn should_contain_new_value_of_a_metric_not_present_before() {
296 let metrics_before = build_hashmap(&[]);
297
298 let metrics_after = build_hashmap(&[("a", 5)]);
299
300 let expected = build_hashmap(&[("a", 5)]);
301
302 assert_eq!(
303 expected,
304 UsageReporter::compute_metrics_delta(&metrics_before, &metrics_after)
305 );
306 }
307
308 #[test]
309 fn should_not_panic_with_a_large_delta() {
310 let metrics_at_0 = build_hashmap(&[("a", 0)]);
311 let metrics_at_max = build_hashmap(&[("a", u32::MAX)]);
312
313 assert_eq!(
314 build_hashmap(&[("a", u32::MAX as i64)]),
315 UsageReporter::compute_metrics_delta(&metrics_at_0, &metrics_at_max)
316 );
317 assert_eq!(
318 build_hashmap(&[("a", -(u32::MAX as i64))]),
319 UsageReporter::compute_metrics_delta(&metrics_at_max, &metrics_at_0)
320 );
321 }
322 }
323}