mithril_dmq/publisher/server/
pallas.rs1use std::{fs, path::PathBuf};
2
3use anyhow::{Context, anyhow};
4use pallas_network::{
5 facades::DmqServer,
6 miniprotocols::{
7 localmsgsubmission::DmqMsgValidationError,
8 localtxsubmission::{Request, Response},
9 },
10};
11use tokio::{
12 net::UnixListener,
13 select,
14 sync::{Mutex, MutexGuard, mpsc::UnboundedSender, watch::Receiver},
15};
16
17use slog::{Logger, debug, error, info, warn};
18
19use mithril_common::{StdResult, logging::LoggerExtensions};
20
21use crate::{DmqMessage, DmqNetwork, DmqPublisherServer};
22
23pub struct DmqPublisherServerPallas {
25 socket: PathBuf,
26 network: DmqNetwork,
27 server: Mutex<Option<DmqServer>>,
28 transmitters: Mutex<Vec<UnboundedSender<DmqMessage>>>,
29 stop_rx: Receiver<()>,
30 logger: Logger,
31}
32
33impl DmqPublisherServerPallas {
34 pub fn new(
36 socket: PathBuf,
37 network: DmqNetwork,
38 stop_rx: Receiver<()>,
39 logger: Logger,
40 ) -> Self {
41 Self {
42 socket,
43 network,
44 server: Mutex::new(None),
45 transmitters: Mutex::new(Vec::new()),
46 stop_rx,
47 logger: logger.new_with_component_name::<Self>(),
48 }
49 }
50
51 async fn new_server(&self) -> StdResult<DmqServer> {
53 info!(
54 self.logger,
55 "Creating a new DMQ publisher server";
56 "socket" => ?self.socket,
57 "network" => ?self.network
58 );
59 let magic = self.network.magic_id();
60 if self.socket.exists() {
61 fs::remove_file(self.socket.clone())?;
62 }
63 let listener = UnixListener::bind(&self.socket).with_context(|| {
64 format!(
65 "DmqPublisherServerPallas failed to bind Unix socket at {}",
66 self.socket.display()
67 )
68 })?;
69
70 DmqServer::accept(&listener, magic)
71 .await
72 .with_context(|| "DmqPublisherServerPallas failed to create a new server")
73 }
74
75 async fn get_server(&self) -> StdResult<MutexGuard<'_, Option<DmqServer>>> {
77 {
78 let server_lock = self.server.lock().await;
80 if server_lock.as_ref().is_some() {
81 return Ok(server_lock);
82 }
83 }
84
85 let mut server_lock = self.server.lock().await;
86 *server_lock = Some(self.new_server().await?);
87
88 Ok(server_lock)
89 }
90
91 async fn drop_server(&self) -> StdResult<()> {
93 debug!(
94 self.logger,
95 "Drop existing DMQ publisher server";
96 "socket" => ?self.socket,
97 "network" => ?self.network
98 );
99 let mut server_lock = self.server.try_lock()?;
100 if let Some(server) = server_lock.take() {
101 server.abort().await;
102 }
103
104 Ok(())
105 }
106
107 pub async fn register_transmitter(
109 &self,
110 transmitter: UnboundedSender<DmqMessage>,
111 ) -> StdResult<()> {
112 debug!(self.logger, "Register message transmitter for DMQ messages");
113 let mut transmitters_guard = self.transmitters.lock().await;
114 transmitters_guard.push(transmitter);
115
116 Ok(())
117 }
118}
119
120#[async_trait::async_trait]
121impl DmqPublisherServer for DmqPublisherServerPallas {
122 async fn process_message(&self) -> StdResult<()> {
123 debug!(
124 self.logger,
125 "Waiting for message to publish to the DMQ network";
126 "socket" => ?self.socket,
127 "network" => ?self.network
128 );
129 let mut server_guard = self.get_server().await?;
130 let server = server_guard
131 .as_mut()
132 .with_context(|| "DMQ publisher server does not exist")?;
133
134 let request = server
135 .msg_submission()
136 .recv_next_request()
137 .await
138 .with_context(|| "Failed to receive next DMQ message")?;
139 let (dmq_message, response) = match request {
140 Request::Submit(dmq_message) => {
141 debug!(self.logger, "Received message to publish to DMQ");
142 (Some(dmq_message), Response::Accepted)
143 }
144 request => {
145 error!(
146 self.logger,
147 "Expected a Submit request, but received: {request:?}"
148 );
149 (
150 None,
151 Response::Rejected(DmqMsgValidationError(format!(
152 "Expected a Submit request, but received: {request:?}"
153 ))),
154 )
155 }
156 };
157 server
158 .msg_submission()
159 .send_submit_tx_response(response)
160 .await
161 .with_context(|| "Failed to send response to DMQ publisher client")?;
162
163 if let Some(dmq_message) = dmq_message {
164 for transmitter in self.transmitters.lock().await.iter() {
165 if let Err(err) = transmitter.send(dmq_message.to_owned().into()) {
166 error!(
167 self.logger,
168 "Failed to send DMQ message to transmitter";
169 "error" => ?err
170 );
171 }
172 }
173 }
174
175 let request = server
176 .msg_submission()
177 .recv_next_request()
178 .await
179 .with_context(|| "Failed to receive next request from DMQ publisher client")?;
180 match request {
181 Request::Done => {
182 debug!(
183 self.logger,
184 "Received Done request from DMQ publisher client"
185 );
186 }
187 _ => {
188 error!(
189 self.logger,
190 "Expected a Done request, but received: {request:?}"
191 );
192 return Err(anyhow!(
193 "Expected a Done request, but received: {request:?}"
194 ));
195 }
196 }
197
198 Ok(())
199 }
200
201 async fn run(&self) -> StdResult<()> {
202 info!(
203 self.logger,
204 "Starting DMQ publisher server";
205 "socket" => ?self.socket,
206 "network" => ?self.network
207 );
208
209 let mut stop_rx = self.stop_rx.clone();
210 loop {
211 select! {
212 _ = stop_rx.changed() => {
213 warn!(self.logger, "Stopping DMQ publisher server...");
214
215 return Ok(());
216 }
217 res = self.process_message() => {
218 match res {
219 Ok(_) => {
220 debug!(self.logger, "Processed a message successfully");
221 }
222 Err(err) => {
223 error!(self.logger, "Failed to process message"; "error" => ?err);
224 }
225 }
226 if let Err(drop_err) = self.drop_server().await {
227 error!(self.logger, "Failed to drop DMQ publisher server"; "error" => ?drop_err);
228 }
229 }
230 }
231 }
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use std::sync::Arc;
238
239 use pallas_network::{
240 facades::DmqClient,
241 miniprotocols::{
242 localmsgsubmission::{DmqMsg, DmqMsgOperationalCertificate, DmqMsgPayload},
243 localtxsubmission,
244 },
245 };
246 use tokio::sync::{mpsc::unbounded_channel, watch};
247
248 use mithril_common::{current_function, test::TempDir};
249
250 use crate::test_tools::TestLogger;
251
252 use super::*;
253
254 fn create_temp_dir(folder_name: &str) -> PathBuf {
255 TempDir::create_with_short_path("dmq_publisher_server", folder_name)
256 }
257
258 async fn fake_msg() -> DmqMsg {
259 DmqMsg {
260 msg_payload: DmqMsgPayload {
261 msg_id: vec![0, 1],
262 msg_body: vec![0, 1, 2],
263 kes_period: 10,
264 expires_at: 100,
265 },
266 kes_signature: vec![0, 1, 2, 3],
267 operational_certificate: DmqMsgOperationalCertificate {
268 kes_vk: vec![12, 13, 14],
269 issue_number: 15,
270 start_kes_period: 16,
271 cert_sig: vec![17],
272 },
273 cold_verification_key: vec![0, 1, 2, 3, 4, 5],
274 }
275 }
276
277 #[tokio::test]
278 async fn pallas_dmq_publisher_server_success() {
279 let (stop_tx, stop_rx) = watch::channel(());
280 let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
281 let socket_path = create_temp_dir(current_function!()).join("node.socket");
282 let cardano_network = DmqNetwork::TestNet(0);
283 let dmq_publisher_server = Arc::new(DmqPublisherServerPallas::new(
284 socket_path.to_path_buf(),
285 cardano_network.to_owned(),
286 stop_rx,
287 TestLogger::stdout(),
288 ));
289 dmq_publisher_server
290 .register_transmitter(signature_dmq_tx)
291 .await
292 .unwrap();
293 let message = fake_msg().await;
294 let message_clone = message.clone();
295 let client = tokio::spawn({
296 async move {
297 let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap();
299
300 let client_msg = client.msg_submission();
302 assert_eq!(*client_msg.state(), localtxsubmission::State::Idle);
303
304 client_msg.send_submit_tx(message_clone).await.unwrap();
306 assert_eq!(*client_msg.state(), localtxsubmission::State::Busy);
307
308 let response = client_msg.recv_submit_tx_response().await.unwrap();
309 assert_eq!(*client_msg.state(), localtxsubmission::State::Idle);
310 assert_eq!(response, localtxsubmission::Response::Accepted);
311 }
312 });
313 let recorder = tokio::spawn(async move {
314 let result = {
315 let mut signature_dmq_rx = signature_dmq_rx;
316 if let Some(message) = signature_dmq_rx.recv().await {
317 return Ok(message);
318 }
319
320 Err(anyhow::anyhow!("No message received in recorder"))
321 };
322 stop_tx
323 .send(())
324 .expect("Failed to send stop signal to DMQ publisher server");
325
326 result
327 });
328
329 let (_, _, message_res) = tokio::join!(dmq_publisher_server.run(), client, recorder);
330 let message_received = message_res.unwrap().unwrap();
331 assert_eq!(message, message_received.into());
332 }
333}