mithril_aggregator/commands/
serve_command.rs

1use std::{
2    net::IpAddr,
3    path::{Path, PathBuf},
4    time::Duration,
5};
6
7use anyhow::{anyhow, Context};
8use chrono::TimeDelta;
9use clap::Parser;
10
11use config::{builder::DefaultState, ConfigBuilder, Map, Source, Value};
12
13use slog::{crit, debug, info, warn, Logger};
14use tokio::{sync::oneshot, task::JoinSet};
15
16use mithril_cli_helper::{
17    register_config_value, register_config_value_bool, register_config_value_option,
18};
19use mithril_common::StdResult;
20use mithril_metric::MetricsServer;
21
22use crate::{dependency_injection::DependenciesBuilder, tools::VacuumTracker, Configuration};
23
24const VACUUM_MINIMUM_INTERVAL: TimeDelta = TimeDelta::weeks(1);
25
26/// Server runtime mode
27#[derive(Parser, Debug, Clone)]
28pub struct ServeCommand {
29    /// Server listening IP
30    #[clap(long)]
31    server_ip: Option<String>,
32
33    /// Server TCP port
34    #[clap(long)]
35    server_port: Option<u16>,
36
37    /// Directory to store snapshot
38    /// Defaults to work folder
39    #[clap(long)]
40    snapshot_directory: Option<PathBuf>,
41
42    /// Disable immutables digests cache.
43    #[clap(long)]
44    disable_digests_cache: bool,
45
46    /// If set the existing immutables digests cache will be reset.
47    ///
48    /// Will be ignored if set in conjunction with `--disable-digests-cache`.
49    #[clap(long)]
50    reset_digests_cache: bool,
51
52    /// If set no error is returned in case of unparsable block and an error log is written instead.
53    ///
54    /// Will be ignored on (pre)production networks.
55    #[clap(long)]
56    allow_unparsable_block: bool,
57
58    /// Enable metrics HTTP server (Prometheus endpoint on /metrics).
59    #[clap(long)]
60    enable_metrics_server: bool,
61
62    /// Metrics HTTP server IP.
63    #[clap(long)]
64    metrics_server_ip: Option<String>,
65
66    /// Metrics HTTP server listening port.
67    #[clap(long)]
68    metrics_server_port: Option<u16>,
69
70    /// Leader aggregator endpoint
71    ///
72    /// This is the endpoint of the aggregator that will be used to fetch the latest epoch settings
73    /// and store the signer registrations when the aggregator is running in a follower mode.
74    /// If this is not set, the aggregator will run in a leader mode.
75    #[clap(long)]
76    leader_aggregator_endpoint: Option<String>,
77}
78
79impl Source for ServeCommand {
80    fn clone_into_box(&self) -> Box<dyn Source + Send + Sync> {
81        Box::new(self.clone())
82    }
83
84    fn collect(&self) -> Result<Map<String, Value>, config::ConfigError> {
85        let mut result = Map::new();
86        let namespace = "clap arguments".to_string();
87
88        register_config_value_option!(result, &namespace, self.server_ip);
89        register_config_value_option!(result, &namespace, self.server_port);
90        register_config_value_option!(
91            result,
92            &namespace,
93            self.snapshot_directory,
94            |v: PathBuf| format!("{}", v.to_string_lossy())
95        );
96        register_config_value_bool!(result, &namespace, self.disable_digests_cache);
97        register_config_value_bool!(result, &namespace, self.reset_digests_cache);
98        register_config_value_bool!(result, &namespace, self.allow_unparsable_block);
99        register_config_value_bool!(result, &namespace, self.enable_metrics_server);
100        register_config_value_option!(result, &namespace, self.metrics_server_ip);
101        register_config_value_option!(result, &namespace, self.metrics_server_port);
102        register_config_value_option!(
103            result,
104            &namespace,
105            self.leader_aggregator_endpoint,
106            |v: String| { Some(v) }
107        );
108
109        Ok(result)
110    }
111}
112
113impl ServeCommand {
114    pub async fn execute(
115        &self,
116        root_logger: Logger,
117        mut config_builder: ConfigBuilder<DefaultState>,
118    ) -> StdResult<()> {
119        config_builder = config_builder.add_source(self.clone());
120        let config: Configuration = config_builder
121            .build()
122            .with_context(|| "configuration build error")?
123            .try_deserialize()
124            .with_context(|| "configuration deserialize error")?;
125        debug!(root_logger, "SERVE command"; "config" => format!("{config:?}"));
126        let mut dependencies_builder =
127            DependenciesBuilder::new(root_logger.clone(), config.clone());
128
129        // start servers
130        println!("Starting server...");
131        println!("Press Ctrl+C to stop");
132
133        // start the monitoring thread
134        let mut event_store = dependencies_builder
135            .create_event_store()
136            .await
137            .with_context(|| "Dependencies Builder can not create event store")?;
138        let event_store_thread = tokio::spawn(async move { event_store.run().await.unwrap() });
139
140        // start the database vacuum operation, if needed
141        self.perform_database_vacuum_if_needed(
142            &config.data_stores_directory,
143            &mut dependencies_builder,
144            VACUUM_MINIMUM_INTERVAL,
145            &root_logger,
146        )
147        .await?;
148
149        // start the aggregator runtime
150        let mut runtime = dependencies_builder
151            .create_aggregator_runner()
152            .await
153            .with_context(|| "Dependencies Builder can not create aggregator runner")?;
154        let mut join_set = JoinSet::new();
155        join_set.spawn(async move { runtime.run().await.map_err(|e| e.to_string()) });
156
157        // start the cardano transactions preloader
158        let cardano_transactions_preloader = dependencies_builder
159            .create_cardano_transactions_preloader()
160            .await
161            .with_context(|| {
162                "Dependencies Builder can not create cardano transactions preloader"
163            })?;
164        let preload_task =
165            tokio::spawn(async move { cardano_transactions_preloader.preload().await });
166
167        // start the HTTP server
168        let (shutdown_tx, shutdown_rx) = oneshot::channel();
169        let routes = dependencies_builder
170            .create_http_routes()
171            .await
172            .with_context(|| "Dependencies Builder can not create http routes")?;
173        join_set.spawn(async move {
174            let (_, server) = warp::serve(routes).bind_with_graceful_shutdown(
175                (
176                    config.server_ip.clone().parse::<IpAddr>().unwrap(),
177                    config.server_port,
178                ),
179                async {
180                    shutdown_rx.await.ok();
181                },
182            );
183            server.await;
184
185            Ok(())
186        });
187
188        // Create a SignersImporter only if the `cexplorer_pools_url` is provided in the config.
189        if let Some(cexplorer_pools_url) = config.cexplorer_pools_url {
190            match dependencies_builder
191                .create_signer_importer(&cexplorer_pools_url)
192                .await
193            {
194                Ok(service) => {
195                    join_set.spawn(async move {
196                        // Wait 5s to let the other services the time to start before running
197                        // the first import.
198                        tokio::time::sleep(Duration::from_secs(5)).await;
199                        service
200                            .run_forever(Duration::from_secs(
201                                // Import interval are in minutes
202                                config.signer_importer_run_interval * 60,
203                            ))
204                            .await;
205                        Ok(())
206                    });
207                }
208                Err(error) => {
209                    warn!(
210                        root_logger, "Failed to build the `SignersImporter`";
211                        "url_to_import" => cexplorer_pools_url,
212                        "error" => ?error
213                    );
214                }
215            }
216        }
217
218        let mut usage_reporter = dependencies_builder
219            .create_usage_reporter()
220            .await
221            .with_context(|| "Dependencies Builder can not create usage reporter")?;
222        join_set.spawn(async move {
223            let interval_duration =
224                Duration::from_secs(config.persist_usage_report_interval_in_seconds);
225            usage_reporter.run_forever(interval_duration).await;
226            Ok(())
227        });
228
229        let metrics_service = dependencies_builder
230            .get_metrics_service()
231            .await
232            .with_context(|| "Metrics service initialization error")?;
233        let (metrics_server_shutdown_tx, metrics_server_shutdown_rx) = oneshot::channel();
234        if config.enable_metrics_server {
235            let metrics_logger = root_logger.clone();
236            join_set.spawn(async move {
237                let _ = MetricsServer::new(
238                    &config.metrics_server_ip,
239                    config.metrics_server_port,
240                    metrics_service,
241                    metrics_logger.clone(),
242                )
243                .start(metrics_server_shutdown_rx)
244                .await
245                .map_err(|e| anyhow!(e));
246
247                Ok(())
248            });
249        }
250
251        join_set.spawn(async { tokio::signal::ctrl_c().await.map_err(|e| e.to_string()) });
252        dependencies_builder.vanish().await;
253
254        if let Err(e) = join_set.join_next().await.unwrap()? {
255            crit!(root_logger, "A critical error occurred"; "error" => e);
256        }
257
258        metrics_server_shutdown_tx
259            .send(())
260            .map_err(|e| anyhow!("Metrics server shutdown signal could not be sent: {e:?}"))?;
261
262        // stop servers
263        join_set.shutdown().await;
264        let _ = shutdown_tx.send(());
265
266        if !preload_task.is_finished() {
267            preload_task.abort();
268        }
269
270        info!(root_logger, "Event store is finishing...");
271        event_store_thread.await.unwrap();
272        println!("Services stopped, exiting.");
273
274        Ok(())
275    }
276
277    /// This function checks if a database vacuum is needed and performs it if necessary.
278    ///
279    /// Errors from [VacuumTracker] operations are logged but not propagated as errors.
280    async fn perform_database_vacuum_if_needed(
281        &self,
282        store_dir: &Path,
283        dependencies_builder: &mut DependenciesBuilder,
284        vacuum_min_interval: TimeDelta,
285        logger: &Logger,
286    ) -> StdResult<()> {
287        let vacuum_tracker = VacuumTracker::new(store_dir, vacuum_min_interval, logger.clone());
288        match vacuum_tracker.check_vacuum_needed() {
289            Ok((true, _)) => {
290                info!(logger, "Performing vacuum");
291
292                let upkeep = dependencies_builder
293                    .get_upkeep_service()
294                    .await
295                    .with_context(|| "Dependencies Builder can not create upkeep")?;
296
297                upkeep
298                    .vacuum()
299                    .await
300                    .with_context(|| "Upkeep service failed to vacuum database")?;
301
302                match vacuum_tracker.update_last_vacuum_time() {
303                    Ok(last_vacuum) => {
304                        info!(logger, "Vacuum performed"; "last_vacuum" => last_vacuum.to_rfc3339());
305                    }
306                    Err(e) => {
307                        warn!(logger, "Failed to update last vacuum time"; "error" => ?e);
308                    }
309                }
310            }
311            Ok((false, last_vacuum)) => {
312                let time_display =
313                    last_vacuum.map_or_else(|| "never".to_string(), |time| time.to_rfc3339());
314                info!(logger, "No vacuum needed"; "last_vacuum" => time_display);
315            }
316            Err(e) => {
317                warn!(logger, "Failed to check if vacuum is needed"; "error" => ?e);
318            }
319        }
320
321        Ok(())
322    }
323}