mithril_signer/services/
upkeep_service.rs1use std::sync::Arc;
9
10use anyhow::Context;
11use async_trait::async_trait;
12use slog::{info, Logger};
13
14use mithril_common::entities::Epoch;
15use mithril_common::logging::LoggerExtensions;
16use mithril_common::StdResult;
17use mithril_persistence::sqlite::{
18 SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool,
19};
20use mithril_signed_entity_lock::SignedEntityTypeLock;
21
22#[cfg_attr(test, mockall::automock)]
24#[async_trait]
25pub trait UpkeepService: Send + Sync {
26 async fn run(&self, current_epoch: Epoch) -> StdResult<()>;
28}
29
30#[cfg_attr(test, mockall::automock)]
32#[async_trait]
33pub trait EpochPruningTask: Send + Sync {
34 fn pruned_data(&self) -> &'static str;
36
37 async fn prune(&self, current_epoch: Epoch) -> StdResult<()>;
39}
40
41pub struct SignerUpkeepService {
46 main_db_connection: Arc<SqliteConnection>,
47 cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
48 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
49 pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
50 logger: Logger,
51}
52
53impl SignerUpkeepService {
54 pub fn new(
56 main_db_connection: Arc<SqliteConnection>,
57 cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
58 signed_entity_type_lock: Arc<SignedEntityTypeLock>,
59 pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
60 logger: Logger,
61 ) -> Self {
62 Self {
63 main_db_connection,
64 cardano_tx_connection_pool,
65 signed_entity_type_lock,
66 pruning_tasks,
67 logger: logger.new_with_component_name::<Self>(),
68 }
69 }
70
71 async fn execute_pruning_tasks(&self, current_epoch: Epoch) -> StdResult<()> {
72 for task in &self.pruning_tasks {
73 info!(
74 self.logger, "Pruning stale data";
75 "pruned_data" => task.pruned_data(), "current_epoch" => ?current_epoch
76 );
77 task.prune(current_epoch).await?;
78 }
79
80 Ok(())
81 }
82
83 async fn upkeep_all_databases(&self) -> StdResult<()> {
84 if self.signed_entity_type_lock.has_locked_entities().await {
85 info!(
86 self.logger,
87 "Some entities are locked - Skipping database upkeep"
88 );
89 return Ok(());
90 }
91
92 let main_db_connection = self.main_db_connection.clone();
93 let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone();
94 let db_upkeep_logger = self.logger.clone();
95
96 let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
98 info!(db_upkeep_logger, "Cleaning main database");
99 SqliteCleaner::new(&main_db_connection)
100 .with_logger(db_upkeep_logger.clone())
101 .with_tasks(&[
102 SqliteCleaningTask::Vacuum,
103 SqliteCleaningTask::WalCheckpointTruncate,
104 ])
105 .run()?;
106
107 info!(db_upkeep_logger, "Cleaning cardano transactions database");
108 let cardano_tx_db_connection = cardano_tx_db_connection_pool.connection()?;
109 SqliteCleaner::new(&cardano_tx_db_connection)
110 .with_logger(db_upkeep_logger.clone())
111 .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
112 .run()?;
113
114 Ok(())
115 });
116
117 db_upkeep_thread
118 .await
119 .with_context(|| "Database Upkeep thread crashed")?
120 }
121}
122
123#[async_trait]
124impl UpkeepService for SignerUpkeepService {
125 async fn run(&self, current_epoch: Epoch) -> StdResult<()> {
126 info!(self.logger, "Start upkeep of the application");
127
128 self.execute_pruning_tasks(current_epoch)
129 .await
130 .with_context(|| "Pruning tasks failed")?;
131
132 self.upkeep_all_databases()
133 .await
134 .with_context(|| "Database upkeep failed")?;
135
136 info!(self.logger, "Upkeep finished");
137 Ok(())
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use mockall::predicate::eq;
144
145 use mithril_common::entities::SignedEntityTypeDiscriminants;
146 use mithril_common::test_utils::TempDir;
147
148 use crate::database::test_helper::{
149 cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection,
150 main_db_file_connection,
151 };
152 use crate::test_tools::TestLogger;
153
154 use super::*;
155
156 fn mock_epoch_pruning_task(
157 mock_config: impl FnOnce(&mut MockEpochPruningTask),
158 ) -> Arc<dyn EpochPruningTask> {
159 let mut task_mock = MockEpochPruningTask::new();
160 task_mock.expect_pruned_data().return_const("mock_data");
161 mock_config(&mut task_mock);
162 Arc::new(task_mock)
163 }
164
165 #[tokio::test]
166 async fn test_cleanup_database() {
167 let (main_db_path, ctx_db_path, log_path) = {
168 let db_dir = TempDir::create("signer_upkeep", "test_cleanup_database");
169 (
170 db_dir.join("main.db"),
171 db_dir.join("cardano_tx.db"),
172 db_dir.join("upkeep.log"),
173 )
174 };
175
176 let main_db_connection = main_db_file_connection(&main_db_path).unwrap();
177 let cardano_tx_connection = cardano_tx_db_file_connection(&ctx_db_path).unwrap();
178
179 {
181 let service = SignerUpkeepService::new(
182 Arc::new(main_db_connection),
183 Arc::new(SqliteConnectionPool::build_from_connection(
184 cardano_tx_connection,
185 )),
186 Arc::new(SignedEntityTypeLock::default()),
187 vec![],
188 TestLogger::file(&log_path),
189 );
190
191 service.run(Epoch(13)).await.expect("Upkeep service failed");
192 }
193
194 let logs = std::fs::read_to_string(&log_path).unwrap();
195
196 assert_eq!(
197 logs.matches(SqliteCleaningTask::Vacuum.log_message())
198 .count(),
199 1,
200 "Should have run only once since only the main database has a `Vacuum` cleanup"
201 );
202 assert_eq!(
203 logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
204 .count(),
205 2,
206 "Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup"
207 );
208 }
209
210 #[tokio::test]
211 async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
212 let log_path = TempDir::create(
213 "signer_upkeep",
214 "test_doesnt_cleanup_db_if_any_entity_is_locked",
215 )
216 .join("upkeep.log");
217
218 let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
219 signed_entity_type_lock
220 .lock(SignedEntityTypeDiscriminants::CardanoTransactions)
221 .await;
222
223 {
225 let service = SignerUpkeepService::new(
226 Arc::new(main_db_connection().unwrap()),
227 Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
228 signed_entity_type_lock.clone(),
229 vec![],
230 TestLogger::file(&log_path),
231 );
232
233 service.run(Epoch(13)).await.expect("Upkeep service failed");
234 }
235
236 let logs = std::fs::read_to_string(&log_path).unwrap();
237
238 assert_eq!(
239 logs.matches(SqliteCleaningTask::Vacuum.log_message())
240 .count(),
241 0,
242 );
243 assert_eq!(
244 logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
245 .count(),
246 0,
247 );
248 }
249
250 #[tokio::test]
251 async fn test_execute_all_pruning_tasks() {
252 let task1 = mock_epoch_pruning_task(|mock| {
253 mock.expect_prune()
254 .once()
255 .with(eq(Epoch(14)))
256 .returning(|_| Ok(()));
257 });
258 let task2 = mock_epoch_pruning_task(|mock| {
259 mock.expect_prune()
260 .once()
261 .with(eq(Epoch(14)))
262 .returning(|_| Ok(()));
263 });
264
265 let service = SignerUpkeepService::new(
266 Arc::new(main_db_connection().unwrap()),
267 Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
268 Arc::new(SignedEntityTypeLock::default()),
269 vec![task1, task2],
270 TestLogger::stdout(),
271 );
272
273 service.run(Epoch(14)).await.expect("Upkeep service failed");
274 }
275}