mithril_aggregator/tools/
signer_importer.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use reqwest::{IntoUrl, Url};
4use serde::{Deserialize, Serialize};
5use slog::{info, warn, Logger};
6use std::collections::HashMap;
7use std::ops::Not;
8use std::sync::Arc;
9use std::time::Duration;
10
11use mithril_common::logging::LoggerExtensions;
12use mithril_common::{entities::PartyId, StdResult};
13
14use crate::database::repository::SignerStore;
15
16pub type PoolTicker = String;
17
18/// Tool that can import a list of signers
19pub struct SignersImporter {
20    retriever: Arc<dyn SignersImporterRetriever>,
21    persister: Arc<dyn SignersImporterPersister>,
22    logger: Logger,
23}
24
25impl SignersImporter {
26    /// [SignersImporter] factory
27    pub fn new(
28        retriever: Arc<dyn SignersImporterRetriever>,
29        persister: Arc<dyn SignersImporterPersister>,
30        logger: Logger,
31    ) -> Self {
32        Self {
33            retriever,
34            persister,
35            logger: logger.new_with_component_name::<Self>(),
36        }
37    }
38
39    /// Import and persist the signers
40    pub async fn run(&self) -> StdResult<()> {
41        info!(self.logger, "Starting import");
42        let items = self
43            .retriever
44            .retrieve()
45            .await
46            .with_context(|| "Failed to retrieve signers from remote service")?;
47
48        info!(
49            self.logger, "Persisting retrieved data in the database";
50            "number_of_signer_to_insert" => items.len()
51        );
52        self.persister
53            .persist(items)
54            .await
55            .with_context(|| "Failed to persist retrieved data into the database")
56    }
57
58    /// Start a loop that call [run][Self::run] at the given time interval.
59    pub async fn run_forever(&self, run_interval: Duration) {
60        let mut interval = tokio::time::interval(run_interval);
61
62        loop {
63            interval.tick().await;
64            if let Err(error) = self.run().await {
65                warn!(self.logger, "Signer retriever failed"; "error" => ?error);
66            }
67            info!(
68                self.logger,
69                "Cycle finished, Sleeping for {} min",
70                run_interval.as_secs() / 60
71            );
72        }
73    }
74}
75
76/// Trait that define how a [SignersImporter] retrieve the signers to import.
77#[cfg_attr(test, mockall::automock)]
78#[async_trait]
79pub trait SignersImporterRetriever: Sync + Send {
80    /// Retrieve the signers list.
81    async fn retrieve(&self) -> StdResult<HashMap<PartyId, Option<PoolTicker>>>;
82}
83
84/// Trait that define how a [SignersImporter] persist the retrieved signers.
85#[cfg_attr(test, mockall::automock)]
86#[async_trait]
87pub trait SignersImporterPersister: Sync + Send {
88    /// Persist the given list of signers.
89    async fn persist(&self, signers: HashMap<PartyId, Option<PoolTicker>>) -> StdResult<()>;
90}
91
92#[async_trait]
93impl SignersImporterPersister for SignerStore {
94    async fn persist(&self, signers: HashMap<PartyId, Option<PoolTicker>>) -> StdResult<()> {
95        self.import_many_signers(signers).await?;
96
97        Ok(())
98    }
99}
100
101/// A [SignersImporterRetriever] fetching signers data from CExplorer.
102pub struct CExplorerSignerRetriever {
103    /// Url from which a SPO list using the CExplorer format will be fetch.
104    source_url: Url,
105    client: reqwest::Client,
106    logger: Logger,
107}
108
109impl CExplorerSignerRetriever {
110    /// Create a new [CExplorerSignerRetriever] that will fetch data from the given url.
111    pub(crate) fn new<T: IntoUrl>(
112        source_url: T,
113        timeout: Option<Duration>,
114        logger: Logger,
115    ) -> StdResult<Self> {
116        let source_url = source_url
117            .into_url()
118            .with_context(|| "Given `source_url` is not a valid Url")?;
119        let client_builder = reqwest::Client::builder();
120        let client = match timeout {
121            None => client_builder,
122            Some(timeout) => client_builder.timeout(timeout),
123        }
124        .build()
125        .with_context(|| "Http Client build failed")?;
126
127        Ok(Self {
128            source_url,
129            client,
130            logger,
131        })
132    }
133}
134
135#[async_trait]
136impl SignersImporterRetriever for CExplorerSignerRetriever {
137    async fn retrieve(&self) -> StdResult<HashMap<PartyId, Option<PoolTicker>>> {
138        info!(
139            self.logger, "Retrieving data from source";
140            "source_url" => &self.source_url.as_str()
141        );
142        let response = self
143            .client
144            .get(self.source_url.to_owned())
145            .send()
146            .await
147            .with_context(|| "Retrieving of CExplorer SPO list failed")?;
148
149        let spo_list = response
150            .error_for_status()
151            .with_context(|| "Data fetching failed")?
152            .json::<SPOList>()
153            .await
154            .with_context(|| "Failed to deserialize retrieved SPO list from CExplorer")?;
155
156        Ok(spo_list
157            .data
158            .into_iter()
159            .map(|item| item.extract())
160            .collect())
161    }
162}
163
164/// *Internal type* Map a CExplorer SPO list.
165#[derive(Debug, Clone, Serialize, Deserialize)]
166struct SPOList {
167    data: Vec<SPOItem>,
168}
169
170/// *Internal type* Map a CExplorer SPO item inside its data list.
171#[derive(Debug, Clone, Serialize, Deserialize)]
172struct SPOItem {
173    pool_id: String,
174    name: String,
175}
176
177impl SPOItem {
178    const EMPTY_NAME: &'static str = "[] ";
179
180    fn is_name_empty(&self) -> bool {
181        self.name.is_empty() || self.name == Self::EMPTY_NAME
182    }
183
184    /// Consume this item to convert it to a result ready to be yield by a
185    /// [SignersImporterRetriever::retrieve] implementation.
186    fn extract(self) -> (PartyId, Option<PoolTicker>) {
187        let is_name_empty = self.is_name_empty();
188        let (pool_id, name) = (self.pool_id, self.name);
189
190        (pool_id, is_name_empty.not().then_some(name))
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use std::collections::{BTreeMap, BTreeSet};
197    use std::convert::Infallible;
198    use std::sync::Arc;
199    use warp::Filter;
200
201    use mithril_common::test_utils::test_http_server::test_http_server;
202    use mithril_common::StdResult;
203    use mithril_persistence::sqlite::SqliteConnection;
204
205    use crate::database::repository::{SignerGetter, SignerStore};
206    use crate::database::test_helper::main_db_connection;
207    use crate::http_server::routes::reply;
208    use crate::test_tools::TestLogger;
209
210    use super::*;
211
212    #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
213    struct TestSigner {
214        pool_id: String,
215        ticker: Option<String>,
216    }
217
218    impl TestSigner {
219        fn with_ticker(pool_id: &str, ticker: &str) -> Self {
220            Self {
221                pool_id: pool_id.to_string(),
222                ticker: Some(ticker.to_string()),
223            }
224        }
225
226        fn without_ticker(pool_id: &str) -> Self {
227            Self {
228                pool_id: pool_id.to_string(),
229                ticker: None,
230            }
231        }
232    }
233
234    async fn fill_signer_db(
235        connection: Arc<SqliteConnection>,
236        test_signers: &[TestSigner],
237    ) -> StdResult<()> {
238        let store = SignerStore::new(connection);
239
240        for signer in test_signers {
241            store
242                .import_signer(signer.pool_id.clone(), signer.ticker.clone())
243                .await?;
244        }
245
246        Ok(())
247    }
248
249    async fn get_all_signers(connection: Arc<SqliteConnection>) -> StdResult<BTreeSet<TestSigner>> {
250        let store = SignerStore::new(connection);
251
252        let signers = store
253            .get_all()
254            .await?
255            .into_iter()
256            .map(|s| TestSigner {
257                pool_id: s.signer_id,
258                ticker: s.pool_ticker,
259            })
260            .collect();
261        Ok(signers)
262    }
263
264    #[test]
265    fn item_with_empty_name_yield_empty_pool_ticker() {
266        for name in ["", SPOItem::EMPTY_NAME] {
267            let item = SPOItem {
268                pool_id: "whatever".to_string(),
269                name: name.to_string(),
270            };
271            assert!(item.is_name_empty());
272            assert_eq!(("whatever".to_string(), None), item.extract());
273        }
274    }
275
276    #[tokio::test]
277    async fn retriever_should_return_deduplicated_data_and_handle_empty_name() {
278        let server = test_http_server(warp::path("list").map(|| {
279            r#"{
280            "data": [
281                {"pool_id": "pool1", "name": ""},
282                {"pool_id": "pool2", "name": "[] "},
283                {"pool_id": "pool3", "name": "whatever"},
284                {"pool_id": "pool3", "name": "whatever2"}
285            ]
286        }"#
287        }));
288
289        let retriever = CExplorerSignerRetriever::new(
290            format!("{}/list", server.url()),
291            None,
292            TestLogger::stdout(),
293        )
294        .unwrap();
295        let result = retriever
296            .retrieve()
297            .await
298            .expect("Retriever should not fail");
299
300        assert_eq!(
301            result.into_iter().collect::<BTreeMap<_, _>>(),
302            BTreeMap::from([
303                ("pool1".to_string(), None),
304                ("pool2".to_string(), None),
305                ("pool3".to_string(), Some("whatever2".to_string())),
306            ])
307        );
308    }
309
310    #[tokio::test]
311    async fn retriever_handle_http_data_fetching_error() {
312        let server =
313            test_http_server(warp::path("list").map(|| reply::internal_server_error("whatever")));
314
315        let retriever = CExplorerSignerRetriever::new(
316            format!("{}/list", server.url()),
317            None,
318            TestLogger::stdout(),
319        )
320        .unwrap();
321        retriever
322            .retrieve()
323            .await
324            .expect_err("An error should have been raised");
325    }
326
327    #[tokio::test]
328    async fn retriever_yield_error_when_json_is_malformed() {
329        let server = test_http_server(warp::path("list").map(|| r#"{ "data": [ {"pool_" ] }"#));
330
331        let retriever = CExplorerSignerRetriever::new(
332            format!("{}/list", server.url()),
333            None,
334            TestLogger::stdout(),
335        )
336        .unwrap();
337        retriever
338            .retrieve()
339            .await
340            .expect_err("An error should have been raised");
341    }
342
343    #[tokio::test]
344    async fn retriever_can_timeout() {
345        let server = test_http_server(warp::path("list").and_then(|| async {
346            tokio::time::sleep(Duration::from_millis(70)).await;
347            Ok::<&str, Infallible>(r#"{"data":[]}"#)
348        }));
349
350        let retriever = CExplorerSignerRetriever::new(
351            format!("{}/list", server.url()),
352            Some(Duration::from_millis(10)),
353            TestLogger::stdout(),
354        )
355        .unwrap();
356        retriever
357            .retrieve()
358            .await
359            .expect_err("An error should have been raised");
360    }
361
362    #[tokio::test]
363    async fn persist_list_of_two_signers_one_with_ticker_the_other_without() {
364        let connection = Arc::new(main_db_connection().unwrap());
365        let mut retriever = MockSignersImporterRetriever::new();
366        retriever.expect_retrieve().returning(|| {
367            Ok(HashMap::from([
368                ("pool1".to_string(), Some("[Pool name test]".to_string())),
369                ("pool2".to_string(), None),
370            ]))
371        });
372
373        let importer = SignersImporter::new(
374            Arc::new(retriever),
375            Arc::new(SignerStore::new(connection.clone())),
376            TestLogger::stdout(),
377        );
378        importer
379            .run()
380            .await
381            .expect("running importer should not fail");
382
383        let result = get_all_signers(connection).await.unwrap();
384        assert_eq!(
385            result,
386            BTreeSet::from([
387                TestSigner::with_ticker("pool1", "[Pool name test]",),
388                TestSigner::without_ticker("pool2"),
389            ])
390        );
391    }
392
393    #[tokio::test]
394    async fn persist_update_existing_data() {
395        let connection = Arc::new(main_db_connection().unwrap());
396        fill_signer_db(
397            connection.clone(),
398            &[
399                TestSigner::with_ticker("pool1", "[Pool name test]"),
400                TestSigner::without_ticker("pool2"),
401                TestSigner::with_ticker("pool3", "[Not updated]"),
402                TestSigner::with_ticker("pool4", "[Ticker will be removed]"),
403            ],
404        )
405        .await
406        .unwrap();
407        let mut retriever = MockSignersImporterRetriever::new();
408        retriever.expect_retrieve().returning(|| {
409            Ok(HashMap::from([
410                ("pool1".to_string(), Some("[Updated Pool name]".to_string())),
411                ("pool2".to_string(), Some("[Added Pool name]".to_string())),
412                ("pool3".to_string(), Some("[Not updated]".to_string())),
413                ("pool4".to_string(), None),
414                ("pool5".to_string(), Some("[New Pool]".to_string())),
415            ]))
416        });
417
418        let importer = SignersImporter::new(
419            Arc::new(retriever),
420            Arc::new(SignerStore::new(connection.clone())),
421            TestLogger::stdout(),
422        );
423        importer
424            .run()
425            .await
426            .expect("running importer should not fail");
427
428        let result = get_all_signers(connection).await.unwrap();
429        assert_eq!(
430            result,
431            BTreeSet::from([
432                TestSigner::with_ticker("pool1", "[Updated Pool name]"),
433                TestSigner::with_ticker("pool2", "[Added Pool name]"),
434                TestSigner::with_ticker("pool3", "[Not updated]"),
435                TestSigner::without_ticker("pool4"),
436                TestSigner::with_ticker("pool5", "[New Pool]"),
437            ])
438        );
439    }
440
441    #[tokio::test]
442    async fn importer_integration_test() {
443        let connection = Arc::new(main_db_connection().unwrap());
444        fill_signer_db(
445            connection.clone(),
446            &[
447                TestSigner::with_ticker("pool4", "[Pool4 dont change]"),
448                TestSigner::without_ticker("pool5"),
449                TestSigner::with_ticker("pool6", "[Pool6 not returned by server]"),
450                TestSigner::with_ticker("pool7", "[Pool7 ticker will be removed]"),
451            ],
452        )
453        .await
454        .unwrap();
455        let server = test_http_server(warp::path("list").map(|| {
456            r#"{
457            "data": [
458                {"pool_id": "pool1", "name": ""},
459                {"pool_id": "pool2", "name": "[] "},
460                {"pool_id": "pool3", "name": "[Pool3 added]"},
461                {"pool_id": "pool4", "name": "[Pool4 dont change]"},
462                {"pool_id": "pool5", "name": "[Pool5 add ticker]"},
463                {"pool_id": "pool7", "name": "[] "}
464            ]
465        }"#
466        }));
467
468        let importer = SignersImporter::new(
469            Arc::new(
470                CExplorerSignerRetriever::new(
471                    format!("{}/list", server.url()),
472                    None,
473                    TestLogger::stdout(),
474                )
475                .unwrap(),
476            ),
477            Arc::new(SignerStore::new(connection.clone())),
478            TestLogger::stdout(),
479        );
480        importer
481            .run()
482            .await
483            .expect("running importer should not fail");
484
485        let result = get_all_signers(connection).await.unwrap();
486        assert_eq!(
487            result,
488            BTreeSet::from([
489                TestSigner::without_ticker("pool1"),
490                TestSigner::without_ticker("pool2"),
491                TestSigner::with_ticker("pool3", "[Pool3 added]",),
492                TestSigner::with_ticker("pool4", "[Pool4 dont change]",),
493                TestSigner::with_ticker("pool5", "[Pool5 add ticker]",),
494                TestSigner::with_ticker("pool6", "[Pool6 not returned by server]",),
495                TestSigner::without_ticker("pool7"),
496            ])
497        );
498    }
499}