mithril_dmq/publisher/
pallas.rs

1use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
2
3use anyhow::Context;
4use pallas_network::{facades::DmqClient, miniprotocols::localtxsubmission::Response};
5use slog::{Logger, debug, error};
6
7use mithril_common::{
8    CardanoNetwork, StdResult, crypto_helper::TryToBytes, logging::LoggerExtensions,
9};
10
11use crate::{DmqMessageBuilder, DmqPublisher};
12
13/// A DMQ publisher implementation.
14///
15/// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas.
16pub struct DmqPublisherPallas<M: TryToBytes + Debug> {
17    socket: PathBuf,
18    network: CardanoNetwork,
19    dmq_message_builder: DmqMessageBuilder,
20    logger: Logger,
21    phantom: PhantomData<M>,
22}
23
24impl<M: TryToBytes + Debug> DmqPublisherPallas<M> {
25    /// Creates a new instance of [DmqPublisherPallas].
26    pub fn new(
27        socket: PathBuf,
28        network: CardanoNetwork,
29        dmq_message_builder: DmqMessageBuilder,
30        logger: Logger,
31    ) -> Self {
32        Self {
33            socket,
34            network,
35            dmq_message_builder,
36            logger: logger.new_with_component_name::<Self>(),
37            phantom: PhantomData,
38        }
39    }
40
41    /// Creates and returns a new `DmqClient` connected to the specified socket.
42    async fn new_client(&self) -> StdResult<DmqClient> {
43        let magic = self.network.magic_id();
44        DmqClient::connect(&self.socket, magic)
45            .await
46            .with_context(|| "DmqPublisherPallas failed to create a new client")
47    }
48}
49
50#[async_trait::async_trait]
51impl<M: TryToBytes + Debug + Sync + Send> DmqPublisher<M> for DmqPublisherPallas<M> {
52    async fn publish_message(&self, message: M) -> StdResult<()> {
53        debug!(
54            self.logger,
55            "Publish message to DMQ";
56            "message" => ?message
57        );
58        let mut client = self.new_client().await?;
59        let message_bytes = &message.to_bytes_vec()?;
60        let dmq_message = self
61            .dmq_message_builder
62            .build(message_bytes)
63            .await
64            .with_context(|| "Failed to build DMQ message")?;
65        client
66            .msg_submission()
67            .send_submit_tx(dmq_message)
68            .await
69            .with_context(|| "Failed to submit DMQ message")?;
70        let response = client.msg_submission().recv_submit_tx_response().await?;
71        if let Err(e) = client.msg_submission().terminate_gracefully().await {
72            error!(self.logger, "Failed to send Done"; "error" => ?e);
73        }
74
75        if response != Response::Accepted {
76            anyhow::bail!("Failed to publish DMQ message: {:?}", response);
77        }
78
79        Ok(())
80    }
81}
82
83#[cfg(all(test, unix))]
84mod tests {
85
86    use std::{fs, sync::Arc};
87
88    use pallas_network::miniprotocols::{
89        localmsgsubmission::DmqMsgValidationError, localtxsubmission,
90    };
91    use tokio::{net::UnixListener, task::JoinHandle};
92
93    use mithril_cardano_node_chain::test::double::FakeChainObserver;
94    use mithril_common::{crypto_helper::KesSignerFake, current_function, test_utils::TempDir};
95
96    use crate::{test::payload::DmqMessageTestPayload, test_tools::TestLogger};
97
98    use super::*;
99
100    fn create_temp_dir(folder_name: &str) -> PathBuf {
101        TempDir::create_with_short_path("dmq_publisher", folder_name)
102    }
103
104    fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<()> {
105        tokio::spawn({
106            async move {
107                // server setup
108                if socket_path.exists() {
109                    fs::remove_file(socket_path.clone()).unwrap();
110                }
111                let listener = UnixListener::bind(socket_path).unwrap();
112                let mut server = pallas_network::facades::DmqServer::accept(&listener, 0)
113                    .await
114                    .unwrap();
115
116                // init local msg submission server
117                let server_msg = server.msg_submission();
118
119                // server waits for request from client and replies to it
120                let request = server_msg.recv_next_request().await.unwrap();
121                match &request {
122                    localtxsubmission::Request::Submit(_) => (),
123                    request => panic!("Expected a Submit request, but received: {request:?}"),
124                }
125                let response = if reply_success {
126                    localtxsubmission::Response::Accepted
127                } else {
128                    localtxsubmission::Response::Rejected(DmqMsgValidationError(
129                        "fake error".to_string(),
130                    ))
131                };
132                server_msg.send_submit_tx_response(response).await.unwrap();
133
134                // server receives done from client
135                let request = server_msg.recv_next_request().await.unwrap();
136                assert_eq!(localtxsubmission::Request::Done, request);
137            }
138        })
139    }
140
141    #[tokio::test]
142    async fn pallas_dmq_signature_publisher_success() {
143        let socket_path = create_temp_dir(current_function!()).join("node.socket");
144        let reply_success = true;
145        let server = setup_dmq_server(socket_path.clone(), reply_success);
146        let client = tokio::spawn(async move {
147            let publisher = DmqPublisherPallas::new(
148                socket_path,
149                CardanoNetwork::TestNet(0),
150                DmqMessageBuilder::new(
151                    {
152                        let (kes_signature, operational_certificate) =
153                            KesSignerFake::dummy_signature();
154                        let kes_signer = KesSignerFake::new(vec![Ok((
155                            kes_signature,
156                            operational_certificate.clone(),
157                        ))]);
158
159                        Arc::new(kes_signer)
160                    },
161                    Arc::new(FakeChainObserver::default()),
162                )
163                .set_ttl(100),
164                TestLogger::stdout(),
165            );
166
167            publisher.publish_message(DmqMessageTestPayload::dummy()).await
168        });
169
170        let (_, res) = tokio::join!(server, client);
171
172        res.unwrap().unwrap();
173    }
174
175    #[tokio::test]
176    async fn pallas_dmq_signature_publisher_fails() {
177        let socket_path = create_temp_dir(current_function!()).join("node.socket");
178        let reply_success = false;
179        let server = setup_dmq_server(socket_path.clone(), reply_success);
180        let client = tokio::spawn(async move {
181            let publisher = DmqPublisherPallas::new(
182                socket_path,
183                CardanoNetwork::TestNet(0),
184                DmqMessageBuilder::new(
185                    {
186                        let (kes_signature, operational_certificate) =
187                            KesSignerFake::dummy_signature();
188                        let kes_signer = KesSignerFake::new(vec![Ok((
189                            kes_signature,
190                            operational_certificate.clone(),
191                        ))]);
192
193                        Arc::new(kes_signer)
194                    },
195                    Arc::new(FakeChainObserver::default()),
196                )
197                .set_ttl(100),
198                TestLogger::stdout(),
199            );
200
201            publisher.publish_message(DmqMessageTestPayload::dummy()).await
202        });
203
204        let (_, res) = tokio::join!(server, client);
205
206        res.unwrap().expect_err("Publishing DMQ message should fail");
207    }
208}