mithril_dmq/publisher/client/
pallas.rs1use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
2
3use anyhow::Context;
4use pallas_network::{facades::DmqClient, miniprotocols::localtxsubmission::Response};
5use slog::{Logger, debug, error};
6
7use mithril_common::{StdResult, crypto_helper::TryToBytes, logging::LoggerExtensions};
8
9use crate::{DmqMessageBuilder, DmqPublisherClient, model::DmqNetwork};
10
11pub struct DmqPublisherClientPallas<M: TryToBytes + Debug> {
15 socket: PathBuf,
16 network: DmqNetwork,
17 dmq_message_builder: DmqMessageBuilder,
18 logger: Logger,
19 phantom: PhantomData<M>,
20}
21
22impl<M: TryToBytes + Debug> DmqPublisherClientPallas<M> {
23 pub fn new(
25 socket: PathBuf,
26 network: DmqNetwork,
27 dmq_message_builder: DmqMessageBuilder,
28 logger: Logger,
29 ) -> Self {
30 Self {
31 socket,
32 network,
33 dmq_message_builder,
34 logger: logger.new_with_component_name::<Self>(),
35 phantom: PhantomData,
36 }
37 }
38
39 async fn new_client(&self) -> StdResult<DmqClient> {
41 let magic = self.network.magic_id();
42 DmqClient::connect(&self.socket, magic)
43 .await
44 .with_context(|| "DmqPublisherClientPallas failed to create a new client")
45 }
46}
47
48#[async_trait::async_trait]
49impl<M: TryToBytes + Debug + Sync + Send> DmqPublisherClient<M> for DmqPublisherClientPallas<M> {
50 async fn publish_message(&self, message: M) -> StdResult<()> {
51 debug!(
52 self.logger,
53 "Publish message to DMQ";
54 "message" => ?message
55 );
56 let mut client = self.new_client().await?;
57 let message_bytes = &message.to_bytes_vec()?;
58 let dmq_message = self
59 .dmq_message_builder
60 .build(message_bytes)
61 .await
62 .with_context(|| "Failed to build DMQ message")?;
63 client
64 .msg_submission()
65 .send_submit_tx(dmq_message.into())
66 .await
67 .with_context(|| "Failed to submit DMQ message")?;
68 let response = client.msg_submission().recv_submit_tx_response().await;
69 if let Err(e) = client.msg_submission().terminate_gracefully().await {
70 error!(self.logger, "Failed to send Done"; "error" => ?e);
71 client.abort().await;
72 }
73 let response = response?;
74
75 if response != Response::Accepted {
76 anyhow::bail!("Failed to publish DMQ message: {:?}", response);
77 }
78
79 Ok(())
80 }
81}
82
83#[cfg(all(test, unix))]
84mod tests {
85 use std::{fs, sync::Arc, time::Duration};
86
87 use pallas_network::miniprotocols::{
88 localmsgsubmission::{DmqMsgRejectReason, DmqMsgValidationError},
89 localtxsubmission,
90 };
91 use tokio::{net::UnixListener, task::JoinHandle};
92
93 use mithril_cardano_node_chain::test::double::FakeChainObserver;
94 use mithril_common::{
95 current_function,
96 test::{TempDir, crypto_helper::KesSignerFake, double::Dummy},
97 };
98
99 use crate::test::{TestLogger, payload::DmqMessageTestPayload};
100
101 use super::*;
102
103 fn create_temp_dir(folder_name: &str) -> PathBuf {
104 TempDir::create_with_short_path("dmq_publisher", folder_name)
105 }
106
107 fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<StdResult<()>> {
108 tokio::spawn({
109 async move {
110 if socket_path.exists() {
112 fs::remove_file(socket_path.clone())?;
113 }
114 let listener = UnixListener::bind(socket_path)?;
115 let mut server = pallas_network::facades::DmqServer::accept(&listener, 0)
116 .await
117 .unwrap();
118
119 let server_msg = server.msg_submission();
121
122 let request = server_msg.recv_next_request().await?;
124 match &request {
125 localtxsubmission::Request::Submit(_) => (),
126 request => panic!("Expected a Submit request, but received: {request:?}"),
127 }
128 let response = if reply_success {
129 localtxsubmission::Response::Accepted
130 } else {
131 localtxsubmission::Response::Rejected(DmqMsgValidationError(
132 DmqMsgRejectReason::Other("fake error".to_string()),
133 ))
134 };
135 server_msg.send_submit_tx_response(response).await?;
136
137 let request = server_msg.recv_next_request().await?;
139 if request != localtxsubmission::Request::Done {
140 anyhow::bail!("Expected a Done request, but received: {request:?}");
141 }
142
143 Ok(())
144 }
145 })
146 }
147
148 #[tokio::test(flavor = "multi_thread")]
149 async fn pallas_dmq_signature_publisher_success() {
150 let current_function_name = current_function!();
151
152 let socket_path = create_temp_dir(current_function_name).join("node.socket");
153 let reply_success = true;
154 let server = setup_dmq_server(socket_path.clone(), reply_success);
155 let client = tokio::spawn(async move {
156 tokio::time::sleep(Duration::from_millis(10)).await;
158
159 let publisher = DmqPublisherClientPallas::new(
160 socket_path,
161 DmqNetwork::TestNet(0),
162 DmqMessageBuilder::new(
163 {
164 let (kes_signature, operational_certificate) =
165 KesSignerFake::dummy_signature(current_function_name);
166 let kes_signer = KesSignerFake::new(vec![Ok((
167 kes_signature,
168 operational_certificate.clone(),
169 ))]);
170
171 Arc::new(kes_signer)
172 },
173 Arc::new(FakeChainObserver::default()),
174 )
175 .set_ttl(100),
176 TestLogger::stdout(),
177 );
178
179 publisher.publish_message(DmqMessageTestPayload::dummy()).await
180 });
181
182 let (res_server, res_client) = tokio::join!(server, client);
183
184 res_server.unwrap().unwrap();
185 res_client.unwrap().unwrap();
186 }
187
188 #[tokio::test(flavor = "multi_thread")]
189 async fn pallas_dmq_signature_publisher_fails() {
190 let current_function_name = current_function!();
191 let socket_path = create_temp_dir(current_function_name).join("node.socket");
192 let reply_success = false;
193 let server = setup_dmq_server(socket_path.clone(), reply_success);
194 let client = tokio::spawn(async move {
195 tokio::time::sleep(Duration::from_millis(10)).await;
197
198 let publisher = DmqPublisherClientPallas::new(
199 socket_path,
200 DmqNetwork::TestNet(0),
201 DmqMessageBuilder::new(
202 {
203 let (kes_signature, operational_certificate) =
204 KesSignerFake::dummy_signature(current_function_name);
205 let kes_signer = KesSignerFake::new(vec![Ok((
206 kes_signature,
207 operational_certificate.clone(),
208 ))]);
209
210 Arc::new(kes_signer)
211 },
212 Arc::new(FakeChainObserver::default()),
213 )
214 .set_ttl(100),
215 TestLogger::stdout(),
216 );
217
218 publisher.publish_message(DmqMessageTestPayload::dummy()).await
219 });
220
221 let (res_server, res_client) = tokio::join!(server, client);
222
223 res_server.unwrap().unwrap();
224 res_client.unwrap().expect_err("Publishing DMQ message should fail");
225 }
226}