1use anyhow::anyhow;
4use mithril_common::StdResult;
5use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
6
7use std::sync::Arc;
8
9use crate::event_store::database::query::InsertEventQuery;
10use crate::event_store::{EventMessage, event::Event};
11pub struct EventPersister {
14 connection: Arc<SqliteConnection>,
15}
16
17impl EventPersister {
18 pub fn new(connection: Arc<SqliteConnection>) -> Self {
20 Self { connection }
21 }
22
23 pub fn persist(&self, message: EventMessage) -> StdResult<Event> {
25 let log_message = message.clone();
26 let mut rows = self.connection.fetch(InsertEventQuery::one(message)?)?;
27
28 rows.next().ok_or(anyhow!(
29 "No record from the database after saving event message {log_message:?}"
30 ))
31 }
32}
33
34#[cfg(test)]
35mod tests {
36 use super::*;
37 use crate::event_store::database::test_helper::event_store_db_connection;
38 use mithril_common::StdResult;
39
40 #[test]
41 fn can_persist_event() -> StdResult<()> {
42 let connection = Arc::new(event_store_db_connection().unwrap());
43
44 let persister = EventPersister::new(connection);
45 let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
46
47 let _event = persister.persist(message)?;
48 Ok(())
49 }
50
51 #[test]
52 fn migration_executed_running_database() -> StdResult<()> {
53 let connection = Arc::new(event_store_db_connection().unwrap());
54
55 let persister = EventPersister::new(connection);
56 let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
57
58 let _event = persister.persist(message)?;
59 Ok(())
60 }
61
62 mod metrics_per_day_view {
63
64 use std::time::Duration;
65
66 use crate::{
67 event_store::database::test_helper::event_store_db_connection, services::UsageReporter,
68 };
69 use chrono::{DateTime, Utc};
70
71 use mithril_common::StdResult;
72
73 use serde::{Deserialize, Serialize};
74 use sqlite::ConnectionThreadSafe;
75
76 use super::*;
77
78 fn get_all_metrics(
79 connection: Arc<ConnectionThreadSafe>,
80 ) -> StdResult<Vec<(String, String, i64)>> {
81 let query = "select date, counter_name, value from metrics_per_day";
82 let mut statement = connection.prepare(query)?;
83 let mut result = Vec::new();
84 while let Ok(sqlite::State::Row) = statement.next() {
85 result.push((
86 statement.read::<String, _>("date")?,
87 statement.read::<String, _>("counter_name")?,
88 statement.read::<i64, _>("value")?,
89 ));
90 }
91
92 Ok(result)
93 }
94
95 fn get_all_metrics_by_origin(
96 connection: Arc<ConnectionThreadSafe>,
97 ) -> StdResult<Vec<(String, String, String, i64)>> {
98 let query = "select date, counter_name, origin, value from metrics_per_day_and_origin";
99 let mut statement = connection.prepare(query)?;
100 let mut result = Vec::new();
101 while let Ok(sqlite::State::Row) = statement.next() {
102 result.push((
103 statement.read::<String, _>("date")?,
104 statement.read::<String, _>("counter_name")?,
105 statement.read::<Option<String>, _>("origin")?.unwrap_or_default(),
106 statement.read::<i64, _>("value")?,
107 ));
108 }
109
110 Ok(result)
111 }
112
113 fn get_all_metrics_by_client_type(
114 connection: Arc<ConnectionThreadSafe>,
115 ) -> StdResult<Vec<(String, String, String, i64)>> {
116 let query = "select date, counter_name, client_type, value from metrics_per_day_and_client_type";
117 let mut statement = connection.prepare(query)?;
118 let mut result = Vec::new();
119 while let Ok(sqlite::State::Row) = statement.next() {
120 result.push((
121 statement.read::<String, _>("date")?,
122 statement.read::<String, _>("counter_name")?,
123 statement
124 .read::<Option<String>, _>("client_type")?
125 .unwrap_or_default(),
126 statement.read::<i64, _>("value")?,
127 ));
128 }
129
130 Ok(result)
131 }
132
133 fn insert_metric_event_with_origin(
136 persister: &EventPersister,
137 date: &str,
138 metric_name: &str,
139 origin: &str,
140 value: i64,
141 ) {
142 let metric_date =
143 DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
144
145 let message = UsageReporter::create_metrics_event_message(
146 metric_name.to_string(),
147 value,
148 Duration::from_secs(5),
149 origin.to_string(),
150 "CLIENT_TYPE_A".to_string(),
151 metric_date.into(),
152 );
153
154 let _event = persister.persist(message).unwrap();
155 }
156
157 fn insert_metric_event_message(
158 persister: &EventPersister,
159 date: &str,
160 metric_name: &str,
161 origin: &str,
162 client_type: &str,
163 value: i64,
164 ) {
165 let metric_date =
166 DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
167
168 let message = UsageReporter::create_metrics_event_message(
169 metric_name.to_string(),
170 value,
171 Duration::from_secs(5),
172 origin.to_string(),
173 client_type.to_string(),
174 metric_date.into(),
175 );
176
177 let _event = persister.persist(message).unwrap();
178 }
179
180 fn insert_metric_event(
182 persister: &EventPersister,
183 date: &str,
184 metric_name: &str,
185 value: i64,
186 ) {
187 #[derive(Serialize, Deserialize)]
188 struct OldMetricEventMessage {
189 name: String,
190 value: i64,
191 period: Duration,
192 date: DateTime<Utc>,
193 }
194
195 let metric_date =
196 DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
197
198 let message = EventMessage::new(
199 "Metrics",
200 metric_name,
201 &OldMetricEventMessage {
202 name: metric_name.to_string(),
203 value,
204 period: Duration::from_secs(5),
205 date: metric_date.into(),
206 },
207 vec![],
208 );
209
210 let _event = persister.persist(message).unwrap();
211 }
212
213 #[test]
214 fn retrieved_inserted_event() {
215 let connection = Arc::new(event_store_db_connection().unwrap());
216 let persister = EventPersister::new(connection.clone());
217 insert_metric_event_with_origin(
218 &persister,
219 "2024-10-29 23:56:04",
220 "metric_1",
221 "ORIGIN",
222 15,
223 );
224
225 let result = get_all_metrics(connection).unwrap();
226
227 assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 15)));
228 }
229
230 #[test]
231 fn sum_metric_per_day() {
232 let connection = Arc::new(event_store_db_connection().unwrap());
233 let persister = EventPersister::new(connection.clone());
234 insert_metric_event_with_origin(
235 &persister,
236 "2024-10-29 21:00:00",
237 "metric_1",
238 "ORIGIN_A",
239 15,
240 );
241 insert_metric_event_with_origin(
242 &persister,
243 "2024-10-29 22:00:00",
244 "metric_1",
245 "ORIGIN_B",
246 60,
247 );
248 insert_metric_event_with_origin(
249 &persister,
250 "2024-10-29 23:00:00",
251 "metric_2",
252 "ORIGIN",
253 100,
254 );
255 insert_metric_event_with_origin(
256 &persister,
257 "2024-10-30 17:00:00",
258 "metric_1",
259 "ORIGIN_A",
260 12,
261 );
262 insert_metric_event_with_origin(
263 &persister,
264 "2024-10-30 18:00:00",
265 "metric_1",
266 "ORIGIN_B",
267 4,
268 );
269
270 let result = get_all_metrics(connection).unwrap();
271
272 assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 75)));
273 assert!(result.contains(&("2024-10-29".to_string(), "metric_2".to_string(), 100)));
274 assert!(result.contains(&("2024-10-30".to_string(), "metric_1".to_string(), 16)));
275 }
276
277 #[test]
278 fn sum_metric_per_day_and_origin() {
279 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
280 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
281 }
282
283 let connection = Arc::new(event_store_db_connection().unwrap());
284 let persister = EventPersister::new(connection.clone());
285 insert_metric_event_with_origin(
286 &persister,
287 "2024-10-29 21:00:00",
288 "metric_1",
289 "ORIGIN_A",
290 15,
291 );
292 insert_metric_event_with_origin(
293 &persister,
294 "2024-10-29 22:00:00",
295 "metric_1",
296 "ORIGIN_B",
297 60,
298 );
299 insert_metric_event_with_origin(
300 &persister,
301 "2024-10-29 23:00:00",
302 "metric_2",
303 "ORIGIN",
304 100,
305 );
306 insert_metric_event_with_origin(
307 &persister,
308 "2024-10-30 17:00:00",
309 "metric_1",
310 "ORIGIN_A",
311 12,
312 );
313 insert_metric_event_with_origin(
314 &persister,
315 "2024-10-30 18:00:00",
316 "metric_1",
317 "ORIGIN_B",
318 4,
319 );
320 insert_metric_event_with_origin(
321 &persister,
322 "2024-10-30 17:00:00",
323 "metric_1",
324 "ORIGIN_A",
325 15,
326 );
327 insert_metric_event_with_origin(
328 &persister,
329 "2024-10-30 18:00:00",
330 "metric_1",
331 "ORIGIN_B",
332 3,
333 );
334
335 let result = get_all_metrics_by_origin(connection).unwrap();
336 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
337
338 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
339 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 60)));
340 assert!(result.contains(&("2024-10-29", "metric_2", "ORIGIN", 100)));
341 assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_A", 27)));
342 assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_B", 7)));
343 }
344
345 #[test]
346 fn vue_metrics_per_day_and_client_type() {
347 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
348 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
349 }
350
351 let connection = Arc::new(event_store_db_connection().unwrap());
352 let persister = EventPersister::new(connection.clone());
353
354 insert_metric_event_message(
355 &persister,
356 "2024-10-29 21:00:00",
357 "metric_1",
358 "ORIGIN_A",
359 "CLIENT_TYPE_A",
360 15,
361 );
362
363 let result = get_all_metrics_by_client_type(connection).unwrap();
364 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
365
366 assert!(result.contains(&("2024-10-29", "metric_1", "CLIENT_TYPE_A", 15)));
367 }
368
369 #[test]
370 fn sum_metric_per_day_and_origin_on_old_event() {
371 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
372 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
373 }
374
375 let connection = Arc::new(event_store_db_connection().unwrap());
376
377 let persister = EventPersister::new(connection.clone());
378 insert_metric_event(&persister, "2024-10-29 21:00:00", "metric_1", 15);
379 insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 60);
380 insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_2", 100);
381 insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 12);
382 insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 4);
383 insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 15);
384 insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 3);
385
386 let result = get_all_metrics_by_origin(connection).unwrap();
387 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
388
389 assert!(result.contains(&("2024-10-29", "metric_1", "", 75)));
390 assert!(result.contains(&("2024-10-29", "metric_2", "", 100)));
391 assert!(result.contains(&("2024-10-30", "metric_1", "", 34)));
392 }
393
394 #[test]
395 fn sum_metric_per_day_and_origin_with_old_and_new_format() {
396 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
397 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
398 }
399
400 let connection = Arc::new(event_store_db_connection().unwrap());
401 let persister = EventPersister::new(connection.clone());
402 insert_metric_event_with_origin(
403 &persister,
404 "2024-10-29 21:00:00",
405 "metric_1",
406 "ORIGIN_A",
407 15,
408 );
409 insert_metric_event_with_origin(
410 &persister,
411 "2024-10-29 22:00:00",
412 "metric_1",
413 "ORIGIN_B",
414 60,
415 );
416 insert_metric_event_with_origin(
417 &persister,
418 "2024-10-29 23:00:00",
419 "metric_1",
420 "ORIGIN_B",
421 20,
422 );
423 insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 23);
424 insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_1", 31);
425
426 let result = get_all_metrics_by_origin(connection).unwrap();
427 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
428
429 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
430 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 80)));
431 assert!(result.contains(&("2024-10-29", "metric_1", "", 54)));
432 }
433 }
434
435 mod signer_registration_summary {
436 use std::sync::Arc;
437
438 use crate::event_store::database::test_helper::event_store_db_connection;
439 use mithril_common::entities::{SignerWithStake, Stake};
440 use mithril_common::{StdResult, test::double::fake_data};
441 use sqlite::ConnectionThreadSafe;
442
443 use super::{EventMessage, EventPersister};
444
445 fn insert_registration_event(
447 persister: &EventPersister,
448 epoch: &str,
449 party_id: &str,
450 stake: Stake,
451 signer_node_version: &str,
452 ) {
453 let signers = fake_data::signers_with_stakes(1);
454 let signer = SignerWithStake {
455 party_id: party_id.to_string(),
456 stake,
457 ..signers[0].clone()
458 };
459
460 let message = EventMessage::signer_registration(
461 "Test",
462 &signer,
463 Some(signer_node_version.to_string()),
464 epoch,
465 );
466
467 let _event = persister.persist(message).unwrap();
468 }
469
470 #[derive(PartialEq)]
471 struct StakeSignerVersion {
472 epoch: i64,
473 version: String,
474 total_epoch_stakes: i64,
475 stakes_version: i64,
476 stakes_ratio: String,
477 pool_count: i64,
478 }
479 impl StakeSignerVersion {
480 fn new(
481 epoch: i64,
482 version: &str,
483 total_epoch_stakes: i64,
484 stakes_version: i64,
485 stakes_ratio: &str,
486 pool_count: i64,
487 ) -> Self {
488 Self {
489 epoch,
490 version: version.to_string(),
491 total_epoch_stakes,
492 stakes_version,
493 stakes_ratio: stakes_ratio.to_string(),
494 pool_count,
495 }
496 }
497 }
498
499 fn get_all_registrations(
500 connection: Arc<ConnectionThreadSafe>,
501 ) -> StdResult<Vec<StakeSignerVersion>> {
502 let query = "select
503 epoch,
504 version,
505 total_epoch_stakes,
506 stakes_version,
507 stakes_ratio,
508 pool_count
509 from signer_registration_summary;";
510 let mut statement = connection.prepare(query)?;
511 let mut result = Vec::new();
512 while let Ok(sqlite::State::Row) = statement.next() {
513 result.push(StakeSignerVersion::new(
514 statement.read::<i64, _>("epoch")?,
515 &statement.read::<String, _>("version")?,
516 statement.read::<i64, _>("total_epoch_stakes")?,
517 statement.read::<i64, _>("stakes_version")?,
518 &statement.read::<String, _>("stakes_ratio")?,
519 statement.read::<i64, _>("pool_count")?,
520 ));
521 }
522
523 Ok(result)
524 }
525
526 #[test]
527 fn retrieved_node_version() {
528 let connection = Arc::new(event_store_db_connection().unwrap());
529 let persister = EventPersister::new(connection.clone());
530
531 insert_registration_event(&persister, "3", "A", 15, "0.2.234");
532 insert_registration_event(&persister, "4", "A", 15, "15.24.32");
533 insert_registration_event(&persister, "5", "A", 15, "0.4.789+ef0c28a");
534
535 let result = get_all_registrations(connection).unwrap();
536
537 assert!(result.contains(&StakeSignerVersion::new(3, "0.2.234", 15, 15, "100 %", 1)));
538 assert!(result.contains(&StakeSignerVersion::new(4, "15.24.32", 15, 15, "100 %", 1)));
539 assert!(result.contains(&StakeSignerVersion::new(5, "0.4.789", 15, 15, "100 %", 1)));
540 }
541
542 #[test]
543 fn retrieved_total_by_epoch() {
544 let connection = Arc::new(event_store_db_connection().unwrap());
545 let persister = EventPersister::new(connection.clone());
546
547 insert_registration_event(&persister, "8", "A", 20, "1.0.2");
548 insert_registration_event(&persister, "8", "B", 15, "1.0.2");
549 insert_registration_event(&persister, "9", "A", 56, "1.0.2");
550 insert_registration_event(&persister, "9", "B", 31, "1.0.2");
551 let result = get_all_registrations(connection).unwrap();
552
553 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 35, 35, "100 %", 2)));
554 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 87, 87, "100 %", 2)));
555 }
556
557 #[test]
558 fn retrieved_percentage_per_version() {
559 let connection = Arc::new(event_store_db_connection().unwrap());
560 let persister = EventPersister::new(connection.clone());
561
562 insert_registration_event(&persister, "8", "A", 90, "1.0.2");
563 insert_registration_event(&persister, "8", "B", 30, "1.0.2");
564 insert_registration_event(&persister, "8", "C", 80, "1.0.4");
565 let result = get_all_registrations(connection).unwrap();
566
567 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 200, 120, "60 %", 2)));
568 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 200, 80, "40 %", 1)));
569 }
570
571 #[test]
572 fn retrieved_percentage_per_epoch() {
573 let connection = Arc::new(event_store_db_connection().unwrap());
574 let persister = EventPersister::new(connection.clone());
575
576 insert_registration_event(&persister, "8", "A", 6, "1.0.2");
577 insert_registration_event(&persister, "8", "B", 4, "1.0.4");
578 insert_registration_event(&persister, "9", "A", 28, "1.0.2");
579 insert_registration_event(&persister, "9", "B", 12, "1.0.4");
580 let result = get_all_registrations(connection).unwrap();
581
582 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 10, 6, "60 %", 1)));
583 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 10, 4, "40 %", 1)));
584 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 40, 28, "70 %", 1)));
585 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.4", 40, 12, "30 %", 1)));
586 }
587
588 #[test]
589 fn with_multi_registrations_for_an_epoch_only_the_last_recorded_one_is_retained() {
590 let connection = Arc::new(event_store_db_connection().unwrap());
591 let persister = EventPersister::new(connection.clone());
592
593 insert_registration_event(&persister, "8", "A", 6, "1.0.2");
594 insert_registration_event(&persister, "8", "A", 8, "1.0.2");
595 insert_registration_event(&persister, "8", "A", 10, "1.0.4");
596 insert_registration_event(&persister, "8", "A", 7, "1.0.3");
597
598 let result = get_all_registrations(connection).unwrap();
599
600 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.3", 7, 7, "100 %", 1)));
601 assert!(result.len() == 1);
602 }
603 }
604}