mithril_aggregator/tools/
signer_importer.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use reqwest::{IntoUrl, Url};
4use serde::{Deserialize, Serialize};
5use slog::{Logger, info, warn};
6use std::collections::HashMap;
7use std::ops::Not;
8use std::sync::Arc;
9use std::time::Duration;
10
11use mithril_common::logging::LoggerExtensions;
12use mithril_common::{StdResult, entities::PartyId};
13
14use crate::database::repository::SignerStore;
15
16pub type PoolTicker = String;
17
18/// Tool that can import a list of signers
19pub struct SignersImporter {
20    retriever: Arc<dyn SignersImporterRetriever>,
21    persister: Arc<dyn SignersImporterPersister>,
22    logger: Logger,
23}
24
25impl SignersImporter {
26    /// [SignersImporter] factory
27    pub fn new(
28        retriever: Arc<dyn SignersImporterRetriever>,
29        persister: Arc<dyn SignersImporterPersister>,
30        logger: Logger,
31    ) -> Self {
32        Self {
33            retriever,
34            persister,
35            logger: logger.new_with_component_name::<Self>(),
36        }
37    }
38
39    /// Import and persist the signers
40    pub async fn run(&self) -> StdResult<()> {
41        info!(self.logger, "Starting import");
42        let items = self
43            .retriever
44            .retrieve()
45            .await
46            .with_context(|| "Failed to retrieve signers from remote service")?;
47
48        info!(
49            self.logger, "Persisting retrieved data in the database";
50            "number_of_signer_to_insert" => items.len()
51        );
52        self.persister
53            .persist(items)
54            .await
55            .with_context(|| "Failed to persist retrieved data into the database")
56    }
57
58    /// Start a loop that call [run][Self::run] at the given time interval.
59    pub async fn run_forever(&self, run_interval: Duration) {
60        let mut interval = tokio::time::interval(run_interval);
61
62        loop {
63            interval.tick().await;
64            if let Err(error) = self.run().await {
65                warn!(self.logger, "Signer retriever failed"; "error" => ?error);
66            }
67            info!(
68                self.logger,
69                "Cycle finished, Sleeping for {} min",
70                run_interval.as_secs() / 60
71            );
72        }
73    }
74}
75
76/// Trait that define how a [SignersImporter] retrieve the signers to import.
77#[cfg_attr(test, mockall::automock)]
78#[async_trait]
79pub trait SignersImporterRetriever: Sync + Send {
80    /// Retrieve the signers list.
81    async fn retrieve(&self) -> StdResult<HashMap<PartyId, Option<PoolTicker>>>;
82}
83
84/// Trait that define how a [SignersImporter] persist the retrieved signers.
85#[cfg_attr(test, mockall::automock)]
86#[async_trait]
87pub trait SignersImporterPersister: Sync + Send {
88    /// Persist the given list of signers.
89    async fn persist(&self, signers: HashMap<PartyId, Option<PoolTicker>>) -> StdResult<()>;
90}
91
92#[async_trait]
93impl SignersImporterPersister for SignerStore {
94    async fn persist(&self, signers: HashMap<PartyId, Option<PoolTicker>>) -> StdResult<()> {
95        self.import_many_signers(signers).await?;
96
97        Ok(())
98    }
99}
100
101/// A [SignersImporterRetriever] fetching signers data from CExplorer.
102pub struct CExplorerSignerRetriever {
103    /// Url from which a SPO list using the CExplorer format will be fetch.
104    source_url: Url,
105    client: reqwest::Client,
106    logger: Logger,
107}
108
109impl CExplorerSignerRetriever {
110    /// Create a new [CExplorerSignerRetriever] that will fetch data from the given url.
111    pub(crate) fn new<T: IntoUrl>(
112        source_url: T,
113        timeout: Option<Duration>,
114        logger: Logger,
115    ) -> StdResult<Self> {
116        let source_url = source_url
117            .into_url()
118            .with_context(|| "Given `source_url` is not a valid Url")?;
119        let client_builder = reqwest::Client::builder();
120        let client = match timeout {
121            None => client_builder,
122            Some(timeout) => client_builder.timeout(timeout),
123        }
124        .build()
125        .with_context(|| "Http Client build failed")?;
126
127        Ok(Self {
128            source_url,
129            client,
130            logger,
131        })
132    }
133}
134
135#[async_trait]
136impl SignersImporterRetriever for CExplorerSignerRetriever {
137    async fn retrieve(&self) -> StdResult<HashMap<PartyId, Option<PoolTicker>>> {
138        info!(
139            self.logger, "Retrieving data from source";
140            "source_url" => &self.source_url.as_str()
141        );
142        let response = self
143            .client
144            .get(self.source_url.to_owned())
145            .send()
146            .await
147            .with_context(|| "Retrieving of CExplorer SPO list failed")?;
148
149        let spo_list = response
150            .error_for_status()
151            .with_context(|| "Data fetching failed")?
152            .json::<SPOList>()
153            .await
154            .with_context(|| "Failed to deserialize retrieved SPO list from CExplorer")?;
155
156        Ok(spo_list.data.into_iter().map(|item| item.extract()).collect())
157    }
158}
159
160/// *Internal type* Map a CExplorer SPO list.
161#[derive(Debug, Clone, Serialize, Deserialize)]
162struct SPOList {
163    data: Vec<SPOItem>,
164}
165
166/// *Internal type* Map a CExplorer SPO item inside its data list.
167#[derive(Debug, Clone, Serialize, Deserialize)]
168struct SPOItem {
169    pool_id: String,
170    name: String,
171}
172
173impl SPOItem {
174    const EMPTY_NAME: &'static str = "[] ";
175
176    fn is_name_empty(&self) -> bool {
177        self.name.is_empty() || self.name == Self::EMPTY_NAME
178    }
179
180    /// Consume this item to convert it to a result ready to be yield by a
181    /// [SignersImporterRetriever::retrieve] implementation.
182    fn extract(self) -> (PartyId, Option<PoolTicker>) {
183        let is_name_empty = self.is_name_empty();
184        let (pool_id, name) = (self.pool_id, self.name);
185
186        (pool_id, is_name_empty.not().then_some(name))
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use std::collections::{BTreeMap, BTreeSet};
193    use std::convert::Infallible;
194    use std::sync::Arc;
195    use warp::Filter;
196
197    use mithril_common::StdResult;
198    use mithril_persistence::sqlite::SqliteConnection;
199    use mithril_test_http_server::test_http_server;
200
201    use crate::database::repository::{SignerGetter, SignerStore};
202    use crate::database::test_helper::main_db_connection;
203    use crate::http_server::routes::reply;
204    use crate::test::TestLogger;
205
206    use super::*;
207
208    #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
209    struct TestSigner {
210        pool_id: String,
211        ticker: Option<String>,
212    }
213
214    impl TestSigner {
215        fn with_ticker(pool_id: &str, ticker: &str) -> Self {
216            Self {
217                pool_id: pool_id.to_string(),
218                ticker: Some(ticker.to_string()),
219            }
220        }
221
222        fn without_ticker(pool_id: &str) -> Self {
223            Self {
224                pool_id: pool_id.to_string(),
225                ticker: None,
226            }
227        }
228    }
229
230    async fn fill_signer_db(
231        connection: Arc<SqliteConnection>,
232        test_signers: &[TestSigner],
233    ) -> StdResult<()> {
234        let store = SignerStore::new(connection);
235
236        for signer in test_signers {
237            store
238                .import_signer(signer.pool_id.clone(), signer.ticker.clone())
239                .await?;
240        }
241
242        Ok(())
243    }
244
245    async fn get_all_signers(connection: Arc<SqliteConnection>) -> StdResult<BTreeSet<TestSigner>> {
246        let store = SignerStore::new(connection);
247
248        let signers = store
249            .get_all()
250            .await?
251            .into_iter()
252            .map(|s| TestSigner {
253                pool_id: s.signer_id,
254                ticker: s.pool_ticker,
255            })
256            .collect();
257        Ok(signers)
258    }
259
260    #[test]
261    fn item_with_empty_name_yield_empty_pool_ticker() {
262        for name in ["", SPOItem::EMPTY_NAME] {
263            let item = SPOItem {
264                pool_id: "whatever".to_string(),
265                name: name.to_string(),
266            };
267            assert!(item.is_name_empty());
268            assert_eq!(("whatever".to_string(), None), item.extract());
269        }
270    }
271
272    #[tokio::test]
273    async fn retriever_should_return_deduplicated_data_and_handle_empty_name() {
274        let server = test_http_server(warp::path("list").map(|| {
275            r#"{
276            "data": [
277                {"pool_id": "pool1", "name": ""},
278                {"pool_id": "pool2", "name": "[] "},
279                {"pool_id": "pool3", "name": "whatever"},
280                {"pool_id": "pool3", "name": "whatever2"}
281            ]
282        }"#
283        }));
284
285        let retriever = CExplorerSignerRetriever::new(
286            format!("{}/list", server.url()),
287            None,
288            TestLogger::stdout(),
289        )
290        .unwrap();
291        let result = retriever.retrieve().await.expect("Retriever should not fail");
292
293        assert_eq!(
294            result.into_iter().collect::<BTreeMap<_, _>>(),
295            BTreeMap::from([
296                ("pool1".to_string(), None),
297                ("pool2".to_string(), None),
298                ("pool3".to_string(), Some("whatever2".to_string())),
299            ])
300        );
301    }
302
303    #[tokio::test]
304    async fn retriever_handle_http_data_fetching_error() {
305        let server =
306            test_http_server(warp::path("list").map(|| reply::internal_server_error("whatever")));
307
308        let retriever = CExplorerSignerRetriever::new(
309            format!("{}/list", server.url()),
310            None,
311            TestLogger::stdout(),
312        )
313        .unwrap();
314        retriever
315            .retrieve()
316            .await
317            .expect_err("An error should have been raised");
318    }
319
320    #[tokio::test]
321    async fn retriever_yield_error_when_json_is_malformed() {
322        let server = test_http_server(warp::path("list").map(|| r#"{ "data": [ {"pool_" ] }"#));
323
324        let retriever = CExplorerSignerRetriever::new(
325            format!("{}/list", server.url()),
326            None,
327            TestLogger::stdout(),
328        )
329        .unwrap();
330        retriever
331            .retrieve()
332            .await
333            .expect_err("An error should have been raised");
334    }
335
336    #[tokio::test]
337    async fn retriever_can_timeout() {
338        let server = test_http_server(warp::path("list").and_then(|| async {
339            tokio::time::sleep(Duration::from_millis(70)).await;
340            Ok::<&str, Infallible>(r#"{"data":[]}"#)
341        }));
342
343        let retriever = CExplorerSignerRetriever::new(
344            format!("{}/list", server.url()),
345            Some(Duration::from_millis(10)),
346            TestLogger::stdout(),
347        )
348        .unwrap();
349        retriever
350            .retrieve()
351            .await
352            .expect_err("An error should have been raised");
353    }
354
355    #[tokio::test]
356    async fn persist_list_of_two_signers_one_with_ticker_the_other_without() {
357        let connection = Arc::new(main_db_connection().unwrap());
358        let mut retriever = MockSignersImporterRetriever::new();
359        retriever.expect_retrieve().returning(|| {
360            Ok(HashMap::from([
361                ("pool1".to_string(), Some("[Pool name test]".to_string())),
362                ("pool2".to_string(), None),
363            ]))
364        });
365
366        let importer = SignersImporter::new(
367            Arc::new(retriever),
368            Arc::new(SignerStore::new(connection.clone())),
369            TestLogger::stdout(),
370        );
371        importer.run().await.expect("running importer should not fail");
372
373        let result = get_all_signers(connection).await.unwrap();
374        assert_eq!(
375            result,
376            BTreeSet::from([
377                TestSigner::with_ticker("pool1", "[Pool name test]",),
378                TestSigner::without_ticker("pool2"),
379            ])
380        );
381    }
382
383    #[tokio::test]
384    async fn persist_update_existing_data() {
385        let connection = Arc::new(main_db_connection().unwrap());
386        fill_signer_db(
387            connection.clone(),
388            &[
389                TestSigner::with_ticker("pool1", "[Pool name test]"),
390                TestSigner::without_ticker("pool2"),
391                TestSigner::with_ticker("pool3", "[Not updated]"),
392                TestSigner::with_ticker("pool4", "[Ticker will be removed]"),
393            ],
394        )
395        .await
396        .unwrap();
397        let mut retriever = MockSignersImporterRetriever::new();
398        retriever.expect_retrieve().returning(|| {
399            Ok(HashMap::from([
400                ("pool1".to_string(), Some("[Updated Pool name]".to_string())),
401                ("pool2".to_string(), Some("[Added Pool name]".to_string())),
402                ("pool3".to_string(), Some("[Not updated]".to_string())),
403                ("pool4".to_string(), None),
404                ("pool5".to_string(), Some("[New Pool]".to_string())),
405            ]))
406        });
407
408        let importer = SignersImporter::new(
409            Arc::new(retriever),
410            Arc::new(SignerStore::new(connection.clone())),
411            TestLogger::stdout(),
412        );
413        importer.run().await.expect("running importer should not fail");
414
415        let result = get_all_signers(connection).await.unwrap();
416        assert_eq!(
417            result,
418            BTreeSet::from([
419                TestSigner::with_ticker("pool1", "[Updated Pool name]"),
420                TestSigner::with_ticker("pool2", "[Added Pool name]"),
421                TestSigner::with_ticker("pool3", "[Not updated]"),
422                TestSigner::without_ticker("pool4"),
423                TestSigner::with_ticker("pool5", "[New Pool]"),
424            ])
425        );
426    }
427
428    #[tokio::test]
429    async fn importer_integration_test() {
430        let connection = Arc::new(main_db_connection().unwrap());
431        fill_signer_db(
432            connection.clone(),
433            &[
434                TestSigner::with_ticker("pool4", "[Pool4 dont change]"),
435                TestSigner::without_ticker("pool5"),
436                TestSigner::with_ticker("pool6", "[Pool6 not returned by server]"),
437                TestSigner::with_ticker("pool7", "[Pool7 ticker will be removed]"),
438            ],
439        )
440        .await
441        .unwrap();
442        let server = test_http_server(warp::path("list").map(|| {
443            r#"{
444            "data": [
445                {"pool_id": "pool1", "name": ""},
446                {"pool_id": "pool2", "name": "[] "},
447                {"pool_id": "pool3", "name": "[Pool3 added]"},
448                {"pool_id": "pool4", "name": "[Pool4 dont change]"},
449                {"pool_id": "pool5", "name": "[Pool5 add ticker]"},
450                {"pool_id": "pool7", "name": "[] "}
451            ]
452        }"#
453        }));
454
455        let importer = SignersImporter::new(
456            Arc::new(
457                CExplorerSignerRetriever::new(
458                    format!("{}/list", server.url()),
459                    None,
460                    TestLogger::stdout(),
461                )
462                .unwrap(),
463            ),
464            Arc::new(SignerStore::new(connection.clone())),
465            TestLogger::stdout(),
466        );
467        importer.run().await.expect("running importer should not fail");
468
469        let result = get_all_signers(connection).await.unwrap();
470        assert_eq!(
471            result,
472            BTreeSet::from([
473                TestSigner::without_ticker("pool1"),
474                TestSigner::without_ticker("pool2"),
475                TestSigner::with_ticker("pool3", "[Pool3 added]",),
476                TestSigner::with_ticker("pool4", "[Pool4 dont change]",),
477                TestSigner::with_ticker("pool5", "[Pool5 add ticker]",),
478                TestSigner::with_ticker("pool6", "[Pool6 not returned by server]",),
479                TestSigner::without_ticker("pool7"),
480            ])
481        );
482    }
483}