mithril_aggregator/dependency_injection/builder/support/
observability.rs1use slog::Logger;
2use std::sync::Arc;
3use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
4
5use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
6use crate::event_store::{EventMessage, EventStore, TransmitterService};
7use crate::get_dependency;
8use crate::services::UsageReporter;
9use crate::MetricsService;
10impl DependenciesBuilder {
11 pub fn root_logger(&self) -> Logger {
13 self.root_logger.clone()
14 }
15
16 pub async fn create_usage_reporter(&mut self) -> Result<UsageReporter> {
18 let usage_reporter = UsageReporter::new(
19 self.get_event_transmitter().await?,
20 self.get_metrics_service().await?,
21 self.root_logger(),
22 );
23
24 Ok(usage_reporter)
25 }
26
27 async fn build_metrics_service(&self) -> Result<Arc<MetricsService>> {
29 let metrics_service = MetricsService::new(self.root_logger())?;
30
31 Ok(Arc::new(metrics_service))
32 }
33
34 pub async fn get_metrics_service(&mut self) -> Result<Arc<MetricsService>> {
36 get_dependency!(self.metrics_service)
37 }
38
39 pub async fn create_event_store(&mut self) -> Result<EventStore> {
41 let event_store = EventStore::new(
42 self.get_event_transmitter_receiver().await?,
43 self.get_event_store_sqlite_connection().await?,
44 self.root_logger(),
45 );
46
47 Ok(event_store)
48 }
49
50 async fn build_event_transmitter_channel(
51 &mut self,
52 ) -> Result<(
53 UnboundedReceiver<EventMessage>,
54 UnboundedSender<EventMessage>,
55 )> {
56 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
57
58 Ok((rx, tx))
59 }
60
61 pub async fn get_event_transmitter_sender(&mut self) -> Result<UnboundedSender<EventMessage>> {
63 if let (_, None) = self.event_transmitter_channel {
64 let (rx, tx) = self.build_event_transmitter_channel().await?;
65 self.event_transmitter_channel = (Some(rx), Some(tx));
66 }
67
68 Ok(self
69 .event_transmitter_channel
70 .1
71 .as_ref()
72 .cloned()
73 .expect("Transmitter<EventMessage> should be set."))
74 }
75
76 pub async fn get_event_transmitter_receiver(
79 &mut self,
80 ) -> Result<UnboundedReceiver<EventMessage>> {
81 if let (_, None) = self.event_transmitter_channel {
82 let (rx, tx) = self.build_event_transmitter_channel().await?;
83 self.event_transmitter_channel = (Some(rx), Some(tx));
84 }
85 let mut receiver: Option<UnboundedReceiver<EventMessage>> = None;
86 std::mem::swap(&mut self.event_transmitter_channel.0, &mut receiver);
87
88 receiver.ok_or_else(|| {
89 DependenciesBuilderError::InconsistentState(
90 "Receiver<EventMessage> has already been given and is not clonable.".to_string(),
91 )
92 })
93 }
94
95 async fn build_event_transmitter(&mut self) -> Result<Arc<TransmitterService<EventMessage>>> {
96 let sender = self.get_event_transmitter_sender().await?;
97 let event_transmitter = Arc::new(TransmitterService::new(sender, self.root_logger()));
98
99 Ok(event_transmitter)
100 }
101
102 pub async fn get_event_transmitter(&mut self) -> Result<Arc<TransmitterService<EventMessage>>> {
104 get_dependency!(self.event_transmitter)
105 }
106}