mithril_aggregator/dependency_injection/builder/enablers/
misc.rs

1//! Miscellaneous enablers
2//!
3//! This naming is not ideal, we should either:
4//! - group these enablers into more logical categories
5//! - redefine the actual categories so those miscellaneous enablers fit into them
6
7use anyhow::Context;
8use std::sync::Arc;
9use std::time::Duration;
10
11use mithril_aggregator_client::AggregatorHttpClient;
12use mithril_common::logging::LoggerExtensions;
13use mithril_common::messages::RegisterSignatureMessageDmq;
14use mithril_dmq::DmqConsumerClientPallas;
15use mithril_signed_entity_lock::SignedEntityTypeLock;
16
17use crate::database::repository::CertificateRepository;
18use crate::dependency_injection::{DependenciesBuilder, Result};
19use crate::get_dependency;
20use crate::services::SignatureConsumerDmq;
21use crate::services::{
22    MessageService, MithrilMessageService, SequentialSignatureProcessor, SignatureConsumer,
23    SignatureConsumerNoop, SignatureProcessor,
24};
25
26impl DependenciesBuilder {
27    async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
28        let signed_entity_lock = Arc::new(SignedEntityTypeLock::default());
29        Ok(signed_entity_lock)
30    }
31
32    /// Get the [SignedEntityTypeLock] instance
33    pub async fn get_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
34        get_dependency!(self.signed_entity_type_lock)
35    }
36
37    /// Builds HTTP message service
38    pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
39        let certificate_repository = Arc::new(CertificateRepository::new(
40            self.get_sqlite_connection().await?,
41        ));
42        let signed_entity_storer = self.get_signed_entity_storer().await?;
43        let epoch_settings_storer = self.get_epoch_settings_store().await?;
44        let immutable_file_digest_mapper = self.get_immutable_file_digest_mapper().await?;
45        let epoch_service = self.get_epoch_service().await?;
46        let service = MithrilMessageService::new(
47            certificate_repository,
48            signed_entity_storer,
49            epoch_settings_storer,
50            immutable_file_digest_mapper,
51            epoch_service,
52        );
53
54        Ok(Arc::new(service))
55    }
56
57    /// [MessageService] service
58    pub async fn get_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
59        get_dependency!(self.message_service)
60    }
61
62    /// Builds an [AggregatorHttpClient]
63    pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
64        let leader_aggregator_endpoint = self
65            .configuration
66            .leader_aggregator_endpoint()
67            .with_context(|| "Leader Aggregator endpoint is mandatory for follower Aggregator")?;
68
69        let aggregator_client = AggregatorHttpClient::builder(&leader_aggregator_endpoint)
70            .with_api_version_provider(self.get_api_version_provider().await?)
71            .with_timeout(Duration::from_secs(30))
72            .with_logger(self.root_logger.new_with_name("LeaderAggregatorClient"))
73            .build()?;
74
75        Ok(Arc::new(aggregator_client))
76    }
77
78    /// Returns a leader [AggregatorHttpClient]
79    pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
80        get_dependency!(self.leader_aggregator_client)
81    }
82
83    /// Builds a [SignatureConsumer]
84    pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
85        let signature_consumer = match self.configuration.dmq_node_socket_path() {
86            Some(dmq_node_socket_path) => {
87                let dmq_consumer =
88                    Arc::new(DmqConsumerClientPallas::<RegisterSignatureMessageDmq>::new(
89                        dmq_node_socket_path,
90                        self.configuration.get_dmq_network()?,
91                        self.root_logger(),
92                    ));
93                Arc::new(SignatureConsumerDmq::new(dmq_consumer)) as Arc<dyn SignatureConsumer>
94            }
95            _ => Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>,
96        };
97
98        Ok(signature_consumer)
99    }
100
101    /// Builds a [SignatureProcessor]
102    pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
103        let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
104        let signature_processor = SequentialSignatureProcessor::new(
105            self.build_signature_consumer().await?,
106            self.get_certifier_service().await?,
107            stop_rx,
108            self.get_metrics_service().await?,
109            Duration::from_millis(self.configuration.signature_processor_wait_delay_on_error_ms()),
110            self.root_logger(),
111        );
112
113        Ok(Arc::new(signature_processor))
114    }
115}