mithril_aggregator/dependency_injection/builder/enablers/
misc.rs1use anyhow::anyhow;
8use std::sync::Arc;
9use std::time::Duration;
10
11use mithril_aggregator_client::AggregatorHttpClient;
12use mithril_common::logging::LoggerExtensions;
13#[cfg(feature = "future_dmq")]
14use mithril_common::messages::RegisterSignatureMessageDmq;
15#[cfg(feature = "future_dmq")]
16use mithril_dmq::DmqConsumerClientPallas;
17use mithril_signed_entity_lock::SignedEntityTypeLock;
18
19use crate::database::repository::CertificateRepository;
20use crate::dependency_injection::{DependenciesBuilder, Result};
21use crate::get_dependency;
22#[cfg(feature = "future_dmq")]
23use crate::services::SignatureConsumerDmq;
24use crate::services::{
25 MessageService, MithrilMessageService, SequentialSignatureProcessor, SignatureConsumer,
26 SignatureConsumerNoop, SignatureProcessor,
27};
28
29impl DependenciesBuilder {
30 async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
31 let signed_entity_lock = Arc::new(SignedEntityTypeLock::default());
32 Ok(signed_entity_lock)
33 }
34
35 pub async fn get_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
37 get_dependency!(self.signed_entity_type_lock)
38 }
39
40 pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
42 let certificate_repository = Arc::new(CertificateRepository::new(
43 self.get_sqlite_connection().await?,
44 ));
45 let signed_entity_storer = self.get_signed_entity_storer().await?;
46 let epoch_settings_storer = self.get_epoch_settings_store().await?;
47 let immutable_file_digest_mapper = self.get_immutable_file_digest_mapper().await?;
48 let epoch_service = self.get_epoch_service().await?;
49 let service = MithrilMessageService::new(
50 certificate_repository,
51 signed_entity_storer,
52 epoch_settings_storer,
53 immutable_file_digest_mapper,
54 epoch_service,
55 );
56
57 Ok(Arc::new(service))
58 }
59
60 pub async fn get_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
62 get_dependency!(self.message_service)
63 }
64
65 pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
67 let leader_aggregator_endpoint = self.configuration.leader_aggregator_endpoint().ok_or(
68 anyhow!("Leader Aggregator endpoint is mandatory for follower Aggregator"),
69 )?;
70
71 let aggregator_client = AggregatorHttpClient::builder(&leader_aggregator_endpoint)
72 .with_api_version_provider(self.get_api_version_provider().await?)
73 .with_timeout(Duration::from_secs(30))
74 .with_logger(self.root_logger.new_with_name("LeaderAggregatorClient"))
75 .build()?;
76
77 Ok(Arc::new(aggregator_client))
78 }
79
80 pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
82 get_dependency!(self.leader_aggregator_client)
83 }
84
85 pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
87 #[cfg(feature = "future_dmq")]
88 let signature_consumer = match self.configuration.dmq_node_socket_path() {
89 Some(dmq_node_socket_path) => {
90 let dmq_consumer =
91 Arc::new(DmqConsumerClientPallas::<RegisterSignatureMessageDmq>::new(
92 dmq_node_socket_path,
93 self.configuration.get_dmq_network()?,
94 self.root_logger(),
95 ));
96 Arc::new(SignatureConsumerDmq::new(dmq_consumer)) as Arc<dyn SignatureConsumer>
97 }
98 _ => Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>,
99 };
100 #[cfg(not(feature = "future_dmq"))]
101 let signature_consumer = Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>;
102
103 Ok(signature_consumer)
104 }
105
106 pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
108 let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
109 let signature_processor = SequentialSignatureProcessor::new(
110 self.build_signature_consumer().await?,
111 self.get_certifier_service().await?,
112 stop_rx,
113 self.root_logger(),
114 self.get_metrics_service().await?,
115 );
116
117 Ok(Arc::new(signature_processor))
118 }
119}