mithril_aggregator/dependency_injection/builder/enablers/
misc.rs

1//! Miscellaneous enablers
2//!
3//! This naming is not ideal, we should either:
4//! - group these enablers into more logical categories
5//! - redefine the actual categories so those miscellaneous enablers fit into them
6
7use anyhow::Context;
8use std::sync::Arc;
9use std::time::Duration;
10
11use mithril_aggregator_client::AggregatorHttpClient;
12use mithril_common::logging::LoggerExtensions;
13use mithril_common::messages::RegisterSignatureMessageDmq;
14use mithril_dmq::{
15    DmqConsumerClientDeduplicator, DmqConsumerClientPallas, SystemUnixTimestampProvider,
16};
17use mithril_signed_entity_lock::SignedEntityTypeLock;
18
19use crate::database::repository::CertificateRepository;
20use crate::dependency_injection::{DependenciesBuilder, Result};
21use crate::get_dependency;
22use crate::services::SignatureConsumerDmq;
23use crate::services::{
24    MessageService, MithrilMessageService, SequentialSignatureProcessor, SignatureConsumer,
25    SignatureConsumerNoop, SignatureProcessor,
26};
27
28impl DependenciesBuilder {
29    async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
30        let signed_entity_lock = Arc::new(SignedEntityTypeLock::default());
31        Ok(signed_entity_lock)
32    }
33
34    /// Get the [SignedEntityTypeLock] instance
35    pub async fn get_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
36        get_dependency!(self.signed_entity_type_lock)
37    }
38
39    /// Builds HTTP message service
40    pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
41        let certificate_repository = Arc::new(CertificateRepository::new(
42            self.get_sqlite_connection().await?,
43        ));
44        let signed_entity_storer = self.get_signed_entity_storer().await?;
45        let epoch_settings_storer = self.get_epoch_settings_store().await?;
46        let immutable_file_digest_mapper = self.get_immutable_file_digest_mapper().await?;
47        let epoch_service = self.get_epoch_service().await?;
48        let service = MithrilMessageService::new(
49            certificate_repository,
50            signed_entity_storer,
51            epoch_settings_storer,
52            immutable_file_digest_mapper,
53            epoch_service,
54        );
55
56        Ok(Arc::new(service))
57    }
58
59    /// [MessageService] service
60    pub async fn get_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
61        get_dependency!(self.message_service)
62    }
63
64    /// Builds an [AggregatorHttpClient]
65    pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
66        let leader_aggregator_endpoint = self
67            .configuration
68            .leader_aggregator_endpoint()
69            .with_context(|| "Leader Aggregator endpoint is mandatory for follower Aggregator")?;
70
71        let aggregator_client = AggregatorHttpClient::builder(&leader_aggregator_endpoint)
72            .with_api_version_provider(self.get_api_version_provider().await?)
73            .with_timeout(Duration::from_secs(30))
74            .with_logger(self.root_logger.new_with_name("LeaderAggregatorClient"))
75            .build()?;
76
77        Ok(Arc::new(aggregator_client))
78    }
79
80    /// Returns a leader [AggregatorHttpClient]
81    pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
82        get_dependency!(self.leader_aggregator_client)
83    }
84
85    /// Builds a [SignatureConsumer]
86    pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
87        let signature_consumer = match self.configuration.dmq_node_socket_path() {
88            Some(dmq_node_socket_path) => {
89                let dmq_consumer =
90                    Arc::new(DmqConsumerClientPallas::<RegisterSignatureMessageDmq>::new(
91                        dmq_node_socket_path,
92                        self.configuration.get_dmq_network()?,
93                        self.root_logger(),
94                    ));
95                let dmq_consumer_deduplicator =
96                    Arc::new(DmqConsumerClientDeduplicator::new_with_default_ttl(
97                        dmq_consumer,
98                        Arc::new(SystemUnixTimestampProvider),
99                    ));
100                Arc::new(SignatureConsumerDmq::new(dmq_consumer_deduplicator))
101                    as Arc<dyn SignatureConsumer>
102            }
103            _ => Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>,
104        };
105
106        Ok(signature_consumer)
107    }
108
109    /// Builds a [SignatureProcessor]
110    pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
111        let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
112        let signature_processor = SequentialSignatureProcessor::new(
113            self.build_signature_consumer().await?,
114            self.get_certifier_service().await?,
115            stop_rx,
116            self.get_metrics_service().await?,
117            Duration::from_millis(self.configuration.signature_processor_wait_delay_on_error_ms()),
118            self.root_logger(),
119        );
120
121        Ok(Arc::new(signature_processor))
122    }
123}