1use anyhow::anyhow;
4use mithril_common::StdResult;
5use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
6
7use std::sync::Arc;
8
9use crate::event_store::database::query::InsertEventQuery;
10use crate::event_store::{event::Event, EventMessage};
11pub struct EventPersister {
14 connection: Arc<SqliteConnection>,
15}
16
17impl EventPersister {
18 pub fn new(connection: Arc<SqliteConnection>) -> Self {
20 Self { connection }
21 }
22
23 pub fn persist(&self, message: EventMessage) -> StdResult<Event> {
25 let log_message = message.clone();
26 let mut rows = self.connection.fetch(InsertEventQuery::one(message)?)?;
27
28 rows.next().ok_or(anyhow!(
29 "No record from the database after saving event message {log_message:?}"
30 ))
31 }
32}
33
34#[cfg(test)]
35mod tests {
36 use super::*;
37 use crate::event_store::database::test_helper::event_store_db_connection;
38 use mithril_common::StdResult;
39
40 #[test]
41 fn can_persist_event() -> StdResult<()> {
42 let connection = Arc::new(event_store_db_connection().unwrap());
43
44 let persister = EventPersister::new(connection);
45 let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
46
47 let _event = persister.persist(message)?;
48 Ok(())
49 }
50
51 #[test]
52 fn migration_executed_running_database() -> StdResult<()> {
53 let connection = Arc::new(event_store_db_connection().unwrap());
54
55 let persister = EventPersister::new(connection);
56 let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
57
58 let _event = persister.persist(message)?;
59 Ok(())
60 }
61
62 mod metrics_per_day_view {
63
64 use std::time::Duration;
65
66 use crate::{
67 event_store::database::test_helper::event_store_db_connection, services::UsageReporter,
68 };
69 use chrono::{DateTime, Utc};
70
71 use mithril_common::StdResult;
72
73 use serde::{Deserialize, Serialize};
74 use sqlite::ConnectionThreadSafe;
75
76 use super::*;
77
78 fn get_all_metrics(
79 connection: Arc<ConnectionThreadSafe>,
80 ) -> StdResult<Vec<(String, String, i64)>> {
81 let query = "select date, counter_name, value from metrics_per_day";
82 let mut statement = connection.prepare(query)?;
83 let mut result = Vec::new();
84 while let Ok(sqlite::State::Row) = statement.next() {
85 result.push((
86 statement.read::<String, _>("date")?,
87 statement.read::<String, _>("counter_name")?,
88 statement.read::<i64, _>("value")?,
89 ));
90 }
91
92 Ok(result)
93 }
94
95 fn get_all_metrics_by_origin(
96 connection: Arc<ConnectionThreadSafe>,
97 ) -> StdResult<Vec<(String, String, String, i64)>> {
98 let query = "select date, counter_name, origin, value from metrics_per_day_and_origin";
99 let mut statement = connection.prepare(query)?;
100 let mut result = Vec::new();
101 while let Ok(sqlite::State::Row) = statement.next() {
102 result.push((
103 statement.read::<String, _>("date")?,
104 statement.read::<String, _>("counter_name")?,
105 statement
106 .read::<Option<String>, _>("origin")?
107 .unwrap_or_default(),
108 statement.read::<i64, _>("value")?,
109 ));
110 }
111
112 Ok(result)
113 }
114
115 fn insert_metric_event_with_origin(
118 persister: &EventPersister,
119 date: &str,
120 metric_name: &str,
121 origin: &str,
122 value: i64,
123 ) {
124 let metric_date =
125 DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
126
127 let message = UsageReporter::create_metrics_event_message(
128 metric_name.to_string(),
129 value,
130 Duration::from_secs(5),
131 origin.to_string(),
132 metric_date.into(),
133 );
134
135 let _event = persister.persist(message).unwrap();
136 }
137
138 fn insert_metric_event(
140 persister: &EventPersister,
141 date: &str,
142 metric_name: &str,
143 value: i64,
144 ) {
145 #[derive(Serialize, Deserialize)]
146 struct OldMetricEventMessage {
147 name: String,
148 value: i64,
149 period: Duration,
150 date: DateTime<Utc>,
151 }
152
153 let metric_date =
154 DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
155
156 let message = EventMessage::new(
157 "Metrics",
158 metric_name,
159 &OldMetricEventMessage {
160 name: metric_name.to_string(),
161 value,
162 period: Duration::from_secs(5),
163 date: metric_date.into(),
164 },
165 vec![],
166 );
167
168 let _event = persister.persist(message).unwrap();
169 }
170
171 #[test]
172 fn retrieved_inserted_event() {
173 let connection = Arc::new(event_store_db_connection().unwrap());
174 let persister = EventPersister::new(connection.clone());
175 insert_metric_event_with_origin(
176 &persister,
177 "2024-10-29 23:56:04",
178 "metric_1",
179 "ORIGIN",
180 15,
181 );
182
183 let result = get_all_metrics(connection).unwrap();
184
185 assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 15)));
186 }
187
188 #[test]
189 fn sum_metric_per_day() {
190 let connection = Arc::new(event_store_db_connection().unwrap());
191 let persister = EventPersister::new(connection.clone());
192 insert_metric_event_with_origin(
193 &persister,
194 "2024-10-29 21:00:00",
195 "metric_1",
196 "ORIGIN_A",
197 15,
198 );
199 insert_metric_event_with_origin(
200 &persister,
201 "2024-10-29 22:00:00",
202 "metric_1",
203 "ORIGIN_B",
204 60,
205 );
206 insert_metric_event_with_origin(
207 &persister,
208 "2024-10-29 23:00:00",
209 "metric_2",
210 "ORIGIN",
211 100,
212 );
213 insert_metric_event_with_origin(
214 &persister,
215 "2024-10-30 17:00:00",
216 "metric_1",
217 "ORIGIN_A",
218 12,
219 );
220 insert_metric_event_with_origin(
221 &persister,
222 "2024-10-30 18:00:00",
223 "metric_1",
224 "ORIGIN_B",
225 4,
226 );
227
228 let result = get_all_metrics(connection).unwrap();
229
230 assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 75)));
231 assert!(result.contains(&("2024-10-29".to_string(), "metric_2".to_string(), 100)));
232 assert!(result.contains(&("2024-10-30".to_string(), "metric_1".to_string(), 16)));
233 }
234
235 #[test]
236 fn sum_metric_per_day_and_origin() {
237 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
238 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
239 }
240
241 let connection = Arc::new(event_store_db_connection().unwrap());
242 let persister = EventPersister::new(connection.clone());
243 insert_metric_event_with_origin(
244 &persister,
245 "2024-10-29 21:00:00",
246 "metric_1",
247 "ORIGIN_A",
248 15,
249 );
250 insert_metric_event_with_origin(
251 &persister,
252 "2024-10-29 22:00:00",
253 "metric_1",
254 "ORIGIN_B",
255 60,
256 );
257 insert_metric_event_with_origin(
258 &persister,
259 "2024-10-29 23:00:00",
260 "metric_2",
261 "ORIGIN",
262 100,
263 );
264 insert_metric_event_with_origin(
265 &persister,
266 "2024-10-30 17:00:00",
267 "metric_1",
268 "ORIGIN_A",
269 12,
270 );
271 insert_metric_event_with_origin(
272 &persister,
273 "2024-10-30 18:00:00",
274 "metric_1",
275 "ORIGIN_B",
276 4,
277 );
278 insert_metric_event_with_origin(
279 &persister,
280 "2024-10-30 17:00:00",
281 "metric_1",
282 "ORIGIN_A",
283 15,
284 );
285 insert_metric_event_with_origin(
286 &persister,
287 "2024-10-30 18:00:00",
288 "metric_1",
289 "ORIGIN_B",
290 3,
291 );
292
293 let result = get_all_metrics_by_origin(connection).unwrap();
294 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
295
296 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
297 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 60)));
298 assert!(result.contains(&("2024-10-29", "metric_2", "ORIGIN", 100)));
299 assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_A", 27)));
300 assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_B", 7)));
301 }
302
303 #[test]
304 fn sum_metric_per_day_and_origin_on_old_event() {
305 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
306 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
307 }
308
309 let connection = Arc::new(event_store_db_connection().unwrap());
310
311 let persister = EventPersister::new(connection.clone());
312 insert_metric_event(&persister, "2024-10-29 21:00:00", "metric_1", 15);
313 insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 60);
314 insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_2", 100);
315 insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 12);
316 insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 4);
317 insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 15);
318 insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 3);
319
320 let result = get_all_metrics_by_origin(connection).unwrap();
321 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
322
323 assert!(result.contains(&("2024-10-29", "metric_1", "", 75)));
324 assert!(result.contains(&("2024-10-29", "metric_2", "", 100)));
325 assert!(result.contains(&("2024-10-30", "metric_1", "", 34)));
326 }
327
328 #[test]
329 fn sum_metric_per_day_and_origin_with_old_and_new_format() {
330 fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
331 (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
332 }
333
334 let connection = Arc::new(event_store_db_connection().unwrap());
335 let persister = EventPersister::new(connection.clone());
336 insert_metric_event_with_origin(
337 &persister,
338 "2024-10-29 21:00:00",
339 "metric_1",
340 "ORIGIN_A",
341 15,
342 );
343 insert_metric_event_with_origin(
344 &persister,
345 "2024-10-29 22:00:00",
346 "metric_1",
347 "ORIGIN_B",
348 60,
349 );
350 insert_metric_event_with_origin(
351 &persister,
352 "2024-10-29 23:00:00",
353 "metric_1",
354 "ORIGIN_B",
355 20,
356 );
357 insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 23);
358 insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_1", 31);
359
360 let result = get_all_metrics_by_origin(connection).unwrap();
361 let result: Vec<_> = result.iter().map(tuple_with_str).collect();
362
363 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
364 assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 80)));
365 assert!(result.contains(&("2024-10-29", "metric_1", "", 54)));
366 }
367 }
368
369 mod signer_registration_summary {
370 use std::sync::Arc;
371
372 use crate::event_store::database::test_helper::event_store_db_connection;
373 use mithril_common::entities::{SignerWithStake, Stake};
374 use mithril_common::{test_utils::fake_data, StdResult};
375 use sqlite::ConnectionThreadSafe;
376
377 use super::{EventMessage, EventPersister};
378
379 fn insert_registration_event(
381 persister: &EventPersister,
382 epoch: &str,
383 party_id: &str,
384 stake: Stake,
385 signer_node_version: &str,
386 ) {
387 let signers = fake_data::signers_with_stakes(1);
388 let signer = SignerWithStake {
389 party_id: party_id.to_string(),
390 stake,
391 ..signers[0].clone()
392 };
393
394 let message = EventMessage::signer_registration(
395 "Test",
396 &signer,
397 Some(signer_node_version.to_string()),
398 epoch,
399 );
400
401 let _event = persister.persist(message).unwrap();
402 }
403
404 #[derive(PartialEq)]
405 struct StakeSignerVersion {
406 epoch: i64,
407 version: String,
408 total_epoch_stakes: i64,
409 stakes_version: i64,
410 stakes_ratio: String,
411 pool_count: i64,
412 }
413 impl StakeSignerVersion {
414 fn new(
415 epoch: i64,
416 version: &str,
417 total_epoch_stakes: i64,
418 stakes_version: i64,
419 stakes_ratio: &str,
420 pool_count: i64,
421 ) -> Self {
422 Self {
423 epoch,
424 version: version.to_string(),
425 total_epoch_stakes,
426 stakes_version,
427 stakes_ratio: stakes_ratio.to_string(),
428 pool_count,
429 }
430 }
431 }
432
433 fn get_all_registrations(
434 connection: Arc<ConnectionThreadSafe>,
435 ) -> StdResult<Vec<StakeSignerVersion>> {
436 let query = "select
437 epoch,
438 version,
439 total_epoch_stakes,
440 stakes_version,
441 stakes_ratio,
442 pool_count
443 from signer_registration_summary;";
444 let mut statement = connection.prepare(query)?;
445 let mut result = Vec::new();
446 while let Ok(sqlite::State::Row) = statement.next() {
447 result.push(StakeSignerVersion::new(
448 statement.read::<i64, _>("epoch")?,
449 &statement.read::<String, _>("version")?,
450 statement.read::<i64, _>("total_epoch_stakes")?,
451 statement.read::<i64, _>("stakes_version")?,
452 &statement.read::<String, _>("stakes_ratio")?,
453 statement.read::<i64, _>("pool_count")?,
454 ));
455 }
456
457 Ok(result)
458 }
459
460 #[test]
461 fn retrieved_node_version() {
462 let connection = Arc::new(event_store_db_connection().unwrap());
463 let persister = EventPersister::new(connection.clone());
464
465 insert_registration_event(&persister, "3", "A", 15, "0.2.234");
466 insert_registration_event(&persister, "4", "A", 15, "15.24.32");
467 insert_registration_event(&persister, "5", "A", 15, "0.4.789+ef0c28a");
468
469 let result = get_all_registrations(connection).unwrap();
470
471 assert!(result.contains(&StakeSignerVersion::new(3, "0.2.234", 15, 15, "100 %", 1)));
472 assert!(result.contains(&StakeSignerVersion::new(4, "15.24.32", 15, 15, "100 %", 1)));
473 assert!(result.contains(&StakeSignerVersion::new(5, "0.4.789", 15, 15, "100 %", 1)));
474 }
475
476 #[test]
477 fn retrieved_total_by_epoch() {
478 let connection = Arc::new(event_store_db_connection().unwrap());
479 let persister = EventPersister::new(connection.clone());
480
481 insert_registration_event(&persister, "8", "A", 20, "1.0.2");
482 insert_registration_event(&persister, "8", "B", 15, "1.0.2");
483 insert_registration_event(&persister, "9", "A", 56, "1.0.2");
484 insert_registration_event(&persister, "9", "B", 31, "1.0.2");
485 let result = get_all_registrations(connection).unwrap();
486
487 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 35, 35, "100 %", 2)));
488 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 87, 87, "100 %", 2)));
489 }
490
491 #[test]
492 fn retrieved_percentage_per_version() {
493 let connection = Arc::new(event_store_db_connection().unwrap());
494 let persister = EventPersister::new(connection.clone());
495
496 insert_registration_event(&persister, "8", "A", 90, "1.0.2");
497 insert_registration_event(&persister, "8", "B", 30, "1.0.2");
498 insert_registration_event(&persister, "8", "C", 80, "1.0.4");
499 let result = get_all_registrations(connection).unwrap();
500
501 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 200, 120, "60 %", 2)));
502 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 200, 80, "40 %", 1)));
503 }
504
505 #[test]
506 fn retrieved_percentage_per_epoch() {
507 let connection = Arc::new(event_store_db_connection().unwrap());
508 let persister = EventPersister::new(connection.clone());
509
510 insert_registration_event(&persister, "8", "A", 6, "1.0.2");
511 insert_registration_event(&persister, "8", "B", 4, "1.0.4");
512 insert_registration_event(&persister, "9", "A", 28, "1.0.2");
513 insert_registration_event(&persister, "9", "B", 12, "1.0.4");
514 let result = get_all_registrations(connection).unwrap();
515
516 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 10, 6, "60 %", 1)));
517 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 10, 4, "40 %", 1)));
518 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 40, 28, "70 %", 1)));
519 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.4", 40, 12, "30 %", 1)));
520 }
521
522 #[test]
523 fn with_multi_registrations_for_an_epoch_only_the_last_recorded_one_is_retained() {
524 let connection = Arc::new(event_store_db_connection().unwrap());
525 let persister = EventPersister::new(connection.clone());
526
527 insert_registration_event(&persister, "8", "A", 6, "1.0.2");
528 insert_registration_event(&persister, "8", "A", 8, "1.0.2");
529 insert_registration_event(&persister, "8", "A", 10, "1.0.4");
530 insert_registration_event(&persister, "8", "A", 7, "1.0.3");
531
532 let result = get_all_registrations(connection).unwrap();
533
534 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.3", 7, 7, "100 %", 1)));
535 assert!(result.len() == 1);
536 }
537 }
538}