mithril_aggregator/event_store/database/
repository.rs

1//! Migration module
2//!
3use anyhow::anyhow;
4use mithril_common::StdResult;
5use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
6
7use std::sync::Arc;
8
9use crate::event_store::database::query::InsertEventQuery;
10use crate::event_store::{event::Event, EventMessage};
11/// The EventPersister is the adapter to persist EventMessage turning them into
12/// Event.
13pub struct EventPersister {
14    connection: Arc<SqliteConnection>,
15}
16
17impl EventPersister {
18    /// Instantiate an EventPersister
19    pub fn new(connection: Arc<SqliteConnection>) -> Self {
20        Self { connection }
21    }
22
23    /// Save an EventMessage in the database.
24    pub fn persist(&self, message: EventMessage) -> StdResult<Event> {
25        let log_message = message.clone();
26        let mut rows = self.connection.fetch(InsertEventQuery::one(message)?)?;
27
28        rows.next().ok_or(anyhow!(
29            "No record from the database after saving event message {log_message:?}"
30        ))
31    }
32}
33
34#[cfg(test)]
35mod tests {
36    use super::*;
37    use crate::event_store::database::test_helper::event_store_db_connection;
38    use mithril_common::StdResult;
39
40    #[test]
41    fn can_persist_event() -> StdResult<()> {
42        let connection = Arc::new(event_store_db_connection().unwrap());
43
44        let persister = EventPersister::new(connection);
45        let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
46
47        let _event = persister.persist(message)?;
48        Ok(())
49    }
50
51    #[test]
52    fn migration_executed_running_database() -> StdResult<()> {
53        let connection = Arc::new(event_store_db_connection().unwrap());
54
55        let persister = EventPersister::new(connection);
56        let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
57
58        let _event = persister.persist(message)?;
59        Ok(())
60    }
61
62    mod metrics_per_day_view {
63        use std::time::Duration;
64
65        use crate::{
66            event_store::database::test_helper::event_store_db_connection, services::UsageReporter,
67        };
68        use chrono::DateTime;
69
70        use mithril_common::StdResult;
71
72        use sqlite::ConnectionThreadSafe;
73
74        use super::*;
75
76        fn get_all_metrics(
77            connection: Arc<ConnectionThreadSafe>,
78        ) -> StdResult<Vec<(String, String, i64)>> {
79            let query = "select date, counter_name, value from metrics_per_day";
80            let mut statement = connection.prepare(query)?;
81            let mut result = Vec::new();
82            while let Ok(sqlite::State::Row) = statement.next() {
83                result.push((
84                    statement.read::<String, _>("date")?,
85                    statement.read::<String, _>("counter_name")?,
86                    statement.read::<i64, _>("value")?,
87                ));
88            }
89
90            Ok(result)
91        }
92
93        /// Insert a metric event in the database.
94        /// date format is "%Y-%m-%d %H:%M:%S %z", example: "2015-09-05 23:56:04 +0000"
95        fn insert_metric_event(
96            persister: &EventPersister,
97            date: &str,
98            metric_name: &str,
99            value: i64,
100        ) {
101            let metric_date =
102                DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
103
104            let message = UsageReporter::create_metrics_event_message(
105                metric_name.to_string(),
106                value,
107                Duration::from_secs(5),
108                metric_date.into(),
109            );
110
111            let _event = persister.persist(message).unwrap();
112        }
113
114        #[test]
115        fn retrieved_inserted_event() {
116            let connection = Arc::new(event_store_db_connection().unwrap());
117            let persister = EventPersister::new(connection.clone());
118            insert_metric_event(&persister, "2024-10-29 23:56:04", "metric_1", 15);
119
120            let result = get_all_metrics(connection).unwrap();
121
122            assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 15)));
123        }
124
125        #[test]
126        fn sum_metric_per_day() {
127            let connection = Arc::new(event_store_db_connection().unwrap());
128            let persister = EventPersister::new(connection.clone());
129            insert_metric_event(&persister, "2024-10-29 21:00:00", "metric_1", 15);
130            insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 60);
131            insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_2", 100);
132            insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 12);
133            insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 4);
134
135            let result = get_all_metrics(connection).unwrap();
136
137            assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 75)));
138            assert!(result.contains(&("2024-10-29".to_string(), "metric_2".to_string(), 100)));
139            assert!(result.contains(&("2024-10-30".to_string(), "metric_1".to_string(), 16)));
140        }
141    }
142
143    mod signer_registration_summary {
144        use std::sync::Arc;
145
146        use crate::event_store::database::test_helper::event_store_db_connection;
147        use mithril_common::entities::{SignerWithStake, Stake};
148        use mithril_common::{test_utils::fake_data, StdResult};
149        use sqlite::ConnectionThreadSafe;
150
151        use super::{EventMessage, EventPersister};
152
153        /// Insert a signer registration event in the database.
154        fn insert_registration_event(
155            persister: &EventPersister,
156            epoch: &str,
157            party_id: &str,
158            stake: Stake,
159            signer_node_version: &str,
160        ) {
161            let signers = fake_data::signers_with_stakes(1);
162            let signer = SignerWithStake {
163                party_id: party_id.to_string(),
164                stake,
165                ..signers[0].clone()
166            };
167
168            let message = EventMessage::signer_registration(
169                "Test",
170                &signer,
171                Some(signer_node_version.to_string()),
172                epoch,
173            );
174
175            let _event = persister.persist(message).unwrap();
176        }
177
178        #[derive(PartialEq)]
179        struct StakeSignerVersion {
180            epoch: i64,
181            version: String,
182            total_epoch_stakes: i64,
183            stakes_version: i64,
184            stakes_ratio: String,
185            pool_count: i64,
186        }
187        impl StakeSignerVersion {
188            fn new(
189                epoch: i64,
190                version: &str,
191                total_epoch_stakes: i64,
192                stakes_version: i64,
193                stakes_ratio: &str,
194                pool_count: i64,
195            ) -> Self {
196                Self {
197                    epoch,
198                    version: version.to_string(),
199                    total_epoch_stakes,
200                    stakes_version,
201                    stakes_ratio: stakes_ratio.to_string(),
202                    pool_count,
203                }
204            }
205        }
206
207        fn get_all_registrations(
208            connection: Arc<ConnectionThreadSafe>,
209        ) -> StdResult<Vec<StakeSignerVersion>> {
210            let query = "select
211                    epoch,
212                    version,
213                    total_epoch_stakes,
214                    stakes_version,
215                    stakes_ratio,
216                    pool_count 
217                from signer_registration_summary;";
218            let mut statement = connection.prepare(query)?;
219            let mut result = Vec::new();
220            while let Ok(sqlite::State::Row) = statement.next() {
221                result.push(StakeSignerVersion::new(
222                    statement.read::<i64, _>("epoch")?,
223                    &statement.read::<String, _>("version")?,
224                    statement.read::<i64, _>("total_epoch_stakes")?,
225                    statement.read::<i64, _>("stakes_version")?,
226                    &statement.read::<String, _>("stakes_ratio")?,
227                    statement.read::<i64, _>("pool_count")?,
228                ));
229            }
230
231            Ok(result)
232        }
233
234        #[test]
235        fn retrieved_node_version() {
236            let connection = Arc::new(event_store_db_connection().unwrap());
237            let persister = EventPersister::new(connection.clone());
238
239            insert_registration_event(&persister, "3", "A", 15, "0.2.234");
240            insert_registration_event(&persister, "4", "A", 15, "15.24.32");
241            insert_registration_event(&persister, "5", "A", 15, "0.4.789+ef0c28a");
242
243            let result = get_all_registrations(connection).unwrap();
244
245            assert!(result.contains(&StakeSignerVersion::new(3, "0.2.234", 15, 15, "100 %", 1)));
246            assert!(result.contains(&StakeSignerVersion::new(4, "15.24.32", 15, 15, "100 %", 1)));
247            assert!(result.contains(&StakeSignerVersion::new(5, "0.4.789", 15, 15, "100 %", 1)));
248        }
249
250        #[test]
251        fn retrieved_total_by_epoch() {
252            let connection = Arc::new(event_store_db_connection().unwrap());
253            let persister = EventPersister::new(connection.clone());
254
255            insert_registration_event(&persister, "8", "A", 20, "1.0.2");
256            insert_registration_event(&persister, "8", "B", 15, "1.0.2");
257            insert_registration_event(&persister, "9", "A", 56, "1.0.2");
258            insert_registration_event(&persister, "9", "B", 31, "1.0.2");
259            let result = get_all_registrations(connection).unwrap();
260
261            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 35, 35, "100 %", 2)));
262            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 87, 87, "100 %", 2)));
263        }
264
265        #[test]
266        fn retrieved_percentage_per_version() {
267            let connection = Arc::new(event_store_db_connection().unwrap());
268            let persister = EventPersister::new(connection.clone());
269
270            insert_registration_event(&persister, "8", "A", 90, "1.0.2");
271            insert_registration_event(&persister, "8", "B", 30, "1.0.2");
272            insert_registration_event(&persister, "8", "C", 80, "1.0.4");
273            let result = get_all_registrations(connection).unwrap();
274
275            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 200, 120, "60 %", 2)));
276            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 200, 80, "40 %", 1)));
277        }
278
279        #[test]
280        fn retrieved_percentage_per_epoch() {
281            let connection = Arc::new(event_store_db_connection().unwrap());
282            let persister = EventPersister::new(connection.clone());
283
284            insert_registration_event(&persister, "8", "A", 6, "1.0.2");
285            insert_registration_event(&persister, "8", "B", 4, "1.0.4");
286            insert_registration_event(&persister, "9", "A", 28, "1.0.2");
287            insert_registration_event(&persister, "9", "B", 12, "1.0.4");
288            let result = get_all_registrations(connection).unwrap();
289
290            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 10, 6, "60 %", 1)));
291            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 10, 4, "40 %", 1)));
292            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 40, 28, "70 %", 1)));
293            assert!(result.contains(&StakeSignerVersion::new(9, "1.0.4", 40, 12, "30 %", 1)));
294        }
295
296        #[test]
297        fn with_multi_registrations_for_an_epoch_only_the_last_recorded_one_is_retained() {
298            let connection = Arc::new(event_store_db_connection().unwrap());
299            let persister = EventPersister::new(connection.clone());
300
301            insert_registration_event(&persister, "8", "A", 6, "1.0.2");
302            insert_registration_event(&persister, "8", "A", 8, "1.0.2");
303            insert_registration_event(&persister, "8", "A", 10, "1.0.4");
304            insert_registration_event(&persister, "8", "A", 7, "1.0.3");
305
306            let result = get_all_registrations(connection).unwrap();
307
308            assert!(result.contains(&StakeSignerVersion::new(8, "1.0.3", 7, 7, "100 %", 1)));
309            assert!(result.len() == 1);
310        }
311    }
312}