mithril_aggregator/services/signature_consumer/
dmq.rs

1use std::sync::Arc;
2
3use anyhow::Context;
4use async_trait::async_trait;
5
6use mithril_common::{
7    StdResult,
8    entities::{SignedEntityType, SingleSignature},
9    messages::RegisterSignatureMessageDmq,
10};
11
12use mithril_dmq::DmqConsumer;
13
14use super::SignatureConsumer;
15
16/// DMQ implementation of the [SignatureConsumer] trait.
17pub struct SignatureConsumerDmq {
18    dmq_consumer: Arc<dyn DmqConsumer<RegisterSignatureMessageDmq>>,
19}
20
21impl SignatureConsumerDmq {
22    /// Creates a new instance of [SignatureConsumerDmq].
23    pub fn new(dmq_consumer: Arc<dyn DmqConsumer<RegisterSignatureMessageDmq>>) -> Self {
24        Self { dmq_consumer }
25    }
26}
27
28#[async_trait]
29impl SignatureConsumer for SignatureConsumerDmq {
30    async fn get_signatures(&self) -> StdResult<Vec<(SingleSignature, SignedEntityType)>> {
31        self.dmq_consumer
32            .consume_messages()
33            .await
34            .map(|messages| {
35                messages
36                    .into_iter()
37                    .map(|(message, party_id)| {
38                        let signature = message.signature;
39                        let won_indexes = signature.indexes.clone();
40                        let single_signature =
41                            SingleSignature::new(party_id, signature, won_indexes);
42                        let signed_entity_type = message.signed_entity_type;
43
44                        (single_signature, signed_entity_type)
45                    })
46                    .collect()
47            })
48            .with_context(|| "Failed to get signatures from DMQ")
49    }
50
51    fn get_origin_tag(&self) -> String {
52        "DMQ".to_string()
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use mithril_common::{crypto_helper::ProtocolSingleSignature, test_utils::fake_keys};
59    use mithril_dmq::test::double::DmqConsumerFake;
60
61    use super::*;
62
63    #[tokio::test]
64    async fn get_signatures_success() {
65        let signed_entity_type = SignedEntityType::dummy();
66        let single_signature: ProtocolSingleSignature =
67            fake_keys::single_signature()[0].try_into().unwrap();
68        let dmq_consumer = Arc::new(DmqConsumerFake::new(vec![Ok(vec![(
69            RegisterSignatureMessageDmq {
70                signature: single_signature.clone(),
71                signed_entity_type: signed_entity_type.to_owned(),
72            },
73            "pool-id-1".to_string(),
74        )])]));
75        let consumer = SignatureConsumerDmq::new(dmq_consumer);
76
77        let signatures = consumer.get_signatures().await.unwrap();
78
79        assert_eq!(
80            vec![(
81                SingleSignature::new(
82                    "pool-id-1".to_string(),
83                    single_signature.clone(),
84                    single_signature.indexes.clone()
85                ),
86                signed_entity_type
87            )],
88            signatures
89        );
90    }
91
92    #[tokio::test]
93    async fn get_signatures_failure() {
94        let dmq_consumer = Arc::new(DmqConsumerFake::new(vec![Err(anyhow::anyhow!(
95            "Test error"
96        ))]));
97        let consumer = SignatureConsumerDmq::new(dmq_consumer);
98
99        consumer
100            .get_signatures()
101            .await
102            .expect_err("SignatureConsumerDmq should return an error");
103    }
104}