mithril_aggregator/dependency_injection/builder/support/
observability.rs

1use slog::Logger;
2use std::sync::Arc;
3use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
4
5use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
6use crate::event_store::{EventMessage, EventStore, TransmitterService};
7use crate::services::UsageReporter;
8use crate::MetricsService;
9
10impl DependenciesBuilder {
11    /// Return a copy of the root logger.
12    pub fn root_logger(&self) -> Logger {
13        self.root_logger.clone()
14    }
15
16    /// Create a [UsageReporter] instance.
17    pub async fn create_usage_reporter(&mut self) -> Result<UsageReporter> {
18        let usage_reporter = UsageReporter::new(
19            self.get_event_transmitter().await?,
20            self.get_metrics_service().await?,
21            self.root_logger(),
22        );
23
24        Ok(usage_reporter)
25    }
26
27    /// Create a [MetricsService] instance.
28    async fn build_metrics_service(&self) -> Result<Arc<MetricsService>> {
29        let metrics_service = MetricsService::new(self.root_logger())?;
30
31        Ok(Arc::new(metrics_service))
32    }
33
34    /// [MetricsService] service
35    pub async fn get_metrics_service(&mut self) -> Result<Arc<MetricsService>> {
36        if self.metrics_service.is_none() {
37            self.metrics_service = Some(self.build_metrics_service().await?);
38        }
39
40        Ok(self.metrics_service.as_ref().cloned().unwrap())
41    }
42
43    /// Create dependencies for the [EventStore] task.
44    pub async fn create_event_store(&mut self) -> Result<EventStore> {
45        let event_store = EventStore::new(
46            self.get_event_transmitter_receiver().await?,
47            self.get_event_store_sqlite_connection().await?,
48            self.root_logger(),
49        );
50
51        Ok(event_store)
52    }
53
54    async fn build_event_transmitter_channel(
55        &mut self,
56    ) -> Result<(
57        UnboundedReceiver<EventMessage>,
58        UnboundedSender<EventMessage>,
59    )> {
60        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
61
62        Ok((rx, tx))
63    }
64
65    /// Return the EventMessage channel sender.
66    pub async fn get_event_transmitter_sender(&mut self) -> Result<UnboundedSender<EventMessage>> {
67        if let (_, None) = self.event_transmitter_channel {
68            let (rx, tx) = self.build_event_transmitter_channel().await?;
69            self.event_transmitter_channel = (Some(rx), Some(tx));
70        }
71
72        Ok(self
73            .event_transmitter_channel
74            .1
75            .as_ref()
76            .cloned()
77            .expect("Transmitter<EventMessage> should be set."))
78    }
79
80    /// Return the channel receiver setup for the [EventStore]. Since this
81    /// receiver is not clonable, it must be called only once.
82    pub async fn get_event_transmitter_receiver(
83        &mut self,
84    ) -> Result<UnboundedReceiver<EventMessage>> {
85        if let (_, None) = self.event_transmitter_channel {
86            let (rx, tx) = self.build_event_transmitter_channel().await?;
87            self.event_transmitter_channel = (Some(rx), Some(tx));
88        }
89        let mut receiver: Option<UnboundedReceiver<EventMessage>> = None;
90        std::mem::swap(&mut self.event_transmitter_channel.0, &mut receiver);
91
92        receiver.ok_or_else(|| {
93            DependenciesBuilderError::InconsistentState(
94                "Receiver<EventMessage> has already been given and is not clonable.".to_string(),
95            )
96        })
97    }
98
99    async fn build_event_transmitter(&mut self) -> Result<Arc<TransmitterService<EventMessage>>> {
100        let sender = self.get_event_transmitter_sender().await?;
101        let event_transmitter = Arc::new(TransmitterService::new(sender, self.root_logger()));
102
103        Ok(event_transmitter)
104    }
105
106    /// [TransmitterService] service
107    pub async fn get_event_transmitter(&mut self) -> Result<Arc<TransmitterService<EventMessage>>> {
108        if self.event_transmitter.is_none() {
109            self.event_transmitter = Some(self.build_event_transmitter().await?);
110        }
111
112        Ok(self.event_transmitter.as_ref().cloned().unwrap())
113    }
114}