mithril_aggregator/dependency_injection/builder/support/
observability.rs1use slog::Logger;
2use std::sync::Arc;
3use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
4
5use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
6use crate::event_store::{EventMessage, EventStore, TransmitterService};
7use crate::services::UsageReporter;
8use crate::MetricsService;
9
10impl DependenciesBuilder {
11 pub fn root_logger(&self) -> Logger {
13 self.root_logger.clone()
14 }
15
16 pub async fn create_usage_reporter(&mut self) -> Result<UsageReporter> {
18 let usage_reporter = UsageReporter::new(
19 self.get_event_transmitter().await?,
20 self.get_metrics_service().await?,
21 self.root_logger(),
22 );
23
24 Ok(usage_reporter)
25 }
26
27 async fn build_metrics_service(&self) -> Result<Arc<MetricsService>> {
29 let metrics_service = MetricsService::new(self.root_logger())?;
30
31 Ok(Arc::new(metrics_service))
32 }
33
34 pub async fn get_metrics_service(&mut self) -> Result<Arc<MetricsService>> {
36 if self.metrics_service.is_none() {
37 self.metrics_service = Some(self.build_metrics_service().await?);
38 }
39
40 Ok(self.metrics_service.as_ref().cloned().unwrap())
41 }
42
43 pub async fn create_event_store(&mut self) -> Result<EventStore> {
45 let event_store = EventStore::new(
46 self.get_event_transmitter_receiver().await?,
47 self.get_event_store_sqlite_connection().await?,
48 self.root_logger(),
49 );
50
51 Ok(event_store)
52 }
53
54 async fn build_event_transmitter_channel(
55 &mut self,
56 ) -> Result<(
57 UnboundedReceiver<EventMessage>,
58 UnboundedSender<EventMessage>,
59 )> {
60 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
61
62 Ok((rx, tx))
63 }
64
65 pub async fn get_event_transmitter_sender(&mut self) -> Result<UnboundedSender<EventMessage>> {
67 if let (_, None) = self.event_transmitter_channel {
68 let (rx, tx) = self.build_event_transmitter_channel().await?;
69 self.event_transmitter_channel = (Some(rx), Some(tx));
70 }
71
72 Ok(self
73 .event_transmitter_channel
74 .1
75 .as_ref()
76 .cloned()
77 .expect("Transmitter<EventMessage> should be set."))
78 }
79
80 pub async fn get_event_transmitter_receiver(
83 &mut self,
84 ) -> Result<UnboundedReceiver<EventMessage>> {
85 if let (_, None) = self.event_transmitter_channel {
86 let (rx, tx) = self.build_event_transmitter_channel().await?;
87 self.event_transmitter_channel = (Some(rx), Some(tx));
88 }
89 let mut receiver: Option<UnboundedReceiver<EventMessage>> = None;
90 std::mem::swap(&mut self.event_transmitter_channel.0, &mut receiver);
91
92 receiver.ok_or_else(|| {
93 DependenciesBuilderError::InconsistentState(
94 "Receiver<EventMessage> has already been given and is not clonable.".to_string(),
95 )
96 })
97 }
98
99 async fn build_event_transmitter(&mut self) -> Result<Arc<TransmitterService<EventMessage>>> {
100 let sender = self.get_event_transmitter_sender().await?;
101 let event_transmitter = Arc::new(TransmitterService::new(sender, self.root_logger()));
102
103 Ok(event_transmitter)
104 }
105
106 pub async fn get_event_transmitter(&mut self) -> Result<Arc<TransmitterService<EventMessage>>> {
108 if self.event_transmitter.is_none() {
109 self.event_transmitter = Some(self.build_event_transmitter().await?);
110 }
111
112 Ok(self.event_transmitter.as_ref().cloned().unwrap())
113 }
114}