mithril_aggregator/dependency_injection/builder/enablers/
misc.rs1use anyhow::Context;
8use std::sync::Arc;
9use std::time::Duration;
10
11use mithril_aggregator_client::AggregatorHttpClient;
12use mithril_common::logging::LoggerExtensions;
13use mithril_common::messages::RegisterSignatureMessageDmq;
14use mithril_dmq::DmqConsumerClientPallas;
15use mithril_signed_entity_lock::SignedEntityTypeLock;
16
17use crate::database::repository::CertificateRepository;
18use crate::dependency_injection::{DependenciesBuilder, Result};
19use crate::get_dependency;
20use crate::services::SignatureConsumerDmq;
21use crate::services::{
22 MessageService, MithrilMessageService, SequentialSignatureProcessor, SignatureConsumer,
23 SignatureConsumerNoop, SignatureProcessor,
24};
25
26impl DependenciesBuilder {
27 async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
28 let signed_entity_lock = Arc::new(SignedEntityTypeLock::default());
29 Ok(signed_entity_lock)
30 }
31
32 pub async fn get_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
34 get_dependency!(self.signed_entity_type_lock)
35 }
36
37 pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
39 let certificate_repository = Arc::new(CertificateRepository::new(
40 self.get_sqlite_connection().await?,
41 ));
42 let signed_entity_storer = self.get_signed_entity_storer().await?;
43 let epoch_settings_storer = self.get_epoch_settings_store().await?;
44 let immutable_file_digest_mapper = self.get_immutable_file_digest_mapper().await?;
45 let epoch_service = self.get_epoch_service().await?;
46 let service = MithrilMessageService::new(
47 certificate_repository,
48 signed_entity_storer,
49 epoch_settings_storer,
50 immutable_file_digest_mapper,
51 epoch_service,
52 );
53
54 Ok(Arc::new(service))
55 }
56
57 pub async fn get_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
59 get_dependency!(self.message_service)
60 }
61
62 pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
64 let leader_aggregator_endpoint = self
65 .configuration
66 .leader_aggregator_endpoint()
67 .with_context(|| "Leader Aggregator endpoint is mandatory for follower Aggregator")?;
68
69 let aggregator_client = AggregatorHttpClient::builder(&leader_aggregator_endpoint)
70 .with_api_version_provider(self.get_api_version_provider().await?)
71 .with_timeout(Duration::from_secs(30))
72 .with_logger(self.root_logger.new_with_name("LeaderAggregatorClient"))
73 .build()?;
74
75 Ok(Arc::new(aggregator_client))
76 }
77
78 pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
80 get_dependency!(self.leader_aggregator_client)
81 }
82
83 pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
85 let signature_consumer = match self.configuration.dmq_node_socket_path() {
86 Some(dmq_node_socket_path) => {
87 let dmq_consumer =
88 Arc::new(DmqConsumerClientPallas::<RegisterSignatureMessageDmq>::new(
89 dmq_node_socket_path,
90 self.configuration.get_dmq_network()?,
91 self.root_logger(),
92 ));
93 Arc::new(SignatureConsumerDmq::new(dmq_consumer)) as Arc<dyn SignatureConsumer>
94 }
95 _ => Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>,
96 };
97
98 Ok(signature_consumer)
99 }
100
101 pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
103 let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
104 let signature_processor = SequentialSignatureProcessor::new(
105 self.build_signature_consumer().await?,
106 self.get_certifier_service().await?,
107 stop_rx,
108 self.get_metrics_service().await?,
109 Duration::from_millis(self.configuration.signature_processor_wait_delay_on_error_ms()),
110 self.root_logger(),
111 );
112
113 Ok(Arc::new(signature_processor))
114 }
115}