mithril_signer/services/
upkeep_service.rs

1//! ## Upkeep Service
2//!
3//! This service is responsible for the upkeep of the application.
4//!
5//! It is in charge of the following tasks:
6//! * free up space by executing vacuum and WAL checkpoint on the database
7
8use std::sync::Arc;
9
10use anyhow::Context;
11use async_trait::async_trait;
12use slog::{Logger, info};
13
14use mithril_common::StdResult;
15use mithril_common::entities::Epoch;
16use mithril_common::logging::LoggerExtensions;
17use mithril_persistence::sqlite::{
18    SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool,
19};
20use mithril_signed_entity_lock::SignedEntityTypeLock;
21
22/// Define the service responsible for the upkeep of the application.
23#[cfg_attr(test, mockall::automock)]
24#[async_trait]
25pub trait UpkeepService: Send + Sync {
26    /// Run the upkeep service.
27    async fn run(&self, current_epoch: Epoch) -> StdResult<()>;
28}
29
30/// Define the task responsible for pruning a datasource below a certain epoch threshold.
31#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait EpochPruningTask: Send + Sync {
34    /// Get the name of the data that will be pruned.
35    fn pruned_data(&self) -> &'static str;
36
37    /// Prune the datasource based on the given current epoch.
38    async fn prune(&self, current_epoch: Epoch) -> StdResult<()>;
39}
40
41/// Implementation of the upkeep service for the signer.
42///
43/// To ensure that connections are cleaned up properly, it creates new connections itself
44/// instead of relying on a connection pool or a shared connection.
45pub struct SignerUpkeepService {
46    main_db_connection: Arc<SqliteConnection>,
47    cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
48    signed_entity_type_lock: Arc<SignedEntityTypeLock>,
49    pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
50    logger: Logger,
51}
52
53impl SignerUpkeepService {
54    /// Create a new instance of the aggregator upkeep service.
55    pub fn new(
56        main_db_connection: Arc<SqliteConnection>,
57        cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
58        signed_entity_type_lock: Arc<SignedEntityTypeLock>,
59        pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
60        logger: Logger,
61    ) -> Self {
62        Self {
63            main_db_connection,
64            cardano_tx_connection_pool,
65            signed_entity_type_lock,
66            pruning_tasks,
67            logger: logger.new_with_component_name::<Self>(),
68        }
69    }
70
71    async fn execute_pruning_tasks(&self, current_epoch: Epoch) -> StdResult<()> {
72        for task in &self.pruning_tasks {
73            info!(
74                self.logger, "Pruning stale data";
75                "pruned_data" => task.pruned_data(), "current_epoch" => ?current_epoch
76            );
77            task.prune(current_epoch).await?;
78        }
79
80        Ok(())
81    }
82
83    async fn upkeep_all_databases(&self) -> StdResult<()> {
84        if self.signed_entity_type_lock.has_locked_entities().await {
85            info!(
86                self.logger,
87                "Some entities are locked - Skipping database upkeep"
88            );
89            return Ok(());
90        }
91
92        let main_db_connection = self.main_db_connection.clone();
93        let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone();
94        let db_upkeep_logger = self.logger.clone();
95
96        // Run the database upkeep tasks in another thread to avoid blocking the tokio runtime
97        let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
98            info!(db_upkeep_logger, "Cleaning main database");
99            SqliteCleaner::new(&main_db_connection)
100                .with_logger(db_upkeep_logger.clone())
101                .with_tasks(&[
102                    SqliteCleaningTask::Vacuum,
103                    SqliteCleaningTask::WalCheckpointTruncate,
104                ])
105                .run()?;
106
107            info!(db_upkeep_logger, "Cleaning cardano transactions database");
108            let cardano_tx_db_connection = cardano_tx_db_connection_pool.connection()?;
109            SqliteCleaner::new(&cardano_tx_db_connection)
110                .with_logger(db_upkeep_logger.clone())
111                .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
112                .run()?;
113
114            Ok(())
115        });
116
117        db_upkeep_thread
118            .await
119            .with_context(|| "Database Upkeep thread crashed")?
120    }
121}
122
123#[async_trait]
124impl UpkeepService for SignerUpkeepService {
125    async fn run(&self, current_epoch: Epoch) -> StdResult<()> {
126        info!(self.logger, "Start upkeep of the application");
127
128        self.execute_pruning_tasks(current_epoch)
129            .await
130            .with_context(|| "Pruning tasks failed")?;
131
132        self.upkeep_all_databases()
133            .await
134            .with_context(|| "Database upkeep failed")?;
135
136        info!(self.logger, "Upkeep finished");
137        Ok(())
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use mockall::predicate::eq;
144
145    use mithril_common::entities::SignedEntityTypeDiscriminants;
146    use mithril_common::temp_dir_create;
147    use mithril_common::test::TempDir;
148
149    use crate::database::test_helper::{
150        cardano_tx_db_connection_builder, main_db_connection, main_db_file_connection,
151    };
152    use crate::test::TestLogger;
153
154    use super::*;
155
156    fn mock_epoch_pruning_task(
157        mock_config: impl FnOnce(&mut MockEpochPruningTask),
158    ) -> Arc<dyn EpochPruningTask> {
159        let mut task_mock = MockEpochPruningTask::new();
160        task_mock.expect_pruned_data().return_const("mock_data");
161        mock_config(&mut task_mock);
162        Arc::new(task_mock)
163    }
164
165    #[tokio::test]
166    async fn test_cleanup_database() {
167        let (logger, log_inspector) = TestLogger::memory();
168        let (main_db_path, ctx_db_path) = {
169            let db_dir = TempDir::create("signer_upkeep", "test_cleanup_database");
170            (db_dir.join("main.db"), db_dir.join("cardano_tx.db"))
171        };
172
173        let main_db_connection = main_db_file_connection(&main_db_path).unwrap();
174        let cardano_tx_conn_builder = cardano_tx_db_connection_builder(&ctx_db_path);
175
176        let service = SignerUpkeepService::new(
177            Arc::new(main_db_connection),
178            Arc::new(cardano_tx_conn_builder.build_pool(1).unwrap()),
179            Arc::new(SignedEntityTypeLock::default()),
180            vec![],
181            logger,
182        );
183
184        service.run(Epoch(13)).await.expect("Upkeep service failed");
185
186        assert_eq!(
187            log_inspector
188                .search_logs(SqliteCleaningTask::Vacuum.log_message())
189                .len(),
190            1,
191            "Should have run only once since only the main database has a `Vacuum` cleanup"
192        );
193        assert_eq!(
194            log_inspector
195                .search_logs(SqliteCleaningTask::WalCheckpointTruncate.log_message())
196                .len(),
197            2,
198            "Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup"
199        );
200    }
201
202    #[tokio::test]
203    async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
204        let db_dir = temp_dir_create!();
205        let (logger, log_inspector) = TestLogger::memory();
206
207        let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
208        signed_entity_type_lock
209            .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
210            .await;
211
212        let service = SignerUpkeepService::new(
213            Arc::new(main_db_connection().unwrap()),
214            Arc::new(
215                cardano_tx_db_connection_builder(&db_dir.join("cardano_tx.db"))
216                    .build_pool(1)
217                    .unwrap(),
218            ),
219            signed_entity_type_lock.clone(),
220            vec![],
221            logger,
222        );
223
224        service.run(Epoch(13)).await.expect("Upkeep service failed");
225
226        assert!(
227            log_inspector
228                .search_logs(SqliteCleaningTask::Vacuum.log_message())
229                .is_empty()
230        );
231        assert!(
232            log_inspector
233                .search_logs(SqliteCleaningTask::WalCheckpointTruncate.log_message())
234                .is_empty()
235        );
236    }
237
238    #[tokio::test]
239    async fn test_execute_all_pruning_tasks() {
240        let db_dir = temp_dir_create!();
241        let task1 = mock_epoch_pruning_task(|mock| {
242            mock.expect_prune().once().with(eq(Epoch(14))).returning(|_| Ok(()));
243        });
244        let task2 = mock_epoch_pruning_task(|mock| {
245            mock.expect_prune().once().with(eq(Epoch(14))).returning(|_| Ok(()));
246        });
247
248        let service = SignerUpkeepService::new(
249            Arc::new(main_db_connection().unwrap()),
250            cardano_tx_db_connection_builder(&db_dir.join("cardano_tx.db"))
251                .build_pool(1)
252                .map(Arc::new)
253                .unwrap(),
254            Arc::new(SignedEntityTypeLock::default()),
255            vec![task1, task2],
256            TestLogger::stdout(),
257        );
258
259        service.run(Epoch(14)).await.expect("Upkeep service failed");
260    }
261}