mithril_dmq/publisher/client/
pallas.rs

1use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
2
3use anyhow::Context;
4use pallas_network::{facades::DmqClient, miniprotocols::localtxsubmission::Response};
5use slog::{Logger, debug, error};
6
7use mithril_common::{
8    CardanoNetwork, StdResult, crypto_helper::TryToBytes, logging::LoggerExtensions,
9};
10
11use crate::{DmqMessageBuilder, DmqPublisherClient};
12
13/// A DMQ client publisher implementation.
14///
15/// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas.
16pub struct DmqPublisherClientPallas<M: TryToBytes + Debug> {
17    socket: PathBuf,
18    network: CardanoNetwork,
19    dmq_message_builder: DmqMessageBuilder,
20    logger: Logger,
21    phantom: PhantomData<M>,
22}
23
24impl<M: TryToBytes + Debug> DmqPublisherClientPallas<M> {
25    /// Creates a new instance of [DmqPublisherClientPallas].
26    pub fn new(
27        socket: PathBuf,
28        network: CardanoNetwork,
29        dmq_message_builder: DmqMessageBuilder,
30        logger: Logger,
31    ) -> Self {
32        Self {
33            socket,
34            network,
35            dmq_message_builder,
36            logger: logger.new_with_component_name::<Self>(),
37            phantom: PhantomData,
38        }
39    }
40
41    /// Creates and returns a new `DmqClient` connected to the specified socket.
42    async fn new_client(&self) -> StdResult<DmqClient> {
43        let magic = self.network.magic_id();
44        DmqClient::connect(&self.socket, magic)
45            .await
46            .with_context(|| "DmqPublisherClientPallas failed to create a new client")
47    }
48}
49
50#[async_trait::async_trait]
51impl<M: TryToBytes + Debug + Sync + Send> DmqPublisherClient<M> for DmqPublisherClientPallas<M> {
52    async fn publish_message(&self, message: M) -> StdResult<()> {
53        debug!(
54            self.logger,
55            "Publish message to DMQ";
56            "message" => ?message
57        );
58        let mut client = self.new_client().await?;
59        let message_bytes = &message.to_bytes_vec()?;
60        let dmq_message = self
61            .dmq_message_builder
62            .build(message_bytes)
63            .await
64            .with_context(|| "Failed to build DMQ message")?;
65        client
66            .msg_submission()
67            .send_submit_tx(dmq_message.into())
68            .await
69            .with_context(|| "Failed to submit DMQ message")?;
70        let response = client.msg_submission().recv_submit_tx_response().await?;
71        if let Err(e) = client.msg_submission().terminate_gracefully().await {
72            error!(self.logger, "Failed to send Done"; "error" => ?e);
73        }
74
75        if response != Response::Accepted {
76            anyhow::bail!("Failed to publish DMQ message: {:?}", response);
77        }
78
79        Ok(())
80    }
81}
82
83#[cfg(all(test, unix))]
84mod tests {
85    use std::{fs, sync::Arc, time::Duration};
86
87    use pallas_network::miniprotocols::{
88        localmsgsubmission::DmqMsgValidationError, localtxsubmission,
89    };
90    use tokio::{net::UnixListener, task::JoinHandle};
91
92    use mithril_cardano_node_chain::test::double::FakeChainObserver;
93    use mithril_common::{
94        current_function,
95        test::{TempDir, crypto_helper::KesSignerFake, double::Dummy},
96    };
97
98    use crate::test::{TestLogger, payload::DmqMessageTestPayload};
99
100    use super::*;
101
102    fn create_temp_dir(folder_name: &str) -> PathBuf {
103        TempDir::create_with_short_path("dmq_publisher", folder_name)
104    }
105
106    fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<()> {
107        tokio::spawn({
108            async move {
109                // server setup
110                if socket_path.exists() {
111                    fs::remove_file(socket_path.clone()).unwrap();
112                }
113                let listener = UnixListener::bind(socket_path).unwrap();
114                let mut server = pallas_network::facades::DmqServer::accept(&listener, 0)
115                    .await
116                    .unwrap();
117
118                // init local msg submission server
119                let server_msg = server.msg_submission();
120
121                // server waits for request from client and replies to it
122                let request = server_msg.recv_next_request().await.unwrap();
123                match &request {
124                    localtxsubmission::Request::Submit(_) => (),
125                    request => panic!("Expected a Submit request, but received: {request:?}"),
126                }
127                let response = if reply_success {
128                    localtxsubmission::Response::Accepted
129                } else {
130                    localtxsubmission::Response::Rejected(DmqMsgValidationError(
131                        "fake error".to_string(),
132                    ))
133                };
134                server_msg.send_submit_tx_response(response).await.unwrap();
135
136                // server receives done from client
137                let request = server_msg.recv_next_request().await.unwrap();
138                assert_eq!(localtxsubmission::Request::Done, request);
139            }
140        })
141    }
142
143    #[tokio::test(flavor = "multi_thread")]
144    async fn pallas_dmq_signature_publisher_success() {
145        let current_function_name = current_function!();
146
147        let socket_path = create_temp_dir(current_function_name).join("node.socket");
148        let reply_success = true;
149        let server = setup_dmq_server(socket_path.clone(), reply_success);
150        let client = tokio::spawn(async move {
151            // sleep to avoid refused connection from the server
152            tokio::time::sleep(Duration::from_millis(10)).await;
153
154            let publisher = DmqPublisherClientPallas::new(
155                socket_path,
156                CardanoNetwork::TestNet(0),
157                DmqMessageBuilder::new(
158                    {
159                        let (kes_signature, operational_certificate) =
160                            KesSignerFake::dummy_signature(current_function_name);
161                        let kes_signer = KesSignerFake::new(vec![Ok((
162                            kes_signature,
163                            operational_certificate.clone(),
164                        ))]);
165
166                        Arc::new(kes_signer)
167                    },
168                    Arc::new(FakeChainObserver::default()),
169                )
170                .set_ttl(100),
171                TestLogger::stdout(),
172            );
173
174            publisher.publish_message(DmqMessageTestPayload::dummy()).await
175        });
176
177        let (_, res) = tokio::join!(server, client);
178
179        res.unwrap().unwrap();
180    }
181
182    #[tokio::test(flavor = "multi_thread")]
183    async fn pallas_dmq_signature_publisher_fails() {
184        let current_function_name = current_function!();
185        let socket_path = create_temp_dir(current_function_name).join("node.socket");
186        let reply_success = false;
187        let server = setup_dmq_server(socket_path.clone(), reply_success);
188        let client = tokio::spawn(async move {
189            // sleep to avoid refused connection from the server
190            tokio::time::sleep(Duration::from_millis(10)).await;
191
192            let publisher = DmqPublisherClientPallas::new(
193                socket_path,
194                CardanoNetwork::TestNet(0),
195                DmqMessageBuilder::new(
196                    {
197                        let (kes_signature, operational_certificate) =
198                            KesSignerFake::dummy_signature(current_function_name);
199                        let kes_signer = KesSignerFake::new(vec![Ok((
200                            kes_signature,
201                            operational_certificate.clone(),
202                        ))]);
203
204                        Arc::new(kes_signer)
205                    },
206                    Arc::new(FakeChainObserver::default()),
207                )
208                .set_ttl(100),
209                TestLogger::stdout(),
210            );
211
212            publisher.publish_message(DmqMessageTestPayload::dummy()).await
213        });
214
215        let (_, res) = tokio::join!(server, client);
216
217        res.unwrap().expect_err("Publishing DMQ message should fail");
218    }
219}