mithril_aggregator/event_store/database/
repository.rs

1//! Migration module
2//!
3use anyhow::anyhow;
4use mithril_common::StdResult;
5use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
6
7use std::sync::Arc;
8
9use crate::event_store::database::query::InsertEventQuery;
10use crate::event_store::{event::Event, EventMessage};
11/// The EventPersister is the adapter to persist EventMessage turning them into
12/// Event.
13pub struct EventPersister {
14    connection: Arc<SqliteConnection>,
15}
16
17impl EventPersister {
18    /// Instantiate an EventPersister
19    pub fn new(connection: Arc<SqliteConnection>) -> Self {
20        Self { connection }
21    }
22
23    /// Save an EventMessage in the database.
24    pub fn persist(&self, message: EventMessage) -> StdResult<Event> {
25        let log_message = message.clone();
26        let mut rows = self.connection.fetch(InsertEventQuery::one(message)?)?;
27
28        rows.next().ok_or(anyhow!(
29            "No record from the database after saving event message {log_message:?}"
30        ))
31    }
32}
33
34#[cfg(test)]
35mod tests {
36    use super::*;
37    use crate::event_store::database::test_helper::event_store_db_connection;
38    use mithril_common::StdResult;
39
40    #[test]
41    fn can_persist_event() -> StdResult<()> {
42        let connection = Arc::new(event_store_db_connection().unwrap());
43
44        let persister = EventPersister::new(connection);
45        let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
46
47        let _event = persister.persist(message)?;
48        Ok(())
49    }
50
51    #[test]
52    fn migration_executed_running_database() -> StdResult<()> {
53        let connection = Arc::new(event_store_db_connection().unwrap());
54
55        let persister = EventPersister::new(connection);
56        let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
57
58        let _event = persister.persist(message)?;
59        Ok(())
60    }
61
62    mod metrics_per_day_view {
63
64        use std::time::Duration;
65
66        use crate::{
67            event_store::database::test_helper::event_store_db_connection, services::UsageReporter,
68        };
69        use chrono::{DateTime, Utc};
70
71        use mithril_common::StdResult;
72
73        use serde::{Deserialize, Serialize};
74        use sqlite::ConnectionThreadSafe;
75
76        use super::*;
77
78        fn get_all_metrics(
79            connection: Arc<ConnectionThreadSafe>,
80        ) -> StdResult<Vec<(String, String, i64)>> {
81            let query = "select date, counter_name, value from metrics_per_day";
82            let mut statement = connection.prepare(query)?;
83            let mut result = Vec::new();
84            while let Ok(sqlite::State::Row) = statement.next() {
85                result.push((
86                    statement.read::<String, _>("date")?,
87                    statement.read::<String, _>("counter_name")?,
88                    statement.read::<i64, _>("value")?,
89                ));
90            }
91
92            Ok(result)
93        }
94
95        fn get_all_metrics_by_origin(
96            connection: Arc<ConnectionThreadSafe>,
97        ) -> StdResult<Vec<(String, String, String, i64)>> {
98            let query = "select date, counter_name, origin, value from metrics_per_day_and_origin";
99            let mut statement = connection.prepare(query)?;
100            let mut result = Vec::new();
101            while let Ok(sqlite::State::Row) = statement.next() {
102                result.push((
103                    statement.read::<String, _>("date")?,
104                    statement.read::<String, _>("counter_name")?,
105                    statement
106                        .read::<Option<String>, _>("origin")?
107                        .unwrap_or_default(),
108                    statement.read::<i64, _>("value")?,
109                ));
110            }
111
112            Ok(result)
113        }
114
115        /// Insert a metric event in the database.
116        /// date format is "%Y-%m-%d %H:%M:%S %z", example: "2015-09-05 23:56:04 +0000"
117        fn insert_metric_event_with_origin(
118            persister: &EventPersister,
119            date: &str,
120            metric_name: &str,
121            origin: &str,
122            value: i64,
123        ) {
124            let metric_date =
125                DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
126
127            let message = UsageReporter::create_metrics_event_message(
128                metric_name.to_string(),
129                value,
130                Duration::from_secs(5),
131                origin.to_string(),
132                metric_date.into(),
133            );
134
135            let _event = persister.persist(message).unwrap();
136        }
137
138        /// Insert a metric event with the old format (without origin) in the database.
139        fn insert_metric_event(
140            persister: &EventPersister,
141            date: &str,
142            metric_name: &str,
143            value: i64,
144        ) {
145            #[derive(Serialize, Deserialize)]
146            struct OldMetricEventMessage {
147                name: String,
148                value: i64,
149                period: Duration,
150                date: DateTime<Utc>,
151            }
152
153            let metric_date =
154                DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
155
156            let message = EventMessage::new(
157                "Metrics",
158                metric_name,
159                &OldMetricEventMessage {
160                    name: metric_name.to_string(),
161                    value,
162                    period: Duration::from_secs(5),
163                    date: metric_date.into(),
164                },
165                vec![],
166            );
167
168            let _event = persister.persist(message).unwrap();
169        }
170
171        #[test]
172        fn retrieved_inserted_event() {
173            let connection = Arc::new(event_store_db_connection().unwrap());
174            let persister = EventPersister::new(connection.clone());
175            insert_metric_event_with_origin(
176                &persister,
177                "2024-10-29 23:56:04",
178                "metric_1",
179                "ORIGIN",
180                15,
181            );
182
183            let result = get_all_metrics(connection).unwrap();
184
185            assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 15)));
186        }
187
188        #[test]
189        fn sum_metric_per_day() {
190            let connection = Arc::new(event_store_db_connection().unwrap());
191            let persister = EventPersister::new(connection.clone());
192            insert_metric_event_with_origin(
193                &persister,
194                "2024-10-29 21:00:00",
195                "metric_1",
196                "ORIGIN_A",
197                15,
198            );
199            insert_metric_event_with_origin(
200                &persister,
201                "2024-10-29 22:00:00",
202                "metric_1",
203                "ORIGIN_B",
204                60,
205            );
206            insert_metric_event_with_origin(
207                &persister,
208                "2024-10-29 23:00:00",
209                "metric_2",
210                "ORIGIN",
211                100,
212            );
213            insert_metric_event_with_origin(
214                &persister,
215                "2024-10-30 17:00:00",
216                "metric_1",
217                "ORIGIN_A",
218                12,
219            );
220            insert_metric_event_with_origin(
221                &persister,
222                "2024-10-30 18:00:00",
223                "metric_1",
224                "ORIGIN_B",
225                4,
226            );
227
228            let result = get_all_metrics(connection).unwrap();
229
230            assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 75)));
231            assert!(result.contains(&("2024-10-29".to_string(), "metric_2".to_string(), 100)));
232            assert!(result.contains(&("2024-10-30".to_string(), "metric_1".to_string(), 16)));
233        }
234
235        #[test]
236        fn sum_metric_per_day_and_origin() {
237            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
238                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
239            }
240
241            let connection = Arc::new(event_store_db_connection().unwrap());
242            let persister = EventPersister::new(connection.clone());
243            insert_metric_event_with_origin(
244                &persister,
245                "2024-10-29 21:00:00",
246                "metric_1",
247                "ORIGIN_A",
248                15,
249            );
250            insert_metric_event_with_origin(
251                &persister,
252                "2024-10-29 22:00:00",
253                "metric_1",
254                "ORIGIN_B",
255                60,
256            );
257            insert_metric_event_with_origin(
258                &persister,
259                "2024-10-29 23:00:00",
260                "metric_2",
261                "ORIGIN",
262                100,
263            );
264            insert_metric_event_with_origin(
265                &persister,
266                "2024-10-30 17:00:00",
267                "metric_1",
268                "ORIGIN_A",
269                12,
270            );
271            insert_metric_event_with_origin(
272                &persister,
273                "2024-10-30 18:00:00",
274                "metric_1",
275                "ORIGIN_B",
276                4,
277            );
278            insert_metric_event_with_origin(
279                &persister,
280                "2024-10-30 17:00:00",
281                "metric_1",
282                "ORIGIN_A",
283                15,
284            );
285            insert_metric_event_with_origin(
286                &persister,
287                "2024-10-30 18:00:00",
288                "metric_1",
289                "ORIGIN_B",
290                3,
291            );
292
293            let result = get_all_metrics_by_origin(connection).unwrap();
294            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
295
296            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
297            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 60)));
298            assert!(result.contains(&("2024-10-29", "metric_2", "ORIGIN", 100)));
299            assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_A", 27)));
300            assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_B", 7)));
301        }
302
303        #[test]
304        fn sum_metric_per_day_and_origin_on_old_event() {
305            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
306                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
307            }
308
309            let connection = Arc::new(event_store_db_connection().unwrap());
310
311            let persister = EventPersister::new(connection.clone());
312            insert_metric_event(&persister, "2024-10-29 21:00:00", "metric_1", 15);
313            insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 60);
314            insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_2", 100);
315            insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 12);
316            insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 4);
317            insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 15);
318            insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 3);
319
320            let result = get_all_metrics_by_origin(connection).unwrap();
321            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
322
323            assert!(result.contains(&("2024-10-29", "metric_1", "", 75)));
324            assert!(result.contains(&("2024-10-29", "metric_2", "", 100)));
325            assert!(result.contains(&("2024-10-30", "metric_1", "", 34)));
326        }
327
328        #[test]
329        fn sum_metric_per_day_and_origin_with_old_and_new_format() {
330            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
331                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
332            }
333
334            let connection = Arc::new(event_store_db_connection().unwrap());
335            let persister = EventPersister::new(connection.clone());
336            insert_metric_event_with_origin(
337                &persister,
338                "2024-10-29 21:00:00",
339                "metric_1",
340                "ORIGIN_A",
341                15,
342            );
343            insert_metric_event_with_origin(
344                &persister,
345                "2024-10-29 22:00:00",
346                "metric_1",
347                "ORIGIN_B",
348                60,
349            );
350            insert_metric_event_with_origin(
351                &persister,
352                "2024-10-29 23:00:00",
353                "metric_1",
354                "ORIGIN_B",
355                20,
356            );
357            insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 23);
358            insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_1", 31);
359
360            let result = get_all_metrics_by_origin(connection).unwrap();
361            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
362
363            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
364            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 80)));
365            assert!(result.contains(&("2024-10-29", "metric_1", "", 54)));
366        }
367    }
368
369    mod signer_registration_summary {
370        use std::sync::Arc;
371
372        use crate::event_store::database::test_helper::event_store_db_connection;
373        use mithril_common::entities::{SignerWithStake, Stake};
374        use mithril_common::{test_utils::fake_data, StdResult};
375        use sqlite::ConnectionThreadSafe;
376
377        use super::{EventMessage, EventPersister};
378
379        /// Insert a signer registration event in the database.
380        fn insert_registration_event(
381            persister: &EventPersister,
382            epoch: &str,
383            party_id: &str,
384            stake: Stake,
385            signer_node_version: &str,
386        ) {
387            let signers = fake_data::signers_with_stakes(1);
388            let signer = SignerWithStake {
389                party_id: party_id.to_string(),
390                stake,
391                ..signers[0].clone()
392            };
393
394            let message = EventMessage::signer_registration(
395                "Test",
396                &signer,
397                Some(signer_node_version.to_string()),
398                epoch,
399            );
400
401            let _event = persister.persist(message).unwrap();
402        }
403
404        #[derive(PartialEq)]
405        struct StakeSignerVersion {
406            epoch: i64,
407            version: String,
408            total_epoch_stakes: i64,
409            stakes_version: i64,
410            stakes_ratio: String,
411            pool_count: i64,
412        }
413        impl StakeSignerVersion {
414            fn new(
415                epoch: i64,
416                version: &str,
417                total_epoch_stakes: i64,
418                stakes_version: i64,
419                stakes_ratio: &str,
420                pool_count: i64,
421            ) -> Self {
422                Self {
423                    epoch,
424                    version: version.to_string(),
425                    total_epoch_stakes,
426                    stakes_version,
427                    stakes_ratio: stakes_ratio.to_string(),
428                    pool_count,
429                }
430            }
431        }
432
433        fn get_all_registrations(
434            connection: Arc<ConnectionThreadSafe>,
435        ) -> StdResult<Vec<StakeSignerVersion>> {
436            let query = "select
437                    epoch,
438                    version,
439                    total_epoch_stakes,
440                    stakes_version,
441                    stakes_ratio,
442                    pool_count 
443                from signer_registration_summary;";
444            let mut statement = connection.prepare(query)?;
445            let mut result = Vec::new();
446            while let Ok(sqlite::State::Row) = statement.next() {
447                result.push(StakeSignerVersion::new(
448                    statement.read::<i64, _>("epoch")?,
449                    &statement.read::<String, _>("version")?,
450                    statement.read::<i64, _>("total_epoch_stakes")?,
451                    statement.read::<i64, _>("stakes_version")?,
452                    &statement.read::<String, _>("stakes_ratio")?,
453                    statement.read::<i64, _>("pool_count")?,
454                ));
455            }
456
457            Ok(result)
458        }
459
460        #[test]
461        fn retrieved_node_version() {
462            let connection = Arc::new(event_store_db_connection().unwrap());
463            let persister = EventPersister::new(connection.clone());
464
465            insert_registration_event(&persister, "3", "A", 15, "0.2.234");
466            insert_registration_event(&persister, "4", "A", 15, "15.24.32");
467            insert_registration_event(&persister, "5", "A", 15, "0.4.789+ef0c28a");
468
469            let result = get_all_registrations(connection).unwrap();
470
471            assert!(result.contains(&StakeSignerVersion::new(3, "0.2.234", 15, 15, "100 %", 1)));
472            assert!(result.contains(&StakeSignerVersion::new(4, "15.24.32", 15, 15, "100 %", 1)));
473            assert!(result.contains(&StakeSignerVersion::new(5, "0.4.789", 15, 15, "100 %", 1)));
474        }
475
476        #[test]
477        fn retrieved_total_by_epoch() {
478            let connection = Arc::new(event_store_db_connection().unwrap());
479            let persister = EventPersister::new(connection.clone());
480
481            insert_registration_event(&persister, "8", "A", 20, "1.0.2");
482            insert_registration_event(&persister, "8", "B", 15, "1.0.2");
483            insert_registration_event(&persister, "9", "A", 56, "1.0.2");
484            insert_registration_event(&persister, "9", "B", 31, "1.0.2");
485            let result = get_all_registrations(connection).unwrap();
486
487            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 35, 35, "100 %", 2)));
488            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 87, 87, "100 %", 2)));
489        }
490
491        #[test]
492        fn retrieved_percentage_per_version() {
493            let connection = Arc::new(event_store_db_connection().unwrap());
494            let persister = EventPersister::new(connection.clone());
495
496            insert_registration_event(&persister, "8", "A", 90, "1.0.2");
497            insert_registration_event(&persister, "8", "B", 30, "1.0.2");
498            insert_registration_event(&persister, "8", "C", 80, "1.0.4");
499            let result = get_all_registrations(connection).unwrap();
500
501            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 200, 120, "60 %", 2)));
502            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 200, 80, "40 %", 1)));
503        }
504
505        #[test]
506        fn retrieved_percentage_per_epoch() {
507            let connection = Arc::new(event_store_db_connection().unwrap());
508            let persister = EventPersister::new(connection.clone());
509
510            insert_registration_event(&persister, "8", "A", 6, "1.0.2");
511            insert_registration_event(&persister, "8", "B", 4, "1.0.4");
512            insert_registration_event(&persister, "9", "A", 28, "1.0.2");
513            insert_registration_event(&persister, "9", "B", 12, "1.0.4");
514            let result = get_all_registrations(connection).unwrap();
515
516            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 10, 6, "60 %", 1)));
517            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 10, 4, "40 %", 1)));
518            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 40, 28, "70 %", 1)));
519            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.4", 40, 12, "30 %", 1)));
520        }
521
522        #[test]
523        fn with_multi_registrations_for_an_epoch_only_the_last_recorded_one_is_retained() {
524            let connection = Arc::new(event_store_db_connection().unwrap());
525            let persister = EventPersister::new(connection.clone());
526
527            insert_registration_event(&persister, "8", "A", 6, "1.0.2");
528            insert_registration_event(&persister, "8", "A", 8, "1.0.2");
529            insert_registration_event(&persister, "8", "A", 10, "1.0.4");
530            insert_registration_event(&persister, "8", "A", 7, "1.0.3");
531
532            let result = get_all_registrations(connection).unwrap();
533
534            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.3", 7, 7, "100 %", 1)));
535            assert!(result.len() == 1);
536        }
537    }
538}