1use anyhow::Context;
2use async_trait::async_trait;
3use reqwest::{IntoUrl, Url};
4use serde::{Deserialize, Serialize};
5use slog::{info, warn, Logger};
6use std::collections::HashMap;
7use std::ops::Not;
8use std::sync::Arc;
9use std::time::Duration;
10
11use mithril_common::logging::LoggerExtensions;
12use mithril_common::{entities::PartyId, StdResult};
13
14use crate::database::repository::SignerStore;
15
16pub type PoolTicker = String;
17
18pub struct SignersImporter {
20 retriever: Arc<dyn SignersImporterRetriever>,
21 persister: Arc<dyn SignersImporterPersister>,
22 logger: Logger,
23}
24
25impl SignersImporter {
26 pub fn new(
28 retriever: Arc<dyn SignersImporterRetriever>,
29 persister: Arc<dyn SignersImporterPersister>,
30 logger: Logger,
31 ) -> Self {
32 Self {
33 retriever,
34 persister,
35 logger: logger.new_with_component_name::<Self>(),
36 }
37 }
38
39 pub async fn run(&self) -> StdResult<()> {
41 info!(self.logger, "Starting import");
42 let items = self
43 .retriever
44 .retrieve()
45 .await
46 .with_context(|| "Failed to retrieve signers from remote service")?;
47
48 info!(
49 self.logger, "Persisting retrieved data in the database";
50 "number_of_signer_to_insert" => items.len()
51 );
52 self.persister
53 .persist(items)
54 .await
55 .with_context(|| "Failed to persist retrieved data into the database")
56 }
57
58 pub async fn run_forever(&self, run_interval: Duration) {
60 let mut interval = tokio::time::interval(run_interval);
61
62 loop {
63 interval.tick().await;
64 if let Err(error) = self.run().await {
65 warn!(self.logger, "Signer retriever failed"; "error" => ?error);
66 }
67 info!(
68 self.logger,
69 "Cycle finished, Sleeping for {} min",
70 run_interval.as_secs() / 60
71 );
72 }
73 }
74}
75
76#[cfg_attr(test, mockall::automock)]
78#[async_trait]
79pub trait SignersImporterRetriever: Sync + Send {
80 async fn retrieve(&self) -> StdResult<HashMap<PartyId, Option<PoolTicker>>>;
82}
83
84#[cfg_attr(test, mockall::automock)]
86#[async_trait]
87pub trait SignersImporterPersister: Sync + Send {
88 async fn persist(&self, signers: HashMap<PartyId, Option<PoolTicker>>) -> StdResult<()>;
90}
91
92#[async_trait]
93impl SignersImporterPersister for SignerStore {
94 async fn persist(&self, signers: HashMap<PartyId, Option<PoolTicker>>) -> StdResult<()> {
95 self.import_many_signers(signers).await?;
96
97 Ok(())
98 }
99}
100
101pub struct CExplorerSignerRetriever {
103 source_url: Url,
105 client: reqwest::Client,
106 logger: Logger,
107}
108
109impl CExplorerSignerRetriever {
110 pub(crate) fn new<T: IntoUrl>(
112 source_url: T,
113 timeout: Option<Duration>,
114 logger: Logger,
115 ) -> StdResult<Self> {
116 let source_url = source_url
117 .into_url()
118 .with_context(|| "Given `source_url` is not a valid Url")?;
119 let client_builder = reqwest::Client::builder();
120 let client = match timeout {
121 None => client_builder,
122 Some(timeout) => client_builder.timeout(timeout),
123 }
124 .build()
125 .with_context(|| "Http Client build failed")?;
126
127 Ok(Self {
128 source_url,
129 client,
130 logger,
131 })
132 }
133}
134
135#[async_trait]
136impl SignersImporterRetriever for CExplorerSignerRetriever {
137 async fn retrieve(&self) -> StdResult<HashMap<PartyId, Option<PoolTicker>>> {
138 info!(
139 self.logger, "Retrieving data from source";
140 "source_url" => &self.source_url.as_str()
141 );
142 let response = self
143 .client
144 .get(self.source_url.to_owned())
145 .send()
146 .await
147 .with_context(|| "Retrieving of CExplorer SPO list failed")?;
148
149 let spo_list = response
150 .error_for_status()
151 .with_context(|| "Data fetching failed")?
152 .json::<SPOList>()
153 .await
154 .with_context(|| "Failed to deserialize retrieved SPO list from CExplorer")?;
155
156 Ok(spo_list
157 .data
158 .into_iter()
159 .map(|item| item.extract())
160 .collect())
161 }
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166struct SPOList {
167 data: Vec<SPOItem>,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
172struct SPOItem {
173 pool_id: String,
174 name: String,
175}
176
177impl SPOItem {
178 const EMPTY_NAME: &'static str = "[] ";
179
180 fn is_name_empty(&self) -> bool {
181 self.name.is_empty() || self.name == Self::EMPTY_NAME
182 }
183
184 fn extract(self) -> (PartyId, Option<PoolTicker>) {
187 let is_name_empty = self.is_name_empty();
188 let (pool_id, name) = (self.pool_id, self.name);
189
190 (pool_id, is_name_empty.not().then_some(name))
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use std::collections::{BTreeMap, BTreeSet};
197 use std::convert::Infallible;
198 use std::sync::Arc;
199 use warp::Filter;
200
201 use mithril_common::test_utils::test_http_server::test_http_server;
202 use mithril_common::StdResult;
203 use mithril_persistence::sqlite::SqliteConnection;
204
205 use crate::database::repository::{SignerGetter, SignerStore};
206 use crate::database::test_helper::main_db_connection;
207 use crate::http_server::routes::reply;
208 use crate::test_tools::TestLogger;
209
210 use super::*;
211
212 #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
213 struct TestSigner {
214 pool_id: String,
215 ticker: Option<String>,
216 }
217
218 impl TestSigner {
219 fn with_ticker(pool_id: &str, ticker: &str) -> Self {
220 Self {
221 pool_id: pool_id.to_string(),
222 ticker: Some(ticker.to_string()),
223 }
224 }
225
226 fn without_ticker(pool_id: &str) -> Self {
227 Self {
228 pool_id: pool_id.to_string(),
229 ticker: None,
230 }
231 }
232 }
233
234 async fn fill_signer_db(
235 connection: Arc<SqliteConnection>,
236 test_signers: &[TestSigner],
237 ) -> StdResult<()> {
238 let store = SignerStore::new(connection);
239
240 for signer in test_signers {
241 store
242 .import_signer(signer.pool_id.clone(), signer.ticker.clone())
243 .await?;
244 }
245
246 Ok(())
247 }
248
249 async fn get_all_signers(connection: Arc<SqliteConnection>) -> StdResult<BTreeSet<TestSigner>> {
250 let store = SignerStore::new(connection);
251
252 let signers = store
253 .get_all()
254 .await?
255 .into_iter()
256 .map(|s| TestSigner {
257 pool_id: s.signer_id,
258 ticker: s.pool_ticker,
259 })
260 .collect();
261 Ok(signers)
262 }
263
264 #[test]
265 fn item_with_empty_name_yield_empty_pool_ticker() {
266 for name in ["", SPOItem::EMPTY_NAME] {
267 let item = SPOItem {
268 pool_id: "whatever".to_string(),
269 name: name.to_string(),
270 };
271 assert!(item.is_name_empty());
272 assert_eq!(("whatever".to_string(), None), item.extract());
273 }
274 }
275
276 #[tokio::test]
277 async fn retriever_should_return_deduplicated_data_and_handle_empty_name() {
278 let server = test_http_server(warp::path("list").map(|| {
279 r#"{
280 "data": [
281 {"pool_id": "pool1", "name": ""},
282 {"pool_id": "pool2", "name": "[] "},
283 {"pool_id": "pool3", "name": "whatever"},
284 {"pool_id": "pool3", "name": "whatever2"}
285 ]
286 }"#
287 }));
288
289 let retriever = CExplorerSignerRetriever::new(
290 format!("{}/list", server.url()),
291 None,
292 TestLogger::stdout(),
293 )
294 .unwrap();
295 let result = retriever
296 .retrieve()
297 .await
298 .expect("Retriever should not fail");
299
300 assert_eq!(
301 result.into_iter().collect::<BTreeMap<_, _>>(),
302 BTreeMap::from([
303 ("pool1".to_string(), None),
304 ("pool2".to_string(), None),
305 ("pool3".to_string(), Some("whatever2".to_string())),
306 ])
307 );
308 }
309
310 #[tokio::test]
311 async fn retriever_handle_http_data_fetching_error() {
312 let server =
313 test_http_server(warp::path("list").map(|| reply::internal_server_error("whatever")));
314
315 let retriever = CExplorerSignerRetriever::new(
316 format!("{}/list", server.url()),
317 None,
318 TestLogger::stdout(),
319 )
320 .unwrap();
321 retriever
322 .retrieve()
323 .await
324 .expect_err("An error should have been raised");
325 }
326
327 #[tokio::test]
328 async fn retriever_yield_error_when_json_is_malformed() {
329 let server = test_http_server(warp::path("list").map(|| r#"{ "data": [ {"pool_" ] }"#));
330
331 let retriever = CExplorerSignerRetriever::new(
332 format!("{}/list", server.url()),
333 None,
334 TestLogger::stdout(),
335 )
336 .unwrap();
337 retriever
338 .retrieve()
339 .await
340 .expect_err("An error should have been raised");
341 }
342
343 #[tokio::test]
344 async fn retriever_can_timeout() {
345 let server = test_http_server(warp::path("list").and_then(|| async {
346 tokio::time::sleep(Duration::from_millis(70)).await;
347 Ok::<&str, Infallible>(r#"{"data":[]}"#)
348 }));
349
350 let retriever = CExplorerSignerRetriever::new(
351 format!("{}/list", server.url()),
352 Some(Duration::from_millis(10)),
353 TestLogger::stdout(),
354 )
355 .unwrap();
356 retriever
357 .retrieve()
358 .await
359 .expect_err("An error should have been raised");
360 }
361
362 #[tokio::test]
363 async fn persist_list_of_two_signers_one_with_ticker_the_other_without() {
364 let connection = Arc::new(main_db_connection().unwrap());
365 let mut retriever = MockSignersImporterRetriever::new();
366 retriever.expect_retrieve().returning(|| {
367 Ok(HashMap::from([
368 ("pool1".to_string(), Some("[Pool name test]".to_string())),
369 ("pool2".to_string(), None),
370 ]))
371 });
372
373 let importer = SignersImporter::new(
374 Arc::new(retriever),
375 Arc::new(SignerStore::new(connection.clone())),
376 TestLogger::stdout(),
377 );
378 importer
379 .run()
380 .await
381 .expect("running importer should not fail");
382
383 let result = get_all_signers(connection).await.unwrap();
384 assert_eq!(
385 result,
386 BTreeSet::from([
387 TestSigner::with_ticker("pool1", "[Pool name test]",),
388 TestSigner::without_ticker("pool2"),
389 ])
390 );
391 }
392
393 #[tokio::test]
394 async fn persist_update_existing_data() {
395 let connection = Arc::new(main_db_connection().unwrap());
396 fill_signer_db(
397 connection.clone(),
398 &[
399 TestSigner::with_ticker("pool1", "[Pool name test]"),
400 TestSigner::without_ticker("pool2"),
401 TestSigner::with_ticker("pool3", "[Not updated]"),
402 TestSigner::with_ticker("pool4", "[Ticker will be removed]"),
403 ],
404 )
405 .await
406 .unwrap();
407 let mut retriever = MockSignersImporterRetriever::new();
408 retriever.expect_retrieve().returning(|| {
409 Ok(HashMap::from([
410 ("pool1".to_string(), Some("[Updated Pool name]".to_string())),
411 ("pool2".to_string(), Some("[Added Pool name]".to_string())),
412 ("pool3".to_string(), Some("[Not updated]".to_string())),
413 ("pool4".to_string(), None),
414 ("pool5".to_string(), Some("[New Pool]".to_string())),
415 ]))
416 });
417
418 let importer = SignersImporter::new(
419 Arc::new(retriever),
420 Arc::new(SignerStore::new(connection.clone())),
421 TestLogger::stdout(),
422 );
423 importer
424 .run()
425 .await
426 .expect("running importer should not fail");
427
428 let result = get_all_signers(connection).await.unwrap();
429 assert_eq!(
430 result,
431 BTreeSet::from([
432 TestSigner::with_ticker("pool1", "[Updated Pool name]"),
433 TestSigner::with_ticker("pool2", "[Added Pool name]"),
434 TestSigner::with_ticker("pool3", "[Not updated]"),
435 TestSigner::without_ticker("pool4"),
436 TestSigner::with_ticker("pool5", "[New Pool]"),
437 ])
438 );
439 }
440
441 #[tokio::test]
442 async fn importer_integration_test() {
443 let connection = Arc::new(main_db_connection().unwrap());
444 fill_signer_db(
445 connection.clone(),
446 &[
447 TestSigner::with_ticker("pool4", "[Pool4 dont change]"),
448 TestSigner::without_ticker("pool5"),
449 TestSigner::with_ticker("pool6", "[Pool6 not returned by server]"),
450 TestSigner::with_ticker("pool7", "[Pool7 ticker will be removed]"),
451 ],
452 )
453 .await
454 .unwrap();
455 let server = test_http_server(warp::path("list").map(|| {
456 r#"{
457 "data": [
458 {"pool_id": "pool1", "name": ""},
459 {"pool_id": "pool2", "name": "[] "},
460 {"pool_id": "pool3", "name": "[Pool3 added]"},
461 {"pool_id": "pool4", "name": "[Pool4 dont change]"},
462 {"pool_id": "pool5", "name": "[Pool5 add ticker]"},
463 {"pool_id": "pool7", "name": "[] "}
464 ]
465 }"#
466 }));
467
468 let importer = SignersImporter::new(
469 Arc::new(
470 CExplorerSignerRetriever::new(
471 format!("{}/list", server.url()),
472 None,
473 TestLogger::stdout(),
474 )
475 .unwrap(),
476 ),
477 Arc::new(SignerStore::new(connection.clone())),
478 TestLogger::stdout(),
479 );
480 importer
481 .run()
482 .await
483 .expect("running importer should not fail");
484
485 let result = get_all_signers(connection).await.unwrap();
486 assert_eq!(
487 result,
488 BTreeSet::from([
489 TestSigner::without_ticker("pool1"),
490 TestSigner::without_ticker("pool2"),
491 TestSigner::with_ticker("pool3", "[Pool3 added]",),
492 TestSigner::with_ticker("pool4", "[Pool4 dont change]",),
493 TestSigner::with_ticker("pool5", "[Pool5 add ticker]",),
494 TestSigner::with_ticker("pool6", "[Pool6 not returned by server]",),
495 TestSigner::without_ticker("pool7"),
496 ])
497 );
498 }
499}