mithril_aggregator/dependency_injection/builder/enablers/
misc.rs1use anyhow::Context;
8use std::sync::Arc;
9use std::time::Duration;
10
11use mithril_aggregator_client::AggregatorHttpClient;
12use mithril_common::logging::LoggerExtensions;
13use mithril_common::messages::RegisterSignatureMessageDmq;
14use mithril_dmq::{
15 DmqConsumerClientDeduplicator, DmqConsumerClientPallas, SystemUnixTimestampProvider,
16};
17use mithril_signed_entity_lock::SignedEntityTypeLock;
18
19use crate::database::repository::CertificateRepository;
20use crate::dependency_injection::{DependenciesBuilder, Result};
21use crate::get_dependency;
22use crate::services::SignatureConsumerDmq;
23use crate::services::{
24 MessageService, MithrilMessageService, SequentialSignatureProcessor, SignatureConsumer,
25 SignatureConsumerNoop, SignatureProcessor,
26};
27
28impl DependenciesBuilder {
29 async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
30 let signed_entity_lock = Arc::new(SignedEntityTypeLock::default());
31 Ok(signed_entity_lock)
32 }
33
34 pub async fn get_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
36 get_dependency!(self.signed_entity_type_lock)
37 }
38
39 pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
41 let certificate_repository = Arc::new(CertificateRepository::new(
42 self.get_sqlite_connection().await?,
43 ));
44 let signed_entity_storer = self.get_signed_entity_storer().await?;
45 let epoch_settings_storer = self.get_epoch_settings_store().await?;
46 let immutable_file_digest_mapper = self.get_immutable_file_digest_mapper().await?;
47 let epoch_service = self.get_epoch_service().await?;
48 let service = MithrilMessageService::new(
49 certificate_repository,
50 signed_entity_storer,
51 epoch_settings_storer,
52 immutable_file_digest_mapper,
53 epoch_service,
54 );
55
56 Ok(Arc::new(service))
57 }
58
59 pub async fn get_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
61 get_dependency!(self.message_service)
62 }
63
64 pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
66 let leader_aggregator_endpoint = self
67 .configuration
68 .leader_aggregator_endpoint()
69 .with_context(|| "Leader Aggregator endpoint is mandatory for follower Aggregator")?;
70
71 let aggregator_client = AggregatorHttpClient::builder(&leader_aggregator_endpoint)
72 .with_api_version_provider(self.get_api_version_provider().await?)
73 .with_timeout(Duration::from_secs(30))
74 .with_logger(self.root_logger.new_with_name("LeaderAggregatorClient"))
75 .build()?;
76
77 Ok(Arc::new(aggregator_client))
78 }
79
80 pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<AggregatorHttpClient>> {
82 get_dependency!(self.leader_aggregator_client)
83 }
84
85 pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
87 let signature_consumer = match self.configuration.dmq_node_socket_path() {
88 Some(dmq_node_socket_path) => {
89 let dmq_consumer =
90 Arc::new(DmqConsumerClientPallas::<RegisterSignatureMessageDmq>::new(
91 dmq_node_socket_path,
92 self.configuration.get_dmq_network()?,
93 self.root_logger(),
94 ));
95 let dmq_consumer_deduplicator =
96 Arc::new(DmqConsumerClientDeduplicator::new_with_default_ttl(
97 dmq_consumer,
98 Arc::new(SystemUnixTimestampProvider),
99 ));
100 Arc::new(SignatureConsumerDmq::new(dmq_consumer_deduplicator))
101 as Arc<dyn SignatureConsumer>
102 }
103 _ => Arc::new(SignatureConsumerNoop) as Arc<dyn SignatureConsumer>,
104 };
105
106 Ok(signature_consumer)
107 }
108
109 pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
111 let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
112 let signature_processor = SequentialSignatureProcessor::new(
113 self.build_signature_consumer().await?,
114 self.get_certifier_service().await?,
115 stop_rx,
116 self.get_metrics_service().await?,
117 Duration::from_millis(self.configuration.signature_processor_wait_delay_on_error_ms()),
118 self.root_logger(),
119 );
120
121 Ok(Arc::new(signature_processor))
122 }
123}