mithril_aggregator/event_store/
runner.rs

1use anyhow::Context;
2use mithril_common::logging::LoggerExtensions;
3use mithril_common::StdResult;
4use mithril_persistence::sqlite::SqliteConnection;
5use slog::{debug, info, Logger};
6use std::sync::Arc;
7use tokio::sync::mpsc::UnboundedReceiver;
8
9use super::database::EventPersister;
10use super::EventMessage;
11
12/// EventMessage receiver service.
13pub struct EventStore {
14    receiver: UnboundedReceiver<EventMessage>,
15    connection: Arc<SqliteConnection>,
16    logger: Logger,
17}
18
19impl EventStore {
20    /// Instantiate the EventMessage receiver service.
21    pub fn new(
22        receiver: UnboundedReceiver<EventMessage>,
23        connection: Arc<SqliteConnection>,
24        logger: Logger,
25    ) -> Self {
26        Self {
27            receiver,
28            connection,
29            logger: logger.new_with_component_name::<Self>(),
30        }
31    }
32
33    /// Launch the service. It runs until all the transmitters are gone and all
34    /// messages have been processed. This means this service shall be waited
35    /// upon completion to ensure all events are properly saved in the database.
36    pub async fn run(&mut self) -> StdResult<()> {
37        let connection = self.connection.clone();
38        let persister = EventPersister::new(connection);
39        info!(
40            self.logger,
41            "Starting monitoring event loop to log messages."
42        );
43        loop {
44            if let Some(message) = self.receiver.recv().await {
45                debug!(self.logger, "Event received"; "event" => ?message);
46                let event = persister
47                    .persist(message)
48                    .with_context(|| "event persist failure")?;
49                debug!(self.logger, "Event ID={} created", event.event_id);
50            } else {
51                info!(self.logger, "No more events to proceed, quitting…");
52                break;
53            }
54        }
55
56        Ok(())
57    }
58}