mithril_aggregator/commands/
serve_command.rs1use std::{
2 net::IpAddr,
3 path::{Path, PathBuf},
4 time::Duration,
5};
6
7use anyhow::{anyhow, Context};
8use chrono::TimeDelta;
9use clap::Parser;
10
11use config::{builder::DefaultState, ConfigBuilder, Map, Source, Value};
12
13use slog::{crit, debug, info, warn, Logger};
14use tokio::{sync::oneshot, task::JoinSet};
15
16use mithril_cli_helper::{
17 register_config_value, register_config_value_bool, register_config_value_option,
18};
19use mithril_common::StdResult;
20use mithril_metric::MetricsServer;
21
22use crate::{dependency_injection::DependenciesBuilder, tools::VacuumTracker, Configuration};
23
24const VACUUM_MINIMUM_INTERVAL: TimeDelta = TimeDelta::weeks(1);
25
26#[derive(Parser, Debug, Clone)]
28pub struct ServeCommand {
29 #[clap(long)]
31 server_ip: Option<String>,
32
33 #[clap(long)]
35 server_port: Option<u16>,
36
37 #[clap(long)]
40 snapshot_directory: Option<PathBuf>,
41
42 #[clap(long)]
44 disable_digests_cache: bool,
45
46 #[clap(long)]
50 reset_digests_cache: bool,
51
52 #[clap(long)]
56 allow_unparsable_block: bool,
57
58 #[clap(long)]
60 enable_metrics_server: bool,
61
62 #[clap(long)]
64 metrics_server_ip: Option<String>,
65
66 #[clap(long)]
68 metrics_server_port: Option<u16>,
69
70 #[clap(long)]
76 leader_aggregator_endpoint: Option<String>,
77}
78
79impl Source for ServeCommand {
80 fn clone_into_box(&self) -> Box<dyn Source + Send + Sync> {
81 Box::new(self.clone())
82 }
83
84 fn collect(&self) -> Result<Map<String, Value>, config::ConfigError> {
85 let mut result = Map::new();
86 let namespace = "clap arguments".to_string();
87
88 register_config_value_option!(result, &namespace, self.server_ip);
89 register_config_value_option!(result, &namespace, self.server_port);
90 register_config_value_option!(
91 result,
92 &namespace,
93 self.snapshot_directory,
94 |v: PathBuf| format!("{}", v.to_string_lossy())
95 );
96 register_config_value_bool!(result, &namespace, self.disable_digests_cache);
97 register_config_value_bool!(result, &namespace, self.reset_digests_cache);
98 register_config_value_bool!(result, &namespace, self.allow_unparsable_block);
99 register_config_value_bool!(result, &namespace, self.enable_metrics_server);
100 register_config_value_option!(result, &namespace, self.metrics_server_ip);
101 register_config_value_option!(result, &namespace, self.metrics_server_port);
102 register_config_value_option!(
103 result,
104 &namespace,
105 self.leader_aggregator_endpoint,
106 |v: String| { Some(v) }
107 );
108
109 Ok(result)
110 }
111}
112
113impl ServeCommand {
114 pub async fn execute(
115 &self,
116 root_logger: Logger,
117 mut config_builder: ConfigBuilder<DefaultState>,
118 ) -> StdResult<()> {
119 config_builder = config_builder.add_source(self.clone());
120 let config: Configuration = config_builder
121 .build()
122 .with_context(|| "configuration build error")?
123 .try_deserialize()
124 .with_context(|| "configuration deserialize error")?;
125 debug!(root_logger, "SERVE command"; "config" => format!("{config:?}"));
126 let mut dependencies_builder =
127 DependenciesBuilder::new(root_logger.clone(), config.clone());
128
129 println!("Starting server...");
131 println!("Press Ctrl+C to stop");
132
133 let mut event_store = dependencies_builder
135 .create_event_store()
136 .await
137 .with_context(|| "Dependencies Builder can not create event store")?;
138 let event_store_thread = tokio::spawn(async move { event_store.run().await.unwrap() });
139
140 self.perform_database_vacuum_if_needed(
142 &config.data_stores_directory,
143 &mut dependencies_builder,
144 VACUUM_MINIMUM_INTERVAL,
145 &root_logger,
146 )
147 .await?;
148
149 let mut runtime = dependencies_builder
151 .create_aggregator_runner()
152 .await
153 .with_context(|| "Dependencies Builder can not create aggregator runner")?;
154 let mut join_set = JoinSet::new();
155 join_set.spawn(async move { runtime.run().await.map_err(|e| e.to_string()) });
156
157 let cardano_transactions_preloader = dependencies_builder
159 .create_cardano_transactions_preloader()
160 .await
161 .with_context(|| {
162 "Dependencies Builder can not create cardano transactions preloader"
163 })?;
164 let preload_task =
165 tokio::spawn(async move { cardano_transactions_preloader.preload().await });
166
167 let (shutdown_tx, shutdown_rx) = oneshot::channel();
169 let routes = dependencies_builder
170 .create_http_routes()
171 .await
172 .with_context(|| "Dependencies Builder can not create http routes")?;
173 join_set.spawn(async move {
174 let (_, server) = warp::serve(routes).bind_with_graceful_shutdown(
175 (
176 config.server_ip.clone().parse::<IpAddr>().unwrap(),
177 config.server_port,
178 ),
179 async {
180 shutdown_rx.await.ok();
181 },
182 );
183 server.await;
184
185 Ok(())
186 });
187
188 if let Some(cexplorer_pools_url) = config.cexplorer_pools_url {
190 match dependencies_builder
191 .create_signer_importer(&cexplorer_pools_url)
192 .await
193 {
194 Ok(service) => {
195 join_set.spawn(async move {
196 tokio::time::sleep(Duration::from_secs(5)).await;
199 service
200 .run_forever(Duration::from_secs(
201 config.signer_importer_run_interval * 60,
203 ))
204 .await;
205 Ok(())
206 });
207 }
208 Err(error) => {
209 warn!(
210 root_logger, "Failed to build the `SignersImporter`";
211 "url_to_import" => cexplorer_pools_url,
212 "error" => ?error
213 );
214 }
215 }
216 }
217
218 let mut usage_reporter = dependencies_builder
219 .create_usage_reporter()
220 .await
221 .with_context(|| "Dependencies Builder can not create usage reporter")?;
222 join_set.spawn(async move {
223 let interval_duration =
224 Duration::from_secs(config.persist_usage_report_interval_in_seconds);
225 usage_reporter.run_forever(interval_duration).await;
226 Ok(())
227 });
228
229 let metrics_service = dependencies_builder
230 .get_metrics_service()
231 .await
232 .with_context(|| "Metrics service initialization error")?;
233 let (metrics_server_shutdown_tx, metrics_server_shutdown_rx) = oneshot::channel();
234 if config.enable_metrics_server {
235 let metrics_logger = root_logger.clone();
236 join_set.spawn(async move {
237 let _ = MetricsServer::new(
238 &config.metrics_server_ip,
239 config.metrics_server_port,
240 metrics_service,
241 metrics_logger.clone(),
242 )
243 .start(metrics_server_shutdown_rx)
244 .await
245 .map_err(|e| anyhow!(e));
246
247 Ok(())
248 });
249 }
250
251 join_set.spawn(async { tokio::signal::ctrl_c().await.map_err(|e| e.to_string()) });
252 dependencies_builder.vanish().await;
253
254 if let Err(e) = join_set.join_next().await.unwrap()? {
255 crit!(root_logger, "A critical error occurred"; "error" => e);
256 }
257
258 metrics_server_shutdown_tx
259 .send(())
260 .map_err(|e| anyhow!("Metrics server shutdown signal could not be sent: {e:?}"))?;
261
262 join_set.shutdown().await;
264 let _ = shutdown_tx.send(());
265
266 if !preload_task.is_finished() {
267 preload_task.abort();
268 }
269
270 info!(root_logger, "Event store is finishing...");
271 event_store_thread.await.unwrap();
272 println!("Services stopped, exiting.");
273
274 Ok(())
275 }
276
277 async fn perform_database_vacuum_if_needed(
281 &self,
282 store_dir: &Path,
283 dependencies_builder: &mut DependenciesBuilder,
284 vacuum_min_interval: TimeDelta,
285 logger: &Logger,
286 ) -> StdResult<()> {
287 let vacuum_tracker = VacuumTracker::new(store_dir, vacuum_min_interval, logger.clone());
288 match vacuum_tracker.check_vacuum_needed() {
289 Ok((true, _)) => {
290 info!(logger, "Performing vacuum");
291
292 let upkeep = dependencies_builder
293 .get_upkeep_service()
294 .await
295 .with_context(|| "Dependencies Builder can not create upkeep")?;
296
297 upkeep
298 .vacuum()
299 .await
300 .with_context(|| "Upkeep service failed to vacuum database")?;
301
302 match vacuum_tracker.update_last_vacuum_time() {
303 Ok(last_vacuum) => {
304 info!(logger, "Vacuum performed"; "last_vacuum" => last_vacuum.to_rfc3339());
305 }
306 Err(e) => {
307 warn!(logger, "Failed to update last vacuum time"; "error" => ?e);
308 }
309 }
310 }
311 Ok((false, last_vacuum)) => {
312 let time_display =
313 last_vacuum.map_or_else(|| "never".to_string(), |time| time.to_rfc3339());
314 info!(logger, "No vacuum needed"; "last_vacuum" => time_display);
315 }
316 Err(e) => {
317 warn!(logger, "Failed to check if vacuum is needed"; "error" => ?e);
318 }
319 }
320
321 Ok(())
322 }
323}