mithril_dmq/test/double/
consumer.rs1use std::{collections::VecDeque, fmt::Debug};
2
3use tokio::sync::Mutex;
4
5use mithril_common::{StdResult, crypto_helper::TryFromBytes, entities::PartyId};
6
7use crate::DmqConsumer;
8
9type ConsumerReturn<M> = StdResult<Vec<(M, PartyId)>>;
10
11pub struct DmqConsumerFake<M: TryFromBytes + Debug + Send + Sync> {
13 results: Mutex<VecDeque<ConsumerReturn<M>>>,
14}
15
16impl<M: TryFromBytes + Debug + Send + Sync> DmqConsumerFake<M> {
17 pub fn new(results: Vec<StdResult<Vec<(M, PartyId)>>>) -> Self {
19 Self {
20 results: Mutex::new(VecDeque::from(results)),
21 }
22 }
23}
24
25#[async_trait::async_trait]
26impl<M: TryFromBytes + Debug + Send + Sync> DmqConsumer<M> for DmqConsumerFake<M> {
27 async fn consume_messages(&self) -> ConsumerReturn<M> {
28 let mut results = self.results.lock().await;
29
30 results
31 .pop_front()
32 .ok_or_else(|| anyhow::anyhow!("No more results available in DmqConsumerFake"))?
33 }
34}
35
36#[cfg(test)]
37mod tests {
38 use crate::test::payload::DmqMessageTestPayload;
39
40 use super::*;
41
42 #[tokio::test]
43 async fn consume_messages_success() {
44 let consumer = DmqConsumerFake::new(vec![
45 Ok(vec![(
46 DmqMessageTestPayload::new(b"test-1"),
47 "pool-id-1".to_string(),
48 )]),
49 Ok(vec![(
50 DmqMessageTestPayload::new(b"test-2"),
51 "pool-id-2".to_string(),
52 )]),
53 ]);
54
55 let messages = consumer.consume_messages().await.unwrap();
56
57 assert_eq!(
58 vec![(
59 DmqMessageTestPayload::new(b"test-1"),
60 "pool-id-1".to_string(),
61 )],
62 messages
63 );
64 }
65
66 #[tokio::test]
67 async fn consume_messages_failure() {
68 let consumer = DmqConsumerFake::new(vec![
69 Err(anyhow::anyhow!("Test error")),
70 Ok(vec![(
71 DmqMessageTestPayload::new(b"test-2"),
72 "pool-id-2".to_string(),
73 )]),
74 ]);
75
76 consumer
77 .consume_messages()
78 .await
79 .expect_err("DmqConsumerFake should return an error");
80 }
81}