mithril_aggregator/event_store/
runner.rs1use anyhow::Context;
2use mithril_common::logging::LoggerExtensions;
3use mithril_common::StdResult;
4use mithril_persistence::sqlite::SqliteConnection;
5use slog::{debug, info, Logger};
6use std::sync::Arc;
7use tokio::sync::mpsc::UnboundedReceiver;
8
9use super::database::EventPersister;
10use super::EventMessage;
11
12pub struct EventStore {
14 receiver: UnboundedReceiver<EventMessage>,
15 connection: Arc<SqliteConnection>,
16 logger: Logger,
17}
18
19impl EventStore {
20 pub fn new(
22 receiver: UnboundedReceiver<EventMessage>,
23 connection: Arc<SqliteConnection>,
24 logger: Logger,
25 ) -> Self {
26 Self {
27 receiver,
28 connection,
29 logger: logger.new_with_component_name::<Self>(),
30 }
31 }
32
33 pub async fn run(&mut self) -> StdResult<()> {
37 let connection = self.connection.clone();
38 let persister = EventPersister::new(connection);
39 info!(
40 self.logger,
41 "Starting monitoring event loop to log messages."
42 );
43 loop {
44 if let Some(message) = self.receiver.recv().await {
45 debug!(self.logger, "Event received"; "event" => ?message);
46 let event = persister
47 .persist(message)
48 .with_context(|| "event persist failure")?;
49 debug!(self.logger, "Event ID={} created", event.event_id);
50 } else {
51 info!(self.logger, "No more events to proceed, quitting…");
52 break;
53 }
54 }
55
56 Ok(())
57 }
58}