mithril_aggregator/dependency_injection/builder/enablers/
misc.rs

1//! Miscellaneous enablers
2//!
3//! This naming is not ideal, we should either:
4//! - group these enablers into more logical categories
5//! - redefine the actual categories so those miscellaneous enablers fit into them
6
7use anyhow::anyhow;
8use std::sync::Arc;
9use std::time::Duration;
10
11use mithril_aggregator_client::AggregatorHttpClient;
12use mithril_common::logging::LoggerExtensions;
13#[cfg(feature = "future_dmq")]
14use mithril_common::messages::RegisterSignatureMessageDmq;
15#[cfg(feature = "future_dmq")]
16use mithril_dmq::DmqConsumerClientPallas;
17use mithril_signed_entity_lock::SignedEntityTypeLock;
18
19use crate::database::repository::CertificateRepository;
20use crate::dependency_injection::{DependenciesBuilder, Result};
21use crate::get_dependency;
22#[cfg(feature = "future_dmq")]
23use crate::services::SignatureConsumerDmq;
24use crate::services::{
25    MessageService, MithrilMessageService, SequentialSignatureProcessor, SignatureConsumer,
26    SignatureConsumerNoop, SignatureProcessor,
27};
28
29impl DependenciesBuilder {
30    async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
31        let signed_entity_lock = Arc::new(SignedEntityTypeLock::default());
32        Ok(signed_entity_lock)
33    }
34
35    /// Get the [SignedEntityTypeLock] instance
36    pub async fn get_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
37        get_dependency!(self.signed_entity_type_lock)
38    }
39
40    /// Builds HTTP message service
41    pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
42        let certificate_repository = Arc::new(CertificateRepository::new(
43            self.get_sqlite_connection().await?,
44        ));
45        let signed_entity_storer = self.get_signed_entity_storer().await?;
46        let epoch_settings_storer = self.get_epoch_settings_store().await?;
47        let immutable_file_digest_mapper = self.get_immutable_file_digest_mapper().await?;
48        let epoch_service = self.get_epoch_service().await?;
49        let service = MithrilMessageService::new(
50            certificate_repository,
51            signed_entity_storer,
52            epoch_settings_storer,
53            immutable_file_digest_mapper,
54            epoch_service,
55        );
56
57        Ok(Arc::new(service))
58    }
59
60    /// [MessageService] service
61    pub async fn get_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
62        get_dependency!(self.message_service)
63    }
64
65    /// Builds an [AggregatorHttpClient]
66    pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
67        let leader_aggregator_endpoint = self.configuration.leader_aggregator_endpoint().ok_or(
68            anyhow!("Leader Aggregator endpoint is mandatory for follower Aggregator"),
69        )?;
70
71        let aggregator_client = AggregatorHttpClient::builder(&leader_aggregator_endpoint)
72            .with_api_version_provider(self.get_api_version_provider().await?)
73            .with_timeout(Duration::from_secs(30))
74            .with_logger(self.root_logger.new_with_name("LeaderAggregatorClient"))
75            .build()?;
76
77        Ok(Arc::new(aggregator_client))
78    }
79
80    /// Returns a leader [AggregatorHttpClient]
81    pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
82        get_dependency!(self.leader_aggregator_client)
83    }
84
85    /// Builds a [SignatureConsumer]
86    pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
87        #[cfg(feature = "future_dmq")]
88        let signature_consumer = match self.configuration.dmq_node_socket_path() {
89            Some(dmq_node_socket_path) => {
90                let dmq_consumer =
91                    Arc::new(DmqConsumerClientPallas::<RegisterSignatureMessageDmq>::new(
92                        dmq_node_socket_path,
93                        self.configuration.get_dmq_network()?,
94                        self.root_logger(),
95                    ));
96                Arc::new(SignatureConsumerDmq::new(dmq_consumer)) as Arc<dyn SignatureConsumer>
97            }
98            _ => Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>,
99        };
100        #[cfg(not(feature = "future_dmq"))]
101        let signature_consumer = Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>;
102
103        Ok(signature_consumer)
104    }
105
106    /// Builds a [SignatureProcessor]
107    pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
108        let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
109        let signature_processor = SequentialSignatureProcessor::new(
110            self.build_signature_consumer().await?,
111            self.get_certifier_service().await?,
112            stop_rx,
113            self.root_logger(),
114            self.get_metrics_service().await?,
115        );
116
117        Ok(Arc::new(signature_processor))
118    }
119}