mithril_dmq/publisher/client/
pallas.rs

1use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
2
3use anyhow::Context;
4use pallas_network::{facades::DmqClient, miniprotocols::localtxsubmission::Response};
5use slog::{Logger, debug, error};
6
7use mithril_common::{StdResult, crypto_helper::TryToBytes, logging::LoggerExtensions};
8
9use crate::{DmqMessageBuilder, DmqPublisherClient, model::DmqNetwork};
10
11/// A DMQ client publisher implementation.
12///
13/// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas.
14pub struct DmqPublisherClientPallas<M: TryToBytes + Debug> {
15    socket: PathBuf,
16    network: DmqNetwork,
17    dmq_message_builder: DmqMessageBuilder,
18    logger: Logger,
19    phantom: PhantomData<M>,
20}
21
22impl<M: TryToBytes + Debug> DmqPublisherClientPallas<M> {
23    /// Creates a new instance of [DmqPublisherClientPallas].
24    pub fn new(
25        socket: PathBuf,
26        network: DmqNetwork,
27        dmq_message_builder: DmqMessageBuilder,
28        logger: Logger,
29    ) -> Self {
30        Self {
31            socket,
32            network,
33            dmq_message_builder,
34            logger: logger.new_with_component_name::<Self>(),
35            phantom: PhantomData,
36        }
37    }
38
39    /// Creates and returns a new `DmqClient` connected to the specified socket.
40    async fn new_client(&self) -> StdResult<DmqClient> {
41        let magic = self.network.magic_id();
42        DmqClient::connect(&self.socket, magic)
43            .await
44            .with_context(|| "DmqPublisherClientPallas failed to create a new client")
45    }
46}
47
48#[async_trait::async_trait]
49impl<M: TryToBytes + Debug + Sync + Send> DmqPublisherClient<M> for DmqPublisherClientPallas<M> {
50    async fn publish_message(&self, message: M) -> StdResult<()> {
51        debug!(
52            self.logger,
53            "Publish message to DMQ";
54            "message" => ?message
55        );
56        let mut client = self.new_client().await?;
57        let message_bytes = &message.to_bytes_vec()?;
58        let dmq_message = self
59            .dmq_message_builder
60            .build(message_bytes)
61            .await
62            .with_context(|| "Failed to build DMQ message")?;
63        client
64            .msg_submission()
65            .send_submit_tx(dmq_message.into())
66            .await
67            .with_context(|| "Failed to submit DMQ message")?;
68        let response = client.msg_submission().recv_submit_tx_response().await;
69        if let Err(e) = client.msg_submission().terminate_gracefully().await {
70            error!(self.logger, "Failed to send Done"; "error" => ?e);
71            client.abort().await;
72        }
73        let response = response?;
74
75        if response != Response::Accepted {
76            anyhow::bail!("Failed to publish DMQ message: {:?}", response);
77        }
78
79        Ok(())
80    }
81}
82
83#[cfg(all(test, unix))]
84mod tests {
85    use std::{fs, sync::Arc, time::Duration};
86
87    use pallas_network::miniprotocols::{
88        localmsgsubmission::{DmqMsgRejectReason, DmqMsgValidationError},
89        localtxsubmission,
90    };
91    use tokio::{net::UnixListener, task::JoinHandle};
92
93    use mithril_cardano_node_chain::test::double::FakeChainObserver;
94    use mithril_common::{
95        current_function,
96        test::{TempDir, crypto_helper::KesSignerFake, double::Dummy},
97    };
98
99    use crate::test::{TestLogger, payload::DmqMessageTestPayload};
100
101    use super::*;
102
103    fn create_temp_dir(folder_name: &str) -> PathBuf {
104        TempDir::create_with_short_path("dmq_publisher", folder_name)
105    }
106
107    fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<StdResult<()>> {
108        tokio::spawn({
109            async move {
110                // server setup
111                if socket_path.exists() {
112                    fs::remove_file(socket_path.clone())?;
113                }
114                let listener = UnixListener::bind(socket_path)?;
115                let mut server = pallas_network::facades::DmqServer::accept(&listener, 0)
116                    .await
117                    .unwrap();
118
119                // init local msg submission server
120                let server_msg = server.msg_submission();
121
122                // server waits for request from client and replies to it
123                let request = server_msg.recv_next_request().await?;
124                match &request {
125                    localtxsubmission::Request::Submit(_) => (),
126                    request => panic!("Expected a Submit request, but received: {request:?}"),
127                }
128                let response = if reply_success {
129                    localtxsubmission::Response::Accepted
130                } else {
131                    localtxsubmission::Response::Rejected(DmqMsgValidationError(
132                        DmqMsgRejectReason::Other("fake error".to_string()),
133                    ))
134                };
135                server_msg.send_submit_tx_response(response).await?;
136
137                // server receives done from client
138                let request = server_msg.recv_next_request().await?;
139                if request != localtxsubmission::Request::Done {
140                    anyhow::bail!("Expected a Done request, but received: {request:?}");
141                }
142
143                Ok(())
144            }
145        })
146    }
147
148    #[tokio::test(flavor = "multi_thread")]
149    async fn pallas_dmq_signature_publisher_success() {
150        let current_function_name = current_function!();
151
152        let socket_path = create_temp_dir(current_function_name).join("node.socket");
153        let reply_success = true;
154        let server = setup_dmq_server(socket_path.clone(), reply_success);
155        let client = tokio::spawn(async move {
156            // sleep to avoid refused connection from the server
157            tokio::time::sleep(Duration::from_millis(10)).await;
158
159            let publisher = DmqPublisherClientPallas::new(
160                socket_path,
161                DmqNetwork::TestNet(0),
162                DmqMessageBuilder::new(
163                    {
164                        let (kes_signature, operational_certificate) =
165                            KesSignerFake::dummy_signature(current_function_name);
166                        let kes_signer = KesSignerFake::new(vec![Ok((
167                            kes_signature,
168                            operational_certificate.clone(),
169                        ))]);
170
171                        Arc::new(kes_signer)
172                    },
173                    Arc::new(FakeChainObserver::default()),
174                )
175                .set_ttl(100),
176                TestLogger::stdout(),
177            );
178
179            publisher.publish_message(DmqMessageTestPayload::dummy()).await
180        });
181
182        let (res_server, res_client) = tokio::join!(server, client);
183
184        res_server.unwrap().unwrap();
185        res_client.unwrap().unwrap();
186    }
187
188    #[tokio::test(flavor = "multi_thread")]
189    async fn pallas_dmq_signature_publisher_fails() {
190        let current_function_name = current_function!();
191        let socket_path = create_temp_dir(current_function_name).join("node.socket");
192        let reply_success = false;
193        let server = setup_dmq_server(socket_path.clone(), reply_success);
194        let client = tokio::spawn(async move {
195            // sleep to avoid refused connection from the server
196            tokio::time::sleep(Duration::from_millis(10)).await;
197
198            let publisher = DmqPublisherClientPallas::new(
199                socket_path,
200                DmqNetwork::TestNet(0),
201                DmqMessageBuilder::new(
202                    {
203                        let (kes_signature, operational_certificate) =
204                            KesSignerFake::dummy_signature(current_function_name);
205                        let kes_signer = KesSignerFake::new(vec![Ok((
206                            kes_signature,
207                            operational_certificate.clone(),
208                        ))]);
209
210                        Arc::new(kes_signer)
211                    },
212                    Arc::new(FakeChainObserver::default()),
213                )
214                .set_ttl(100),
215                TestLogger::stdout(),
216            );
217
218            publisher.publish_message(DmqMessageTestPayload::dummy()).await
219        });
220
221        let (res_server, res_client) = tokio::join!(server, client);
222
223        res_server.unwrap().unwrap();
224        res_client.unwrap().expect_err("Publishing DMQ message should fail");
225    }
226}