1use anyhow::anyhow;
4use mithril_common::StdResult;
5use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
6
7use std::sync::Arc;
8
9use crate::event_store::database::query::InsertEventQuery;
10use crate::event_store::{event::Event, EventMessage};
11pub struct EventPersister {
14 connection: Arc<SqliteConnection>,
15}
16
17impl EventPersister {
18 pub fn new(connection: Arc<SqliteConnection>) -> Self {
20 Self { connection }
21 }
22
23 pub fn persist(&self, message: EventMessage) -> StdResult<Event> {
25 let log_message = message.clone();
26 let mut rows = self.connection.fetch(InsertEventQuery::one(message)?)?;
27
28 rows.next().ok_or(anyhow!(
29 "No record from the database after saving event message {log_message:?}"
30 ))
31 }
32}
33
34#[cfg(test)]
35mod tests {
36 use super::*;
37 use crate::event_store::database::test_helper::event_store_db_connection;
38 use mithril_common::StdResult;
39
40 #[test]
41 fn can_persist_event() -> StdResult<()> {
42 let connection = Arc::new(event_store_db_connection().unwrap());
43
44 let persister = EventPersister::new(connection);
45 let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
46
47 let _event = persister.persist(message)?;
48 Ok(())
49 }
50
51 #[test]
52 fn migration_executed_running_database() -> StdResult<()> {
53 let connection = Arc::new(event_store_db_connection().unwrap());
54
55 let persister = EventPersister::new(connection);
56 let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
57
58 let _event = persister.persist(message)?;
59 Ok(())
60 }
61
62 mod metrics_per_day_view {
63 use std::time::Duration;
64
65 use crate::{
66 event_store::database::test_helper::event_store_db_connection, services::UsageReporter,
67 };
68 use chrono::DateTime;
69
70 use mithril_common::StdResult;
71
72 use sqlite::ConnectionThreadSafe;
73
74 use super::*;
75
76 fn get_all_metrics(
77 connection: Arc<ConnectionThreadSafe>,
78 ) -> StdResult<Vec<(String, String, i64)>> {
79 let query = "select date, counter_name, value from metrics_per_day";
80 let mut statement = connection.prepare(query)?;
81 let mut result = Vec::new();
82 while let Ok(sqlite::State::Row) = statement.next() {
83 result.push((
84 statement.read::<String, _>("date")?,
85 statement.read::<String, _>("counter_name")?,
86 statement.read::<i64, _>("value")?,
87 ));
88 }
89
90 Ok(result)
91 }
92
93 fn insert_metric_event(
96 persister: &EventPersister,
97 date: &str,
98 metric_name: &str,
99 value: i64,
100 ) {
101 let metric_date =
102 DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
103
104 let message = UsageReporter::create_metrics_event_message(
105 metric_name.to_string(),
106 value,
107 Duration::from_secs(5),
108 metric_date.into(),
109 );
110
111 let _event = persister.persist(message).unwrap();
112 }
113
114 #[test]
115 fn retrieved_inserted_event() {
116 let connection = Arc::new(event_store_db_connection().unwrap());
117 let persister = EventPersister::new(connection.clone());
118 insert_metric_event(&persister, "2024-10-29 23:56:04", "metric_1", 15);
119
120 let result = get_all_metrics(connection).unwrap();
121
122 assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 15)));
123 }
124
125 #[test]
126 fn sum_metric_per_day() {
127 let connection = Arc::new(event_store_db_connection().unwrap());
128 let persister = EventPersister::new(connection.clone());
129 insert_metric_event(&persister, "2024-10-29 21:00:00", "metric_1", 15);
130 insert_metric_event(&persister, "2024-10-29 22:00:00", "metric_1", 60);
131 insert_metric_event(&persister, "2024-10-29 23:00:00", "metric_2", 100);
132 insert_metric_event(&persister, "2024-10-30 17:00:00", "metric_1", 12);
133 insert_metric_event(&persister, "2024-10-30 18:00:00", "metric_1", 4);
134
135 let result = get_all_metrics(connection).unwrap();
136
137 assert!(result.contains(&("2024-10-29".to_string(), "metric_1".to_string(), 75)));
138 assert!(result.contains(&("2024-10-29".to_string(), "metric_2".to_string(), 100)));
139 assert!(result.contains(&("2024-10-30".to_string(), "metric_1".to_string(), 16)));
140 }
141 }
142
143 mod signer_registration_summary {
144 use std::sync::Arc;
145
146 use crate::event_store::database::test_helper::event_store_db_connection;
147 use mithril_common::entities::{SignerWithStake, Stake};
148 use mithril_common::{test_utils::fake_data, StdResult};
149 use sqlite::ConnectionThreadSafe;
150
151 use super::{EventMessage, EventPersister};
152
153 fn insert_registration_event(
155 persister: &EventPersister,
156 epoch: &str,
157 party_id: &str,
158 stake: Stake,
159 signer_node_version: &str,
160 ) {
161 let signers = fake_data::signers_with_stakes(1);
162 let signer = SignerWithStake {
163 party_id: party_id.to_string(),
164 stake,
165 ..signers[0].clone()
166 };
167
168 let message = EventMessage::signer_registration(
169 "Test",
170 &signer,
171 Some(signer_node_version.to_string()),
172 epoch,
173 );
174
175 let _event = persister.persist(message).unwrap();
176 }
177
178 #[derive(PartialEq)]
179 struct StakeSignerVersion {
180 epoch: i64,
181 version: String,
182 total_epoch_stakes: i64,
183 stakes_version: i64,
184 stakes_ratio: String,
185 pool_count: i64,
186 }
187 impl StakeSignerVersion {
188 fn new(
189 epoch: i64,
190 version: &str,
191 total_epoch_stakes: i64,
192 stakes_version: i64,
193 stakes_ratio: &str,
194 pool_count: i64,
195 ) -> Self {
196 Self {
197 epoch,
198 version: version.to_string(),
199 total_epoch_stakes,
200 stakes_version,
201 stakes_ratio: stakes_ratio.to_string(),
202 pool_count,
203 }
204 }
205 }
206
207 fn get_all_registrations(
208 connection: Arc<ConnectionThreadSafe>,
209 ) -> StdResult<Vec<StakeSignerVersion>> {
210 let query = "select
211 epoch,
212 version,
213 total_epoch_stakes,
214 stakes_version,
215 stakes_ratio,
216 pool_count
217 from signer_registration_summary;";
218 let mut statement = connection.prepare(query)?;
219 let mut result = Vec::new();
220 while let Ok(sqlite::State::Row) = statement.next() {
221 result.push(StakeSignerVersion::new(
222 statement.read::<i64, _>("epoch")?,
223 &statement.read::<String, _>("version")?,
224 statement.read::<i64, _>("total_epoch_stakes")?,
225 statement.read::<i64, _>("stakes_version")?,
226 &statement.read::<String, _>("stakes_ratio")?,
227 statement.read::<i64, _>("pool_count")?,
228 ));
229 }
230
231 Ok(result)
232 }
233
234 #[test]
235 fn retrieved_node_version() {
236 let connection = Arc::new(event_store_db_connection().unwrap());
237 let persister = EventPersister::new(connection.clone());
238
239 insert_registration_event(&persister, "3", "A", 15, "0.2.234");
240 insert_registration_event(&persister, "4", "A", 15, "15.24.32");
241 insert_registration_event(&persister, "5", "A", 15, "0.4.789+ef0c28a");
242
243 let result = get_all_registrations(connection).unwrap();
244
245 assert!(result.contains(&StakeSignerVersion::new(3, "0.2.234", 15, 15, "100 %", 1)));
246 assert!(result.contains(&StakeSignerVersion::new(4, "15.24.32", 15, 15, "100 %", 1)));
247 assert!(result.contains(&StakeSignerVersion::new(5, "0.4.789", 15, 15, "100 %", 1)));
248 }
249
250 #[test]
251 fn retrieved_total_by_epoch() {
252 let connection = Arc::new(event_store_db_connection().unwrap());
253 let persister = EventPersister::new(connection.clone());
254
255 insert_registration_event(&persister, "8", "A", 20, "1.0.2");
256 insert_registration_event(&persister, "8", "B", 15, "1.0.2");
257 insert_registration_event(&persister, "9", "A", 56, "1.0.2");
258 insert_registration_event(&persister, "9", "B", 31, "1.0.2");
259 let result = get_all_registrations(connection).unwrap();
260
261 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 35, 35, "100 %", 2)));
262 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 87, 87, "100 %", 2)));
263 }
264
265 #[test]
266 fn retrieved_percentage_per_version() {
267 let connection = Arc::new(event_store_db_connection().unwrap());
268 let persister = EventPersister::new(connection.clone());
269
270 insert_registration_event(&persister, "8", "A", 90, "1.0.2");
271 insert_registration_event(&persister, "8", "B", 30, "1.0.2");
272 insert_registration_event(&persister, "8", "C", 80, "1.0.4");
273 let result = get_all_registrations(connection).unwrap();
274
275 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 200, 120, "60 %", 2)));
276 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 200, 80, "40 %", 1)));
277 }
278
279 #[test]
280 fn retrieved_percentage_per_epoch() {
281 let connection = Arc::new(event_store_db_connection().unwrap());
282 let persister = EventPersister::new(connection.clone());
283
284 insert_registration_event(&persister, "8", "A", 6, "1.0.2");
285 insert_registration_event(&persister, "8", "B", 4, "1.0.4");
286 insert_registration_event(&persister, "9", "A", 28, "1.0.2");
287 insert_registration_event(&persister, "9", "B", 12, "1.0.4");
288 let result = get_all_registrations(connection).unwrap();
289
290 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 10, 6, "60 %", 1)));
291 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 10, 4, "40 %", 1)));
292 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 40, 28, "70 %", 1)));
293 assert!(result.contains(&StakeSignerVersion::new(9, "1.0.4", 40, 12, "30 %", 1)));
294 }
295
296 #[test]
297 fn with_multi_registrations_for_an_epoch_only_the_last_recorded_one_is_retained() {
298 let connection = Arc::new(event_store_db_connection().unwrap());
299 let persister = EventPersister::new(connection.clone());
300
301 insert_registration_event(&persister, "8", "A", 6, "1.0.2");
302 insert_registration_event(&persister, "8", "A", 8, "1.0.2");
303 insert_registration_event(&persister, "8", "A", 10, "1.0.4");
304 insert_registration_event(&persister, "8", "A", 7, "1.0.3");
305
306 let result = get_all_registrations(connection).unwrap();
307
308 assert!(result.contains(&StakeSignerVersion::new(8, "1.0.3", 7, 7, "100 %", 1)));
309 assert!(result.len() == 1);
310 }
311 }
312}