mithril_aggregator/dependency_injection/builder/support/
observability.rs

1use slog::Logger;
2use std::sync::Arc;
3use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
4
5use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
6use crate::event_store::{EventMessage, EventStore, TransmitterService};
7use crate::get_dependency;
8use crate::services::UsageReporter;
9use crate::MetricsService;
10impl DependenciesBuilder {
11    /// Return a copy of the root logger.
12    pub fn root_logger(&self) -> Logger {
13        self.root_logger.clone()
14    }
15
16    /// Create a [UsageReporter] instance.
17    pub async fn create_usage_reporter(&mut self) -> Result<UsageReporter> {
18        let usage_reporter = UsageReporter::new(
19            self.get_event_transmitter().await?,
20            self.get_metrics_service().await?,
21            self.root_logger(),
22        );
23
24        Ok(usage_reporter)
25    }
26
27    /// Create a [MetricsService] instance.
28    async fn build_metrics_service(&self) -> Result<Arc<MetricsService>> {
29        let metrics_service = MetricsService::new(self.root_logger())?;
30
31        Ok(Arc::new(metrics_service))
32    }
33
34    /// [MetricsService] service
35    pub async fn get_metrics_service(&mut self) -> Result<Arc<MetricsService>> {
36        get_dependency!(self.metrics_service)
37    }
38
39    /// Create dependencies for the [EventStore] task.
40    pub async fn create_event_store(&mut self) -> Result<EventStore> {
41        let event_store = EventStore::new(
42            self.get_event_transmitter_receiver().await?,
43            self.get_event_store_sqlite_connection().await?,
44            self.root_logger(),
45        );
46
47        Ok(event_store)
48    }
49
50    async fn build_event_transmitter_channel(
51        &mut self,
52    ) -> Result<(
53        UnboundedReceiver<EventMessage>,
54        UnboundedSender<EventMessage>,
55    )> {
56        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
57
58        Ok((rx, tx))
59    }
60
61    /// Return the EventMessage channel sender.
62    pub async fn get_event_transmitter_sender(&mut self) -> Result<UnboundedSender<EventMessage>> {
63        if let (_, None) = self.event_transmitter_channel {
64            let (rx, tx) = self.build_event_transmitter_channel().await?;
65            self.event_transmitter_channel = (Some(rx), Some(tx));
66        }
67
68        Ok(self
69            .event_transmitter_channel
70            .1
71            .as_ref()
72            .cloned()
73            .expect("Transmitter<EventMessage> should be set."))
74    }
75
76    /// Return the channel receiver setup for the [EventStore]. Since this
77    /// receiver is not clonable, it must be called only once.
78    pub async fn get_event_transmitter_receiver(
79        &mut self,
80    ) -> Result<UnboundedReceiver<EventMessage>> {
81        if let (_, None) = self.event_transmitter_channel {
82            let (rx, tx) = self.build_event_transmitter_channel().await?;
83            self.event_transmitter_channel = (Some(rx), Some(tx));
84        }
85        let mut receiver: Option<UnboundedReceiver<EventMessage>> = None;
86        std::mem::swap(&mut self.event_transmitter_channel.0, &mut receiver);
87
88        receiver.ok_or_else(|| {
89            DependenciesBuilderError::InconsistentState(
90                "Receiver<EventMessage> has already been given and is not clonable.".to_string(),
91            )
92        })
93    }
94
95    async fn build_event_transmitter(&mut self) -> Result<Arc<TransmitterService<EventMessage>>> {
96        let sender = self.get_event_transmitter_sender().await?;
97        let event_transmitter = Arc::new(TransmitterService::new(sender, self.root_logger()));
98
99        Ok(event_transmitter)
100    }
101
102    /// [TransmitterService] service
103    pub async fn get_event_transmitter(&mut self) -> Result<Arc<TransmitterService<EventMessage>>> {
104        get_dependency!(self.event_transmitter)
105    }
106}