mithril_aggregator/services/signature_consumer/
dmq.rs1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::{
7 StdResult,
8 entities::{SignedEntityType, SingleSignature},
9 messages::RegisterSignatureMessageDmq,
10};
11
12use mithril_dmq::DmqConsumer;
13
14use super::SignatureConsumer;
15
16pub struct SignatureConsumerDmq {
18 dmq_consumer: Arc<dyn DmqConsumer<RegisterSignatureMessageDmq>>,
19}
20
21impl SignatureConsumerDmq {
22 pub fn new(dmq_consumer: Arc<dyn DmqConsumer<RegisterSignatureMessageDmq>>) -> Self {
24 Self { dmq_consumer }
25 }
26}
27
28#[async_trait]
29impl SignatureConsumer for SignatureConsumerDmq {
30 async fn get_signatures(&self) -> StdResult<Vec<(SingleSignature, SignedEntityType)>> {
31 self.dmq_consumer
32 .consume_messages()
33 .await
34 .map(|messages| {
35 messages
36 .into_iter()
37 .map(|(message, party_id)| {
38 let signature = message.signature;
39 let won_indexes = signature.indexes.clone();
40 let single_signature =
41 SingleSignature::new(party_id, signature, won_indexes);
42 let signed_entity_type = message.signed_entity_type;
43
44 (single_signature, signed_entity_type)
45 })
46 .collect()
47 })
48 .with_context(|| "Failed to get signatures from DMQ")
49 }
50
51 fn get_origin_tag(&self) -> String {
52 "DMQ".to_string()
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 use mithril_common::{
59 crypto_helper::ProtocolSingleSignature,
60 test::double::{Dummy, fake_keys},
61 };
62 use mithril_dmq::test::double::DmqConsumerFake;
63
64 use super::*;
65
66 #[tokio::test]
67 async fn get_signatures_success() {
68 let signed_entity_type = SignedEntityType::dummy();
69 let single_signature: ProtocolSingleSignature =
70 fake_keys::single_signature()[0].try_into().unwrap();
71 let dmq_consumer = Arc::new(DmqConsumerFake::new(vec![Ok(vec![(
72 RegisterSignatureMessageDmq {
73 signature: single_signature.clone(),
74 signed_entity_type: signed_entity_type.to_owned(),
75 },
76 "pool-id-1".to_string(),
77 )])]));
78 let consumer = SignatureConsumerDmq::new(dmq_consumer);
79
80 let signatures = consumer.get_signatures().await.unwrap();
81
82 assert_eq!(
83 vec![(
84 SingleSignature::new(
85 "pool-id-1".to_string(),
86 single_signature.clone(),
87 single_signature.indexes.clone()
88 ),
89 signed_entity_type
90 )],
91 signatures
92 );
93 }
94
95 #[tokio::test]
96 async fn get_signatures_failure() {
97 let dmq_consumer = Arc::new(DmqConsumerFake::new(vec![Err(anyhow::anyhow!(
98 "Test error"
99 ))]));
100 let consumer = SignatureConsumerDmq::new(dmq_consumer);
101
102 consumer
103 .get_signatures()
104 .await
105 .expect_err("SignatureConsumerDmq should return an error");
106 }
107}