mithril_client/
aggregator_client.rs

1//! Mechanisms to exchange data with an aggregator.
2//!
3
4use anyhow::Context;
5use async_trait::async_trait;
6
7use mithril_aggregator_client::AggregatorHttpClient;
8use mithril_aggregator_client::query::{
9    GetAggregatorStatusQuery, GetCardanoDatabaseListQuery, GetCardanoDatabaseQuery,
10    GetCardanoStakeDistributionQuery, GetCardanoStakeDistributionsListQuery,
11    GetCardanoTransactionProofQuery, GetCardanoTransactionQuery, GetCardanoTransactionsListQuery,
12    GetCertificateQuery, GetCertificatesListQuery, GetMithrilStakeDistributionQuery,
13    GetMithrilStakeDistributionsListQuery, GetSnapshotQuery, GetSnapshotsListQuery,
14    PostIncrementCardanoDatabaseAncillaryRestoredStatisticQuery,
15    PostIncrementCardanoDatabaseImmutablesRestoredStatisticQuery,
16    PostIncrementCardanoDatabaseRestorationStatisticQuery,
17    PostIncrementSnapshotDownloadStatisticQuery,
18};
19
20use crate::{
21    CardanoDatabaseSnapshot, CardanoDatabaseSnapshotListItem, CardanoStakeDistribution,
22    CardanoStakeDistributionListItem, CardanoTransactionSnapshot,
23    CardanoTransactionSnapshotListItem, CardanoTransactionsProofs, MithrilCertificate,
24    MithrilCertificateListItem, MithrilResult, MithrilStakeDistribution,
25    MithrilStakeDistributionListItem, Snapshot, SnapshotListItem, common::EpochSpecifier,
26    era::FetchedEra,
27};
28use crate::{
29    cardano_database_client::CardanoDatabaseAggregatorRequest,
30    cardano_stake_distribution_client::CardanoStakeDistributionAggregatorRequest,
31    cardano_transaction_client::CardanoTransactionAggregatorRequest,
32    certificate_client::CertificateAggregatorRequest, era::EraFetcher,
33    mithril_stake_distribution_client::MithrilStakeDistributionAggregatorRequest,
34    snapshot_client::SnapshotAggregatorRequest,
35};
36
37#[cfg_attr(target_family = "wasm", async_trait(?Send))]
38#[cfg_attr(not(target_family = "wasm"), async_trait)]
39impl CardanoDatabaseAggregatorRequest for AggregatorHttpClient {
40    async fn list_latest(&self) -> MithrilResult<Vec<CardanoDatabaseSnapshotListItem>> {
41        self.send(GetCardanoDatabaseListQuery::latest())
42            .await
43            .with_context(|| "Failed to list latest Cardano database v2 snapshots")
44    }
45
46    async fn list_by_epoch(
47        &self,
48        specifier: EpochSpecifier,
49    ) -> MithrilResult<Vec<CardanoDatabaseSnapshotListItem>> {
50        self.send(GetCardanoDatabaseListQuery::for_epoch(specifier))
51            .await
52            .with_context(|| {
53                format!("Failed to list Cardano database v2 snapshots for epoch '{specifier}'")
54            })
55    }
56
57    async fn get_by_hash(&self, hash: &str) -> MithrilResult<Option<CardanoDatabaseSnapshot>> {
58        self.send(GetCardanoDatabaseQuery::by_hash(hash))
59            .await
60            .with_context(|| {
61                format!("Failed to get Cardano database v2 snapshots with hash '{hash}'")
62            })
63    }
64
65    async fn increment_cardano_database_complete_restoration_statistic(&self) -> MithrilResult<()> {
66        self.send(PostIncrementCardanoDatabaseRestorationStatisticQuery::complete())
67            .await
68            .with_context(|| "Failed to increment Cardano database complete restoration statistic")
69    }
70
71    async fn increment_cardano_database_partial_restoration_statistic(&self) -> MithrilResult<()> {
72        self.send(PostIncrementCardanoDatabaseRestorationStatisticQuery::partial())
73            .await
74            .with_context(|| "Failed to increment Cardano database partial restoration statistic")
75    }
76
77    async fn increment_immutables_snapshot_restored_statistic(
78        &self,
79        number_of_immutable_files_restored: u32,
80    ) -> MithrilResult<()> {
81        self.send(PostIncrementCardanoDatabaseImmutablesRestoredStatisticQuery::new(number_of_immutable_files_restored))
82            .await
83            .with_context(|| format!("Failed to increment Cardano database immutable files restored statistic, number of immutable files restored: '{number_of_immutable_files_restored}'"))
84    }
85
86    async fn increment_ancillary_downloaded_statistic(&self) -> MithrilResult<()> {
87        self.send(PostIncrementCardanoDatabaseAncillaryRestoredStatisticQuery)
88            .await
89            .with_context(
90                || "Failed to increment Cardano database ancillary files restored statistic",
91            )
92    }
93}
94
95#[cfg_attr(target_family = "wasm", async_trait(?Send))]
96#[cfg_attr(not(target_family = "wasm"), async_trait)]
97impl CardanoStakeDistributionAggregatorRequest for AggregatorHttpClient {
98    async fn list_latest(&self) -> MithrilResult<Vec<CardanoStakeDistributionListItem>> {
99        self.send(GetCardanoStakeDistributionsListQuery::latest())
100            .await
101            .with_context(|| "Failed to list latest Cardano stake distributions")
102    }
103
104    async fn get_by_hash(&self, hash: &str) -> MithrilResult<Option<CardanoStakeDistribution>> {
105        self.send(GetCardanoStakeDistributionQuery::by_hash(hash))
106            .await
107            .with_context(|| format!("Failed to get Cardano stake distribution with hash '{hash}'"))
108    }
109
110    async fn get_by_epoch(
111        &self,
112        specifier: EpochSpecifier,
113    ) -> MithrilResult<Option<CardanoStakeDistribution>> {
114        self.send(GetCardanoStakeDistributionQuery::for_epoch(specifier))
115            .await
116            .with_context(|| {
117                format!("Failed to get Cardano stake distribution for epoch '{specifier}'")
118            })
119    }
120}
121
122#[cfg_attr(target_family = "wasm", async_trait(?Send))]
123#[cfg_attr(not(target_family = "wasm"), async_trait)]
124impl CardanoTransactionAggregatorRequest for AggregatorHttpClient {
125    async fn get_proof(
126        &self,
127        hashes: &[String],
128    ) -> MithrilResult<Option<CardanoTransactionsProofs>> {
129        self.send(GetCardanoTransactionProofQuery::for_hashes(hashes))
130            .await
131            .with_context(|| {
132                format!("Failed to get Cardano transactions proofs for hashes '{hashes:?}'")
133            })
134    }
135
136    async fn list_latest_snapshots(
137        &self,
138    ) -> MithrilResult<Vec<CardanoTransactionSnapshotListItem>> {
139        self.send(GetCardanoTransactionsListQuery::latest())
140            .await
141            .with_context(|| "Failed to list latest Cardano transactions snapshots")
142    }
143
144    async fn get_snapshot(&self, hash: &str) -> MithrilResult<Option<CardanoTransactionSnapshot>> {
145        self.send(GetCardanoTransactionQuery::by_hash(hash))
146            .await
147            .with_context(|| {
148                format!("Failed to get Cardano transaction snapshot with hash '{hash}'")
149            })
150    }
151}
152
153#[cfg_attr(target_family = "wasm", async_trait(?Send))]
154#[cfg_attr(not(target_family = "wasm"), async_trait)]
155impl CertificateAggregatorRequest for AggregatorHttpClient {
156    async fn list_latest(&self) -> MithrilResult<Vec<MithrilCertificateListItem>> {
157        self.send(GetCertificatesListQuery::latest())
158            .await
159            .with_context(|| "Failed to list latest certificates")
160    }
161
162    async fn get_by_hash(&self, hash: &str) -> MithrilResult<Option<MithrilCertificate>> {
163        self.send(GetCertificateQuery::by_hash(hash))
164            .await
165            .with_context(|| format!("Failed to get certificate with hash '{hash}'"))
166    }
167}
168
169#[cfg_attr(target_family = "wasm", async_trait(?Send))]
170#[cfg_attr(not(target_family = "wasm"), async_trait)]
171impl EraFetcher for AggregatorHttpClient {
172    async fn fetch_current_era(&self) -> MithrilResult<FetchedEra> {
173        let aggregator_status = self
174            .send(GetAggregatorStatusQuery::current())
175            .await
176            .with_context(|| "Failed to get aggregator status")?;
177
178        Ok(FetchedEra {
179            era: aggregator_status.mithril_era.to_string(),
180        })
181    }
182}
183
184#[cfg_attr(target_family = "wasm", async_trait(?Send))]
185#[cfg_attr(not(target_family = "wasm"), async_trait)]
186impl MithrilStakeDistributionAggregatorRequest for AggregatorHttpClient {
187    async fn list_latest(&self) -> MithrilResult<Vec<MithrilStakeDistributionListItem>> {
188        self.send(GetMithrilStakeDistributionsListQuery::latest())
189            .await
190            .with_context(|| "Failed to list latest Mithril stake distributions")
191    }
192
193    async fn get_by_hash(&self, hash: &str) -> MithrilResult<Option<MithrilStakeDistribution>> {
194        self.send(GetMithrilStakeDistributionQuery::by_hash(hash))
195            .await
196            .with_context(|| format!("Failed to get Mithril stake distribution with hash '{hash}'"))
197    }
198}
199
200#[cfg_attr(target_family = "wasm", async_trait(?Send))]
201#[cfg_attr(not(target_family = "wasm"), async_trait)]
202impl SnapshotAggregatorRequest for AggregatorHttpClient {
203    async fn list_latest(&self) -> MithrilResult<Vec<SnapshotListItem>> {
204        self.send(GetSnapshotsListQuery::latest())
205            .await
206            .with_context(|| "Failed to list latest Cardano database v1 snapshots")
207    }
208
209    async fn get_by_hash(&self, hash: &str) -> MithrilResult<Option<Snapshot>> {
210        self.send(GetSnapshotQuery::by_hash(hash)).await.with_context(|| {
211            format!("Failed to get Cardano database v1 snapshots with hash '{hash}'")
212        })
213    }
214
215    async fn increment_snapshot_downloaded_statistic(
216        &self,
217        snapshot: Snapshot,
218    ) -> MithrilResult<()> {
219        self.send(PostIncrementSnapshotDownloadStatisticQuery::new(
220            snapshot.into(),
221        ))
222        .await
223        .with_context(|| "Failed to increment Cardano database v1 snapshot downloaded statistic")
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use httpmock::MockServer;
230    use serde_json::json;
231
232    use mithril_common::messages::AggregatorStatusMessage;
233
234    use crate::common::{SupportedEra, test::Dummy};
235    use crate::test_utils::TestLogger;
236
237    use super::*;
238
239    fn setup_server_and_client() -> (MockServer, AggregatorHttpClient) {
240        let server = MockServer::start();
241        let client = AggregatorHttpClient::builder(server.base_url())
242            .with_logger(TestLogger::stdout())
243            .build()
244            .unwrap();
245
246        (server, client)
247    }
248
249    #[tokio::test]
250    async fn extract_mithril_era_from_status_response() {
251        let (server, client) = setup_server_and_client();
252        server.mock(|when, then| {
253            when.any_request();
254            then.status(200).json_body(json!(AggregatorStatusMessage {
255                mithril_era: SupportedEra::Lagrange,
256                ..Dummy::dummy()
257            }));
258        });
259
260        let mithril_era = client.fetch_current_era().await.unwrap();
261        assert_eq!(mithril_era.era, SupportedEra::Lagrange.to_string());
262    }
263}