mithril_dmq/publisher/server/
pallas.rs

1use std::{fs, path::PathBuf};
2
3use anyhow::{Context, anyhow};
4use pallas_network::{
5    facades::DmqServer,
6    miniprotocols::{
7        localmsgsubmission::DmqMsgValidationError,
8        localtxsubmission::{Request, Response},
9    },
10};
11use tokio::{
12    net::UnixListener,
13    select,
14    sync::{Mutex, MutexGuard, mpsc::UnboundedSender, watch::Receiver},
15};
16
17use slog::{Logger, debug, error, info, warn};
18
19use mithril_common::{StdResult, logging::LoggerExtensions};
20
21use crate::{DmqMessage, DmqNetwork, DmqPublisherServer};
22
23/// A DMQ server implementation for messages publication to a DMQ node.
24pub struct DmqPublisherServerPallas {
25    socket: PathBuf,
26    network: DmqNetwork,
27    server: Mutex<Option<DmqServer>>,
28    transmitters: Mutex<Vec<UnboundedSender<DmqMessage>>>,
29    stop_rx: Receiver<()>,
30    logger: Logger,
31}
32
33impl DmqPublisherServerPallas {
34    /// Creates a new instance of [DmqPublisherServerPallas].
35    pub fn new(
36        socket: PathBuf,
37        network: DmqNetwork,
38        stop_rx: Receiver<()>,
39        logger: Logger,
40    ) -> Self {
41        Self {
42            socket,
43            network,
44            server: Mutex::new(None),
45            transmitters: Mutex::new(Vec::new()),
46            stop_rx,
47            logger: logger.new_with_component_name::<Self>(),
48        }
49    }
50
51    /// Creates and returns a new `DmqServer` connected to the specified socket.
52    async fn new_server(&self) -> StdResult<DmqServer> {
53        info!(
54            self.logger,
55            "Creating a new DMQ publisher server";
56            "socket" => ?self.socket,
57            "network" => ?self.network
58        );
59        let magic = self.network.magic_id();
60        if self.socket.exists() {
61            fs::remove_file(self.socket.clone())?;
62        }
63        let listener = UnixListener::bind(&self.socket).with_context(|| {
64            format!(
65                "DmqPublisherServerPallas failed to bind Unix socket at {}",
66                self.socket.display()
67            )
68        })?;
69
70        DmqServer::accept(&listener, magic)
71            .await
72            .with_context(|| "DmqPublisherServerPallas failed to create a new server")
73    }
74
75    /// Gets the cached `DmqServer`, creating a new one if it does not exist.
76    async fn get_server(&self) -> StdResult<MutexGuard<'_, Option<DmqServer>>> {
77        {
78            // Run this in a separate block to avoid dead lock on the Mutex
79            let server_lock = self.server.lock().await;
80            if server_lock.as_ref().is_some() {
81                return Ok(server_lock);
82            }
83        }
84
85        let mut server_lock = self.server.lock().await;
86        *server_lock = Some(self.new_server().await?);
87
88        Ok(server_lock)
89    }
90
91    /// Drops the current `DmqServer`, if it exists.
92    async fn drop_server(&self) -> StdResult<()> {
93        debug!(
94            self.logger,
95            "Drop existing DMQ publisher server";
96            "socket" => ?self.socket,
97            "network" => ?self.network
98        );
99        let mut server_lock = self.server.try_lock()?;
100        if let Some(server) = server_lock.take() {
101            server.abort().await;
102        }
103
104        Ok(())
105    }
106
107    /// Registers a transmitter for DMQ messages.
108    pub async fn register_transmitter(
109        &self,
110        transmitter: UnboundedSender<DmqMessage>,
111    ) -> StdResult<()> {
112        debug!(self.logger, "Register message transmitter for DMQ messages");
113        let mut transmitters_guard = self.transmitters.lock().await;
114        transmitters_guard.push(transmitter);
115
116        Ok(())
117    }
118}
119
120#[async_trait::async_trait]
121impl DmqPublisherServer for DmqPublisherServerPallas {
122    async fn process_message(&self) -> StdResult<()> {
123        debug!(
124            self.logger,
125            "Waiting for message to publish to the DMQ network";
126            "socket" => ?self.socket,
127            "network" => ?self.network
128        );
129        let mut server_guard = self.get_server().await?;
130        let server = server_guard
131            .as_mut()
132            .with_context(|| "DMQ publisher server does not exist")?;
133
134        let request = server
135            .msg_submission()
136            .recv_next_request()
137            .await
138            .with_context(|| "Failed to receive next DMQ message")?;
139        let (dmq_message, response) = match request {
140            Request::Submit(dmq_message) => {
141                debug!(self.logger, "Received message to publish to DMQ");
142                (Some(dmq_message), Response::Accepted)
143            }
144            request => {
145                error!(
146                    self.logger,
147                    "Expected a Submit request, but received: {request:?}"
148                );
149                (
150                    None,
151                    Response::Rejected(DmqMsgValidationError(format!(
152                        "Expected a Submit request, but received: {request:?}"
153                    ))),
154                )
155            }
156        };
157        server
158            .msg_submission()
159            .send_submit_tx_response(response)
160            .await
161            .with_context(|| "Failed to send response to DMQ publisher client")?;
162
163        if let Some(dmq_message) = dmq_message {
164            for transmitter in self.transmitters.lock().await.iter() {
165                if let Err(err) = transmitter.send(dmq_message.to_owned().into()) {
166                    error!(
167                        self.logger,
168                        "Failed to send DMQ message to transmitter";
169                        "error" => ?err
170                    );
171                }
172            }
173        }
174
175        let request = server
176            .msg_submission()
177            .recv_next_request()
178            .await
179            .with_context(|| "Failed to receive next request from DMQ publisher client")?;
180        match request {
181            Request::Done => {
182                debug!(
183                    self.logger,
184                    "Received Done request from DMQ publisher client"
185                );
186            }
187            _ => {
188                error!(
189                    self.logger,
190                    "Expected a Done request, but received: {request:?}"
191                );
192                return Err(anyhow!(
193                    "Expected a Done request, but received: {request:?}"
194                ));
195            }
196        }
197
198        Ok(())
199    }
200
201    async fn run(&self) -> StdResult<()> {
202        info!(
203            self.logger,
204            "Starting DMQ publisher server";
205            "socket" => ?self.socket,
206            "network" => ?self.network
207        );
208
209        let mut stop_rx = self.stop_rx.clone();
210        loop {
211            select! {
212                _ = stop_rx.changed() => {
213                    warn!(self.logger, "Stopping DMQ publisher server...");
214
215                    return Ok(());
216                }
217                res = self.process_message() => {
218                    match res {
219                        Ok(_) => {
220                            debug!(self.logger, "Processed a message successfully");
221                        }
222                        Err(err) => {
223                            error!(self.logger, "Failed to process message"; "error" => ?err);
224                        }
225                    }
226                    if let Err(drop_err) = self.drop_server().await {
227                        error!(self.logger, "Failed to drop DMQ publisher server"; "error" => ?drop_err);
228                    }
229                }
230            }
231        }
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use std::sync::Arc;
238
239    use pallas_network::{
240        facades::DmqClient,
241        miniprotocols::{
242            localmsgsubmission::{DmqMsg, DmqMsgOperationalCertificate, DmqMsgPayload},
243            localtxsubmission,
244        },
245    };
246    use tokio::sync::{mpsc::unbounded_channel, watch};
247
248    use mithril_common::{current_function, test::TempDir};
249
250    use crate::test_tools::TestLogger;
251
252    use super::*;
253
254    fn create_temp_dir(folder_name: &str) -> PathBuf {
255        TempDir::create_with_short_path("dmq_publisher_server", folder_name)
256    }
257
258    async fn fake_msg() -> DmqMsg {
259        DmqMsg {
260            msg_payload: DmqMsgPayload {
261                msg_id: vec![0, 1],
262                msg_body: vec![0, 1, 2],
263                kes_period: 10,
264                expires_at: 100,
265            },
266            kes_signature: vec![0, 1, 2, 3],
267            operational_certificate: DmqMsgOperationalCertificate {
268                kes_vk: vec![12, 13, 14],
269                issue_number: 15,
270                start_kes_period: 16,
271                cert_sig: vec![17],
272            },
273            cold_verification_key: vec![0, 1, 2, 3, 4, 5],
274        }
275    }
276
277    #[tokio::test]
278    async fn pallas_dmq_publisher_server_success() {
279        let (stop_tx, stop_rx) = watch::channel(());
280        let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
281        let socket_path = create_temp_dir(current_function!()).join("node.socket");
282        let cardano_network = DmqNetwork::TestNet(0);
283        let dmq_publisher_server = Arc::new(DmqPublisherServerPallas::new(
284            socket_path.to_path_buf(),
285            cardano_network.to_owned(),
286            stop_rx,
287            TestLogger::stdout(),
288        ));
289        dmq_publisher_server
290            .register_transmitter(signature_dmq_tx)
291            .await
292            .unwrap();
293        let message = fake_msg().await;
294        let message_clone = message.clone();
295        let client = tokio::spawn({
296            async move {
297                // client setup
298                let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap();
299
300                // init local msg submission client
301                let client_msg = client.msg_submission();
302                assert_eq!(*client_msg.state(), localtxsubmission::State::Idle);
303
304                // client sends a request to server and waits for a reply from the server
305                client_msg.send_submit_tx(message_clone).await.unwrap();
306                assert_eq!(*client_msg.state(), localtxsubmission::State::Busy);
307
308                let response = client_msg.recv_submit_tx_response().await.unwrap();
309                assert_eq!(*client_msg.state(), localtxsubmission::State::Idle);
310                assert_eq!(response, localtxsubmission::Response::Accepted);
311            }
312        });
313        let recorder = tokio::spawn(async move {
314            let result = {
315                let mut signature_dmq_rx = signature_dmq_rx;
316                if let Some(message) = signature_dmq_rx.recv().await {
317                    return Ok(message);
318                }
319
320                Err(anyhow::anyhow!("No message received in recorder"))
321            };
322            stop_tx
323                .send(())
324                .expect("Failed to send stop signal to DMQ publisher server");
325
326            result
327        });
328
329        let (_, _, message_res) = tokio::join!(dmq_publisher_server.run(), client, recorder);
330        let message_received = message_res.unwrap().unwrap();
331        assert_eq!(message, message_received.into());
332    }
333}