mithril_aggregator/event_store/database/
repository.rs

1//! Migration module
2//!
3use anyhow::anyhow;
4use mithril_common::StdResult;
5use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
6
7use std::sync::Arc;
8
9use crate::event_store::database::query::InsertEventQuery;
10use crate::event_store::{EventMessage, event::Event};
11/// The EventPersister is the adapter to persist EventMessage turning them into
12/// Event.
13pub struct EventPersister {
14    connection: Arc<SqliteConnection>,
15}
16
17impl EventPersister {
18    /// Instantiate an EventPersister
19    pub fn new(connection: Arc<SqliteConnection>) -> Self {
20        Self { connection }
21    }
22
23    /// Save an EventMessage in the database.
24    pub fn persist(&self, message: EventMessage) -> StdResult<Event> {
25        let log_message = message.clone();
26        let mut rows = self.connection.fetch(InsertEventQuery::one(message)?)?;
27
28        rows.next().ok_or(anyhow!(
29            "No record from the database after saving event message {log_message:?}"
30        ))
31    }
32}
33
34#[cfg(test)]
35mod tests {
36    use super::*;
37    use crate::event_store::database::test_helper::event_store_db_connection;
38    use mithril_common::StdResult;
39
40    #[test]
41    fn can_persist_event() -> StdResult<()> {
42        let connection = Arc::new(event_store_db_connection().unwrap());
43
44        let persister = EventPersister::new(connection);
45        let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
46
47        let _event = persister.persist(message)?;
48        Ok(())
49    }
50
51    #[test]
52    fn migration_executed_running_database() -> StdResult<()> {
53        let connection = Arc::new(event_store_db_connection().unwrap());
54
55        let persister = EventPersister::new(connection);
56        let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
57
58        let _event = persister.persist(message)?;
59        Ok(())
60    }
61
62    mod metrics_per_day_view {
63
64        use std::time::Duration;
65
66        use crate::{
67            event_store::database::test_helper::event_store_db_connection, services::UsageReporter,
68        };
69        use chrono::{DateTime, Utc};
70
71        use mithril_common::StdResult;
72
73        use serde::{Deserialize, Serialize};
74        use sqlite::ConnectionThreadSafe;
75
76        use super::*;
77
78        fn get_all_metrics(
79            connection: Arc<ConnectionThreadSafe>,
80        ) -> StdResult<Vec<(String, String, i64)>> {
81            let query = "select date, counter_name, value from metrics_per_day";
82            let mut statement = connection.prepare(query)?;
83            let mut result = Vec::new();
84            while let Ok(sqlite::State::Row) = statement.next() {
85                result.push((
86                    statement.read::<String, _>("date")?,
87                    statement.read::<String, _>("counter_name")?,
88                    statement.read::<i64, _>("value")?,
89                ));
90            }
91
92            Ok(result)
93        }
94
95        fn get_all_metrics_by_origin(
96            connection: Arc<ConnectionThreadSafe>,
97        ) -> StdResult<Vec<(String, String, String, i64)>> {
98            let query = "select date, counter_name, origin, value from metrics_per_day_and_origin";
99            let mut statement = connection.prepare(query)?;
100            let mut result = Vec::new();
101            while let Ok(sqlite::State::Row) = statement.next() {
102                result.push((
103                    statement.read::<String, _>("date")?,
104                    statement.read::<String, _>("counter_name")?,
105                    statement.read::<Option<String>, _>("origin")?.unwrap_or_default(),
106                    statement.read::<i64, _>("value")?,
107                ));
108            }
109
110            Ok(result)
111        }
112
113        fn get_all_metrics_by_client_type(
114            connection: Arc<ConnectionThreadSafe>,
115        ) -> StdResult<Vec<(String, String, String, i64)>> {
116            let query = "select date, counter_name, client_type, value from metrics_per_day_and_client_type";
117            let mut statement = connection.prepare(query)?;
118            let mut result = Vec::new();
119            while let Ok(sqlite::State::Row) = statement.next() {
120                result.push((
121                    statement.read::<String, _>("date")?,
122                    statement.read::<String, _>("counter_name")?,
123                    statement
124                        .read::<Option<String>, _>("client_type")?
125                        .unwrap_or_default(),
126                    statement.read::<i64, _>("value")?,
127                ));
128            }
129
130            Ok(result)
131        }
132
133        /// Insert a metric event in the database.
134        /// date format is "%Y-%m-%d %H:%M:%S %z", example: "2015-09-05 23:56:04 +0000"
135        fn insert_metric_event_with_origin(
136            persister: &EventPersister,
137            date: &str,
138            metric_name: &str,
139            origin: &str,
140            value: i64,
141        ) {
142            let metric_date =
143                DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
144
145            let message = UsageReporter::create_metrics_event_message(
146                metric_name.to_string(),
147                value,
148                Duration::from_secs(5),
149                origin.to_string(),
150                "CLIENT_TYPE_A".to_string(),
151                metric_date.into(),
152            );
153
154            let _event = persister.persist(message).unwrap();
155        }
156
157        fn insert_metric_event_message(
158            persister: &EventPersister,
159            date: &str,
160            metric_name: &str,
161            origin: &str,
162            client_type: &str,
163            value: i64,
164        ) {
165            let metric_date =
166                DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
167
168            let message = UsageReporter::create_metrics_event_message(
169                metric_name.to_string(),
170                value,
171                Duration::from_secs(5),
172                origin.to_string(),
173                client_type.to_string(),
174                metric_date.into(),
175            );
176
177            let _event = persister.persist(message).unwrap();
178        }
179
180        /// Insert a metric event with the old format (without origin) in the database.
181        fn insert_metric_event(
182            persister: &EventPersister,
183            date: &str,
184            metric_name: &str,
185            value: i64,
186        ) {
187            #[derive(Serialize, Deserialize)]
188            struct OldMetricEventMessage {
189                name: String,
190                value: i64,
191                period: Duration,
192                date: DateTime<Utc>,
193            }
194
195            let metric_date =
196                DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
197
198            let message = EventMessage::new(
199                "Metrics",
200                metric_name,
201                &OldMetricEventMessage {
202                    name: metric_name.to_string(),
203                    value,
204                    period: Duration::from_secs(5),
205                    date: metric_date.into(),
206                },
207                vec![],
208            );
209
210            let _event = persister.persist(message).unwrap();
211        }
212
213        #[test]
214        fn retrieved_inserted_event() {
215            let connection = Arc::new(event_store_db_connection().unwrap());
216            let persister = EventPersister::new(connection.clone());
217            insert_metric_event_with_origin(
218                &persister,
219                "2024-10-29 23:56:04",
220                "metric_1",
221                "ORIGIN",
222                15,
223            );
224
225            let result = get_all_metrics(connection).unwrap();
226
227            assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 15)));
228        }
229
230        #[test]
231        fn sum_metric_per_day() {
232            let connection = Arc::new(event_store_db_connection().unwrap());
233            let persister = EventPersister::new(connection.clone());
234            insert_metric_event_with_origin(
235                &persister,
236                "2024-10-29 21:00:00",
237                "metric_1",
238                "ORIGIN_A",
239                15,
240            );
241            insert_metric_event_with_origin(
242                &persister,
243                "2024-10-29 22:00:00",
244                "metric_1",
245                "ORIGIN_B",
246                60,
247            );
248            insert_metric_event_with_origin(
249                &persister,
250                "2024-10-29 23:00:00",
251                "metric_2",
252                "ORIGIN",
253                100,
254            );
255            insert_metric_event_with_origin(
256                &persister,
257                "2024-10-30 17:00:00",
258                "metric_1",
259                "ORIGIN_A",
260                12,
261            );
262            insert_metric_event_with_origin(
263                &persister,
264                "2024-10-30 18:00:00",
265                "metric_1",
266                "ORIGIN_B",
267                4,
268            );
269
270            let result = get_all_metrics(connection).unwrap();
271
272            assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 75)));
273            assert!(result.contains(&("2024-10-29".to_string(), "metric_2".to_string(), 100)));
274            assert!(result.contains(&("2024-10-30".to_string(), "metric_1".to_string(), 16)));
275        }
276
277        #[test]
278        fn sum_metric_per_day_and_origin() {
279            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
280                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
281            }
282
283            let connection = Arc::new(event_store_db_connection().unwrap());
284            let persister = EventPersister::new(connection.clone());
285            insert_metric_event_with_origin(
286                &persister,
287                "2024-10-29 21:00:00",
288                "metric_1",
289                "ORIGIN_A",
290                15,
291            );
292            insert_metric_event_with_origin(
293                &persister,
294                "2024-10-29 22:00:00",
295                "metric_1",
296                "ORIGIN_B",
297                60,
298            );
299            insert_metric_event_with_origin(
300                &persister,
301                "2024-10-29 23:00:00",
302                "metric_2",
303                "ORIGIN",
304                100,
305            );
306            insert_metric_event_with_origin(
307                &persister,
308                "2024-10-30 17:00:00",
309                "metric_1",
310                "ORIGIN_A",
311                12,
312            );
313            insert_metric_event_with_origin(
314                &persister,
315                "2024-10-30 18:00:00",
316                "metric_1",
317                "ORIGIN_B",
318                4,
319            );
320            insert_metric_event_with_origin(
321                &persister,
322                "2024-10-30 17:00:00",
323                "metric_1",
324                "ORIGIN_A",
325                15,
326            );
327            insert_metric_event_with_origin(
328                &persister,
329                "2024-10-30 18:00:00",
330                "metric_1",
331                "ORIGIN_B",
332                3,
333            );
334
335            let result = get_all_metrics_by_origin(connection).unwrap();
336            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
337
338            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
339            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 60)));
340            assert!(result.contains(&("2024-10-29", "metric_2", "ORIGIN", 100)));
341            assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_A", 27)));
342            assert!(result.contains(&("2024-10-30", "metric_1", "ORIGIN_B", 7)));
343        }
344
345        #[test]
346        fn vue_metrics_per_day_and_client_type() {
347            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
348                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
349            }
350
351            let connection = Arc::new(event_store_db_connection().unwrap());
352            let persister = EventPersister::new(connection.clone());
353
354            insert_metric_event_message(
355                &persister,
356                "2024-10-29 21:00:00",
357                "metric_1",
358                "ORIGIN_A",
359                "CLIENT_TYPE_A",
360                15,
361            );
362
363            let result = get_all_metrics_by_client_type(connection).unwrap();
364            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
365
366            assert!(result.contains(&("2024-10-29", "metric_1", "CLIENT_TYPE_A", 15)));
367        }
368
369        #[test]
370        fn sum_metric_per_day_and_origin_on_old_event() {
371            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
372                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
373            }
374
375            let connection = Arc::new(event_store_db_connection().unwrap());
376
377            let persister = EventPersister::new(connection.clone());
378            insert_metric_event(&persister, "2024-10-29 21:00:00", "metric_1", 15);
379            insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 60);
380            insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_2", 100);
381            insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 12);
382            insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 4);
383            insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 15);
384            insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 3);
385
386            let result = get_all_metrics_by_origin(connection).unwrap();
387            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
388
389            assert!(result.contains(&("2024-10-29", "metric_1", "", 75)));
390            assert!(result.contains(&("2024-10-29", "metric_2", "", 100)));
391            assert!(result.contains(&("2024-10-30", "metric_1", "", 34)));
392        }
393
394        #[test]
395        fn sum_metric_per_day_and_origin_with_old_and_new_format() {
396            fn tuple_with_str(t: &(String, String, String, i64)) -> (&str, &str, &str, i64) {
397                (t.0.as_str(), t.1.as_str(), t.2.as_str(), t.3)
398            }
399
400            let connection = Arc::new(event_store_db_connection().unwrap());
401            let persister = EventPersister::new(connection.clone());
402            insert_metric_event_with_origin(
403                &persister,
404                "2024-10-29 21:00:00",
405                "metric_1",
406                "ORIGIN_A",
407                15,
408            );
409            insert_metric_event_with_origin(
410                &persister,
411                "2024-10-29 22:00:00",
412                "metric_1",
413                "ORIGIN_B",
414                60,
415            );
416            insert_metric_event_with_origin(
417                &persister,
418                "2024-10-29 23:00:00",
419                "metric_1",
420                "ORIGIN_B",
421                20,
422            );
423            insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 23);
424            insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_1", 31);
425
426            let result = get_all_metrics_by_origin(connection).unwrap();
427            let result: Vec<_> = result.iter().map(tuple_with_str).collect();
428
429            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_A", 15)));
430            assert!(result.contains(&("2024-10-29", "metric_1", "ORIGIN_B", 80)));
431            assert!(result.contains(&("2024-10-29", "metric_1", "", 54)));
432        }
433    }
434
435    mod signer_registration_summary {
436        use std::sync::Arc;
437
438        use crate::event_store::database::test_helper::event_store_db_connection;
439        use mithril_common::entities::{SignerWithStake, Stake};
440        use mithril_common::{StdResult, test::double::fake_data};
441        use sqlite::ConnectionThreadSafe;
442
443        use super::{EventMessage, EventPersister};
444
445        /// Insert a signer registration event in the database.
446        fn insert_registration_event(
447            persister: &EventPersister,
448            epoch: &str,
449            party_id: &str,
450            stake: Stake,
451            signer_node_version: &str,
452        ) {
453            let signers = fake_data::signers_with_stakes(1);
454            let signer = SignerWithStake {
455                party_id: party_id.to_string(),
456                stake,
457                ..signers[0].clone()
458            };
459
460            let message = EventMessage::signer_registration(
461                "Test",
462                &signer,
463                Some(signer_node_version.to_string()),
464                epoch,
465            );
466
467            let _event = persister.persist(message).unwrap();
468        }
469
470        #[derive(PartialEq)]
471        struct StakeSignerVersion {
472            epoch: i64,
473            version: String,
474            total_epoch_stakes: i64,
475            stakes_version: i64,
476            stakes_ratio: String,
477            pool_count: i64,
478        }
479        impl StakeSignerVersion {
480            fn new(
481                epoch: i64,
482                version: &str,
483                total_epoch_stakes: i64,
484                stakes_version: i64,
485                stakes_ratio: &str,
486                pool_count: i64,
487            ) -> Self {
488                Self {
489                    epoch,
490                    version: version.to_string(),
491                    total_epoch_stakes,
492                    stakes_version,
493                    stakes_ratio: stakes_ratio.to_string(),
494                    pool_count,
495                }
496            }
497        }
498
499        fn get_all_registrations(
500            connection: Arc<ConnectionThreadSafe>,
501        ) -> StdResult<Vec<StakeSignerVersion>> {
502            let query = "select
503                    epoch,
504                    version,
505                    total_epoch_stakes,
506                    stakes_version,
507                    stakes_ratio,
508                    pool_count 
509                from signer_registration_summary;";
510            let mut statement = connection.prepare(query)?;
511            let mut result = Vec::new();
512            while let Ok(sqlite::State::Row) = statement.next() {
513                result.push(StakeSignerVersion::new(
514                    statement.read::<i64, _>("epoch")?,
515                    &statement.read::<String, _>("version")?,
516                    statement.read::<i64, _>("total_epoch_stakes")?,
517                    statement.read::<i64, _>("stakes_version")?,
518                    &statement.read::<String, _>("stakes_ratio")?,
519                    statement.read::<i64, _>("pool_count")?,
520                ));
521            }
522
523            Ok(result)
524        }
525
526        #[test]
527        fn retrieved_node_version() {
528            let connection = Arc::new(event_store_db_connection().unwrap());
529            let persister = EventPersister::new(connection.clone());
530
531            insert_registration_event(&persister, "3", "A", 15, "0.2.234");
532            insert_registration_event(&persister, "4", "A", 15, "15.24.32");
533            insert_registration_event(&persister, "5", "A", 15, "0.4.789+ef0c28a");
534
535            let result = get_all_registrations(connection).unwrap();
536
537            assert!(result.contains(&StakeSignerVersion::new(3, "0.2.234", 15, 15, "100 %", 1)));
538            assert!(result.contains(&StakeSignerVersion::new(4, "15.24.32", 15, 15, "100 %", 1)));
539            assert!(result.contains(&StakeSignerVersion::new(5, "0.4.789", 15, 15, "100 %", 1)));
540        }
541
542        #[test]
543        fn retrieved_total_by_epoch() {
544            let connection = Arc::new(event_store_db_connection().unwrap());
545            let persister = EventPersister::new(connection.clone());
546
547            insert_registration_event(&persister, "8", "A", 20, "1.0.2");
548            insert_registration_event(&persister, "8", "B", 15, "1.0.2");
549            insert_registration_event(&persister, "9", "A", 56, "1.0.2");
550            insert_registration_event(&persister, "9", "B", 31, "1.0.2");
551            let result = get_all_registrations(connection).unwrap();
552
553            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 35, 35, "100 %", 2)));
554            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 87, 87, "100 %", 2)));
555        }
556
557        #[test]
558        fn retrieved_percentage_per_version() {
559            let connection = Arc::new(event_store_db_connection().unwrap());
560            let persister = EventPersister::new(connection.clone());
561
562            insert_registration_event(&persister, "8", "A", 90, "1.0.2");
563            insert_registration_event(&persister, "8", "B", 30, "1.0.2");
564            insert_registration_event(&persister, "8", "C", 80, "1.0.4");
565            let result = get_all_registrations(connection).unwrap();
566
567            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 200, 120, "60 %", 2)));
568            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 200, 80, "40 %", 1)));
569        }
570
571        #[test]
572        fn retrieved_percentage_per_epoch() {
573            let connection = Arc::new(event_store_db_connection().unwrap());
574            let persister = EventPersister::new(connection.clone());
575
576            insert_registration_event(&persister, "8", "A", 6, "1.0.2");
577            insert_registration_event(&persister, "8", "B", 4, "1.0.4");
578            insert_registration_event(&persister, "9", "A", 28, "1.0.2");
579            insert_registration_event(&persister, "9", "B", 12, "1.0.4");
580            let result = get_all_registrations(connection).unwrap();
581
582            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 10, 6, "60 %", 1)));
583            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 10, 4, "40 %", 1)));
584            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 40, 28, "70 %", 1)));
585            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.4", 40, 12, "30 %", 1)));
586        }
587
588        #[test]
589        fn with_multi_registrations_for_an_epoch_only_the_last_recorded_one_is_retained() {
590            let connection = Arc::new(event_store_db_connection().unwrap());
591            let persister = EventPersister::new(connection.clone());
592
593            insert_registration_event(&persister, "8", "A", 6, "1.0.2");
594            insert_registration_event(&persister, "8", "A", 8, "1.0.2");
595            insert_registration_event(&persister, "8", "A", 10, "1.0.4");
596            insert_registration_event(&persister, "8", "A", 7, "1.0.3");
597
598            let result = get_all_registrations(connection).unwrap();
599
600            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.3", 7, 7, "100 %", 1)));
601            assert!(result.len() == 1);
602        }
603    }
604}