mithril_dmq/publisher/server/
pallas.rs1use std::{fs, path::PathBuf};
2
3use anyhow::{Context, anyhow};
4use pallas_network::{
5 facades::DmqServer,
6 miniprotocols::{
7 localmsgsubmission::DmqMsgValidationError,
8 localtxsubmission::{Request, Response},
9 },
10};
11use tokio::{
12 net::UnixListener,
13 select,
14 sync::{Mutex, MutexGuard, mpsc::UnboundedSender, watch::Receiver},
15};
16
17use slog::{Logger, debug, error, info, warn};
18
19use mithril_common::{CardanoNetwork, StdResult, logging::LoggerExtensions};
20
21use crate::{DmqMessage, DmqPublisherServer};
22
23pub struct DmqPublisherServerPallas {
25 socket: PathBuf,
26 network: CardanoNetwork,
27 server: Mutex<Option<DmqServer>>,
28 transmitters: Mutex<Vec<UnboundedSender<DmqMessage>>>,
29 stop_rx: Receiver<()>,
30 logger: Logger,
31}
32
33impl DmqPublisherServerPallas {
34 pub fn new(
36 socket: PathBuf,
37 network: CardanoNetwork,
38 stop_rx: Receiver<()>,
39 logger: Logger,
40 ) -> Self {
41 Self {
42 socket,
43 network,
44 server: Mutex::new(None),
45 transmitters: Mutex::new(Vec::new()),
46 stop_rx,
47 logger: logger.new_with_component_name::<Self>(),
48 }
49 }
50
51 async fn new_server(&self) -> StdResult<DmqServer> {
53 info!(
54 self.logger,
55 "Creating a new DMQ publisher server";
56 "socket" => ?self.socket,
57 "network" => ?self.network
58 );
59 let magic = self.network.magic_id();
60 if self.socket.exists() {
61 fs::remove_file(self.socket.clone())?;
62 }
63 let listener = UnixListener::bind(&self.socket)
64 .map_err(|err| anyhow!(err))
65 .with_context(|| {
66 format!(
67 "DmqPublisherServerPallas failed to bind Unix socket at {}",
68 self.socket.display()
69 )
70 })?;
71
72 DmqServer::accept(&listener, magic)
73 .await
74 .map_err(|err| anyhow!(err))
75 .with_context(|| "DmqPublisherServerPallas failed to create a new server")
76 }
77
78 async fn get_server(&self) -> StdResult<MutexGuard<'_, Option<DmqServer>>> {
80 {
81 let server_lock = self.server.lock().await;
83 if server_lock.as_ref().is_some() {
84 return Ok(server_lock);
85 }
86 }
87
88 let mut server_lock = self.server.lock().await;
89 *server_lock = Some(self.new_server().await?);
90
91 Ok(server_lock)
92 }
93
94 async fn drop_server(&self) -> StdResult<()> {
96 debug!(
97 self.logger,
98 "Drop existing DMQ publisher server";
99 "socket" => ?self.socket,
100 "network" => ?self.network
101 );
102 let mut server_lock = self.server.try_lock()?;
103 if let Some(server) = server_lock.take() {
104 server.abort().await;
105 }
106
107 Ok(())
108 }
109
110 pub async fn register_transmitter(
112 &self,
113 transmitter: UnboundedSender<DmqMessage>,
114 ) -> StdResult<()> {
115 debug!(self.logger, "Register message transmitter for DMQ messages");
116 let mut transmitters_guard = self.transmitters.lock().await;
117 transmitters_guard.push(transmitter);
118
119 Ok(())
120 }
121}
122
123#[async_trait::async_trait]
124impl DmqPublisherServer for DmqPublisherServerPallas {
125 async fn process_message(&self) -> StdResult<()> {
126 debug!(
127 self.logger,
128 "Waiting for message to publish to the DMQ network";
129 "socket" => ?self.socket,
130 "network" => ?self.network
131 );
132 let mut server_guard = self.get_server().await?;
133 let server = server_guard
134 .as_mut()
135 .ok_or(anyhow!("DMQ publisher server does not exist"))?;
136
137 let request = server
138 .msg_submission()
139 .recv_next_request()
140 .await
141 .map_err(|err| anyhow!("Failed to receive next DMQ message: {}", err))?;
142 let (dmq_message, response) = match request {
143 Request::Submit(dmq_message) => {
144 debug!(self.logger, "Received message to publish to DMQ");
145 (Some(dmq_message), Response::Accepted)
146 }
147 request => {
148 error!(
149 self.logger,
150 "Expected a Submit request, but received: {request:?}"
151 );
152 (
153 None,
154 Response::Rejected(DmqMsgValidationError(format!(
155 "Expected a Submit request, but received: {request:?}"
156 ))),
157 )
158 }
159 };
160 server
161 .msg_submission()
162 .send_submit_tx_response(response)
163 .await
164 .map_err(|err| anyhow!("Failed to send response to DMQ publisher client: {}", err))?;
165
166 if let Some(dmq_message) = dmq_message {
167 for transmitter in self.transmitters.lock().await.iter() {
168 if let Err(err) = transmitter.send(dmq_message.to_owned().into()) {
169 error!(
170 self.logger,
171 "Failed to send DMQ message to transmitter";
172 "error" => ?err
173 );
174 }
175 }
176 }
177
178 let request = server.msg_submission().recv_next_request().await.map_err(|err| {
179 anyhow!(
180 "Failed to receive next request from DMQ publisher client: {}",
181 err
182 )
183 })?;
184 match request {
185 Request::Done => {
186 debug!(
187 self.logger,
188 "Received Done request from DMQ publisher client"
189 );
190 }
191 _ => {
192 error!(
193 self.logger,
194 "Expected a Done request, but received: {request:?}"
195 );
196 return Err(anyhow!(
197 "Expected a Done request, but received: {request:?}"
198 ));
199 }
200 }
201
202 Ok(())
203 }
204
205 async fn run(&self) -> StdResult<()> {
206 info!(
207 self.logger,
208 "Starting DMQ publisher server";
209 "socket" => ?self.socket,
210 "network" => ?self.network
211 );
212
213 let mut stop_rx = self.stop_rx.clone();
214 loop {
215 select! {
216 _ = stop_rx.changed() => {
217 warn!(self.logger, "Stopping DMQ publisher server...");
218
219 return Ok(());
220 }
221 res = self.process_message() => {
222 match res {
223 Ok(_) => {
224 debug!(self.logger, "Processed a message successfully");
225 }
226 Err(err) => {
227 error!(self.logger, "Failed to process message"; "error" => ?err);
228 }
229 }
230 if let Err(drop_err) = self.drop_server().await {
231 error!(self.logger, "Failed to drop DMQ publisher server"; "error" => ?drop_err);
232 }
233 }
234 }
235 }
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use std::sync::Arc;
242
243 use pallas_network::{
244 facades::DmqClient,
245 miniprotocols::{localmsgsubmission::DmqMsg, localtxsubmission},
246 };
247 use tokio::sync::{mpsc::unbounded_channel, watch};
248
249 use mithril_common::{current_function, test::TempDir};
250
251 use crate::test_tools::TestLogger;
252
253 use super::*;
254
255 fn create_temp_dir(folder_name: &str) -> PathBuf {
256 TempDir::create_with_short_path("dmq_publisher_server", folder_name)
257 }
258
259 async fn fake_msg() -> DmqMsg {
260 DmqMsg {
261 msg_id: vec![0, 1],
262 msg_body: vec![2, 3, 4, 5],
263 block_number: 10,
264 ttl: 100,
265 kes_signature: vec![0, 1, 2, 3],
266 operational_certificate: vec![0, 1, 2, 3, 4, 5],
267 kes_period: 10,
268 }
269 }
270
271 #[tokio::test]
272 async fn pallas_dmq_publisher_server_success() {
273 let (stop_tx, stop_rx) = watch::channel(());
274 let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
275 let socket_path = create_temp_dir(current_function!()).join("node.socket");
276 let cardano_network = CardanoNetwork::TestNet(0);
277 let dmq_publisher_server = Arc::new(DmqPublisherServerPallas::new(
278 socket_path.to_path_buf(),
279 cardano_network.to_owned(),
280 stop_rx,
281 TestLogger::stdout(),
282 ));
283 dmq_publisher_server
284 .register_transmitter(signature_dmq_tx)
285 .await
286 .unwrap();
287 let message = fake_msg().await;
288 let message_clone = message.clone();
289 let client = tokio::spawn({
290 async move {
291 let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap();
293
294 let client_msg = client.msg_submission();
296 assert_eq!(*client_msg.state(), localtxsubmission::State::Idle);
297
298 client_msg.send_submit_tx(message_clone).await.unwrap();
300 assert_eq!(*client_msg.state(), localtxsubmission::State::Busy);
301
302 let response = client_msg.recv_submit_tx_response().await.unwrap();
303 assert_eq!(*client_msg.state(), localtxsubmission::State::Idle);
304 assert_eq!(response, localtxsubmission::Response::Accepted);
305 }
306 });
307 let recorder = tokio::spawn(async move {
308 let result = {
309 let mut signature_dmq_rx = signature_dmq_rx;
310 if let Some(message) = signature_dmq_rx.recv().await {
311 return Ok(message);
312 }
313
314 Err(anyhow::anyhow!("No message received in recorder"))
315 };
316 stop_tx
317 .send(())
318 .expect("Failed to send stop signal to DMQ publisher server");
319
320 result
321 });
322
323 let (_, _, message_res) = tokio::join!(dmq_publisher_server.run(), client, recorder);
324 let message_received = message_res.unwrap().unwrap();
325 assert_eq!(message, message_received.into());
326 }
327}