mithril_aggregator/dependency_injection/builder/enablers/
misc.rs1use anyhow::{Context, anyhow};
8use reqwest::Url;
9use std::sync::Arc;
10use std::time::Duration;
11
12#[cfg(feature = "future_dmq")]
13use mithril_common::messages::RegisterSignatureMessageDmq;
14#[cfg(feature = "future_dmq")]
15use mithril_dmq::DmqConsumerPallas;
16use mithril_signed_entity_lock::SignedEntityTypeLock;
17
18use crate::database::repository::CertificateRepository;
19use crate::dependency_injection::{DependenciesBuilder, Result};
20use crate::get_dependency;
21#[cfg(feature = "future_dmq")]
22use crate::services::SignatureConsumerDmq;
23use crate::services::{
24 AggregatorHTTPClient, MessageService, MithrilMessageService, SequentialSignatureProcessor,
25 SignatureConsumer, SignatureConsumerNoop, SignatureProcessor,
26};
27impl DependenciesBuilder {
28 async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
29 let signed_entity_lock = Arc::new(SignedEntityTypeLock::default());
30 Ok(signed_entity_lock)
31 }
32
33 pub async fn get_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
35 get_dependency!(self.signed_entity_type_lock)
36 }
37
38 pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
40 let certificate_repository = Arc::new(CertificateRepository::new(
41 self.get_sqlite_connection().await?,
42 ));
43 let signed_entity_storer = self.get_signed_entity_storer().await?;
44 let immutable_file_digest_mapper = self.get_immutable_file_digest_mapper().await?;
45 let epoch_service = self.get_epoch_service().await?;
46 let service = MithrilMessageService::new(
47 certificate_repository,
48 signed_entity_storer,
49 immutable_file_digest_mapper,
50 epoch_service,
51 );
52
53 Ok(Arc::new(service))
54 }
55
56 pub async fn get_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
58 get_dependency!(self.message_service)
59 }
60
61 pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHTTPClient>> {
63 let leader_aggregator_endpoint = self.configuration.leader_aggregator_endpoint().ok_or(
64 anyhow!("Leader Aggregator endpoint is mandatory for follower Aggregator"),
65 )?;
66
67 let aggregator_client = AggregatorHTTPClient::new(
68 Url::parse(&leader_aggregator_endpoint).with_context(|| {
69 format!(
70 "Failed to parse leader aggregator endpoint: '{leader_aggregator_endpoint}'"
71 )
72 })?,
73 None,
74 self.get_api_version_provider().await?,
75 Some(Duration::from_secs(30)),
76 self.root_logger(),
77 );
78
79 Ok(Arc::new(aggregator_client))
80 }
81
82 pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHTTPClient>> {
84 get_dependency!(self.leader_aggregator_client)
85 }
86
87 pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
89 #[cfg(feature = "future_dmq")]
90 let signature_consumer = match self.configuration.dmq_node_socket_path() {
91 Some(dmq_node_socket_path) => {
92 let dmq_consumer = Arc::new(DmqConsumerPallas::<RegisterSignatureMessageDmq>::new(
93 dmq_node_socket_path,
94 self.configuration.get_network()?,
95 self.root_logger(),
96 ));
97 Arc::new(SignatureConsumerDmq::new(dmq_consumer)) as Arc<dyn SignatureConsumer>
98 }
99 _ => Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>,
100 };
101 #[cfg(not(feature = "future_dmq"))]
102 let signature_consumer = Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>;
103
104 Ok(signature_consumer)
105 }
106
107 pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
109 let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
110 let signature_processor = SequentialSignatureProcessor::new(
111 self.build_signature_consumer().await?,
112 self.get_certifier_service().await?,
113 stop_rx,
114 self.root_logger(),
115 self.get_metrics_service().await?,
116 );
117
118 Ok(Arc::new(signature_processor))
119 }
120}