mithril_aggregator/commands/
serve_command.rs

1use std::{
2    collections::HashMap,
3    net::IpAddr,
4    path::{Path, PathBuf},
5    sync::Arc,
6    time::Duration,
7};
8
9use anyhow::{Context, anyhow};
10use chrono::TimeDelta;
11use clap::Parser;
12
13use config::{ConfigBuilder, Map, Source, Value, builder::DefaultState};
14
15use slog::{Logger, crit, debug, info, warn};
16use tokio::task::JoinSet;
17
18use mithril_cli_helper::{
19    register_config_value, register_config_value_bool, register_config_value_option,
20};
21use mithril_common::StdResult;
22use mithril_doc::{Documenter, DocumenterDefault, StructDoc};
23use mithril_metric::MetricsServer;
24
25use crate::{
26    DefaultConfiguration, ServeCommandConfiguration, dependency_injection::DependenciesBuilder,
27    tools::VacuumTracker,
28};
29
30const VACUUM_MINIMUM_INTERVAL: TimeDelta = TimeDelta::weeks(1);
31
32/// Server runtime mode
33#[derive(Parser, Debug, Clone)]
34pub struct ServeCommand {
35    /// Server listening IP
36    #[clap(long)]
37    server_ip: Option<String>,
38
39    /// Server TCP port
40    #[clap(long)]
41    server_port: Option<u16>,
42
43    /// Directory to store snapshot
44    /// Defaults to work folder
45    #[clap(long)]
46    snapshot_directory: Option<PathBuf>,
47
48    /// Disable immutables digests cache.
49    #[clap(long)]
50    disable_digests_cache: bool,
51
52    /// If set the existing immutables digests cache will be reset.
53    ///
54    /// Will be ignored if set in conjunction with `--disable-digests-cache`.
55    #[clap(long)]
56    reset_digests_cache: bool,
57
58    /// If set no error is returned in case of unparsable block and an error log is written instead.
59    ///
60    /// Will be ignored on (pre)production networks.
61    #[clap(long)]
62    allow_unparsable_block: bool,
63
64    /// Enable metrics HTTP server (Prometheus endpoint on /metrics).
65    #[clap(long)]
66    enable_metrics_server: bool,
67
68    /// Metrics HTTP server IP.
69    #[clap(long)]
70    metrics_server_ip: Option<String>,
71
72    /// Metrics HTTP server listening port.
73    #[clap(long)]
74    metrics_server_port: Option<u16>,
75
76    /// Leader aggregator endpoint
77    ///
78    /// This is the endpoint of the aggregator that will be used to fetch the latest epoch settings
79    /// and store the signer registrations when the aggregator is running in a follower mode.
80    /// If this is not set, the aggregator will run in a leader mode.
81    #[clap(long)]
82    leader_aggregator_endpoint: Option<String>,
83}
84
85impl Source for ServeCommand {
86    fn clone_into_box(&self) -> Box<dyn Source + Send + Sync> {
87        Box::new(self.clone())
88    }
89
90    fn collect(&self) -> Result<Map<String, Value>, config::ConfigError> {
91        let mut result = Map::new();
92        let namespace = "clap arguments".to_string();
93
94        register_config_value_option!(result, &namespace, self.server_ip);
95        register_config_value_option!(result, &namespace, self.server_port);
96        register_config_value_option!(
97            result,
98            &namespace,
99            self.snapshot_directory,
100            |v: PathBuf| format!("{}", v.to_string_lossy())
101        );
102        register_config_value_bool!(result, &namespace, self.disable_digests_cache);
103        register_config_value_bool!(result, &namespace, self.reset_digests_cache);
104        register_config_value_bool!(result, &namespace, self.allow_unparsable_block);
105        register_config_value_bool!(result, &namespace, self.enable_metrics_server);
106        register_config_value_option!(result, &namespace, self.metrics_server_ip);
107        register_config_value_option!(result, &namespace, self.metrics_server_port);
108        register_config_value_option!(
109            result,
110            &namespace,
111            self.leader_aggregator_endpoint,
112            |v: String| { Some(v) }
113        );
114
115        Ok(result)
116    }
117}
118
119impl ServeCommand {
120    pub async fn execute(
121        &self,
122        root_logger: Logger,
123        mut config_builder: ConfigBuilder<DefaultState>,
124    ) -> StdResult<()> {
125        config_builder = config_builder.add_source(self.clone());
126        let config: ServeCommandConfiguration = config_builder
127            .build()
128            .with_context(|| "configuration build error")?
129            .try_deserialize()
130            .with_context(|| "configuration deserialize error")?;
131        debug!(root_logger, "SERVE command"; "config" => format!("{config:?}"));
132        let mut dependencies_builder =
133            DependenciesBuilder::new(root_logger.clone(), Arc::new(config.clone()));
134
135        println!("Starting server...");
136        println!("Press Ctrl+C to stop");
137
138        // Create the stop signal channel
139        let (stop_tx, stop_rx) = dependencies_builder
140            .get_stop_signal_channel()
141            .await
142            .with_context(|| "Dependencies Builder can not create stop signal channel")?;
143
144        // Start the monitoring thread
145        let mut event_store = dependencies_builder
146            .create_event_store()
147            .await
148            .with_context(|| "Dependencies Builder can not create event store")?;
149        let event_store_thread = tokio::spawn(async move { event_store.run().await.unwrap() });
150
151        // Start the database vacuum operation, if needed
152        self.perform_database_vacuum_if_needed(
153            &config.data_stores_directory,
154            &mut dependencies_builder,
155            VACUUM_MINIMUM_INTERVAL,
156            &root_logger,
157        )
158        .await?;
159
160        // Start the aggregator runtime
161        let mut runtime = dependencies_builder
162            .create_aggregator_runner()
163            .await
164            .with_context(|| "Dependencies Builder can not create aggregator runner")?;
165        let mut join_set = JoinSet::new();
166        join_set.spawn(async move { runtime.run().await.map_err(|e| e.to_string()) });
167
168        // Start the cardano transactions preloader
169        let cardano_transactions_preloader = dependencies_builder
170            .create_cardano_transactions_preloader()
171            .await
172            .with_context(
173                || "Dependencies Builder can not create cardano transactions preloader",
174            )?;
175        let preload_task =
176            tokio::spawn(async move { cardano_transactions_preloader.preload().await });
177
178        // Start the HTTP server
179        let routes = dependencies_builder
180            .create_http_routes()
181            .await
182            .with_context(|| "Dependencies Builder can not create http routes")?;
183        let mut stop_rx_clone = stop_rx.clone();
184        join_set.spawn(async move {
185            let (_, server) = warp::serve(routes).bind_with_graceful_shutdown(
186                (
187                    config.server_ip.clone().parse::<IpAddr>().unwrap(),
188                    config.server_port,
189                ),
190                async move {
191                    stop_rx_clone.changed().await.ok();
192                },
193            );
194            server.await;
195
196            Ok(())
197        });
198
199        let signature_processor = dependencies_builder
200            .create_signature_processor()
201            .await
202            .with_context(|| "Dependencies Builder can not create signature processor")?;
203        let signature_processor_clone = signature_processor.clone();
204        join_set.spawn(async move {
205            signature_processor_clone.run().await.map_err(|e| e.to_string())?;
206
207            Ok(())
208        });
209
210        // Create a SignersImporter only if the `cexplorer_pools_url` is provided in the config.
211        if let Some(cexplorer_pools_url) = config.cexplorer_pools_url {
212            match dependencies_builder
213                .create_signer_importer(&cexplorer_pools_url)
214                .await
215            {
216                Ok(service) => {
217                    join_set.spawn(async move {
218                        // Wait 5s to let the other services the time to start before running
219                        // the first import.
220                        tokio::time::sleep(Duration::from_secs(5)).await;
221                        service
222                            .run_forever(Duration::from_secs(
223                                // Import interval are in minutes
224                                config.signer_importer_run_interval * 60,
225                            ))
226                            .await;
227                        Ok(())
228                    });
229                }
230                Err(error) => {
231                    warn!(
232                        root_logger, "Failed to build the `SignersImporter`";
233                        "url_to_import" => cexplorer_pools_url,
234                        "error" => ?error
235                    );
236                }
237            }
238        }
239
240        let mut usage_reporter = dependencies_builder
241            .create_usage_reporter()
242            .await
243            .with_context(|| "Dependencies Builder can not create usage reporter")?;
244        join_set.spawn(async move {
245            let interval_duration =
246                Duration::from_secs(config.persist_usage_report_interval_in_seconds);
247            usage_reporter.run_forever(interval_duration).await;
248            Ok(())
249        });
250
251        let metrics_service = dependencies_builder
252            .get_metrics_service()
253            .await
254            .with_context(|| "Metrics service initialization error")?;
255        let stop_rx_clone = stop_rx.clone();
256        if config.enable_metrics_server {
257            let metrics_logger = root_logger.clone();
258            join_set.spawn(async move {
259                if let Err(serve_err) = MetricsServer::build(
260                    &config.metrics_server_ip,
261                    config.metrics_server_port,
262                    metrics_service,
263                    metrics_logger.clone(),
264                )
265                .serve(stop_rx_clone)
266                .await
267                {
268                    warn!(
269                        metrics_logger, "Could not start Metrics server";
270                        "error" => ?serve_err, "ip" => &config.metrics_server_ip, "port" => &config.metrics_server_port
271                    );
272                };
273
274                Ok(())
275            });
276        }
277
278        join_set.spawn(async { tokio::signal::ctrl_c().await.map_err(|e| e.to_string()) });
279        dependencies_builder.vanish().await;
280
281        if let Err(e) = join_set.join_next().await.unwrap()? {
282            crit!(root_logger, "A critical error occurred"; "error" => e);
283        }
284
285        // Stop servers
286        join_set.shutdown().await;
287
288        // Send the stop signal
289        stop_tx
290            .send(())
291            .map_err(|e| anyhow!("Stop signal could not be sent: {e:?}"))?;
292
293        if !preload_task.is_finished() {
294            preload_task.abort();
295        }
296
297        info!(root_logger, "Event store is finishing...");
298        event_store_thread.await.unwrap();
299        println!("Services stopped, exiting.");
300
301        Ok(())
302    }
303
304    /// This function checks if a database vacuum is needed and performs it if necessary.
305    ///
306    /// Errors from [VacuumTracker] operations are logged but not propagated as errors.
307    async fn perform_database_vacuum_if_needed(
308        &self,
309        store_dir: &Path,
310        dependencies_builder: &mut DependenciesBuilder,
311        vacuum_min_interval: TimeDelta,
312        logger: &Logger,
313    ) -> StdResult<()> {
314        let vacuum_tracker = VacuumTracker::new(store_dir, vacuum_min_interval, logger.clone());
315        match vacuum_tracker.check_vacuum_needed() {
316            Ok((true, _)) => {
317                info!(logger, "Performing vacuum");
318
319                let upkeep = dependencies_builder
320                    .get_upkeep_service()
321                    .await
322                    .with_context(|| "Dependencies Builder can not create upkeep")?;
323
324                upkeep
325                    .vacuum()
326                    .await
327                    .with_context(|| "Upkeep service failed to vacuum database")?;
328
329                match vacuum_tracker.update_last_vacuum_time() {
330                    Ok(last_vacuum) => {
331                        info!(logger, "Vacuum performed"; "last_vacuum" => last_vacuum.to_rfc3339());
332                    }
333                    Err(e) => {
334                        warn!(logger, "Failed to update last vacuum time"; "error" => ?e);
335                    }
336                }
337            }
338            Ok((false, last_vacuum)) => {
339                let time_display =
340                    last_vacuum.map_or_else(|| "never".to_string(), |time| time.to_rfc3339());
341                info!(logger, "No vacuum needed"; "last_vacuum" => time_display);
342            }
343            Err(e) => {
344                warn!(logger, "Failed to check if vacuum is needed"; "error" => ?e);
345            }
346        }
347
348        Ok(())
349    }
350
351    pub fn extract_config(command_path: String) -> HashMap<String, StructDoc> {
352        HashMap::from([(
353            command_path,
354            ServeCommandConfiguration::extract().merge_struct_doc(&DefaultConfiguration::extract()),
355        )])
356    }
357}