mithril_dmq/publisher/client/
pallas.rs

1use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
2
3use anyhow::Context;
4use pallas_network::{facades::DmqClient, miniprotocols::localtxsubmission::Response};
5use slog::{Logger, debug, error};
6
7use mithril_common::{StdResult, crypto_helper::TryToBytes, logging::LoggerExtensions};
8
9use crate::{DmqMessageBuilder, DmqPublisherClient, model::DmqNetwork};
10
11/// A DMQ client publisher implementation.
12///
13/// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas.
14pub struct DmqPublisherClientPallas<M: TryToBytes + Debug> {
15    socket: PathBuf,
16    network: DmqNetwork,
17    dmq_message_builder: DmqMessageBuilder,
18    logger: Logger,
19    phantom: PhantomData<M>,
20}
21
22impl<M: TryToBytes + Debug> DmqPublisherClientPallas<M> {
23    /// Creates a new instance of [DmqPublisherClientPallas].
24    pub fn new(
25        socket: PathBuf,
26        network: DmqNetwork,
27        dmq_message_builder: DmqMessageBuilder,
28        logger: Logger,
29    ) -> Self {
30        Self {
31            socket,
32            network,
33            dmq_message_builder,
34            logger: logger.new_with_component_name::<Self>(),
35            phantom: PhantomData,
36        }
37    }
38
39    /// Creates and returns a new `DmqClient` connected to the specified socket.
40    async fn new_client(&self) -> StdResult<DmqClient> {
41        let magic = self.network.magic_id();
42        DmqClient::connect(&self.socket, magic)
43            .await
44            .with_context(|| "DmqPublisherClientPallas failed to create a new client")
45    }
46}
47
48#[async_trait::async_trait]
49impl<M: TryToBytes + Debug + Sync + Send> DmqPublisherClient<M> for DmqPublisherClientPallas<M> {
50    async fn publish_message(&self, message: M) -> StdResult<()> {
51        debug!(
52            self.logger,
53            "Publish message to DMQ";
54            "message" => ?message
55        );
56        let mut client = self.new_client().await?;
57        let message_bytes = &message.to_bytes_vec()?;
58        let dmq_message = self
59            .dmq_message_builder
60            .build(message_bytes)
61            .await
62            .with_context(|| "Failed to build DMQ message")?;
63        client
64            .msg_submission()
65            .send_submit_tx(dmq_message.into())
66            .await
67            .with_context(|| "Failed to submit DMQ message")?;
68        let response = client.msg_submission().recv_submit_tx_response().await?;
69        if let Err(e) = client.msg_submission().terminate_gracefully().await {
70            error!(self.logger, "Failed to send Done"; "error" => ?e);
71        }
72
73        if response != Response::Accepted {
74            anyhow::bail!("Failed to publish DMQ message: {:?}", response);
75        }
76
77        Ok(())
78    }
79}
80
81#[cfg(all(test, unix))]
82mod tests {
83    use std::{fs, sync::Arc, time::Duration};
84
85    use pallas_network::miniprotocols::{
86        localmsgsubmission::DmqMsgValidationError, localtxsubmission,
87    };
88    use tokio::{net::UnixListener, task::JoinHandle};
89
90    use mithril_cardano_node_chain::test::double::FakeChainObserver;
91    use mithril_common::{
92        current_function,
93        test::{TempDir, crypto_helper::KesSignerFake, double::Dummy},
94    };
95
96    use crate::test::{TestLogger, payload::DmqMessageTestPayload};
97
98    use super::*;
99
100    fn create_temp_dir(folder_name: &str) -> PathBuf {
101        TempDir::create_with_short_path("dmq_publisher", folder_name)
102    }
103
104    fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<()> {
105        tokio::spawn({
106            async move {
107                // server setup
108                if socket_path.exists() {
109                    fs::remove_file(socket_path.clone()).unwrap();
110                }
111                let listener = UnixListener::bind(socket_path).unwrap();
112                let mut server = pallas_network::facades::DmqServer::accept(&listener, 0)
113                    .await
114                    .unwrap();
115
116                // init local msg submission server
117                let server_msg = server.msg_submission();
118
119                // server waits for request from client and replies to it
120                let request = server_msg.recv_next_request().await.unwrap();
121                match &request {
122                    localtxsubmission::Request::Submit(_) => (),
123                    request => panic!("Expected a Submit request, but received: {request:?}"),
124                }
125                let response = if reply_success {
126                    localtxsubmission::Response::Accepted
127                } else {
128                    localtxsubmission::Response::Rejected(DmqMsgValidationError(
129                        "fake error".to_string(),
130                    ))
131                };
132                server_msg.send_submit_tx_response(response).await.unwrap();
133
134                // server receives done from client
135                let request = server_msg.recv_next_request().await.unwrap();
136                assert_eq!(localtxsubmission::Request::Done, request);
137            }
138        })
139    }
140
141    #[tokio::test(flavor = "multi_thread")]
142    async fn pallas_dmq_signature_publisher_success() {
143        let current_function_name = current_function!();
144
145        let socket_path = create_temp_dir(current_function_name).join("node.socket");
146        let reply_success = true;
147        let server = setup_dmq_server(socket_path.clone(), reply_success);
148        let client = tokio::spawn(async move {
149            // sleep to avoid refused connection from the server
150            tokio::time::sleep(Duration::from_millis(10)).await;
151
152            let publisher = DmqPublisherClientPallas::new(
153                socket_path,
154                DmqNetwork::TestNet(0),
155                DmqMessageBuilder::new(
156                    {
157                        let (kes_signature, operational_certificate) =
158                            KesSignerFake::dummy_signature(current_function_name);
159                        let kes_signer = KesSignerFake::new(vec![Ok((
160                            kes_signature,
161                            operational_certificate.clone(),
162                        ))]);
163
164                        Arc::new(kes_signer)
165                    },
166                    Arc::new(FakeChainObserver::default()),
167                )
168                .set_ttl(100),
169                TestLogger::stdout(),
170            );
171
172            publisher.publish_message(DmqMessageTestPayload::dummy()).await
173        });
174
175        let (_, res) = tokio::join!(server, client);
176
177        res.unwrap().unwrap();
178    }
179
180    #[tokio::test(flavor = "multi_thread")]
181    async fn pallas_dmq_signature_publisher_fails() {
182        let current_function_name = current_function!();
183        let socket_path = create_temp_dir(current_function_name).join("node.socket");
184        let reply_success = false;
185        let server = setup_dmq_server(socket_path.clone(), reply_success);
186        let client = tokio::spawn(async move {
187            // sleep to avoid refused connection from the server
188            tokio::time::sleep(Duration::from_millis(10)).await;
189
190            let publisher = DmqPublisherClientPallas::new(
191                socket_path,
192                DmqNetwork::TestNet(0),
193                DmqMessageBuilder::new(
194                    {
195                        let (kes_signature, operational_certificate) =
196                            KesSignerFake::dummy_signature(current_function_name);
197                        let kes_signer = KesSignerFake::new(vec![Ok((
198                            kes_signature,
199                            operational_certificate.clone(),
200                        ))]);
201
202                        Arc::new(kes_signer)
203                    },
204                    Arc::new(FakeChainObserver::default()),
205                )
206                .set_ttl(100),
207                TestLogger::stdout(),
208            );
209
210            publisher.publish_message(DmqMessageTestPayload::dummy()).await
211        });
212
213        let (_, res) = tokio::join!(server, client);
214
215        res.unwrap().expect_err("Publishing DMQ message should fail");
216    }
217}