mithril_aggregator/services/
upkeep.rs1use std::sync::Arc;
9
10use anyhow::Context;
11use async_trait::async_trait;
12use mithril_common::entities::Epoch;
13use mithril_common::logging::LoggerExtensions;
14use mithril_common::StdResult;
15use mithril_persistence::sqlite::{
16 SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool,
17};
18use mithril_signed_entity_lock::SignedEntityTypeLock;
19use slog::{info, Logger};
20
21#[cfg_attr(test, mockall::automock)]
23#[async_trait]
24pub trait UpkeepService: Send + Sync {
25 async fn run(&self, epoch: Epoch) -> StdResult<()>;
27
28 async fn vacuum(&self) -> StdResult<()>;
30}
31
32#[cfg_attr(test, mockall::automock)]
34#[async_trait]
35pub trait EpochPruningTask: Send + Sync {
36 fn pruned_data(&self) -> &'static str;
38
39 async fn prune(&self, current_epoch: Epoch) -> StdResult<()>;
41}
42
43pub struct AggregatorUpkeepService {
48 main_db_connection: Arc<SqliteConnection>,
49 cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
50 event_store_connection: Arc<SqliteConnection>,
51 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
52 pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
53 logger: Logger,
54}
55
56impl AggregatorUpkeepService {
57 pub fn new(
59 main_db_connection: Arc<SqliteConnection>,
60 cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
61 event_store_connection: Arc<SqliteConnection>,
62 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
63 pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
64 logger: Logger,
65 ) -> Self {
66 Self {
67 main_db_connection,
68 cardano_tx_connection_pool,
69 event_store_connection,
70 signed_entity_type_lock,
71 pruning_tasks,
72 logger: logger.new_with_component_name::<Self>(),
73 }
74 }
75
76 async fn execute_pruning_tasks(&self, current_epoch: Epoch) -> StdResult<()> {
77 for task in &self.pruning_tasks {
78 info!(
79 self.logger, "Pruning stale data";
80 "pruned_data" => task.pruned_data(), "current_epoch" => ?current_epoch
81 );
82 task.prune(current_epoch).await?;
83 }
84
85 Ok(())
86 }
87
88 async fn upkeep_all_databases(&self) -> StdResult<()> {
89 if self.signed_entity_type_lock.has_locked_entities().await {
90 info!(
91 self.logger,
92 "Some entities are locked - Skipping database upkeep"
93 );
94 return Ok(());
95 }
96
97 let main_db_connection = self.main_db_connection.clone();
98 let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone();
99 let event_store_connection = self.event_store_connection.clone();
100 let db_upkeep_logger = self.logger.clone();
101
102 let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
104 info!(db_upkeep_logger, "Cleaning main database");
105 SqliteCleaner::new(&main_db_connection)
106 .with_logger(db_upkeep_logger.clone())
107 .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
108 .run()?;
109
110 info!(db_upkeep_logger, "Cleaning cardano transactions database");
111 let cardano_tx_db_connection = cardano_tx_db_connection_pool.connection()?;
112 SqliteCleaner::new(&cardano_tx_db_connection)
113 .with_logger(db_upkeep_logger.clone())
114 .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
115 .run()?;
116
117 info!(db_upkeep_logger, "Cleaning event database");
118 SqliteCleaner::new(&event_store_connection)
119 .with_logger(db_upkeep_logger.clone())
120 .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
121 .run()?;
122
123 Ok(())
124 });
125
126 db_upkeep_thread
127 .await
128 .with_context(|| "Database Upkeep thread crashed")?
129 }
130
131 async fn vacuum_main_database(&self) -> StdResult<()> {
132 if self.signed_entity_type_lock.has_locked_entities().await {
133 info!(
134 self.logger,
135 "Some entities are locked - Skipping main database vacuum"
136 );
137 return Ok(());
138 }
139
140 let main_db_connection = self.main_db_connection.clone();
141 let db_upkeep_logger = self.logger.clone();
142
143 let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
145 info!(db_upkeep_logger, "Vacuum main database");
146 SqliteCleaner::new(&main_db_connection)
147 .with_logger(db_upkeep_logger.clone())
148 .with_tasks(&[SqliteCleaningTask::Vacuum])
149 .run()?;
150
151 Ok(())
152 });
153
154 db_upkeep_thread
155 .await
156 .with_context(|| "Database Upkeep thread crashed")?
157 }
158}
159
160#[async_trait]
161impl UpkeepService for AggregatorUpkeepService {
162 async fn run(&self, current_epoch: Epoch) -> StdResult<()> {
163 info!(self.logger, "Start upkeep of the application");
164
165 self.execute_pruning_tasks(current_epoch)
166 .await
167 .with_context(|| "Pruning tasks failed")?;
168
169 self.upkeep_all_databases()
170 .await
171 .with_context(|| "Database upkeep failed")?;
172
173 info!(self.logger, "Upkeep finished");
174 Ok(())
175 }
176
177 async fn vacuum(&self) -> StdResult<()> {
178 info!(self.logger, "Start database vacuum");
179
180 self.vacuum_main_database()
181 .await
182 .with_context(|| "Vacuuming main database failed")?;
183
184 info!(self.logger, "Vacuum finished");
185
186 Ok(())
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use mithril_common::{entities::SignedEntityTypeDiscriminants, temp_dir_create};
193 use mockall::predicate::eq;
194
195 use crate::database::test_helper::{
196 cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection,
197 main_db_file_connection,
198 };
199 use crate::event_store::database::test_helper::{
200 event_store_db_connection, event_store_db_file_connection,
201 };
202 use crate::test_tools::TestLogger;
203
204 use super::*;
205
206 fn mock_epoch_pruning_task(
207 mock_config: impl FnOnce(&mut MockEpochPruningTask),
208 ) -> Arc<dyn EpochPruningTask> {
209 let mut task_mock = MockEpochPruningTask::new();
210 task_mock.expect_pruned_data().return_const("mock_data");
211 mock_config(&mut task_mock);
212 Arc::new(task_mock)
213 }
214
215 fn default_upkeep_service() -> AggregatorUpkeepService {
216 AggregatorUpkeepService::new(
217 Arc::new(main_db_connection().unwrap()),
218 Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
219 Arc::new(event_store_db_connection().unwrap()),
220 Arc::new(SignedEntityTypeLock::default()),
221 vec![],
222 TestLogger::stdout(),
223 )
224 }
225
226 #[tokio::test]
227 async fn test_cleanup_database() {
228 let (main_db_path, ctx_db_path, event_store_db_path, log_path) = {
229 let db_dir = temp_dir_create!();
230 (
231 db_dir.join("main.db"),
232 db_dir.join("cardano_tx.db"),
233 db_dir.join("event_store.db"),
234 db_dir.join("upkeep.log"),
235 )
236 };
237
238 let main_db_connection = main_db_file_connection(&main_db_path).unwrap();
239 let cardano_tx_connection = cardano_tx_db_file_connection(&ctx_db_path).unwrap();
240 let event_store_connection = event_store_db_file_connection(&event_store_db_path).unwrap();
241
242 {
244 let service = AggregatorUpkeepService::new(
245 Arc::new(main_db_connection),
246 Arc::new(SqliteConnectionPool::build_from_connection(
247 cardano_tx_connection,
248 )),
249 Arc::new(event_store_connection),
250 Arc::new(SignedEntityTypeLock::default()),
251 vec![],
252 TestLogger::file(&log_path),
253 );
254
255 service.run(Epoch(5)).await.expect("Upkeep service failed");
256 }
257
258 let logs = std::fs::read_to_string(&log_path).unwrap();
259
260 assert_eq!(
261 logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
262 .count(),
263 3,
264 "Should have run three times since the three databases have a `WalCheckpointTruncate` cleanup"
265 );
266 assert_eq!(
267 logs.matches(SqliteCleaningTask::Vacuum.log_message())
268 .count(),
269 0,
270 "Upkeep operation should not include Vacuum tasks"
271 );
272 }
273
274 #[tokio::test]
275 async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
276 let log_path = temp_dir_create!().join("upkeep.log");
277
278 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
279 signed_entity_type_lock
280 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
281 .await;
282
283 {
285 let service = AggregatorUpkeepService {
286 signed_entity_type_lock: signed_entity_type_lock.clone(),
287 logger: TestLogger::file(&log_path),
288 ..default_upkeep_service()
289 };
290 service.run(Epoch(5)).await.expect("Upkeep service failed");
291 }
292
293 let logs = std::fs::read_to_string(&log_path).unwrap();
294
295 assert_eq!(
296 logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
297 .count(),
298 0,
299 );
300 }
301 #[tokio::test]
302 async fn test_execute_all_pruning_tasks() {
303 let task1 = mock_epoch_pruning_task(|mock| {
304 mock.expect_prune()
305 .once()
306 .with(eq(Epoch(14)))
307 .returning(|_| Ok(()));
308 });
309 let task2 = mock_epoch_pruning_task(|mock| {
310 mock.expect_prune()
311 .once()
312 .with(eq(Epoch(14)))
313 .returning(|_| Ok(()));
314 });
315
316 let service = AggregatorUpkeepService {
317 pruning_tasks: vec![task1, task2],
318 ..default_upkeep_service()
319 };
320
321 service.run(Epoch(14)).await.expect("Upkeep service failed");
322 }
323
324 #[tokio::test]
325 async fn test_doesnt_vacuum_db_if_any_entity_is_locked() {
326 let log_path = temp_dir_create!().join("vacuum.log");
327
328 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
329 signed_entity_type_lock
330 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
331 .await;
332
333 {
335 let service = AggregatorUpkeepService {
336 signed_entity_type_lock: signed_entity_type_lock.clone(),
337 logger: TestLogger::file(&log_path),
338 ..default_upkeep_service()
339 };
340 service.vacuum().await.expect("Vacuum failed");
341 }
342
343 let logs = std::fs::read_to_string(&log_path).unwrap();
344
345 assert_eq!(
346 logs.matches(SqliteCleaningTask::Vacuum.log_message())
347 .count(),
348 0,
349 );
350 }
351
352 #[tokio::test]
353 async fn test_vacuum_database() {
354 let log_path = temp_dir_create!().join("vacuum.log");
355
356 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
357 signed_entity_type_lock
358 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
359 .await;
360
361 {
363 let service = AggregatorUpkeepService {
364 logger: TestLogger::file(&log_path),
365 ..default_upkeep_service()
366 };
367 service.vacuum().await.expect("Vacuum failed");
368 }
369
370 let logs = std::fs::read_to_string(&log_path).unwrap();
371
372 assert_eq!(
373 logs.matches(SqliteCleaningTask::Vacuum.log_message())
374 .count(),
375 1,
376 );
377 }
378}