mithril_signer/services/
upkeep_service.rs1use std::sync::Arc;
9
10use anyhow::Context;
11use async_trait::async_trait;
12use slog::{Logger, info};
13
14use mithril_common::StdResult;
15use mithril_common::entities::Epoch;
16use mithril_common::logging::LoggerExtensions;
17use mithril_persistence::sqlite::{
18 SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool,
19};
20use mithril_signed_entity_lock::SignedEntityTypeLock;
21
22#[cfg_attr(test, mockall::automock)]
24#[async_trait]
25pub trait UpkeepService: Send + Sync {
26 async fn run(&self, current_epoch: Epoch) -> StdResult<()>;
28}
29
30#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait EpochPruningTask: Send + Sync {
34 fn pruned_data(&self) -> &'static str;
36
37 async fn prune(&self, current_epoch: Epoch) -> StdResult<()>;
39}
40
41pub struct SignerUpkeepService {
46 main_db_connection: Arc<SqliteConnection>,
47 cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
48 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
49 pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
50 logger: Logger,
51}
52
53impl SignerUpkeepService {
54 pub fn new(
56 main_db_connection: Arc<SqliteConnection>,
57 cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
58 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
59 pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
60 logger: Logger,
61 ) -> Self {
62 Self {
63 main_db_connection,
64 cardano_tx_connection_pool,
65 signed_entity_type_lock,
66 pruning_tasks,
67 logger: logger.new_with_component_name::<Self>(),
68 }
69 }
70
71 async fn execute_pruning_tasks(&self, current_epoch: Epoch) -> StdResult<()> {
72 for task in &self.pruning_tasks {
73 info!(
74 self.logger, "Pruning stale data";
75 "pruned_data" => task.pruned_data(), "current_epoch" => ?current_epoch
76 );
77 task.prune(current_epoch).await?;
78 }
79
80 Ok(())
81 }
82
83 async fn upkeep_all_databases(&self) -> StdResult<()> {
84 if self.signed_entity_type_lock.has_locked_entities().await {
85 info!(
86 self.logger,
87 "Some entities are locked - Skipping database upkeep"
88 );
89 return Ok(());
90 }
91
92 let main_db_connection = self.main_db_connection.clone();
93 let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone();
94 let db_upkeep_logger = self.logger.clone();
95
96 let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
98 info!(db_upkeep_logger, "Cleaning main database");
99 SqliteCleaner::new(&main_db_connection)
100 .with_logger(db_upkeep_logger.clone())
101 .with_tasks(&[
102 SqliteCleaningTask::Vacuum,
103 SqliteCleaningTask::WalCheckpointTruncate,
104 ])
105 .run()?;
106
107 info!(db_upkeep_logger, "Cleaning cardano transactions database");
108 let cardano_tx_db_connection = cardano_tx_db_connection_pool.connection()?;
109 SqliteCleaner::new(&cardano_tx_db_connection)
110 .with_logger(db_upkeep_logger.clone())
111 .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
112 .run()?;
113
114 Ok(())
115 });
116
117 db_upkeep_thread
118 .await
119 .with_context(|| "Database Upkeep thread crashed")?
120 }
121}
122
123#[async_trait]
124impl UpkeepService for SignerUpkeepService {
125 async fn run(&self, current_epoch: Epoch) -> StdResult<()> {
126 info!(self.logger, "Start upkeep of the application");
127
128 self.execute_pruning_tasks(current_epoch)
129 .await
130 .with_context(|| "Pruning tasks failed")?;
131
132 self.upkeep_all_databases()
133 .await
134 .with_context(|| "Database upkeep failed")?;
135
136 info!(self.logger, "Upkeep finished");
137 Ok(())
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use mockall::predicate::eq;
144
145 use mithril_common::entities::SignedEntityTypeDiscriminants;
146 use mithril_common::temp_dir_create;
147 use mithril_common::test::TempDir;
148
149 use crate::database::test_helper::{
150 cardano_tx_db_connection_builder, main_db_connection, main_db_file_connection,
151 };
152 use crate::test::TestLogger;
153
154 use super::*;
155
156 fn mock_epoch_pruning_task(
157 mock_config: impl FnOnce(&mut MockEpochPruningTask),
158 ) -> Arc<dyn EpochPruningTask> {
159 let mut task_mock = MockEpochPruningTask::new();
160 task_mock.expect_pruned_data().return_const("mock_data");
161 mock_config(&mut task_mock);
162 Arc::new(task_mock)
163 }
164
165 #[tokio::test]
166 async fn test_cleanup_database() {
167 let (logger, log_inspector) = TestLogger::memory();
168 let (main_db_path, ctx_db_path) = {
169 let db_dir = TempDir::create("signer_upkeep", "test_cleanup_database");
170 (db_dir.join("main.db"), db_dir.join("cardano_tx.db"))
171 };
172
173 let main_db_connection = main_db_file_connection(&main_db_path).unwrap();
174 let cardano_tx_conn_builder = cardano_tx_db_connection_builder(&ctx_db_path);
175
176 let service = SignerUpkeepService::new(
177 Arc::new(main_db_connection),
178 Arc::new(cardano_tx_conn_builder.build_pool(1).unwrap()),
179 Arc::new(SignedEntityTypeLock::default()),
180 vec![],
181 logger,
182 );
183
184 service.run(Epoch(13)).await.expect("Upkeep service failed");
185
186 assert_eq!(
187 log_inspector
188 .search_logs(SqliteCleaningTask::Vacuum.log_message())
189 .len(),
190 1,
191 "Should have run only once since only the main database has a `Vacuum` cleanup"
192 );
193 assert_eq!(
194 log_inspector
195 .search_logs(SqliteCleaningTask::WalCheckpointTruncate.log_message())
196 .len(),
197 2,
198 "Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup"
199 );
200 }
201
202 #[tokio::test]
203 async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
204 let db_dir = temp_dir_create!();
205 let (logger, log_inspector) = TestLogger::memory();
206
207 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
208 signed_entity_type_lock
209 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
210 .await;
211
212 let service = SignerUpkeepService::new(
213 Arc::new(main_db_connection().unwrap()),
214 Arc::new(
215 cardano_tx_db_connection_builder(&db_dir.join("cardano_tx.db"))
216 .build_pool(1)
217 .unwrap(),
218 ),
219 signed_entity_type_lock.clone(),
220 vec![],
221 logger,
222 );
223
224 service.run(Epoch(13)).await.expect("Upkeep service failed");
225
226 assert!(
227 log_inspector
228 .search_logs(SqliteCleaningTask::Vacuum.log_message())
229 .is_empty()
230 );
231 assert!(
232 log_inspector
233 .search_logs(SqliteCleaningTask::WalCheckpointTruncate.log_message())
234 .is_empty()
235 );
236 }
237
238 #[tokio::test]
239 async fn test_execute_all_pruning_tasks() {
240 let db_dir = temp_dir_create!();
241 let task1 = mock_epoch_pruning_task(|mock| {
242 mock.expect_prune().once().with(eq(Epoch(14))).returning(|_| Ok(()));
243 });
244 let task2 = mock_epoch_pruning_task(|mock| {
245 mock.expect_prune().once().with(eq(Epoch(14))).returning(|_| Ok(()));
246 });
247
248 let service = SignerUpkeepService::new(
249 Arc::new(main_db_connection().unwrap()),
250 cardano_tx_db_connection_builder(&db_dir.join("cardano_tx.db"))
251 .build_pool(1)
252 .map(Arc::new)
253 .unwrap(),
254 Arc::new(SignedEntityTypeLock::default()),
255 vec![task1, task2],
256 TestLogger::stdout(),
257 );
258
259 service.run(Epoch(14)).await.expect("Upkeep service failed");
260 }
261}