mithril_aggregator/services/signature_consumer/
dmq.rs1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::{
7 StdResult,
8 entities::{SignedEntityType, SingleSignature},
9 messages::RegisterSignatureMessageDmq,
10};
11
12use mithril_dmq::DmqConsumer;
13
14use super::SignatureConsumer;
15
16pub struct SignatureConsumerDmq {
18 dmq_consumer: Arc<dyn DmqConsumer<RegisterSignatureMessageDmq>>,
19}
20
21impl SignatureConsumerDmq {
22 pub fn new(dmq_consumer: Arc<dyn DmqConsumer<RegisterSignatureMessageDmq>>) -> Self {
24 Self { dmq_consumer }
25 }
26}
27
28#[async_trait]
29impl SignatureConsumer for SignatureConsumerDmq {
30 async fn get_signatures(&self) -> StdResult<Vec<(SingleSignature, SignedEntityType)>> {
31 self.dmq_consumer
32 .consume_messages()
33 .await
34 .map(|messages| {
35 messages
36 .into_iter()
37 .map(|(message, party_id)| {
38 let signature = message.signature;
39 let won_indexes = signature.indexes.clone();
40 let single_signature =
41 SingleSignature::new(party_id, signature, won_indexes);
42 let signed_entity_type = message.signed_entity_type;
43
44 (single_signature, signed_entity_type)
45 })
46 .collect()
47 })
48 .with_context(|| "Failed to get signatures from DMQ")
49 }
50
51 fn get_origin_tag(&self) -> String {
52 "DMQ".to_string()
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 use mithril_common::{crypto_helper::ProtocolSingleSignature, test_utils::fake_keys};
59 use mithril_dmq::test::double::DmqConsumerFake;
60
61 use super::*;
62
63 #[tokio::test]
64 async fn get_signatures_success() {
65 let signed_entity_type = SignedEntityType::dummy();
66 let single_signature: ProtocolSingleSignature =
67 fake_keys::single_signature()[0].try_into().unwrap();
68 let dmq_consumer = Arc::new(DmqConsumerFake::new(vec![Ok(vec![(
69 RegisterSignatureMessageDmq {
70 signature: single_signature.clone(),
71 signed_entity_type: signed_entity_type.to_owned(),
72 },
73 "pool-id-1".to_string(),
74 )])]));
75 let consumer = SignatureConsumerDmq::new(dmq_consumer);
76
77 let signatures = consumer.get_signatures().await.unwrap();
78
79 assert_eq!(
80 vec![(
81 SingleSignature::new(
82 "pool-id-1".to_string(),
83 single_signature.clone(),
84 single_signature.indexes.clone()
85 ),
86 signed_entity_type
87 )],
88 signatures
89 );
90 }
91
92 #[tokio::test]
93 async fn get_signatures_failure() {
94 let dmq_consumer = Arc::new(DmqConsumerFake::new(vec![Err(anyhow::anyhow!(
95 "Test error"
96 ))]));
97 let consumer = SignatureConsumerDmq::new(dmq_consumer);
98
99 consumer
100 .get_signatures()
101 .await
102 .expect_err("SignatureConsumerDmq should return an error");
103 }
104}