1use anyhow::anyhow;
4use mithril_common::StdResult;
5use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
6
7use std::sync::Arc;
8
9use crate::event_store::database::query::InsertEventQuery;
10use crate::event_store::{event::Event, EventMessage};
11pub struct EventPersister {
14 connection: Arc<SqliteConnection>,
15}
16
17impl EventPersister {
18 pub fn new(connection: Arc<SqliteConnection>) -> Self {
20 Self { connection }
21 }
22
23 pub fn persist(&self, message: EventMessage) -> StdResult<Event> {
25 let log_message = message.clone();
26 let mut rows = self.connection.fetch(InsertEventQuery::one(message)?)?;
27
28 rows.next().ok_or(anyhow!(
29 "No record from the database after saving event message {log_message:?}"
30 ))
31 }
32}
33
34#[cfg(test)]
35mod tests {
36 use super::*;
37 use crate::event_store::database::test_helper::event_store_db_connection;
38 use mithril_common::StdResult;
39
40 #[test]
41 fn can_persist_event() -> StdResult<()> {
42 let connection = Arc::new(event_store_db_connection().unwrap());
43
44 let persister = EventPersister::new(connection);
45 let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
46
47 let _event = persister.persist(message)?;
48 Ok(())
49 }
50
51 #[test]
52 fn migration_executed_running_database() -> StdResult<()> {
53 let connection = Arc::new(event_store_db_connection().unwrap());
54
55 let persister = EventPersister::new(connection);
56 let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
57
58 let _event = persister.persist(message)?;
59 Ok(())
60 }
61
62 mod metrics_per_day_view {
63
64 use std::time::Duration;
65
66 use crate::{
67 event_store::database::test_helper::event_store_db_connection, services::UsageReporter,
68 };
69 use chrono::{DateTime, Utc};
70
71 use mithril_common::StdResult;
72
73 use serde::{Deserialize, Serialize};
74 use sqlite::ConnectionThreadSafe;
75
76 use super::*;
77
78 fn get_all_metrics(
79 connection: Arc<ConnectionThreadSafe>,
80 ) -> StdResult<Vec<(String, String, i64)>> {
81 let query = "select date, counter_name, value from metrics_per_day";
82 let mut statement = connection.prepare(query)?;
83 let mut result = Vec::new();
84 while let Ok(sqlite::State::Row) = statement.next() {
85 result.push((
86 statement.read::<String, _>("date")?,
87 statement.read::<String, _>("counter_name")?,
88 statement.read::<i64, _>("value")?,
89 ));
90 }
91
92 Ok(result)
93 }
94
95 fn get_all_metrics_by_origin(
96 connection: Arc<ConnectionThreadSafe>,
97 ) -> StdResult<Vec<(String, String, String, i64)>> {
98 let query = "select date, counter_name, origin, value from metrics_per_day_and_origin";
99 let mut statement = connection.prepare(query)?;
100 let mut result = Vec::new();
101 while let Ok(sqlite::State::Row) = statement.next() {
102 result.push((
103 statement.read::<String, _>("date")?,
104 statement.read::<String, _>("counter_name")?,
105 statement
106 .read::<Option<String>, _>("origin")?
107 .unwrap_or_default(),
108 statement.read::<i64, _>("value")?,
109 ));
110 }
111
112 Ok(result)
113 }
114
115 fn get_all_metrics_by_client_type(
116 connection: Arc<ConnectionThreadSafe>,
117 ) -> StdResult<Vec<(String, String, String, i64)>> {
118 let query = "select date, counter_name, client_type, value from metrics_per_day_and_client_type";
119 let mut statement = connection.prepare(query)?;
120 let mut result = Vec::new();
121 while let Ok(sqlite::State::Row) = statement.next() {
122 result.push((
123 statement.read::<String, _>("date")?,
124 statement.read::<String, _>("counter_name")?,
125 statement
126 .read::<Option<String>, _>("client_type")?
127 .unwrap_or_default(),
128 statement.read::<i64, _>("value")?,
129 ));
130 }
131
132 Ok(result)
133 }
134
135 fn insert_metric_event_with_origin(
138 persister: &EventPersister,
139 date: &str,
140 metric_name: &str,
141 origin: &str,
142 value: i64,
143 ) {
144 let metric_date =
145 DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
146
147 let message = UsageReporter::create_metrics_event_message(
148 metric_name.to_string(),
149 value,
150 Duration::from_secs(5),
151 origin.to_string(),
152 "CLIENT_TYPE_A".to_string(),
153 metric_date.into(),
154 );
155
156 let _event = persister.persist(message).unwrap();
157 }
158
159 fn insert_metric_event_message(
160 persister: &EventPersister,
161 date: &str,
162 metric_name: &str,
163 origin: &str,
164 client_type: &str,
165 value: i64,
166 ) {
167 let metric_date =
168 DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
169
170 let message = UsageReporter::create_metrics_event_message(
171 metric_name.to_string(),
172 value,
173 Duration::from_secs(5),
174 origin.to_string(),
175 client_type.to_string(),
176 metric_date.into(),
177 );
178
179 let _event = persister.persist(message).unwrap();
180 }
181
182 fn insert_metric_event(
184 persister: &EventPersister,
185 date: &str,
186 metric_name: &str,
187 value: i64,
188 ) {
189 #[derive(Serialize, Deserialize)]
190 struct OldMetricEventMessage {
191 name: String,
192 value: i64,
193 period: Duration,
194 date: DateTime<Utc>,
195 }
196
197 let metric_date =
198 DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
199
200 let message = EventMessage::new(
201 "Metrics",
202 metric_name,
203 &OldMetricEventMessage {
204 name: metric_name.to_string(),
205 value,
206 period: Duration::from_secs(5),
207 date: metric_date.into(),
208 },
209 vec![],
210 );
211
212 let _event = persister.persist(message).unwrap();
213 }
214
215 #[test]
216 fn retrieved_inserted_event() {
217 let connection = Arc::new(event_store_db_connection().unwrap());
218 let persister = EventPersister::new(connection.clone());
219 insert_metric_event_with_origin(
220 &persister,
221 "2024-10-29 23:56:04",
222 "metric_1",
223 "ORIGIN",
224 15,
225 );
226
227 let result = get_all_metrics(connection).unwrap();
228
229 assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 15)));
230 }
231
232 #[test]
233 fn sum_metric_per_day() {
234 let connection = Arc::new(event_store_db_connection().unwrap());
235 let persister = EventPersister::new(connection.clone());
236 insert_metric_event_with_origin(
237 &persister,
238 "2024-10-29 21:00:00",
239 "metric_1",
240 "ORIGIN_A",
241 15,
242 );
243 insert_metric_event_with_origin(
244 &persister,
245 "2024-10-29 22:00:00",
246 "metric_1",
247 "ORIGIN_B",
248 60,
249 );
250 insert_metric_event_with_origin(
251 &persister,
252 "2024-10-29 23:00:00",
253 "metric_2",
254 "ORIGIN",
255 100,
256 );
257 insert_metric_event_with_origin(
258 &persister,
259 "2024-10-30 17:00:00",
260 "metric_1",
261 "ORIGIN_A",
262 12,
263 );
264 insert_metric_event_with_origin(
265 &persister,
266 "2024-10-30 18:00:00",
267 "metric_1",
268 "ORIGIN_B",
269 4,
270 );
271
272 let result = get_all_metrics(connection).unwrap();
273
274 assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 75)));
275 assert!(result.contains(&("2024-10-29".to_string(), "metric_2".to_string(), 100)));
276 assert!(result.contains(&("2024-10-30".to_string(), "metric_1".to_string(), 16)));
277 }
278
279 #[test]
280 fn sum_metric_per_day_and_origin() {
281 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
282 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
283 }
284
285 let connection = Arc::new(event_store_db_connection().unwrap());
286 let persister = EventPersister::new(connection.clone());
287 insert_metric_event_with_origin(
288 &persister,
289 "2024-10-29 21:00:00",
290 "metric_1",
291 "ORIGIN_A",
292 15,
293 );
294 insert_metric_event_with_origin(
295 &persister,
296 "2024-10-29 22:00:00",
297 "metric_1",
298 "ORIGIN_B",
299 60,
300 );
301 insert_metric_event_with_origin(
302 &persister,
303 "2024-10-29 23:00:00",
304 "metric_2",
305 "ORIGIN",
306 100,
307 );
308 insert_metric_event_with_origin(
309 &persister,
310 "2024-10-30 17:00:00",
311 "metric_1",
312 "ORIGIN_A",
313 12,
314 );
315 insert_metric_event_with_origin(
316 &persister,
317 "2024-10-30 18:00:00",
318 "metric_1",
319 "ORIGIN_B",
320 4,
321 );
322 insert_metric_event_with_origin(
323 &persister,
324 "2024-10-30 17:00:00",
325 "metric_1",
326 "ORIGIN_A",
327 15,
328 );
329 insert_metric_event_with_origin(
330 &persister,
331 "2024-10-30 18:00:00",
332 "metric_1",
333 "ORIGIN_B",
334 3,
335 );
336
337 let result = get_all_metrics_by_origin(connection).unwrap();
338 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
339
340 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
341 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 60)));
342 assert!(result.contains(&("2024-10-29", "metric_2", "ORIGIN", 100)));
343 assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_A", 27)));
344 assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_B", 7)));
345 }
346
347 #[test]
348 fn vue_metrics_per_day_and_client_type() {
349 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
350 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
351 }
352
353 let connection = Arc::new(event_store_db_connection().unwrap());
354 let persister = EventPersister::new(connection.clone());
355
356 insert_metric_event_message(
357 &persister,
358 "2024-10-29 21:00:00",
359 "metric_1",
360 "ORIGIN_A",
361 "CLIENT_TYPE_A",
362 15,
363 );
364
365 let result = get_all_metrics_by_client_type(connection).unwrap();
366 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
367
368 assert!(result.contains(&("2024-10-29", "metric_1", "CLIENT_TYPE_A", 15)));
369 }
370
371 #[test]
372 fn sum_metric_per_day_and_origin_on_old_event() {
373 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
374 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
375 }
376
377 let connection = Arc::new(event_store_db_connection().unwrap());
378
379 let persister = EventPersister::new(connection.clone());
380 insert_metric_event(&persister, "2024-10-29 21:00:00", "metric_1", 15);
381 insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 60);
382 insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_2", 100);
383 insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 12);
384 insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 4);
385 insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 15);
386 insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 3);
387
388 let result = get_all_metrics_by_origin(connection).unwrap();
389 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
390
391 assert!(result.contains(&("2024-10-29", "metric_1", "", 75)));
392 assert!(result.contains(&("2024-10-29", "metric_2", "", 100)));
393 assert!(result.contains(&("2024-10-30", "metric_1", "", 34)));
394 }
395
396 #[test]
397 fn sum_metric_per_day_and_origin_with_old_and_new_format() {
398 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
399 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
400 }
401
402 let connection = Arc::new(event_store_db_connection().unwrap());
403 let persister = EventPersister::new(connection.clone());
404 insert_metric_event_with_origin(
405 &persister,
406 "2024-10-29 21:00:00",
407 "metric_1",
408 "ORIGIN_A",
409 15,
410 );
411 insert_metric_event_with_origin(
412 &persister,
413 "2024-10-29 22:00:00",
414 "metric_1",
415 "ORIGIN_B",
416 60,
417 );
418 insert_metric_event_with_origin(
419 &persister,
420 "2024-10-29 23:00:00",
421 "metric_1",
422 "ORIGIN_B",
423 20,
424 );
425 insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 23);
426 insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_1", 31);
427
428 let result = get_all_metrics_by_origin(connection).unwrap();
429 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
430
431 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
432 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 80)));
433 assert!(result.contains(&("2024-10-29", "metric_1", "", 54)));
434 }
435 }
436
437 mod signer_registration_summary {
438 use std::sync::Arc;
439
440 use crate::event_store::database::test_helper::event_store_db_connection;
441 use mithril_common::entities::{SignerWithStake, Stake};
442 use mithril_common::{test_utils::fake_data, StdResult};
443 use sqlite::ConnectionThreadSafe;
444
445 use super::{EventMessage, EventPersister};
446
447 fn insert_registration_event(
449 persister: &EventPersister,
450 epoch: &str,
451 party_id: &str,
452 stake: Stake,
453 signer_node_version: &str,
454 ) {
455 let signers = fake_data::signers_with_stakes(1);
456 let signer = SignerWithStake {
457 party_id: party_id.to_string(),
458 stake,
459 ..signers[0].clone()
460 };
461
462 let message = EventMessage::signer_registration(
463 "Test",
464 &signer,
465 Some(signer_node_version.to_string()),
466 epoch,
467 );
468
469 let _event = persister.persist(message).unwrap();
470 }
471
472 #[derive(PartialEq)]
473 struct StakeSignerVersion {
474 epoch: i64,
475 version: String,
476 total_epoch_stakes: i64,
477 stakes_version: i64,
478 stakes_ratio: String,
479 pool_count: i64,
480 }
481 impl StakeSignerVersion {
482 fn new(
483 epoch: i64,
484 version: &str,
485 total_epoch_stakes: i64,
486 stakes_version: i64,
487 stakes_ratio: &str,
488 pool_count: i64,
489 ) -> Self {
490 Self {
491 epoch,
492 version: version.to_string(),
493 total_epoch_stakes,
494 stakes_version,
495 stakes_ratio: stakes_ratio.to_string(),
496 pool_count,
497 }
498 }
499 }
500
501 fn get_all_registrations(
502 connection: Arc<ConnectionThreadSafe>,
503 ) -> StdResult<Vec<StakeSignerVersion>> {
504 let query = "select
505 epoch,
506 version,
507 total_epoch_stakes,
508 stakes_version,
509 stakes_ratio,
510 pool_count
511 from signer_registration_summary;";
512 let mut statement = connection.prepare(query)?;
513 let mut result = Vec::new();
514 while let Ok(sqlite::State::Row) = statement.next() {
515 result.push(StakeSignerVersion::new(
516 statement.read::<i64, _>("epoch")?,
517 &statement.read::<String, _>("version")?,
518 statement.read::<i64, _>("total_epoch_stakes")?,
519 statement.read::<i64, _>("stakes_version")?,
520 &statement.read::<String, _>("stakes_ratio")?,
521 statement.read::<i64, _>("pool_count")?,
522 ));
523 }
524
525 Ok(result)
526 }
527
528 #[test]
529 fn retrieved_node_version() {
530 let connection = Arc::new(event_store_db_connection().unwrap());
531 let persister = EventPersister::new(connection.clone());
532
533 insert_registration_event(&persister, "3", "A", 15, "0.2.234");
534 insert_registration_event(&persister, "4", "A", 15, "15.24.32");
535 insert_registration_event(&persister, "5", "A", 15, "0.4.789+ef0c28a");
536
537 let result = get_all_registrations(connection).unwrap();
538
539 assert!(result.contains(&StakeSignerVersion::new(3, "0.2.234", 15, 15, "100 %", 1)));
540 assert!(result.contains(&StakeSignerVersion::new(4, "15.24.32", 15, 15, "100 %", 1)));
541 assert!(result.contains(&StakeSignerVersion::new(5, "0.4.789", 15, 15, "100 %", 1)));
542 }
543
544 #[test]
545 fn retrieved_total_by_epoch() {
546 let connection = Arc::new(event_store_db_connection().unwrap());
547 let persister = EventPersister::new(connection.clone());
548
549 insert_registration_event(&persister, "8", "A", 20, "1.0.2");
550 insert_registration_event(&persister, "8", "B", 15, "1.0.2");
551 insert_registration_event(&persister, "9", "A", 56, "1.0.2");
552 insert_registration_event(&persister, "9", "B", 31, "1.0.2");
553 let result = get_all_registrations(connection).unwrap();
554
555 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 35, 35, "100 %", 2)));
556 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 87, 87, "100 %", 2)));
557 }
558
559 #[test]
560 fn retrieved_percentage_per_version() {
561 let connection = Arc::new(event_store_db_connection().unwrap());
562 let persister = EventPersister::new(connection.clone());
563
564 insert_registration_event(&persister, "8", "A", 90, "1.0.2");
565 insert_registration_event(&persister, "8", "B", 30, "1.0.2");
566 insert_registration_event(&persister, "8", "C", 80, "1.0.4");
567 let result = get_all_registrations(connection).unwrap();
568
569 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 200, 120, "60 %", 2)));
570 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 200, 80, "40 %", 1)));
571 }
572
573 #[test]
574 fn retrieved_percentage_per_epoch() {
575 let connection = Arc::new(event_store_db_connection().unwrap());
576 let persister = EventPersister::new(connection.clone());
577
578 insert_registration_event(&persister, "8", "A", 6, "1.0.2");
579 insert_registration_event(&persister, "8", "B", 4, "1.0.4");
580 insert_registration_event(&persister, "9", "A", 28, "1.0.2");
581 insert_registration_event(&persister, "9", "B", 12, "1.0.4");
582 let result = get_all_registrations(connection).unwrap();
583
584 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 10, 6, "60 %", 1)));
585 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 10, 4, "40 %", 1)));
586 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 40, 28, "70 %", 1)));
587 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.4", 40, 12, "30 %", 1)));
588 }
589
590 #[test]
591 fn with_multi_registrations_for_an_epoch_only_the_last_recorded_one_is_retained() {
592 let connection = Arc::new(event_store_db_connection().unwrap());
593 let persister = EventPersister::new(connection.clone());
594
595 insert_registration_event(&persister, "8", "A", 6, "1.0.2");
596 insert_registration_event(&persister, "8", "A", 8, "1.0.2");
597 insert_registration_event(&persister, "8", "A", 10, "1.0.4");
598 insert_registration_event(&persister, "8", "A", 7, "1.0.3");
599
600 let result = get_all_registrations(connection).unwrap();
601
602 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.3", 7, 7, "100 %", 1)));
603 assert!(result.len() == 1);
604 }
605 }
606}