mithril_aggregator/services/
upkeep.rs

1//! ## Upkeep Service
2//!
3//! This service is responsible for the upkeep of the application.
4//!
5//! It is in charge of the following tasks:
6//! * free up space by executing vacuum and WAL checkpoint on the database
7
8use std::sync::Arc;
9
10use anyhow::Context;
11use async_trait::async_trait;
12use mithril_common::entities::Epoch;
13use mithril_common::logging::LoggerExtensions;
14use mithril_common::StdResult;
15use mithril_persistence::sqlite::{
16    SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool,
17};
18use mithril_signed_entity_lock::SignedEntityTypeLock;
19use slog::{info, Logger};
20
21/// Define the service responsible for the upkeep of the application.
22#[cfg_attr(test, mockall::automock)]
23#[async_trait]
24pub trait UpkeepService: Send + Sync {
25    /// Run the upkeep service.
26    async fn run(&self, epoch: Epoch) -> StdResult<()>;
27
28    /// Vacuum database.
29    async fn vacuum(&self) -> StdResult<()>;
30}
31
32/// Define the task responsible for pruning a datasource below a certain epoch threshold.
33#[cfg_attr(test, mockall::automock)]
34#[async_trait]
35pub trait EpochPruningTask: Send + Sync {
36    /// Get the name of the data that will be pruned.
37    fn pruned_data(&self) -> &'static str;
38
39    /// Prune the datasource based on the given current epoch.
40    async fn prune(&self, current_epoch: Epoch) -> StdResult<()>;
41}
42
43/// Implementation of the upkeep service for the aggregator.
44///
45/// To ensure that connections are cleaned up properly, it creates new connections itself
46/// instead of relying on a connection pool or a shared connection.
47pub struct AggregatorUpkeepService {
48    main_db_connection: Arc<SqliteConnection>,
49    cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
50    event_store_connection: Arc<SqliteConnection>,
51    signed_entity_type_lock: Arc<SignedEntityTypeLock>,
52    pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
53    logger: Logger,
54}
55
56impl AggregatorUpkeepService {
57    /// Create a new instance of the aggregator upkeep service.
58    pub fn new(
59        main_db_connection: Arc<SqliteConnection>,
60        cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
61        event_store_connection: Arc<SqliteConnection>,
62        signed_entity_type_lock: Arc<SignedEntityTypeLock>,
63        pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
64        logger: Logger,
65    ) -> Self {
66        Self {
67            main_db_connection,
68            cardano_tx_connection_pool,
69            event_store_connection,
70            signed_entity_type_lock,
71            pruning_tasks,
72            logger: logger.new_with_component_name::<Self>(),
73        }
74    }
75
76    async fn execute_pruning_tasks(&self, current_epoch: Epoch) -> StdResult<()> {
77        for task in &self.pruning_tasks {
78            info!(
79                self.logger, "Pruning stale data";
80                "pruned_data" => task.pruned_data(), "current_epoch" => ?current_epoch
81            );
82            task.prune(current_epoch).await?;
83        }
84
85        Ok(())
86    }
87
88    async fn upkeep_all_databases(&self) -> StdResult<()> {
89        if self.signed_entity_type_lock.has_locked_entities().await {
90            info!(
91                self.logger,
92                "Some entities are locked - Skipping database upkeep"
93            );
94            return Ok(());
95        }
96
97        let main_db_connection = self.main_db_connection.clone();
98        let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone();
99        let event_store_connection = self.event_store_connection.clone();
100        let db_upkeep_logger = self.logger.clone();
101
102        // Run the database upkeep tasks in another thread to avoid blocking the tokio runtime
103        let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
104            info!(db_upkeep_logger, "Cleaning main database");
105            SqliteCleaner::new(&main_db_connection)
106                .with_logger(db_upkeep_logger.clone())
107                .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
108                .run()?;
109
110            info!(db_upkeep_logger, "Cleaning cardano transactions database");
111            let cardano_tx_db_connection = cardano_tx_db_connection_pool.connection()?;
112            SqliteCleaner::new(&cardano_tx_db_connection)
113                .with_logger(db_upkeep_logger.clone())
114                .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
115                .run()?;
116
117            info!(db_upkeep_logger, "Cleaning event database");
118            SqliteCleaner::new(&event_store_connection)
119                .with_logger(db_upkeep_logger.clone())
120                .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
121                .run()?;
122
123            Ok(())
124        });
125
126        db_upkeep_thread
127            .await
128            .with_context(|| "Database Upkeep thread crashed")?
129    }
130
131    async fn vacuum_main_database(&self) -> StdResult<()> {
132        if self.signed_entity_type_lock.has_locked_entities().await {
133            info!(
134                self.logger,
135                "Some entities are locked - Skipping main database vacuum"
136            );
137            return Ok(());
138        }
139
140        let main_db_connection = self.main_db_connection.clone();
141        let db_upkeep_logger = self.logger.clone();
142
143        // Run the database upkeep tasks in another thread to avoid blocking the tokio runtime
144        let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
145            info!(db_upkeep_logger, "Vacuum main database");
146            SqliteCleaner::new(&main_db_connection)
147                .with_logger(db_upkeep_logger.clone())
148                .with_tasks(&[SqliteCleaningTask::Vacuum])
149                .run()?;
150
151            Ok(())
152        });
153
154        db_upkeep_thread
155            .await
156            .with_context(|| "Database Upkeep thread crashed")?
157    }
158}
159
160#[async_trait]
161impl UpkeepService for AggregatorUpkeepService {
162    async fn run(&self, current_epoch: Epoch) -> StdResult<()> {
163        info!(self.logger, "Start upkeep of the application");
164
165        self.execute_pruning_tasks(current_epoch)
166            .await
167            .with_context(|| "Pruning tasks failed")?;
168
169        self.upkeep_all_databases()
170            .await
171            .with_context(|| "Database upkeep failed")?;
172
173        info!(self.logger, "Upkeep finished");
174        Ok(())
175    }
176
177    async fn vacuum(&self) -> StdResult<()> {
178        info!(self.logger, "Start database vacuum");
179
180        self.vacuum_main_database()
181            .await
182            .with_context(|| "Vacuuming main database failed")?;
183
184        info!(self.logger, "Vacuum finished");
185
186        Ok(())
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use mithril_common::{entities::SignedEntityTypeDiscriminants, temp_dir_create};
193    use mockall::predicate::eq;
194
195    use crate::database::test_helper::{
196        cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection,
197        main_db_file_connection,
198    };
199    use crate::event_store::database::test_helper::{
200        event_store_db_connection, event_store_db_file_connection,
201    };
202    use crate::test_tools::TestLogger;
203
204    use super::*;
205
206    fn mock_epoch_pruning_task(
207        mock_config: impl FnOnce(&mut MockEpochPruningTask),
208    ) -> Arc<dyn EpochPruningTask> {
209        let mut task_mock = MockEpochPruningTask::new();
210        task_mock.expect_pruned_data().return_const("mock_data");
211        mock_config(&mut task_mock);
212        Arc::new(task_mock)
213    }
214
215    fn default_upkeep_service() -> AggregatorUpkeepService {
216        AggregatorUpkeepService::new(
217            Arc::new(main_db_connection().unwrap()),
218            Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
219            Arc::new(event_store_db_connection().unwrap()),
220            Arc::new(SignedEntityTypeLock::default()),
221            vec![],
222            TestLogger::stdout(),
223        )
224    }
225
226    #[tokio::test]
227    async fn test_cleanup_database() {
228        let (main_db_path, ctx_db_path, event_store_db_path, log_path) = {
229            let db_dir = temp_dir_create!();
230            (
231                db_dir.join("main.db"),
232                db_dir.join("cardano_tx.db"),
233                db_dir.join("event_store.db"),
234                db_dir.join("upkeep.log"),
235            )
236        };
237
238        let main_db_connection = main_db_file_connection(&main_db_path).unwrap();
239        let cardano_tx_connection = cardano_tx_db_file_connection(&ctx_db_path).unwrap();
240        let event_store_connection = event_store_db_file_connection(&event_store_db_path).unwrap();
241
242        // Separate block to force log flushing by dropping the service that owns the logger
243        {
244            let service = AggregatorUpkeepService::new(
245                Arc::new(main_db_connection),
246                Arc::new(SqliteConnectionPool::build_from_connection(
247                    cardano_tx_connection,
248                )),
249                Arc::new(event_store_connection),
250                Arc::new(SignedEntityTypeLock::default()),
251                vec![],
252                TestLogger::file(&log_path),
253            );
254
255            service.run(Epoch(5)).await.expect("Upkeep service failed");
256        }
257
258        let logs = std::fs::read_to_string(&log_path).unwrap();
259
260        assert_eq!(
261            logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
262                .count(),
263            3,
264            "Should have run three times since the three databases have a `WalCheckpointTruncate` cleanup"
265        );
266        assert_eq!(
267            logs.matches(SqliteCleaningTask::Vacuum.log_message())
268                .count(),
269            0,
270            "Upkeep operation should not include Vacuum tasks"
271        );
272    }
273
274    #[tokio::test]
275    async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
276        let log_path = temp_dir_create!().join("upkeep.log");
277
278        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
279        signed_entity_type_lock
280            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
281            .await;
282
283        // Separate block to force log flushing by dropping the service that owns the logger
284        {
285            let service = AggregatorUpkeepService {
286                signed_entity_type_lock: signed_entity_type_lock.clone(),
287                logger: TestLogger::file(&log_path),
288                ..default_upkeep_service()
289            };
290            service.run(Epoch(5)).await.expect("Upkeep service failed");
291        }
292
293        let logs = std::fs::read_to_string(&log_path).unwrap();
294
295        assert_eq!(
296            logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
297                .count(),
298            0,
299        );
300    }
301    #[tokio::test]
302    async fn test_execute_all_pruning_tasks() {
303        let task1 = mock_epoch_pruning_task(|mock| {
304            mock.expect_prune()
305                .once()
306                .with(eq(Epoch(14)))
307                .returning(|_| Ok(()));
308        });
309        let task2 = mock_epoch_pruning_task(|mock| {
310            mock.expect_prune()
311                .once()
312                .with(eq(Epoch(14)))
313                .returning(|_| Ok(()));
314        });
315
316        let service = AggregatorUpkeepService {
317            pruning_tasks: vec![task1, task2],
318            ..default_upkeep_service()
319        };
320
321        service.run(Epoch(14)).await.expect("Upkeep service failed");
322    }
323
324    #[tokio::test]
325    async fn test_doesnt_vacuum_db_if_any_entity_is_locked() {
326        let log_path = temp_dir_create!().join("vacuum.log");
327
328        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
329        signed_entity_type_lock
330            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
331            .await;
332
333        // Separate block to force log flushing by dropping the service that owns the logger
334        {
335            let service = AggregatorUpkeepService {
336                signed_entity_type_lock: signed_entity_type_lock.clone(),
337                logger: TestLogger::file(&log_path),
338                ..default_upkeep_service()
339            };
340            service.vacuum().await.expect("Vacuum failed");
341        }
342
343        let logs = std::fs::read_to_string(&log_path).unwrap();
344
345        assert_eq!(
346            logs.matches(SqliteCleaningTask::Vacuum.log_message())
347                .count(),
348            0,
349        );
350    }
351
352    #[tokio::test]
353    async fn test_vacuum_database() {
354        let log_path = temp_dir_create!().join("vacuum.log");
355
356        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
357        signed_entity_type_lock
358            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
359            .await;
360
361        // Separate block to force log flushing by dropping the service that owns the logger
362        {
363            let service = AggregatorUpkeepService {
364                logger: TestLogger::file(&log_path),
365                ..default_upkeep_service()
366            };
367            service.vacuum().await.expect("Vacuum failed");
368        }
369
370        let logs = std::fs::read_to_string(&log_path).unwrap();
371
372        assert_eq!(
373            logs.matches(SqliteCleaningTask::Vacuum.log_message())
374                .count(),
375            1,
376        );
377    }
378}