mithril_dmq/publisher/
pallas.rs1use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
2
3use anyhow::Context;
4use pallas_network::{facades::DmqClient, miniprotocols::localtxsubmission::Response};
5use slog::{Logger, debug, error};
6
7use mithril_common::{
8 CardanoNetwork, StdResult, crypto_helper::TryToBytes, logging::LoggerExtensions,
9};
10
11use crate::{DmqMessageBuilder, DmqPublisher};
12
13pub struct DmqPublisherPallas<M: TryToBytes + Debug> {
17 socket: PathBuf,
18 network: CardanoNetwork,
19 dmq_message_builder: DmqMessageBuilder,
20 logger: Logger,
21 phantom: PhantomData<M>,
22}
23
24impl<M: TryToBytes + Debug> DmqPublisherPallas<M> {
25 pub fn new(
27 socket: PathBuf,
28 network: CardanoNetwork,
29 dmq_message_builder: DmqMessageBuilder,
30 logger: Logger,
31 ) -> Self {
32 Self {
33 socket,
34 network,
35 dmq_message_builder,
36 logger: logger.new_with_component_name::<Self>(),
37 phantom: PhantomData,
38 }
39 }
40
41 async fn new_client(&self) -> StdResult<DmqClient> {
43 let magic = self.network.magic_id();
44 DmqClient::connect(&self.socket, magic)
45 .await
46 .with_context(|| "DmqPublisherPallas failed to create a new client")
47 }
48}
49
50#[async_trait::async_trait]
51impl<M: TryToBytes + Debug + Sync + Send> DmqPublisher<M> for DmqPublisherPallas<M> {
52 async fn publish_message(&self, message: M) -> StdResult<()> {
53 debug!(
54 self.logger,
55 "Publish message to DMQ";
56 "message" => ?message
57 );
58 let mut client = self.new_client().await?;
59 let message_bytes = &message.to_bytes_vec()?;
60 let dmq_message = self
61 .dmq_message_builder
62 .build(message_bytes)
63 .await
64 .with_context(|| "Failed to build DMQ message")?;
65 client
66 .msg_submission()
67 .send_submit_tx(dmq_message)
68 .await
69 .with_context(|| "Failed to submit DMQ message")?;
70 let response = client.msg_submission().recv_submit_tx_response().await?;
71 if let Err(e) = client.msg_submission().terminate_gracefully().await {
72 error!(self.logger, "Failed to send Done"; "error" => ?e);
73 }
74
75 if response != Response::Accepted {
76 anyhow::bail!("Failed to publish DMQ message: {:?}", response);
77 }
78
79 Ok(())
80 }
81}
82
83#[cfg(all(test, unix))]
84mod tests {
85
86 use std::{fs, sync::Arc};
87
88 use pallas_network::miniprotocols::{
89 localmsgsubmission::DmqMsgValidationError, localtxsubmission,
90 };
91 use tokio::{net::UnixListener, task::JoinHandle};
92
93 use mithril_cardano_node_chain::test::double::FakeChainObserver;
94 use mithril_common::{crypto_helper::KesSignerFake, current_function, test_utils::TempDir};
95
96 use crate::{test::payload::DmqMessageTestPayload, test_tools::TestLogger};
97
98 use super::*;
99
100 fn create_temp_dir(folder_name: &str) -> PathBuf {
101 TempDir::create_with_short_path("dmq_publisher", folder_name)
102 }
103
104 fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<()> {
105 tokio::spawn({
106 async move {
107 if socket_path.exists() {
109 fs::remove_file(socket_path.clone()).unwrap();
110 }
111 let listener = UnixListener::bind(socket_path).unwrap();
112 let mut server = pallas_network::facades::DmqServer::accept(&listener, 0)
113 .await
114 .unwrap();
115
116 let server_msg = server.msg_submission();
118
119 let request = server_msg.recv_next_request().await.unwrap();
121 match &request {
122 localtxsubmission::Request::Submit(_) => (),
123 request => panic!("Expected a Submit request, but received: {request:?}"),
124 }
125 let response = if reply_success {
126 localtxsubmission::Response::Accepted
127 } else {
128 localtxsubmission::Response::Rejected(DmqMsgValidationError(
129 "fake error".to_string(),
130 ))
131 };
132 server_msg.send_submit_tx_response(response).await.unwrap();
133
134 let request = server_msg.recv_next_request().await.unwrap();
136 assert_eq!(localtxsubmission::Request::Done, request);
137 }
138 })
139 }
140
141 #[tokio::test]
142 async fn pallas_dmq_signature_publisher_success() {
143 let socket_path = create_temp_dir(current_function!()).join("node.socket");
144 let reply_success = true;
145 let server = setup_dmq_server(socket_path.clone(), reply_success);
146 let client = tokio::spawn(async move {
147 let publisher = DmqPublisherPallas::new(
148 socket_path,
149 CardanoNetwork::TestNet(0),
150 DmqMessageBuilder::new(
151 {
152 let (kes_signature, operational_certificate) =
153 KesSignerFake::dummy_signature();
154 let kes_signer = KesSignerFake::new(vec![Ok((
155 kes_signature,
156 operational_certificate.clone(),
157 ))]);
158
159 Arc::new(kes_signer)
160 },
161 Arc::new(FakeChainObserver::default()),
162 )
163 .set_ttl(100),
164 TestLogger::stdout(),
165 );
166
167 publisher.publish_message(DmqMessageTestPayload::dummy()).await
168 });
169
170 let (_, res) = tokio::join!(server, client);
171
172 res.unwrap().unwrap();
173 }
174
175 #[tokio::test]
176 async fn pallas_dmq_signature_publisher_fails() {
177 let socket_path = create_temp_dir(current_function!()).join("node.socket");
178 let reply_success = false;
179 let server = setup_dmq_server(socket_path.clone(), reply_success);
180 let client = tokio::spawn(async move {
181 let publisher = DmqPublisherPallas::new(
182 socket_path,
183 CardanoNetwork::TestNet(0),
184 DmqMessageBuilder::new(
185 {
186 let (kes_signature, operational_certificate) =
187 KesSignerFake::dummy_signature();
188 let kes_signer = KesSignerFake::new(vec![Ok((
189 kes_signature,
190 operational_certificate.clone(),
191 ))]);
192
193 Arc::new(kes_signer)
194 },
195 Arc::new(FakeChainObserver::default()),
196 )
197 .set_ttl(100),
198 TestLogger::stdout(),
199 );
200
201 publisher.publish_message(DmqMessageTestPayload::dummy()).await
202 });
203
204 let (_, res) = tokio::join!(server, client);
205
206 res.unwrap().expect_err("Publishing DMQ message should fail");
207 }
208}