mithril_aggregator/dependency_injection/builder/enablers/
misc.rs1use std::sync::Arc;
8use std::time::Duration;
9
10#[cfg(feature = "future_dmq")]
11use mithril_common::messages::RegisterSignatureMessageDmq;
12#[cfg(feature = "future_dmq")]
13use mithril_dmq::DmqConsumerPallas;
14use mithril_signed_entity_lock::SignedEntityTypeLock;
15
16use crate::database::repository::CertificateRepository;
17use crate::dependency_injection::{DependenciesBuilder, Result};
18use crate::get_dependency;
19#[cfg(feature = "future_dmq")]
20use crate::services::SignatureConsumerDmq;
21use crate::services::{
22 AggregatorClient, AggregatorHTTPClient, MessageService, MithrilMessageService,
23 SequentialSignatureProcessor, SignatureConsumer, SignatureConsumerNoop, SignatureProcessor,
24};
25impl DependenciesBuilder {
26 async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
27 let signed_entity_lock = Arc::new(SignedEntityTypeLock::default());
28 Ok(signed_entity_lock)
29 }
30
31 pub async fn get_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
33 get_dependency!(self.signed_entity_type_lock)
34 }
35
36 pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
38 let certificate_repository = Arc::new(CertificateRepository::new(
39 self.get_sqlite_connection().await?,
40 ));
41 let signed_entity_storer = self.get_signed_entity_storer().await?;
42 let immutable_file_digest_mapper = self.get_immutable_file_digest_mapper().await?;
43 let epoch_service = self.get_epoch_service().await?;
44 let service = MithrilMessageService::new(
45 certificate_repository,
46 signed_entity_storer,
47 immutable_file_digest_mapper,
48 epoch_service,
49 );
50
51 Ok(Arc::new(service))
52 }
53
54 pub async fn get_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
56 get_dependency!(self.message_service)
57 }
58
59 pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<dyn AggregatorClient>> {
61 let leader_aggregator_endpoint = self
62 .configuration
63 .leader_aggregator_endpoint()
64 .unwrap_or_default();
65 let aggregator_client = AggregatorHTTPClient::new(
66 leader_aggregator_endpoint,
67 None,
68 self.get_api_version_provider().await?,
69 Some(Duration::from_secs(30)),
70 self.root_logger(),
71 );
72
73 Ok(Arc::new(aggregator_client))
74 }
75
76 pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<dyn AggregatorClient>> {
78 get_dependency!(self.leader_aggregator_client)
79 }
80
81 pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
83 #[cfg(feature = "future_dmq")]
84 let signature_consumer = match self.configuration.dmq_node_socket_path() {
85 Some(dmq_node_socket_path) => {
86 let dmq_consumer = Arc::new(DmqConsumerPallas::<RegisterSignatureMessageDmq>::new(
87 dmq_node_socket_path,
88 self.configuration.get_network()?,
89 self.root_logger(),
90 ));
91 Arc::new(SignatureConsumerDmq::new(dmq_consumer)) as Arc<dyn SignatureConsumer>
92 }
93 _ => Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>,
94 };
95 #[cfg(not(feature = "future_dmq"))]
96 let signature_consumer = Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>;
97
98 Ok(signature_consumer)
99 }
100
101 pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
103 let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
104 let signature_processor = SequentialSignatureProcessor::new(
105 self.build_signature_consumer().await?,
106 self.get_certifier_service().await?,
107 stop_rx,
108 self.root_logger(),
109 self.get_metrics_service().await?,
110 );
111
112 Ok(Arc::new(signature_processor))
113 }
114}