mithril_signer/services/
upkeep_service.rs

1//! ## Upkeep Service
2//!
3//! This service is responsible for the upkeep of the application.
4//!
5//! It is in charge of the following tasks:
6//! * free up space by executing vacuum and WAL checkpoint on the database
7
8use std::sync::Arc;
9
10use anyhow::Context;
11use async_trait::async_trait;
12use slog::{info, Logger};
13
14use mithril_common::entities::Epoch;
15use mithril_common::logging::LoggerExtensions;
16use mithril_common::StdResult;
17use mithril_persistence::sqlite::{
18    SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool,
19};
20use mithril_signed_entity_lock::SignedEntityTypeLock;
21
22/// Define the service responsible for the upkeep of the application.
23#[cfg_attr(test, mockall::automock)]
24#[async_trait]
25pub trait UpkeepService: Send + Sync {
26    /// Run the upkeep service.
27    async fn run(&self, current_epoch: Epoch) -> StdResult<()>;
28}
29
30/// Define the task responsible for pruning a datasource below a certain epoch threshold.
31#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait EpochPruningTask: Send + Sync {
34    /// Get the name of the data that will be pruned.
35    fn pruned_data(&self) -> &'static str;
36
37    /// Prune the datasource based on the given current epoch.
38    async fn prune(&self, current_epoch: Epoch) -> StdResult<()>;
39}
40
41/// Implementation of the upkeep service for the signer.
42///
43/// To ensure that connections are cleaned up properly, it creates new connections itself
44/// instead of relying on a connection pool or a shared connection.
45pub struct SignerUpkeepService {
46    main_db_connection: Arc<SqliteConnection>,
47    cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
48    signed_entity_type_lock: Arc<SignedEntityTypeLock>,
49    pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
50    logger: Logger,
51}
52
53impl SignerUpkeepService {
54    /// Create a new instance of the aggregator upkeep service.
55    pub fn new(
56        main_db_connection: Arc<SqliteConnection>,
57        cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
58        signed_entity_type_lock: Arc<SignedEntityTypeLock>,
59        pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
60        logger: Logger,
61    ) -> Self {
62        Self {
63            main_db_connection,
64            cardano_tx_connection_pool,
65            signed_entity_type_lock,
66            pruning_tasks,
67            logger: logger.new_with_component_name::<Self>(),
68        }
69    }
70
71    async fn execute_pruning_tasks(&self, current_epoch: Epoch) -> StdResult<()> {
72        for task in &self.pruning_tasks {
73            info!(
74                self.logger, "Pruning stale data";
75                "pruned_data" => task.pruned_data(), "current_epoch" => ?current_epoch
76            );
77            task.prune(current_epoch).await?;
78        }
79
80        Ok(())
81    }
82
83    async fn upkeep_all_databases(&self) -> StdResult<()> {
84        if self.signed_entity_type_lock.has_locked_entities().await {
85            info!(
86                self.logger,
87                "Some entities are locked - Skipping database upkeep"
88            );
89            return Ok(());
90        }
91
92        let main_db_connection = self.main_db_connection.clone();
93        let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone();
94        let db_upkeep_logger = self.logger.clone();
95
96        // Run the database upkeep tasks in another thread to avoid blocking the tokio runtime
97        let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
98            info!(db_upkeep_logger, "Cleaning main database");
99            SqliteCleaner::new(&main_db_connection)
100                .with_logger(db_upkeep_logger.clone())
101                .with_tasks(&[
102                    SqliteCleaningTask::Vacuum,
103                    SqliteCleaningTask::WalCheckpointTruncate,
104                ])
105                .run()?;
106
107            info!(db_upkeep_logger, "Cleaning cardano transactions database");
108            let cardano_tx_db_connection = cardano_tx_db_connection_pool.connection()?;
109            SqliteCleaner::new(&cardano_tx_db_connection)
110                .with_logger(db_upkeep_logger.clone())
111                .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
112                .run()?;
113
114            Ok(())
115        });
116
117        db_upkeep_thread
118            .await
119            .with_context(|| "Database Upkeep thread crashed")?
120    }
121}
122
123#[async_trait]
124impl UpkeepService for SignerUpkeepService {
125    async fn run(&self, current_epoch: Epoch) -> StdResult<()> {
126        info!(self.logger, "Start upkeep of the application");
127
128        self.execute_pruning_tasks(current_epoch)
129            .await
130            .with_context(|| "Pruning tasks failed")?;
131
132        self.upkeep_all_databases()
133            .await
134            .with_context(|| "Database upkeep failed")?;
135
136        info!(self.logger, "Upkeep finished");
137        Ok(())
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use mockall::predicate::eq;
144
145    use mithril_common::entities::SignedEntityTypeDiscriminants;
146    use mithril_common::test_utils::TempDir;
147
148    use crate::database::test_helper::{
149        cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection,
150        main_db_file_connection,
151    };
152    use crate::test_tools::TestLogger;
153
154    use super::*;
155
156    fn mock_epoch_pruning_task(
157        mock_config: impl FnOnce(&mut MockEpochPruningTask),
158    ) -> Arc<dyn EpochPruningTask> {
159        let mut task_mock = MockEpochPruningTask::new();
160        task_mock.expect_pruned_data().return_const("mock_data");
161        mock_config(&mut task_mock);
162        Arc::new(task_mock)
163    }
164
165    #[tokio::test]
166    async fn test_cleanup_database() {
167        let (main_db_path, ctx_db_path, log_path) = {
168            let db_dir = TempDir::create("signer_upkeep", "test_cleanup_database");
169            (
170                db_dir.join("main.db"),
171                db_dir.join("cardano_tx.db"),
172                db_dir.join("upkeep.log"),
173            )
174        };
175
176        let main_db_connection = main_db_file_connection(&main_db_path).unwrap();
177        let cardano_tx_connection = cardano_tx_db_file_connection(&ctx_db_path).unwrap();
178
179        // Separate block to force log flushing by dropping the service that owns the logger
180        {
181            let service = SignerUpkeepService::new(
182                Arc::new(main_db_connection),
183                Arc::new(SqliteConnectionPool::build_from_connection(
184                    cardano_tx_connection,
185                )),
186                Arc::new(SignedEntityTypeLock::default()),
187                vec![],
188                TestLogger::file(&log_path),
189            );
190
191            service.run(Epoch(13)).await.expect("Upkeep service failed");
192        }
193
194        let logs = std::fs::read_to_string(&log_path).unwrap();
195
196        assert_eq!(
197            logs.matches(SqliteCleaningTask::Vacuum.log_message())
198                .count(),
199            1,
200            "Should have run only once since only the main database has a `Vacuum` cleanup"
201        );
202        assert_eq!(
203            logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
204                .count(),
205            2,
206            "Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup"
207        );
208    }
209
210    #[tokio::test]
211    async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
212        let log_path = TempDir::create(
213            "signer_upkeep",
214            "test_doesnt_cleanup_db_if_any_entity_is_locked",
215        )
216        .join("upkeep.log");
217
218        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
219        signed_entity_type_lock
220            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
221            .await;
222
223        // Separate block to force log flushing by dropping the service that owns the logger
224        {
225            let service = SignerUpkeepService::new(
226                Arc::new(main_db_connection().unwrap()),
227                Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
228                signed_entity_type_lock.clone(),
229                vec![],
230                TestLogger::file(&log_path),
231            );
232
233            service.run(Epoch(13)).await.expect("Upkeep service failed");
234        }
235
236        let logs = std::fs::read_to_string(&log_path).unwrap();
237
238        assert_eq!(
239            logs.matches(SqliteCleaningTask::Vacuum.log_message())
240                .count(),
241            0,
242        );
243        assert_eq!(
244            logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
245                .count(),
246            0,
247        );
248    }
249
250    #[tokio::test]
251    async fn test_execute_all_pruning_tasks() {
252        let task1 = mock_epoch_pruning_task(|mock| {
253            mock.expect_prune()
254                .once()
255                .with(eq(Epoch(14)))
256                .returning(|_| Ok(()));
257        });
258        let task2 = mock_epoch_pruning_task(|mock| {
259            mock.expect_prune()
260                .once()
261                .with(eq(Epoch(14)))
262                .returning(|_| Ok(()));
263        });
264
265        let service = SignerUpkeepService::new(
266            Arc::new(main_db_connection().unwrap()),
267            Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
268            Arc::new(SignedEntityTypeLock::default()),
269            vec![task1, task2],
270            TestLogger::stdout(),
271        );
272
273        service.run(Epoch(14)).await.expect("Upkeep service failed");
274    }
275}