mithril_aggregator/dependency_injection/builder/enablers/
misc.rs

1//! Miscellaneous enablers
2//!
3//! This naming is not ideal, we should either:
4//! - group these enablers into more logical categories
5//! - redefine the actual categories so those miscellaneous enablers fit into them
6
7use anyhow::{Context, anyhow};
8use reqwest::Url;
9use std::sync::Arc;
10use std::time::Duration;
11
12#[cfg(feature = "future_dmq")]
13use mithril_common::messages::RegisterSignatureMessageDmq;
14#[cfg(feature = "future_dmq")]
15use mithril_dmq::DmqConsumerPallas;
16use mithril_signed_entity_lock::SignedEntityTypeLock;
17
18use crate::database::repository::CertificateRepository;
19use crate::dependency_injection::{DependenciesBuilder, Result};
20use crate::get_dependency;
21#[cfg(feature = "future_dmq")]
22use crate::services::SignatureConsumerDmq;
23use crate::services::{
24    AggregatorHTTPClient, MessageService, MithrilMessageService, SequentialSignatureProcessor,
25    SignatureConsumer, SignatureConsumerNoop, SignatureProcessor,
26};
27impl DependenciesBuilder {
28    async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
29        let signed_entity_lock = Arc::new(SignedEntityTypeLock::default());
30        Ok(signed_entity_lock)
31    }
32
33    /// Get the [SignedEntityTypeLock] instance
34    pub async fn get_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
35        get_dependency!(self.signed_entity_type_lock)
36    }
37
38    /// Builds HTTP message service
39    pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
40        let certificate_repository = Arc::new(CertificateRepository::new(
41            self.get_sqlite_connection().await?,
42        ));
43        let signed_entity_storer = self.get_signed_entity_storer().await?;
44        let immutable_file_digest_mapper = self.get_immutable_file_digest_mapper().await?;
45        let epoch_service = self.get_epoch_service().await?;
46        let service = MithrilMessageService::new(
47            certificate_repository,
48            signed_entity_storer,
49            immutable_file_digest_mapper,
50            epoch_service,
51        );
52
53        Ok(Arc::new(service))
54    }
55
56    /// [MessageService] service
57    pub async fn get_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
58        get_dependency!(self.message_service)
59    }
60
61    /// Builds an [AggregatorHTTPClient]
62    pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHTTPClient>> {
63        let leader_aggregator_endpoint = self.configuration.leader_aggregator_endpoint().ok_or(
64            anyhow!("Leader Aggregator endpoint is mandatory for follower Aggregator"),
65        )?;
66
67        let aggregator_client = AggregatorHTTPClient::new(
68            Url::parse(&leader_aggregator_endpoint).with_context(|| {
69                format!(
70                    "Failed to parse leader aggregator endpoint: '{leader_aggregator_endpoint}'"
71                )
72            })?,
73            None,
74            self.get_api_version_provider().await?,
75            Some(Duration::from_secs(30)),
76            self.root_logger(),
77        );
78
79        Ok(Arc::new(aggregator_client))
80    }
81
82    /// Returns a leader [AggregatorHTTPClient]
83    pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHTTPClient>> {
84        get_dependency!(self.leader_aggregator_client)
85    }
86
87    /// Builds a [SignatureConsumer]
88    pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
89        #[cfg(feature = "future_dmq")]
90        let signature_consumer = match self.configuration.dmq_node_socket_path() {
91            Some(dmq_node_socket_path) => {
92                let dmq_consumer = Arc::new(DmqConsumerPallas::<RegisterSignatureMessageDmq>::new(
93                    dmq_node_socket_path,
94                    self.configuration.get_network()?,
95                    self.root_logger(),
96                ));
97                Arc::new(SignatureConsumerDmq::new(dmq_consumer)) as Arc<dyn SignatureConsumer>
98            }
99            _ => Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>,
100        };
101        #[cfg(not(feature = "future_dmq"))]
102        let signature_consumer = Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>;
103
104        Ok(signature_consumer)
105    }
106
107    /// Builds a [SignatureProcessor]
108    pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
109        let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
110        let signature_processor = SequentialSignatureProcessor::new(
111            self.build_signature_consumer().await?,
112            self.get_certifier_service().await?,
113            stop_rx,
114            self.root_logger(),
115            self.get_metrics_service().await?,
116        );
117
118        Ok(Arc::new(signature_processor))
119    }
120}