mithril_aggregator/commands/
serve_command.rs1use std::{
2 collections::HashMap,
3 net::IpAddr,
4 path::{Path, PathBuf},
5 sync::Arc,
6 time::Duration,
7};
8
9use anyhow::{Context, anyhow};
10use chrono::TimeDelta;
11use clap::Parser;
12
13use config::{ConfigBuilder, Map, Source, Value, builder::DefaultState};
14
15use slog::{Logger, crit, debug, info, warn};
16use tokio::task::JoinSet;
17
18use mithril_cli_helper::{
19 register_config_value, register_config_value_bool, register_config_value_option,
20};
21use mithril_common::StdResult;
22use mithril_doc::{Documenter, DocumenterDefault, StructDoc};
23use mithril_metric::MetricsServer;
24
25use crate::{
26 DefaultConfiguration, ServeCommandConfiguration, dependency_injection::DependenciesBuilder,
27 tools::VacuumTracker,
28};
29
30const VACUUM_MINIMUM_INTERVAL: TimeDelta = TimeDelta::weeks(1);
31
32#[derive(Parser, Debug, Clone)]
34pub struct ServeCommand {
35 #[clap(long)]
37 server_ip: Option<String>,
38
39 #[clap(long)]
41 server_port: Option<u16>,
42
43 #[clap(long)]
46 snapshot_directory: Option<PathBuf>,
47
48 #[clap(long)]
50 disable_digests_cache: bool,
51
52 #[clap(long)]
56 reset_digests_cache: bool,
57
58 #[clap(long)]
62 allow_unparsable_block: bool,
63
64 #[clap(long)]
66 enable_metrics_server: bool,
67
68 #[clap(long)]
70 metrics_server_ip: Option<String>,
71
72 #[clap(long)]
74 metrics_server_port: Option<u16>,
75
76 #[clap(long)]
82 leader_aggregator_endpoint: Option<String>,
83}
84
85impl Source for ServeCommand {
86 fn clone_into_box(&self) -> Box<dyn Source + Send + Sync> {
87 Box::new(self.clone())
88 }
89
90 fn collect(&self) -> Result<Map<String, Value>, config::ConfigError> {
91 let mut result = Map::new();
92 let namespace = "clap arguments".to_string();
93
94 register_config_value_option!(result, &namespace, self.server_ip);
95 register_config_value_option!(result, &namespace, self.server_port);
96 register_config_value_option!(
97 result,
98 &namespace,
99 self.snapshot_directory,
100 |v: PathBuf| format!("{}", v.to_string_lossy())
101 );
102 register_config_value_bool!(result, &namespace, self.disable_digests_cache);
103 register_config_value_bool!(result, &namespace, self.reset_digests_cache);
104 register_config_value_bool!(result, &namespace, self.allow_unparsable_block);
105 register_config_value_bool!(result, &namespace, self.enable_metrics_server);
106 register_config_value_option!(result, &namespace, self.metrics_server_ip);
107 register_config_value_option!(result, &namespace, self.metrics_server_port);
108 register_config_value_option!(
109 result,
110 &namespace,
111 self.leader_aggregator_endpoint,
112 |v: String| { Some(v) }
113 );
114
115 Ok(result)
116 }
117}
118
119impl ServeCommand {
120 pub async fn execute(
121 &self,
122 root_logger: Logger,
123 mut config_builder: ConfigBuilder<DefaultState>,
124 ) -> StdResult<()> {
125 config_builder = config_builder.add_source(self.clone());
126 let config: ServeCommandConfiguration = config_builder
127 .build()
128 .with_context(|| "configuration build error")?
129 .try_deserialize()
130 .with_context(|| "configuration deserialize error")?;
131 debug!(root_logger, "SERVE command"; "config" => format!("{config:?}"));
132 let mut dependencies_builder =
133 DependenciesBuilder::new(root_logger.clone(), Arc::new(config.clone()));
134
135 println!("Starting server...");
136 println!("Press Ctrl+C to stop");
137
138 let (stop_tx, stop_rx) = dependencies_builder
140 .get_stop_signal_channel()
141 .await
142 .with_context(|| "Dependencies Builder can not create stop signal channel")?;
143
144 let mut event_store = dependencies_builder
146 .create_event_store()
147 .await
148 .with_context(|| "Dependencies Builder can not create event store")?;
149 let event_store_thread = tokio::spawn(async move { event_store.run().await.unwrap() });
150
151 self.perform_database_vacuum_if_needed(
153 &config.data_stores_directory,
154 &mut dependencies_builder,
155 VACUUM_MINIMUM_INTERVAL,
156 &root_logger,
157 )
158 .await?;
159
160 let mut runtime = dependencies_builder
162 .create_aggregator_runner()
163 .await
164 .with_context(|| "Dependencies Builder can not create aggregator runner")?;
165 let mut join_set = JoinSet::new();
166 join_set.spawn(async move { runtime.run().await.map_err(|e| e.to_string()) });
167
168 let cardano_transactions_preloader = dependencies_builder
170 .create_cardano_transactions_preloader()
171 .await
172 .with_context(
173 || "Dependencies Builder can not create cardano transactions preloader",
174 )?;
175 let preload_task =
176 tokio::spawn(async move { cardano_transactions_preloader.preload().await });
177
178 let routes = dependencies_builder
180 .create_http_routes()
181 .await
182 .with_context(|| "Dependencies Builder can not create http routes")?;
183 let mut stop_rx_clone = stop_rx.clone();
184 join_set.spawn(async move {
185 let (_, server) = warp::serve(routes).bind_with_graceful_shutdown(
186 (
187 config.server_ip.clone().parse::<IpAddr>().unwrap(),
188 config.server_port,
189 ),
190 async move {
191 stop_rx_clone.changed().await.ok();
192 },
193 );
194 server.await;
195
196 Ok(())
197 });
198
199 let signature_processor = dependencies_builder
200 .create_signature_processor()
201 .await
202 .with_context(|| "Dependencies Builder can not create signature processor")?;
203 let signature_processor_clone = signature_processor.clone();
204 join_set.spawn(async move {
205 signature_processor_clone.run().await.map_err(|e| e.to_string())?;
206
207 Ok(())
208 });
209
210 if let Some(cexplorer_pools_url) = config.cexplorer_pools_url {
212 match dependencies_builder
213 .create_signer_importer(&cexplorer_pools_url)
214 .await
215 {
216 Ok(service) => {
217 join_set.spawn(async move {
218 tokio::time::sleep(Duration::from_secs(5)).await;
221 service
222 .run_forever(Duration::from_secs(
223 config.signer_importer_run_interval * 60,
225 ))
226 .await;
227 Ok(())
228 });
229 }
230 Err(error) => {
231 warn!(
232 root_logger, "Failed to build the `SignersImporter`";
233 "url_to_import" => cexplorer_pools_url,
234 "error" => ?error
235 );
236 }
237 }
238 }
239
240 let mut usage_reporter = dependencies_builder
241 .create_usage_reporter()
242 .await
243 .with_context(|| "Dependencies Builder can not create usage reporter")?;
244 join_set.spawn(async move {
245 let interval_duration =
246 Duration::from_secs(config.persist_usage_report_interval_in_seconds);
247 usage_reporter.run_forever(interval_duration).await;
248 Ok(())
249 });
250
251 let metrics_service = dependencies_builder
252 .get_metrics_service()
253 .await
254 .with_context(|| "Metrics service initialization error")?;
255 let stop_rx_clone = stop_rx.clone();
256 if config.enable_metrics_server {
257 let metrics_logger = root_logger.clone();
258 join_set.spawn(async move {
259 if let Err(serve_err) = MetricsServer::build(
260 &config.metrics_server_ip,
261 config.metrics_server_port,
262 metrics_service,
263 metrics_logger.clone(),
264 )
265 .serve(stop_rx_clone)
266 .await
267 {
268 warn!(
269 metrics_logger, "Could not start Metrics server";
270 "error" => ?serve_err, "ip" => &config.metrics_server_ip, "port" => &config.metrics_server_port
271 );
272 };
273
274 Ok(())
275 });
276 }
277
278 join_set.spawn(async { tokio::signal::ctrl_c().await.map_err(|e| e.to_string()) });
279 dependencies_builder.vanish().await;
280
281 if let Err(e) = join_set.join_next().await.unwrap()? {
282 crit!(root_logger, "A critical error occurred"; "error" => e);
283 }
284
285 join_set.shutdown().await;
287
288 stop_tx
290 .send(())
291 .map_err(|e| anyhow!("Stop signal could not be sent: {e:?}"))?;
292
293 if !preload_task.is_finished() {
294 preload_task.abort();
295 }
296
297 info!(root_logger, "Event store is finishing...");
298 event_store_thread.await.unwrap();
299 println!("Services stopped, exiting.");
300
301 Ok(())
302 }
303
304 async fn perform_database_vacuum_if_needed(
308 &self,
309 store_dir: &Path,
310 dependencies_builder: &mut DependenciesBuilder,
311 vacuum_min_interval: TimeDelta,
312 logger: &Logger,
313 ) -> StdResult<()> {
314 let vacuum_tracker = VacuumTracker::new(store_dir, vacuum_min_interval, logger.clone());
315 match vacuum_tracker.check_vacuum_needed() {
316 Ok((true, _)) => {
317 info!(logger, "Performing vacuum");
318
319 let upkeep = dependencies_builder
320 .get_upkeep_service()
321 .await
322 .with_context(|| "Dependencies Builder can not create upkeep")?;
323
324 upkeep
325 .vacuum()
326 .await
327 .with_context(|| "Upkeep service failed to vacuum database")?;
328
329 match vacuum_tracker.update_last_vacuum_time() {
330 Ok(last_vacuum) => {
331 info!(logger, "Vacuum performed"; "last_vacuum" => last_vacuum.to_rfc3339());
332 }
333 Err(e) => {
334 warn!(logger, "Failed to update last vacuum time"; "error" => ?e);
335 }
336 }
337 }
338 Ok((false, last_vacuum)) => {
339 let time_display =
340 last_vacuum.map_or_else(|| "never".to_string(), |time| time.to_rfc3339());
341 info!(logger, "No vacuum needed"; "last_vacuum" => time_display);
342 }
343 Err(e) => {
344 warn!(logger, "Failed to check if vacuum is needed"; "error" => ?e);
345 }
346 }
347
348 Ok(())
349 }
350
351 pub fn extract_config(command_path: String) -> HashMap<String, StructDoc> {
352 HashMap::from([(
353 command_path,
354 ServeCommandConfiguration::extract().merge_struct_doc(&DefaultConfiguration::extract()),
355 )])
356 }
357}