mithril_dmq/publisher/client/
pallas.rs1use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
2
3use anyhow::Context;
4use pallas_network::{facades::DmqClient, miniprotocols::localtxsubmission::Response};
5use slog::{Logger, debug, error};
6
7use mithril_common::{
8 CardanoNetwork, StdResult, crypto_helper::TryToBytes, logging::LoggerExtensions,
9};
10
11use crate::{DmqMessageBuilder, DmqPublisherClient};
12
13pub struct DmqPublisherClientPallas<M: TryToBytes + Debug> {
17 socket: PathBuf,
18 network: CardanoNetwork,
19 dmq_message_builder: DmqMessageBuilder,
20 logger: Logger,
21 phantom: PhantomData<M>,
22}
23
24impl<M: TryToBytes + Debug> DmqPublisherClientPallas<M> {
25 pub fn new(
27 socket: PathBuf,
28 network: CardanoNetwork,
29 dmq_message_builder: DmqMessageBuilder,
30 logger: Logger,
31 ) -> Self {
32 Self {
33 socket,
34 network,
35 dmq_message_builder,
36 logger: logger.new_with_component_name::<Self>(),
37 phantom: PhantomData,
38 }
39 }
40
41 async fn new_client(&self) -> StdResult<DmqClient> {
43 let magic = self.network.magic_id();
44 DmqClient::connect(&self.socket, magic)
45 .await
46 .with_context(|| "DmqPublisherClientPallas failed to create a new client")
47 }
48}
49
50#[async_trait::async_trait]
51impl<M: TryToBytes + Debug + Sync + Send> DmqPublisherClient<M> for DmqPublisherClientPallas<M> {
52 async fn publish_message(&self, message: M) -> StdResult<()> {
53 debug!(
54 self.logger,
55 "Publish message to DMQ";
56 "message" => ?message
57 );
58 let mut client = self.new_client().await?;
59 let message_bytes = &message.to_bytes_vec()?;
60 let dmq_message = self
61 .dmq_message_builder
62 .build(message_bytes)
63 .await
64 .with_context(|| "Failed to build DMQ message")?;
65 client
66 .msg_submission()
67 .send_submit_tx(dmq_message.into())
68 .await
69 .with_context(|| "Failed to submit DMQ message")?;
70 let response = client.msg_submission().recv_submit_tx_response().await?;
71 if let Err(e) = client.msg_submission().terminate_gracefully().await {
72 error!(self.logger, "Failed to send Done"; "error" => ?e);
73 }
74
75 if response != Response::Accepted {
76 anyhow::bail!("Failed to publish DMQ message: {:?}", response);
77 }
78
79 Ok(())
80 }
81}
82
83#[cfg(all(test, unix))]
84mod tests {
85 use std::{fs, sync::Arc, time::Duration};
86
87 use pallas_network::miniprotocols::{
88 localmsgsubmission::DmqMsgValidationError, localtxsubmission,
89 };
90 use tokio::{net::UnixListener, task::JoinHandle};
91
92 use mithril_cardano_node_chain::test::double::FakeChainObserver;
93 use mithril_common::{
94 current_function,
95 test::{TempDir, crypto_helper::KesSignerFake, double::Dummy},
96 };
97
98 use crate::test::{TestLogger, payload::DmqMessageTestPayload};
99
100 use super::*;
101
102 fn create_temp_dir(folder_name: &str) -> PathBuf {
103 TempDir::create_with_short_path("dmq_publisher", folder_name)
104 }
105
106 fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<()> {
107 tokio::spawn({
108 async move {
109 if socket_path.exists() {
111 fs::remove_file(socket_path.clone()).unwrap();
112 }
113 let listener = UnixListener::bind(socket_path).unwrap();
114 let mut server = pallas_network::facades::DmqServer::accept(&listener, 0)
115 .await
116 .unwrap();
117
118 let server_msg = server.msg_submission();
120
121 let request = server_msg.recv_next_request().await.unwrap();
123 match &request {
124 localtxsubmission::Request::Submit(_) => (),
125 request => panic!("Expected a Submit request, but received: {request:?}"),
126 }
127 let response = if reply_success {
128 localtxsubmission::Response::Accepted
129 } else {
130 localtxsubmission::Response::Rejected(DmqMsgValidationError(
131 "fake error".to_string(),
132 ))
133 };
134 server_msg.send_submit_tx_response(response).await.unwrap();
135
136 let request = server_msg.recv_next_request().await.unwrap();
138 assert_eq!(localtxsubmission::Request::Done, request);
139 }
140 })
141 }
142
143 #[tokio::test(flavor = "multi_thread")]
144 async fn pallas_dmq_signature_publisher_success() {
145 let current_function_name = current_function!();
146
147 let socket_path = create_temp_dir(current_function_name).join("node.socket");
148 let reply_success = true;
149 let server = setup_dmq_server(socket_path.clone(), reply_success);
150 let client = tokio::spawn(async move {
151 tokio::time::sleep(Duration::from_millis(10)).await;
153
154 let publisher = DmqPublisherClientPallas::new(
155 socket_path,
156 CardanoNetwork::TestNet(0),
157 DmqMessageBuilder::new(
158 {
159 let (kes_signature, operational_certificate) =
160 KesSignerFake::dummy_signature(current_function_name);
161 let kes_signer = KesSignerFake::new(vec![Ok((
162 kes_signature,
163 operational_certificate.clone(),
164 ))]);
165
166 Arc::new(kes_signer)
167 },
168 Arc::new(FakeChainObserver::default()),
169 )
170 .set_ttl(100),
171 TestLogger::stdout(),
172 );
173
174 publisher.publish_message(DmqMessageTestPayload::dummy()).await
175 });
176
177 let (_, res) = tokio::join!(server, client);
178
179 res.unwrap().unwrap();
180 }
181
182 #[tokio::test(flavor = "multi_thread")]
183 async fn pallas_dmq_signature_publisher_fails() {
184 let current_function_name = current_function!();
185 let socket_path = create_temp_dir(current_function_name).join("node.socket");
186 let reply_success = false;
187 let server = setup_dmq_server(socket_path.clone(), reply_success);
188 let client = tokio::spawn(async move {
189 tokio::time::sleep(Duration::from_millis(10)).await;
191
192 let publisher = DmqPublisherClientPallas::new(
193 socket_path,
194 CardanoNetwork::TestNet(0),
195 DmqMessageBuilder::new(
196 {
197 let (kes_signature, operational_certificate) =
198 KesSignerFake::dummy_signature(current_function_name);
199 let kes_signer = KesSignerFake::new(vec![Ok((
200 kes_signature,
201 operational_certificate.clone(),
202 ))]);
203
204 Arc::new(kes_signer)
205 },
206 Arc::new(FakeChainObserver::default()),
207 )
208 .set_ttl(100),
209 TestLogger::stdout(),
210 );
211
212 publisher.publish_message(DmqMessageTestPayload::dummy()).await
213 });
214
215 let (_, res) = tokio::join!(server, client);
216
217 res.unwrap().expect_err("Publishing DMQ message should fail");
218 }
219}