1use anyhow::Context;
2use async_trait::async_trait;
3use reqwest::{IntoUrl, Url};
4use serde::{Deserialize, Serialize};
5use slog::{Logger, info, warn};
6use std::collections::HashMap;
7use std::ops::Not;
8use std::sync::Arc;
9use std::time::Duration;
10
11use mithril_common::logging::LoggerExtensions;
12use mithril_common::{StdResult, entities::PartyId};
13
14use crate::database::repository::SignerStore;
15
16pub type PoolTicker = String;
17
18pub struct SignersImporter {
20 retriever: Arc<dyn SignersImporterRetriever>,
21 persister: Arc<dyn SignersImporterPersister>,
22 logger: Logger,
23}
24
25impl SignersImporter {
26 pub fn new(
28 retriever: Arc<dyn SignersImporterRetriever>,
29 persister: Arc<dyn SignersImporterPersister>,
30 logger: Logger,
31 ) -> Self {
32 Self {
33 retriever,
34 persister,
35 logger: logger.new_with_component_name::<Self>(),
36 }
37 }
38
39 pub async fn run(&self) -> StdResult<()> {
41 info!(self.logger, "Starting import");
42 let items = self
43 .retriever
44 .retrieve()
45 .await
46 .with_context(|| "Failed to retrieve signers from remote service")?;
47
48 info!(
49 self.logger, "Persisting retrieved data in the database";
50 "number_of_signer_to_insert" => items.len()
51 );
52 self.persister
53 .persist(items)
54 .await
55 .with_context(|| "Failed to persist retrieved data into the database")
56 }
57
58 pub async fn run_forever(&self, run_interval: Duration) {
60 let mut interval = tokio::time::interval(run_interval);
61
62 loop {
63 interval.tick().await;
64 if let Err(error) = self.run().await {
65 warn!(self.logger, "Signer retriever failed"; "error" => ?error);
66 }
67 info!(
68 self.logger,
69 "Cycle finished, Sleeping for {} min",
70 run_interval.as_secs() / 60
71 );
72 }
73 }
74}
75
76#[cfg_attr(test, mockall::automock)]
78#[async_trait]
79pub trait SignersImporterRetriever: Sync + Send {
80 async fn retrieve(&self) -> StdResult<HashMap<PartyId, Option<PoolTicker>>>;
82}
83
84#[cfg_attr(test, mockall::automock)]
86#[async_trait]
87pub trait SignersImporterPersister: Sync + Send {
88 async fn persist(&self, signers: HashMap<PartyId, Option<PoolTicker>>) -> StdResult<()>;
90}
91
92#[async_trait]
93impl SignersImporterPersister for SignerStore {
94 async fn persist(&self, signers: HashMap<PartyId, Option<PoolTicker>>) -> StdResult<()> {
95 self.import_many_signers(signers).await?;
96
97 Ok(())
98 }
99}
100
101pub struct CExplorerSignerRetriever {
103 source_url: Url,
105 client: reqwest::Client,
106 logger: Logger,
107}
108
109impl CExplorerSignerRetriever {
110 pub(crate) fn new<T: IntoUrl>(
112 source_url: T,
113 timeout: Option<Duration>,
114 logger: Logger,
115 ) -> StdResult<Self> {
116 let source_url = source_url
117 .into_url()
118 .with_context(|| "Given `source_url` is not a valid Url")?;
119 let client_builder = reqwest::Client::builder();
120 let client = match timeout {
121 None => client_builder,
122 Some(timeout) => client_builder.timeout(timeout),
123 }
124 .build()
125 .with_context(|| "Http Client build failed")?;
126
127 Ok(Self {
128 source_url,
129 client,
130 logger,
131 })
132 }
133}
134
135#[async_trait]
136impl SignersImporterRetriever for CExplorerSignerRetriever {
137 async fn retrieve(&self) -> StdResult<HashMap<PartyId, Option<PoolTicker>>> {
138 info!(
139 self.logger, "Retrieving data from source";
140 "source_url" => &self.source_url.as_str()
141 );
142 let response = self
143 .client
144 .get(self.source_url.to_owned())
145 .send()
146 .await
147 .with_context(|| "Retrieving of CExplorer SPO list failed")?;
148
149 let spo_list = response
150 .error_for_status()
151 .with_context(|| "Data fetching failed")?
152 .json::<SPOList>()
153 .await
154 .with_context(|| "Failed to deserialize retrieved SPO list from CExplorer")?;
155
156 Ok(spo_list.data.into_iter().map(|item| item.extract()).collect())
157 }
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162struct SPOList {
163 data: Vec<SPOItem>,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168struct SPOItem {
169 pool_id: String,
170 name: String,
171}
172
173impl SPOItem {
174 const EMPTY_NAME: &'static str = "[] ";
175
176 fn is_name_empty(&self) -> bool {
177 self.name.is_empty() || self.name == Self::EMPTY_NAME
178 }
179
180 fn extract(self) -> (PartyId, Option<PoolTicker>) {
183 let is_name_empty = self.is_name_empty();
184 let (pool_id, name) = (self.pool_id, self.name);
185
186 (pool_id, is_name_empty.not().then_some(name))
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use std::collections::{BTreeMap, BTreeSet};
193 use std::convert::Infallible;
194 use std::sync::Arc;
195 use warp::Filter;
196
197 use mithril_common::StdResult;
198 use mithril_persistence::sqlite::SqliteConnection;
199 use mithril_test_http_server::test_http_server;
200
201 use crate::database::repository::{SignerGetter, SignerStore};
202 use crate::database::test_helper::main_db_connection;
203 use crate::http_server::routes::reply;
204 use crate::test::TestLogger;
205
206 use super::*;
207
208 #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
209 struct TestSigner {
210 pool_id: String,
211 ticker: Option<String>,
212 }
213
214 impl TestSigner {
215 fn with_ticker(pool_id: &str, ticker: &str) -> Self {
216 Self {
217 pool_id: pool_id.to_string(),
218 ticker: Some(ticker.to_string()),
219 }
220 }
221
222 fn without_ticker(pool_id: &str) -> Self {
223 Self {
224 pool_id: pool_id.to_string(),
225 ticker: None,
226 }
227 }
228 }
229
230 async fn fill_signer_db(
231 connection: Arc<SqliteConnection>,
232 test_signers: &[TestSigner],
233 ) -> StdResult<()> {
234 let store = SignerStore::new(connection);
235
236 for signer in test_signers {
237 store
238 .import_signer(signer.pool_id.clone(), signer.ticker.clone())
239 .await?;
240 }
241
242 Ok(())
243 }
244
245 async fn get_all_signers(connection: Arc<SqliteConnection>) -> StdResult<BTreeSet<TestSigner>> {
246 let store = SignerStore::new(connection);
247
248 let signers = store
249 .get_all()
250 .await?
251 .into_iter()
252 .map(|s| TestSigner {
253 pool_id: s.signer_id,
254 ticker: s.pool_ticker,
255 })
256 .collect();
257 Ok(signers)
258 }
259
260 #[test]
261 fn item_with_empty_name_yield_empty_pool_ticker() {
262 for name in ["", SPOItem::EMPTY_NAME] {
263 let item = SPOItem {
264 pool_id: "whatever".to_string(),
265 name: name.to_string(),
266 };
267 assert!(item.is_name_empty());
268 assert_eq!(("whatever".to_string(), None), item.extract());
269 }
270 }
271
272 #[tokio::test]
273 async fn retriever_should_return_deduplicated_data_and_handle_empty_name() {
274 let server = test_http_server(warp::path("list").map(|| {
275 r#"{
276 "data": [
277 {"pool_id": "pool1", "name": ""},
278 {"pool_id": "pool2", "name": "[] "},
279 {"pool_id": "pool3", "name": "whatever"},
280 {"pool_id": "pool3", "name": "whatever2"}
281 ]
282 }"#
283 }));
284
285 let retriever = CExplorerSignerRetriever::new(
286 format!("{}/list", server.url()),
287 None,
288 TestLogger::stdout(),
289 )
290 .unwrap();
291 let result = retriever.retrieve().await.expect("Retriever should not fail");
292
293 assert_eq!(
294 result.into_iter().collect::<BTreeMap<_, _>>(),
295 BTreeMap::from([
296 ("pool1".to_string(), None),
297 ("pool2".to_string(), None),
298 ("pool3".to_string(), Some("whatever2".to_string())),
299 ])
300 );
301 }
302
303 #[tokio::test]
304 async fn retriever_handle_http_data_fetching_error() {
305 let server =
306 test_http_server(warp::path("list").map(|| reply::internal_server_error("whatever")));
307
308 let retriever = CExplorerSignerRetriever::new(
309 format!("{}/list", server.url()),
310 None,
311 TestLogger::stdout(),
312 )
313 .unwrap();
314 retriever
315 .retrieve()
316 .await
317 .expect_err("An error should have been raised");
318 }
319
320 #[tokio::test]
321 async fn retriever_yield_error_when_json_is_malformed() {
322 let server = test_http_server(warp::path("list").map(|| r#"{ "data": [ {"pool_" ] }"#));
323
324 let retriever = CExplorerSignerRetriever::new(
325 format!("{}/list", server.url()),
326 None,
327 TestLogger::stdout(),
328 )
329 .unwrap();
330 retriever
331 .retrieve()
332 .await
333 .expect_err("An error should have been raised");
334 }
335
336 #[tokio::test]
337 async fn retriever_can_timeout() {
338 let server = test_http_server(warp::path("list").and_then(|| async {
339 tokio::time::sleep(Duration::from_millis(70)).await;
340 Ok::<&str, Infallible>(r#"{"data":[]}"#)
341 }));
342
343 let retriever = CExplorerSignerRetriever::new(
344 format!("{}/list", server.url()),
345 Some(Duration::from_millis(10)),
346 TestLogger::stdout(),
347 )
348 .unwrap();
349 retriever
350 .retrieve()
351 .await
352 .expect_err("An error should have been raised");
353 }
354
355 #[tokio::test]
356 async fn persist_list_of_two_signers_one_with_ticker_the_other_without() {
357 let connection = Arc::new(main_db_connection().unwrap());
358 let mut retriever = MockSignersImporterRetriever::new();
359 retriever.expect_retrieve().returning(|| {
360 Ok(HashMap::from([
361 ("pool1".to_string(), Some("[Pool name test]".to_string())),
362 ("pool2".to_string(), None),
363 ]))
364 });
365
366 let importer = SignersImporter::new(
367 Arc::new(retriever),
368 Arc::new(SignerStore::new(connection.clone())),
369 TestLogger::stdout(),
370 );
371 importer.run().await.expect("running importer should not fail");
372
373 let result = get_all_signers(connection).await.unwrap();
374 assert_eq!(
375 result,
376 BTreeSet::from([
377 TestSigner::with_ticker("pool1", "[Pool name test]",),
378 TestSigner::without_ticker("pool2"),
379 ])
380 );
381 }
382
383 #[tokio::test]
384 async fn persist_update_existing_data() {
385 let connection = Arc::new(main_db_connection().unwrap());
386 fill_signer_db(
387 connection.clone(),
388 &[
389 TestSigner::with_ticker("pool1", "[Pool name test]"),
390 TestSigner::without_ticker("pool2"),
391 TestSigner::with_ticker("pool3", "[Not updated]"),
392 TestSigner::with_ticker("pool4", "[Ticker will be removed]"),
393 ],
394 )
395 .await
396 .unwrap();
397 let mut retriever = MockSignersImporterRetriever::new();
398 retriever.expect_retrieve().returning(|| {
399 Ok(HashMap::from([
400 ("pool1".to_string(), Some("[Updated Pool name]".to_string())),
401 ("pool2".to_string(), Some("[Added Pool name]".to_string())),
402 ("pool3".to_string(), Some("[Not updated]".to_string())),
403 ("pool4".to_string(), None),
404 ("pool5".to_string(), Some("[New Pool]".to_string())),
405 ]))
406 });
407
408 let importer = SignersImporter::new(
409 Arc::new(retriever),
410 Arc::new(SignerStore::new(connection.clone())),
411 TestLogger::stdout(),
412 );
413 importer.run().await.expect("running importer should not fail");
414
415 let result = get_all_signers(connection).await.unwrap();
416 assert_eq!(
417 result,
418 BTreeSet::from([
419 TestSigner::with_ticker("pool1", "[Updated Pool name]"),
420 TestSigner::with_ticker("pool2", "[Added Pool name]"),
421 TestSigner::with_ticker("pool3", "[Not updated]"),
422 TestSigner::without_ticker("pool4"),
423 TestSigner::with_ticker("pool5", "[New Pool]"),
424 ])
425 );
426 }
427
428 #[tokio::test]
429 async fn importer_integration_test() {
430 let connection = Arc::new(main_db_connection().unwrap());
431 fill_signer_db(
432 connection.clone(),
433 &[
434 TestSigner::with_ticker("pool4", "[Pool4 dont change]"),
435 TestSigner::without_ticker("pool5"),
436 TestSigner::with_ticker("pool6", "[Pool6 not returned by server]"),
437 TestSigner::with_ticker("pool7", "[Pool7 ticker will be removed]"),
438 ],
439 )
440 .await
441 .unwrap();
442 let server = test_http_server(warp::path("list").map(|| {
443 r#"{
444 "data": [
445 {"pool_id": "pool1", "name": ""},
446 {"pool_id": "pool2", "name": "[] "},
447 {"pool_id": "pool3", "name": "[Pool3 added]"},
448 {"pool_id": "pool4", "name": "[Pool4 dont change]"},
449 {"pool_id": "pool5", "name": "[Pool5 add ticker]"},
450 {"pool_id": "pool7", "name": "[] "}
451 ]
452 }"#
453 }));
454
455 let importer = SignersImporter::new(
456 Arc::new(
457 CExplorerSignerRetriever::new(
458 format!("{}/list", server.url()),
459 None,
460 TestLogger::stdout(),
461 )
462 .unwrap(),
463 ),
464 Arc::new(SignerStore::new(connection.clone())),
465 TestLogger::stdout(),
466 );
467 importer.run().await.expect("running importer should not fail");
468
469 let result = get_all_signers(connection).await.unwrap();
470 assert_eq!(
471 result,
472 BTreeSet::from([
473 TestSigner::without_ticker("pool1"),
474 TestSigner::without_ticker("pool2"),
475 TestSigner::with_ticker("pool3", "[Pool3 added]",),
476 TestSigner::with_ticker("pool4", "[Pool4 dont change]",),
477 TestSigner::with_ticker("pool5", "[Pool5 add ticker]",),
478 TestSigner::with_ticker("pool6", "[Pool6 not returned by server]",),
479 TestSigner::without_ticker("pool7"),
480 ])
481 );
482 }
483}