mithril_aggregator/event_store/database/
repository.rs

1//! Migration module
2//!
3use anyhow::anyhow;
4use mithril_common::StdResult;
5use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
6
7use std::sync::Arc;
8
9use crate::event_store::database::query::InsertEventQuery;
10use crate::event_store::{event::Event, EventMessage};
11/// The EventPersister is the adapter to persist EventMessage turning them into
12/// Event.
13pub struct EventPersister {
14    connection: Arc<SqliteConnection>,
15}
16
17impl EventPersister {
18    /// Instantiate an EventPersister
19    pub fn new(connection: Arc<SqliteConnection>) -> Self {
20        Self { connection }
21    }
22
23    /// Save an EventMessage in the database.
24    pub fn persist(&self, message: EventMessage) -> StdResult<Event> {
25        let log_message = message.clone();
26        let mut rows = self.connection.fetch(InsertEventQuery::one(message)?)?;
27
28        rows.next().ok_or(anyhow!(
29            "No record from the database after saving event message {log_message:?}"
30        ))
31    }
32}
33
34#[cfg(test)]
35mod tests {
36    use super::*;
37    use crate::event_store::database::test_helper::event_store_db_connection;
38    use mithril_common::StdResult;
39
40    #[test]
41    fn can_persist_event() -> StdResult<()> {
42        let connection = Arc::new(event_store_db_connection().unwrap());
43
44        let persister = EventPersister::new(connection);
45        let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
46
47        let _event = persister.persist(message)?;
48        Ok(())
49    }
50
51    #[test]
52    fn migration_executed_running_database() -> StdResult<()> {
53        let connection = Arc::new(event_store_db_connection().unwrap());
54
55        let persister = EventPersister::new(connection);
56        let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
57
58        let _event = persister.persist(message)?;
59        Ok(())
60    }
61
62    mod metrics_per_day_view {
63
64        use std::time::Duration;
65
66        use crate::{
67            event_store::database::test_helper::event_store_db_connection, services::UsageReporter,
68        };
69        use chrono::{DateTime, Utc};
70
71        use mithril_common::StdResult;
72
73        use serde::{Deserialize, Serialize};
74        use sqlite::ConnectionThreadSafe;
75
76        use super::*;
77
78        fn get_all_metrics(
79            connection: Arc<ConnectionThreadSafe>,
80        ) -> StdResult<Vec<(String, String, i64)>> {
81            let query = "select date, counter_name, value from metrics_per_day";
82            let mut statement = connection.prepare(query)?;
83            let mut result = Vec::new();
84            while let Ok(sqlite::State::Row) = statement.next() {
85                result.push((
86                    statement.read::<String, _>("date")?,
87                    statement.read::<String, _>("counter_name")?,
88                    statement.read::<i64, _>("value")?,
89                ));
90            }
91
92            Ok(result)
93        }
94
95        fn get_all_metrics_by_origin(
96            connection: Arc<ConnectionThreadSafe>,
97        ) -> StdResult<Vec<(String, String, String, i64)>> {
98            let query = "select date, counter_name, origin, value from metrics_per_day_and_origin";
99            let mut statement = connection.prepare(query)?;
100            let mut result = Vec::new();
101            while let Ok(sqlite::State::Row) = statement.next() {
102                result.push((
103                    statement.read::<String, _>("date")?,
104                    statement.read::<String, _>("counter_name")?,
105                    statement
106                        .read::<Option<String>, _>("origin")?
107                        .unwrap_or_default(),
108                    statement.read::<i64, _>("value")?,
109                ));
110            }
111
112            Ok(result)
113        }
114
115        fn get_all_metrics_by_client_type(
116            connection: Arc<ConnectionThreadSafe>,
117        ) -> StdResult<Vec<(String, String, String, i64)>> {
118            let query = "select date, counter_name, client_type, value from metrics_per_day_and_client_type";
119            let mut statement = connection.prepare(query)?;
120            let mut result = Vec::new();
121            while let Ok(sqlite::State::Row) = statement.next() {
122                result.push((
123                    statement.read::<String, _>("date")?,
124                    statement.read::<String, _>("counter_name")?,
125                    statement
126                        .read::<Option<String>, _>("client_type")?
127                        .unwrap_or_default(),
128                    statement.read::<i64, _>("value")?,
129                ));
130            }
131
132            Ok(result)
133        }
134
135        /// Insert a metric event in the database.
136        /// date format is "%Y-%m-%d %H:%M:%S %z", example: "2015-09-05 23:56:04 +0000"
137        fn insert_metric_event_with_origin(
138            persister: &EventPersister,
139            date: &str,
140            metric_name: &str,
141            origin: &str,
142            value: i64,
143        ) {
144            let metric_date =
145                DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
146
147            let message = UsageReporter::create_metrics_event_message(
148                metric_name.to_string(),
149                value,
150                Duration::from_secs(5),
151                origin.to_string(),
152                "CLIENT_TYPE_A".to_string(),
153                metric_date.into(),
154            );
155
156            let _event = persister.persist(message).unwrap();
157        }
158
159        fn insert_metric_event_message(
160            persister: &EventPersister,
161            date: &str,
162            metric_name: &str,
163            origin: &str,
164            client_type: &str,
165            value: i64,
166        ) {
167            let metric_date =
168                DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
169
170            let message = UsageReporter::create_metrics_event_message(
171                metric_name.to_string(),
172                value,
173                Duration::from_secs(5),
174                origin.to_string(),
175                client_type.to_string(),
176                metric_date.into(),
177            );
178
179            let _event = persister.persist(message).unwrap();
180        }
181
182        /// Insert a metric event with the old format (without origin) in the database.
183        fn insert_metric_event(
184            persister: &EventPersister,
185            date: &str,
186            metric_name: &str,
187            value: i64,
188        ) {
189            #[derive(Serialize, Deserialize)]
190            struct OldMetricEventMessage {
191                name: String,
192                value: i64,
193                period: Duration,
194                date: DateTime<Utc>,
195            }
196
197            let metric_date =
198                DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
199
200            let message = EventMessage::new(
201                "Metrics",
202                metric_name,
203                &OldMetricEventMessage {
204                    name: metric_name.to_string(),
205                    value,
206                    period: Duration::from_secs(5),
207                    date: metric_date.into(),
208                },
209                vec![],
210            );
211
212            let _event = persister.persist(message).unwrap();
213        }
214
215        #[test]
216        fn retrieved_inserted_event() {
217            let connection = Arc::new(event_store_db_connection().unwrap());
218            let persister = EventPersister::new(connection.clone());
219            insert_metric_event_with_origin(
220                &persister,
221                "2024-10-29 23:56:04",
222                "metric_1",
223                "ORIGIN",
224                15,
225            );
226
227            let result = get_all_metrics(connection).unwrap();
228
229            assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 15)));
230        }
231
232        #[test]
233        fn sum_metric_per_day() {
234            let connection = Arc::new(event_store_db_connection().unwrap());
235            let persister = EventPersister::new(connection.clone());
236            insert_metric_event_with_origin(
237                &persister,
238                "2024-10-29 21:00:00",
239                "metric_1",
240                "ORIGIN_A",
241                15,
242            );
243            insert_metric_event_with_origin(
244                &persister,
245                "2024-10-29 22:00:00",
246                "metric_1",
247                "ORIGIN_B",
248                60,
249            );
250            insert_metric_event_with_origin(
251                &persister,
252                "2024-10-29 23:00:00",
253                "metric_2",
254                "ORIGIN",
255                100,
256            );
257            insert_metric_event_with_origin(
258                &persister,
259                "2024-10-30 17:00:00",
260                "metric_1",
261                "ORIGIN_A",
262                12,
263            );
264            insert_metric_event_with_origin(
265                &persister,
266                "2024-10-30 18:00:00",
267                "metric_1",
268                "ORIGIN_B",
269                4,
270            );
271
272            let result = get_all_metrics(connection).unwrap();
273
274            assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 75)));
275            assert!(result.contains(&("2024-10-29".to_string(), "metric_2".to_string(), 100)));
276            assert!(result.contains(&("2024-10-30".to_string(), "metric_1".to_string(), 16)));
277        }
278
279        #[test]
280        fn sum_metric_per_day_and_origin() {
281            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
282                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
283            }
284
285            let connection = Arc::new(event_store_db_connection().unwrap());
286            let persister = EventPersister::new(connection.clone());
287            insert_metric_event_with_origin(
288                &persister,
289                "2024-10-29 21:00:00",
290                "metric_1",
291                "ORIGIN_A",
292                15,
293            );
294            insert_metric_event_with_origin(
295                &persister,
296                "2024-10-29 22:00:00",
297                "metric_1",
298                "ORIGIN_B",
299                60,
300            );
301            insert_metric_event_with_origin(
302                &persister,
303                "2024-10-29 23:00:00",
304                "metric_2",
305                "ORIGIN",
306                100,
307            );
308            insert_metric_event_with_origin(
309                &persister,
310                "2024-10-30 17:00:00",
311                "metric_1",
312                "ORIGIN_A",
313                12,
314            );
315            insert_metric_event_with_origin(
316                &persister,
317                "2024-10-30 18:00:00",
318                "metric_1",
319                "ORIGIN_B",
320                4,
321            );
322            insert_metric_event_with_origin(
323                &persister,
324                "2024-10-30 17:00:00",
325                "metric_1",
326                "ORIGIN_A",
327                15,
328            );
329            insert_metric_event_with_origin(
330                &persister,
331                "2024-10-30 18:00:00",
332                "metric_1",
333                "ORIGIN_B",
334                3,
335            );
336
337            let result = get_all_metrics_by_origin(connection).unwrap();
338            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
339
340            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
341            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 60)));
342            assert!(result.contains(&("2024-10-29", "metric_2", "ORIGIN", 100)));
343            assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_A", 27)));
344            assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_B", 7)));
345        }
346
347        #[test]
348        fn vue_metrics_per_day_and_client_type() {
349            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
350                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
351            }
352
353            let connection = Arc::new(event_store_db_connection().unwrap());
354            let persister = EventPersister::new(connection.clone());
355
356            insert_metric_event_message(
357                &persister,
358                "2024-10-29 21:00:00",
359                "metric_1",
360                "ORIGIN_A",
361                "CLIENT_TYPE_A",
362                15,
363            );
364
365            let result = get_all_metrics_by_client_type(connection).unwrap();
366            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
367
368            assert!(result.contains(&("2024-10-29", "metric_1", "CLIENT_TYPE_A", 15)));
369        }
370
371        #[test]
372        fn sum_metric_per_day_and_origin_on_old_event() {
373            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
374                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
375            }
376
377            let connection = Arc::new(event_store_db_connection().unwrap());
378
379            let persister = EventPersister::new(connection.clone());
380            insert_metric_event(&persister, "2024-10-29 21:00:00", "metric_1", 15);
381            insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 60);
382            insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_2", 100);
383            insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 12);
384            insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 4);
385            insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 15);
386            insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 3);
387
388            let result = get_all_metrics_by_origin(connection).unwrap();
389            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
390
391            assert!(result.contains(&("2024-10-29", "metric_1", "", 75)));
392            assert!(result.contains(&("2024-10-29", "metric_2", "", 100)));
393            assert!(result.contains(&("2024-10-30", "metric_1", "", 34)));
394        }
395
396        #[test]
397        fn sum_metric_per_day_and_origin_with_old_and_new_format() {
398            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
399                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
400            }
401
402            let connection = Arc::new(event_store_db_connection().unwrap());
403            let persister = EventPersister::new(connection.clone());
404            insert_metric_event_with_origin(
405                &persister,
406                "2024-10-29 21:00:00",
407                "metric_1",
408                "ORIGIN_A",
409                15,
410            );
411            insert_metric_event_with_origin(
412                &persister,
413                "2024-10-29 22:00:00",
414                "metric_1",
415                "ORIGIN_B",
416                60,
417            );
418            insert_metric_event_with_origin(
419                &persister,
420                "2024-10-29 23:00:00",
421                "metric_1",
422                "ORIGIN_B",
423                20,
424            );
425            insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 23);
426            insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_1", 31);
427
428            let result = get_all_metrics_by_origin(connection).unwrap();
429            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
430
431            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
432            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 80)));
433            assert!(result.contains(&("2024-10-29", "metric_1", "", 54)));
434        }
435    }
436
437    mod signer_registration_summary {
438        use std::sync::Arc;
439
440        use crate::event_store::database::test_helper::event_store_db_connection;
441        use mithril_common::entities::{SignerWithStake, Stake};
442        use mithril_common::{test_utils::fake_data, StdResult};
443        use sqlite::ConnectionThreadSafe;
444
445        use super::{EventMessage, EventPersister};
446
447        /// Insert a signer registration event in the database.
448        fn insert_registration_event(
449            persister: &EventPersister,
450            epoch: &str,
451            party_id: &str,
452            stake: Stake,
453            signer_node_version: &str,
454        ) {
455            let signers = fake_data::signers_with_stakes(1);
456            let signer = SignerWithStake {
457                party_id: party_id.to_string(),
458                stake,
459                ..signers[0].clone()
460            };
461
462            let message = EventMessage::signer_registration(
463                "Test",
464                &signer,
465                Some(signer_node_version.to_string()),
466                epoch,
467            );
468
469            let _event = persister.persist(message).unwrap();
470        }
471
472        #[derive(PartialEq)]
473        struct StakeSignerVersion {
474            epoch: i64,
475            version: String,
476            total_epoch_stakes: i64,
477            stakes_version: i64,
478            stakes_ratio: String,
479            pool_count: i64,
480        }
481        impl StakeSignerVersion {
482            fn new(
483                epoch: i64,
484                version: &str,
485                total_epoch_stakes: i64,
486                stakes_version: i64,
487                stakes_ratio: &str,
488                pool_count: i64,
489            ) -> Self {
490                Self {
491                    epoch,
492                    version: version.to_string(),
493                    total_epoch_stakes,
494                    stakes_version,
495                    stakes_ratio: stakes_ratio.to_string(),
496                    pool_count,
497                }
498            }
499        }
500
501        fn get_all_registrations(
502            connection: Arc<ConnectionThreadSafe>,
503        ) -> StdResult<Vec<StakeSignerVersion>> {
504            let query = "select
505                    epoch,
506                    version,
507                    total_epoch_stakes,
508                    stakes_version,
509                    stakes_ratio,
510                    pool_count 
511                from signer_registration_summary;";
512            let mut statement = connection.prepare(query)?;
513            let mut result = Vec::new();
514            while let Ok(sqlite::State::Row) = statement.next() {
515                result.push(StakeSignerVersion::new(
516                    statement.read::<i64, _>("epoch")?,
517                    &statement.read::<String, _>("version")?,
518                    statement.read::<i64, _>("total_epoch_stakes")?,
519                    statement.read::<i64, _>("stakes_version")?,
520                    &statement.read::<String, _>("stakes_ratio")?,
521                    statement.read::<i64, _>("pool_count")?,
522                ));
523            }
524
525            Ok(result)
526        }
527
528        #[test]
529        fn retrieved_node_version() {
530            let connection = Arc::new(event_store_db_connection().unwrap());
531            let persister = EventPersister::new(connection.clone());
532
533            insert_registration_event(&persister, "3", "A", 15, "0.2.234");
534            insert_registration_event(&persister, "4", "A", 15, "15.24.32");
535            insert_registration_event(&persister, "5", "A", 15, "0.4.789+ef0c28a");
536
537            let result = get_all_registrations(connection).unwrap();
538
539            assert!(result.contains(&StakeSignerVersion::new(3, "0.2.234", 15, 15, "100 %", 1)));
540            assert!(result.contains(&StakeSignerVersion::new(4, "15.24.32", 15, 15, "100 %", 1)));
541            assert!(result.contains(&StakeSignerVersion::new(5, "0.4.789", 15, 15, "100 %", 1)));
542        }
543
544        #[test]
545        fn retrieved_total_by_epoch() {
546            let connection = Arc::new(event_store_db_connection().unwrap());
547            let persister = EventPersister::new(connection.clone());
548
549            insert_registration_event(&persister, "8", "A", 20, "1.0.2");
550            insert_registration_event(&persister, "8", "B", 15, "1.0.2");
551            insert_registration_event(&persister, "9", "A", 56, "1.0.2");
552            insert_registration_event(&persister, "9", "B", 31, "1.0.2");
553            let result = get_all_registrations(connection).unwrap();
554
555            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 35, 35, "100 %", 2)));
556            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 87, 87, "100 %", 2)));
557        }
558
559        #[test]
560        fn retrieved_percentage_per_version() {
561            let connection = Arc::new(event_store_db_connection().unwrap());
562            let persister = EventPersister::new(connection.clone());
563
564            insert_registration_event(&persister, "8", "A", 90, "1.0.2");
565            insert_registration_event(&persister, "8", "B", 30, "1.0.2");
566            insert_registration_event(&persister, "8", "C", 80, "1.0.4");
567            let result = get_all_registrations(connection).unwrap();
568
569            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 200, 120, "60 %", 2)));
570            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 200, 80, "40 %", 1)));
571        }
572
573        #[test]
574        fn retrieved_percentage_per_epoch() {
575            let connection = Arc::new(event_store_db_connection().unwrap());
576            let persister = EventPersister::new(connection.clone());
577
578            insert_registration_event(&persister, "8", "A", 6, "1.0.2");
579            insert_registration_event(&persister, "8", "B", 4, "1.0.4");
580            insert_registration_event(&persister, "9", "A", 28, "1.0.2");
581            insert_registration_event(&persister, "9", "B", 12, "1.0.4");
582            let result = get_all_registrations(connection).unwrap();
583
584            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 10, 6, "60 %", 1)));
585            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 10, 4, "40 %", 1)));
586            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 40, 28, "70 %", 1)));
587            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.4", 40, 12, "30 %", 1)));
588        }
589
590        #[test]
591        fn with_multi_registrations_for_an_epoch_only_the_last_recorded_one_is_retained() {
592            let connection = Arc::new(event_store_db_connection().unwrap());
593            let persister = EventPersister::new(connection.clone());
594
595            insert_registration_event(&persister, "8", "A", 6, "1.0.2");
596            insert_registration_event(&persister, "8", "A", 8, "1.0.2");
597            insert_registration_event(&persister, "8", "A", 10, "1.0.4");
598            insert_registration_event(&persister, "8", "A", 7, "1.0.3");
599
600            let result = get_all_registrations(connection).unwrap();
601
602            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.3", 7, 7, "100 %", 1)));
603            assert!(result.len() == 1);
604        }
605    }
606}