mithril_dmq/publisher/server/
pallas.rs

1use std::{fs, path::PathBuf};
2
3use anyhow::{Context, anyhow};
4use pallas_network::{
5    facades::DmqServer,
6    miniprotocols::{
7        localmsgsubmission::DmqMsgValidationError,
8        localtxsubmission::{Request, Response},
9    },
10};
11use tokio::{
12    net::UnixListener,
13    select,
14    sync::{Mutex, MutexGuard, mpsc::UnboundedSender, watch::Receiver},
15};
16
17use slog::{Logger, debug, error, info, warn};
18
19use mithril_common::{CardanoNetwork, StdResult, logging::LoggerExtensions};
20
21use crate::{DmqMessage, DmqPublisherServer};
22
23/// A DMQ server implementation for messages publication to a DMQ node.
24pub struct DmqPublisherServerPallas {
25    socket: PathBuf,
26    network: CardanoNetwork,
27    server: Mutex<Option<DmqServer>>,
28    transmitters: Mutex<Vec<UnboundedSender<DmqMessage>>>,
29    stop_rx: Receiver<()>,
30    logger: Logger,
31}
32
33impl DmqPublisherServerPallas {
34    /// Creates a new instance of [DmqPublisherServerPallas].
35    pub fn new(
36        socket: PathBuf,
37        network: CardanoNetwork,
38        stop_rx: Receiver<()>,
39        logger: Logger,
40    ) -> Self {
41        Self {
42            socket,
43            network,
44            server: Mutex::new(None),
45            transmitters: Mutex::new(Vec::new()),
46            stop_rx,
47            logger: logger.new_with_component_name::<Self>(),
48        }
49    }
50
51    /// Creates and returns a new `DmqServer` connected to the specified socket.
52    async fn new_server(&self) -> StdResult<DmqServer> {
53        info!(
54            self.logger,
55            "Creating a new DMQ publisher server";
56            "socket" => ?self.socket,
57            "network" => ?self.network
58        );
59        let magic = self.network.magic_id();
60        if self.socket.exists() {
61            fs::remove_file(self.socket.clone())?;
62        }
63        let listener = UnixListener::bind(&self.socket)
64            .map_err(|err| anyhow!(err))
65            .with_context(|| {
66                format!(
67                    "DmqPublisherServerPallas failed to bind Unix socket at {}",
68                    self.socket.display()
69                )
70            })?;
71
72        DmqServer::accept(&listener, magic)
73            .await
74            .map_err(|err| anyhow!(err))
75            .with_context(|| "DmqPublisherServerPallas failed to create a new server")
76    }
77
78    /// Gets the cached `DmqServer`, creating a new one if it does not exist.
79    async fn get_server(&self) -> StdResult<MutexGuard<'_, Option<DmqServer>>> {
80        {
81            // Run this in a separate block to avoid dead lock on the Mutex
82            let server_lock = self.server.lock().await;
83            if server_lock.as_ref().is_some() {
84                return Ok(server_lock);
85            }
86        }
87
88        let mut server_lock = self.server.lock().await;
89        *server_lock = Some(self.new_server().await?);
90
91        Ok(server_lock)
92    }
93
94    /// Drops the current `DmqServer`, if it exists.
95    async fn drop_server(&self) -> StdResult<()> {
96        debug!(
97            self.logger,
98            "Drop existing DMQ publisher server";
99            "socket" => ?self.socket,
100            "network" => ?self.network
101        );
102        let mut server_lock = self.server.try_lock()?;
103        if let Some(server) = server_lock.take() {
104            server.abort().await;
105        }
106
107        Ok(())
108    }
109
110    /// Registers a transmitter for DMQ messages.
111    pub async fn register_transmitter(
112        &self,
113        transmitter: UnboundedSender<DmqMessage>,
114    ) -> StdResult<()> {
115        debug!(self.logger, "Register message transmitter for DMQ messages");
116        let mut transmitters_guard = self.transmitters.lock().await;
117        transmitters_guard.push(transmitter);
118
119        Ok(())
120    }
121}
122
123#[async_trait::async_trait]
124impl DmqPublisherServer for DmqPublisherServerPallas {
125    async fn process_message(&self) -> StdResult<()> {
126        debug!(
127            self.logger,
128            "Waiting for message to publish to the DMQ network";
129            "socket" => ?self.socket,
130            "network" => ?self.network
131        );
132        let mut server_guard = self.get_server().await?;
133        let server = server_guard
134            .as_mut()
135            .ok_or(anyhow!("DMQ publisher server does not exist"))?;
136
137        let request = server
138            .msg_submission()
139            .recv_next_request()
140            .await
141            .map_err(|err| anyhow!("Failed to receive next DMQ message: {}", err))?;
142        let (dmq_message, response) = match request {
143            Request::Submit(dmq_message) => {
144                debug!(self.logger, "Received message to publish to DMQ");
145                (Some(dmq_message), Response::Accepted)
146            }
147            request => {
148                error!(
149                    self.logger,
150                    "Expected a Submit request, but received: {request:?}"
151                );
152                (
153                    None,
154                    Response::Rejected(DmqMsgValidationError(format!(
155                        "Expected a Submit request, but received: {request:?}"
156                    ))),
157                )
158            }
159        };
160        server
161            .msg_submission()
162            .send_submit_tx_response(response)
163            .await
164            .map_err(|err| anyhow!("Failed to send response to DMQ publisher client: {}", err))?;
165
166        if let Some(dmq_message) = dmq_message {
167            for transmitter in self.transmitters.lock().await.iter() {
168                if let Err(err) = transmitter.send(dmq_message.to_owned().into()) {
169                    error!(
170                        self.logger,
171                        "Failed to send DMQ message to transmitter";
172                        "error" => ?err
173                    );
174                }
175            }
176        }
177
178        let request = server.msg_submission().recv_next_request().await.map_err(|err| {
179            anyhow!(
180                "Failed to receive next request from DMQ publisher client: {}",
181                err
182            )
183        })?;
184        match request {
185            Request::Done => {
186                debug!(
187                    self.logger,
188                    "Received Done request from DMQ publisher client"
189                );
190            }
191            _ => {
192                error!(
193                    self.logger,
194                    "Expected a Done request, but received: {request:?}"
195                );
196                return Err(anyhow!(
197                    "Expected a Done request, but received: {request:?}"
198                ));
199            }
200        }
201
202        Ok(())
203    }
204
205    async fn run(&self) -> StdResult<()> {
206        info!(
207            self.logger,
208            "Starting DMQ publisher server";
209            "socket" => ?self.socket,
210            "network" => ?self.network
211        );
212
213        let mut stop_rx = self.stop_rx.clone();
214        loop {
215            select! {
216                _ = stop_rx.changed() => {
217                    warn!(self.logger, "Stopping DMQ publisher server...");
218
219                    return Ok(());
220                }
221                res = self.process_message() => {
222                    match res {
223                        Ok(_) => {
224                            debug!(self.logger, "Processed a message successfully");
225                        }
226                        Err(err) => {
227                            error!(self.logger, "Failed to process message"; "error" => ?err);
228                        }
229                    }
230                    if let Err(drop_err) = self.drop_server().await {
231                        error!(self.logger, "Failed to drop DMQ publisher server"; "error" => ?drop_err);
232                    }
233                }
234            }
235        }
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use std::sync::Arc;
242
243    use pallas_network::{
244        facades::DmqClient,
245        miniprotocols::{localmsgsubmission::DmqMsg, localtxsubmission},
246    };
247    use tokio::sync::{mpsc::unbounded_channel, watch};
248
249    use mithril_common::{current_function, test::TempDir};
250
251    use crate::test_tools::TestLogger;
252
253    use super::*;
254
255    fn create_temp_dir(folder_name: &str) -> PathBuf {
256        TempDir::create_with_short_path("dmq_publisher_server", folder_name)
257    }
258
259    async fn fake_msg() -> DmqMsg {
260        DmqMsg {
261            msg_id: vec![0, 1],
262            msg_body: vec![2, 3, 4, 5],
263            block_number: 10,
264            ttl: 100,
265            kes_signature: vec![0, 1, 2, 3],
266            operational_certificate: vec![0, 1, 2, 3, 4, 5],
267            kes_period: 10,
268        }
269    }
270
271    #[tokio::test]
272    async fn pallas_dmq_publisher_server_success() {
273        let (stop_tx, stop_rx) = watch::channel(());
274        let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
275        let socket_path = create_temp_dir(current_function!()).join("node.socket");
276        let cardano_network = CardanoNetwork::TestNet(0);
277        let dmq_publisher_server = Arc::new(DmqPublisherServerPallas::new(
278            socket_path.to_path_buf(),
279            cardano_network.to_owned(),
280            stop_rx,
281            TestLogger::stdout(),
282        ));
283        dmq_publisher_server
284            .register_transmitter(signature_dmq_tx)
285            .await
286            .unwrap();
287        let message = fake_msg().await;
288        let message_clone = message.clone();
289        let client = tokio::spawn({
290            async move {
291                // client setup
292                let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap();
293
294                // init local msg submission client
295                let client_msg = client.msg_submission();
296                assert_eq!(*client_msg.state(), localtxsubmission::State::Idle);
297
298                // client sends a request to server and waits for a reply from the server
299                client_msg.send_submit_tx(message_clone).await.unwrap();
300                assert_eq!(*client_msg.state(), localtxsubmission::State::Busy);
301
302                let response = client_msg.recv_submit_tx_response().await.unwrap();
303                assert_eq!(*client_msg.state(), localtxsubmission::State::Idle);
304                assert_eq!(response, localtxsubmission::Response::Accepted);
305            }
306        });
307        let recorder = tokio::spawn(async move {
308            let result = {
309                let mut signature_dmq_rx = signature_dmq_rx;
310                if let Some(message) = signature_dmq_rx.recv().await {
311                    return Ok(message);
312                }
313
314                Err(anyhow::anyhow!("No message received in recorder"))
315            };
316            stop_tx
317                .send(())
318                .expect("Failed to send stop signal to DMQ publisher server");
319
320            result
321        });
322
323        let (_, _, message_res) = tokio::join!(dmq_publisher_server.run(), client, recorder);
324        let message_received = message_res.unwrap().unwrap();
325        assert_eq!(message, message_received.into());
326    }
327}