mithril_dmq/publisher/client/
pallas.rs1use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
2
3use anyhow::Context;
4use pallas_network::{facades::DmqClient, miniprotocols::localtxsubmission::Response};
5use slog::{Logger, debug, error};
6
7use mithril_common::{StdResult, crypto_helper::TryToBytes, logging::LoggerExtensions};
8
9use crate::{DmqMessageBuilder, DmqPublisherClient, model::DmqNetwork};
10
11pub struct DmqPublisherClientPallas<M: TryToBytes + Debug> {
15 socket: PathBuf,
16 network: DmqNetwork,
17 dmq_message_builder: DmqMessageBuilder,
18 logger: Logger,
19 phantom: PhantomData<M>,
20}
21
22impl<M: TryToBytes + Debug> DmqPublisherClientPallas<M> {
23 pub fn new(
25 socket: PathBuf,
26 network: DmqNetwork,
27 dmq_message_builder: DmqMessageBuilder,
28 logger: Logger,
29 ) -> Self {
30 Self {
31 socket,
32 network,
33 dmq_message_builder,
34 logger: logger.new_with_component_name::<Self>(),
35 phantom: PhantomData,
36 }
37 }
38
39 async fn new_client(&self) -> StdResult<DmqClient> {
41 let magic = self.network.magic_id();
42 DmqClient::connect(&self.socket, magic)
43 .await
44 .with_context(|| "DmqPublisherClientPallas failed to create a new client")
45 }
46}
47
48#[async_trait::async_trait]
49impl<M: TryToBytes + Debug + Sync + Send> DmqPublisherClient<M> for DmqPublisherClientPallas<M> {
50 async fn publish_message(&self, message: M) -> StdResult<()> {
51 debug!(
52 self.logger,
53 "Publish message to DMQ";
54 "message" => ?message
55 );
56 let mut client = self.new_client().await?;
57 let message_bytes = &message.to_bytes_vec()?;
58 let dmq_message = self
59 .dmq_message_builder
60 .build(message_bytes)
61 .await
62 .with_context(|| "Failed to build DMQ message")?;
63 client
64 .msg_submission()
65 .send_submit_tx(dmq_message.into())
66 .await
67 .with_context(|| "Failed to submit DMQ message")?;
68 let response = client.msg_submission().recv_submit_tx_response().await?;
69 if let Err(e) = client.msg_submission().terminate_gracefully().await {
70 error!(self.logger, "Failed to send Done"; "error" => ?e);
71 }
72
73 if response != Response::Accepted {
74 anyhow::bail!("Failed to publish DMQ message: {:?}", response);
75 }
76
77 Ok(())
78 }
79}
80
81#[cfg(all(test, unix))]
82mod tests {
83 use std::{fs, sync::Arc, time::Duration};
84
85 use pallas_network::miniprotocols::{
86 localmsgsubmission::DmqMsgValidationError, localtxsubmission,
87 };
88 use tokio::{net::UnixListener, task::JoinHandle};
89
90 use mithril_cardano_node_chain::test::double::FakeChainObserver;
91 use mithril_common::{
92 current_function,
93 test::{TempDir, crypto_helper::KesSignerFake, double::Dummy},
94 };
95
96 use crate::test::{TestLogger, payload::DmqMessageTestPayload};
97
98 use super::*;
99
100 fn create_temp_dir(folder_name: &str) -> PathBuf {
101 TempDir::create_with_short_path("dmq_publisher", folder_name)
102 }
103
104 fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<()> {
105 tokio::spawn({
106 async move {
107 if socket_path.exists() {
109 fs::remove_file(socket_path.clone()).unwrap();
110 }
111 let listener = UnixListener::bind(socket_path).unwrap();
112 let mut server = pallas_network::facades::DmqServer::accept(&listener, 0)
113 .await
114 .unwrap();
115
116 let server_msg = server.msg_submission();
118
119 let request = server_msg.recv_next_request().await.unwrap();
121 match &request {
122 localtxsubmission::Request::Submit(_) => (),
123 request => panic!("Expected a Submit request, but received: {request:?}"),
124 }
125 let response = if reply_success {
126 localtxsubmission::Response::Accepted
127 } else {
128 localtxsubmission::Response::Rejected(DmqMsgValidationError(
129 "fake error".to_string(),
130 ))
131 };
132 server_msg.send_submit_tx_response(response).await.unwrap();
133
134 let request = server_msg.recv_next_request().await.unwrap();
136 assert_eq!(localtxsubmission::Request::Done, request);
137 }
138 })
139 }
140
141 #[tokio::test(flavor = "multi_thread")]
142 async fn pallas_dmq_signature_publisher_success() {
143 let current_function_name = current_function!();
144
145 let socket_path = create_temp_dir(current_function_name).join("node.socket");
146 let reply_success = true;
147 let server = setup_dmq_server(socket_path.clone(), reply_success);
148 let client = tokio::spawn(async move {
149 tokio::time::sleep(Duration::from_millis(10)).await;
151
152 let publisher = DmqPublisherClientPallas::new(
153 socket_path,
154 DmqNetwork::TestNet(0),
155 DmqMessageBuilder::new(
156 {
157 let (kes_signature, operational_certificate) =
158 KesSignerFake::dummy_signature(current_function_name);
159 let kes_signer = KesSignerFake::new(vec![Ok((
160 kes_signature,
161 operational_certificate.clone(),
162 ))]);
163
164 Arc::new(kes_signer)
165 },
166 Arc::new(FakeChainObserver::default()),
167 )
168 .set_ttl(100),
169 TestLogger::stdout(),
170 );
171
172 publisher.publish_message(DmqMessageTestPayload::dummy()).await
173 });
174
175 let (_, res) = tokio::join!(server, client);
176
177 res.unwrap().unwrap();
178 }
179
180 #[tokio::test(flavor = "multi_thread")]
181 async fn pallas_dmq_signature_publisher_fails() {
182 let current_function_name = current_function!();
183 let socket_path = create_temp_dir(current_function_name).join("node.socket");
184 let reply_success = false;
185 let server = setup_dmq_server(socket_path.clone(), reply_success);
186 let client = tokio::spawn(async move {
187 tokio::time::sleep(Duration::from_millis(10)).await;
189
190 let publisher = DmqPublisherClientPallas::new(
191 socket_path,
192 DmqNetwork::TestNet(0),
193 DmqMessageBuilder::new(
194 {
195 let (kes_signature, operational_certificate) =
196 KesSignerFake::dummy_signature(current_function_name);
197 let kes_signer = KesSignerFake::new(vec![Ok((
198 kes_signature,
199 operational_certificate.clone(),
200 ))]);
201
202 Arc::new(kes_signer)
203 },
204 Arc::new(FakeChainObserver::default()),
205 )
206 .set_ttl(100),
207 TestLogger::stdout(),
208 );
209
210 publisher.publish_message(DmqMessageTestPayload::dummy()).await
211 });
212
213 let (_, res) = tokio::join!(server, client);
214
215 res.unwrap().expect_err("Publishing DMQ message should fail");
216 }
217}