mithril_signer/services/
upkeep_service.rs

1//! ## Upkeep Service
2//!
3//! This service is responsible for the upkeep of the application.
4//!
5//! It is in charge of the following tasks:
6//! * free up space by executing vacuum and WAL checkpoint on the database
7
8use std::sync::Arc;
9
10use anyhow::Context;
11use async_trait::async_trait;
12use slog::{info, Logger};
13
14use mithril_common::entities::Epoch;
15use mithril_common::logging::LoggerExtensions;
16use mithril_common::StdResult;
17use mithril_persistence::sqlite::{
18    SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool,
19};
20use mithril_signed_entity_lock::SignedEntityTypeLock;
21
22/// Define the service responsible for the upkeep of the application.
23#[cfg_attr(test, mockall::automock)]
24#[async_trait]
25pub trait UpkeepService: Send + Sync {
26    /// Run the upkeep service.
27    async fn run(&self, current_epoch: Epoch) -> StdResult<()>;
28}
29
30/// Define the task responsible for pruning a datasource below a certain epoch threshold.
31#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait EpochPruningTask: Send + Sync {
34    /// Get the name of the data that will be pruned.
35    fn pruned_data(&self) -> &'static str;
36
37    /// Prune the datasource based on the given current epoch.
38    async fn prune(&self, current_epoch: Epoch) -> StdResult<()>;
39}
40
41/// Implementation of the upkeep service for the signer.
42///
43/// To ensure that connections are cleaned up properly, it creates new connections itself
44/// instead of relying on a connection pool or a shared connection.
45pub struct SignerUpkeepService {
46    main_db_connection: Arc<SqliteConnection>,
47    cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
48    signed_entity_type_lock: Arc<SignedEntityTypeLock>,
49    pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
50    logger: Logger,
51}
52
53impl SignerUpkeepService {
54    /// Create a new instance of the aggregator upkeep service.
55    pub fn new(
56        main_db_connection: Arc<SqliteConnection>,
57        cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
58        signed_entity_type_lock: Arc<SignedEntityTypeLock>,
59        pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
60        logger: Logger,
61    ) -> Self {
62        Self {
63            main_db_connection,
64            cardano_tx_connection_pool,
65            signed_entity_type_lock,
66            pruning_tasks,
67            logger: logger.new_with_component_name::<Self>(),
68        }
69    }
70
71    async fn execute_pruning_tasks(&self, current_epoch: Epoch) -> StdResult<()> {
72        for task in &self.pruning_tasks {
73            info!(
74                self.logger, "Pruning stale data";
75                "pruned_data" => task.pruned_data(), "current_epoch" => ?current_epoch
76            );
77            task.prune(current_epoch).await?;
78        }
79
80        Ok(())
81    }
82
83    async fn upkeep_all_databases(&self) -> StdResult<()> {
84        if self.signed_entity_type_lock.has_locked_entities().await {
85            info!(
86                self.logger,
87                "Some entities are locked - Skipping database upkeep"
88            );
89            return Ok(());
90        }
91
92        let main_db_connection = self.main_db_connection.clone();
93        let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone();
94        let db_upkeep_logger = self.logger.clone();
95
96        // Run the database upkeep tasks in another thread to avoid blocking the tokio runtime
97        let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
98            info!(db_upkeep_logger, "Cleaning main database");
99            SqliteCleaner::new(&main_db_connection)
100                .with_logger(db_upkeep_logger.clone())
101                .with_tasks(&[
102                    SqliteCleaningTask::Vacuum,
103                    SqliteCleaningTask::WalCheckpointTruncate,
104                ])
105                .run()?;
106
107            info!(db_upkeep_logger, "Cleaning cardano transactions database");
108            let cardano_tx_db_connection = cardano_tx_db_connection_pool.connection()?;
109            SqliteCleaner::new(&cardano_tx_db_connection)
110                .with_logger(db_upkeep_logger.clone())
111                .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
112                .run()?;
113
114            Ok(())
115        });
116
117        db_upkeep_thread
118            .await
119            .with_context(|| "Database Upkeep thread crashed")?
120    }
121}
122
123#[async_trait]
124impl UpkeepService for SignerUpkeepService {
125    async fn run(&self, current_epoch: Epoch) -> StdResult<()> {
126        info!(self.logger, "Start upkeep of the application");
127
128        self.execute_pruning_tasks(current_epoch)
129            .await
130            .with_context(|| "Pruning tasks failed")?;
131
132        self.upkeep_all_databases()
133            .await
134            .with_context(|| "Database upkeep failed")?;
135
136        info!(self.logger, "Upkeep finished");
137        Ok(())
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use mockall::predicate::eq;
144
145    use mithril_common::entities::SignedEntityTypeDiscriminants;
146    use mithril_common::test_utils::TempDir;
147
148    use crate::database::test_helper::{
149        cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection,
150        main_db_file_connection,
151    };
152    use crate::test_tools::TestLogger;
153
154    use super::*;
155
156    fn mock_epoch_pruning_task(
157        mock_config: impl FnOnce(&mut MockEpochPruningTask),
158    ) -> Arc<dyn EpochPruningTask> {
159        let mut task_mock = MockEpochPruningTask::new();
160        task_mock.expect_pruned_data().return_const("mock_data");
161        mock_config(&mut task_mock);
162        Arc::new(task_mock)
163    }
164
165    #[tokio::test]
166    async fn test_cleanup_database() {
167        let (logger, log_inspector) = TestLogger::memory();
168        let (main_db_path, ctx_db_path) = {
169            let db_dir = TempDir::create("signer_upkeep", "test_cleanup_database");
170            (db_dir.join("main.db"), db_dir.join("cardano_tx.db"))
171        };
172
173        let main_db_connection = main_db_file_connection(&main_db_path).unwrap();
174        let cardano_tx_connection = cardano_tx_db_file_connection(&ctx_db_path).unwrap();
175
176        let service = SignerUpkeepService::new(
177            Arc::new(main_db_connection),
178            Arc::new(SqliteConnectionPool::build_from_connection(
179                cardano_tx_connection,
180            )),
181            Arc::new(SignedEntityTypeLock::default()),
182            vec![],
183            logger,
184        );
185
186        service.run(Epoch(13)).await.expect("Upkeep service failed");
187
188        assert_eq!(
189            log_inspector
190                .search_logs(SqliteCleaningTask::Vacuum.log_message())
191                .len(),
192            1,
193            "Should have run only once since only the main database has a `Vacuum` cleanup"
194        );
195        assert_eq!(
196            log_inspector
197                .search_logs(SqliteCleaningTask::WalCheckpointTruncate.log_message())
198                .len(),
199            2,
200            "Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup"
201        );
202    }
203
204    #[tokio::test]
205    async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
206        let (logger, log_inspector) = TestLogger::memory();
207
208        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
209        signed_entity_type_lock
210            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
211            .await;
212
213        let service = SignerUpkeepService::new(
214            Arc::new(main_db_connection().unwrap()),
215            Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
216            signed_entity_type_lock.clone(),
217            vec![],
218            logger,
219        );
220
221        service.run(Epoch(13)).await.expect("Upkeep service failed");
222
223        assert!(log_inspector
224            .search_logs(SqliteCleaningTask::Vacuum.log_message())
225            .is_empty());
226        assert!(log_inspector
227            .search_logs(SqliteCleaningTask::WalCheckpointTruncate.log_message())
228            .is_empty());
229    }
230
231    #[tokio::test]
232    async fn test_execute_all_pruning_tasks() {
233        let task1 = mock_epoch_pruning_task(|mock| {
234            mock.expect_prune()
235                .once()
236                .with(eq(Epoch(14)))
237                .returning(|_| Ok(()));
238        });
239        let task2 = mock_epoch_pruning_task(|mock| {
240            mock.expect_prune()
241                .once()
242                .with(eq(Epoch(14)))
243                .returning(|_| Ok(()));
244        });
245
246        let service = SignerUpkeepService::new(
247            Arc::new(main_db_connection().unwrap()),
248            Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
249            Arc::new(SignedEntityTypeLock::default()),
250            vec![task1, task2],
251            TestLogger::stdout(),
252        );
253
254        service.run(Epoch(14)).await.expect("Upkeep service failed");
255    }
256}