1use std::collections::BTreeSet;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, anyhow};
5use async_trait::async_trait;
6use pallas_addresses::Address;
7use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
8use pallas_network::{
9 facades::NodeClient,
10 miniprotocols::{
11 Point,
12 localstate::{
13 Client,
14 queries_v16::{
15 self, Addr, Addrs, ChainBlockNumber, GenesisConfig, PostAlonsoTransactionOutput,
16 StakeSnapshot, Stakes, TransactionOutput, UTxOByAddress,
17 },
18 },
19 },
20};
21use pallas_primitives::ToCanonicalJson;
22use pallas_traverse::Era;
23
24use mithril_common::crypto_helper::{KesPeriod, encode_bech32};
25use mithril_common::entities::{BlockNumber, ChainPoint, Epoch, SlotNumber, StakeDistribution};
26use mithril_common::{CardanoNetwork, StdResult};
27
28use crate::entities::{ChainAddress, Datum, Datums, TxDatum, try_inspect};
29
30use super::{ChainObserver, ChainObserverError};
31
32const ERA_OFFSET: u16 = 1;
35
36pub struct PallasChainObserver {
38 socket: PathBuf,
39 network: CardanoNetwork,
40}
41
42impl From<anyhow::Error> for ChainObserverError {
43 fn from(err: anyhow::Error) -> Self {
44 ChainObserverError::General(err)
45 }
46}
47
48impl PallasChainObserver {
49 pub fn new(socket: &Path, network: CardanoNetwork) -> Self {
51 Self {
52 socket: socket.to_owned(),
53 network,
54 }
55 }
56
57 async fn new_client(&self) -> StdResult<NodeClient> {
59 let magic = self.network.magic_id();
60 let client = NodeClient::connect(&self.socket, magic).await?;
61
62 Ok(client)
63 }
64
65 async fn get_client(&self) -> StdResult<NodeClient> {
67 self.new_client()
68 .await
69 .map_err(|err| anyhow!(err))
70 .with_context(|| "PallasChainObserver failed to create new client")
71 }
72
73 async fn get_era(&self, statequery: &mut Client) -> StdResult<u16> {
75 statequery
76 .acquire(None)
77 .await
78 .map_err(|err| anyhow!(err))
79 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
80
81 let era = queries_v16::get_current_era(statequery)
82 .await
83 .map_err(|err| anyhow!(err))
84 .with_context(|| "PallasChainObserver failed to get current era")?;
85
86 Ok(era)
87 }
88
89 async fn get_epoch(&self, statequery: &mut Client) -> StdResult<u32> {
91 statequery
92 .acquire(None)
93 .await
94 .map_err(|err| anyhow!(err))
95 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
96
97 let era = queries_v16::get_current_era(statequery)
98 .await
99 .map_err(|err| anyhow!(err))
100 .with_context(|| "PallasChainObserver failed to get current era")?;
101
102 let epoch = queries_v16::get_block_epoch_number(statequery, era)
103 .await
104 .map_err(|err| anyhow!(err))
105 .with_context(|| "PallasChainObserver failed to get block epoch number")?;
106
107 Ok(epoch)
108 }
109
110 fn get_datum_tag(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TagWrap<Bytes, 24>> {
112 Ok(utxo
113 .inline_datum
114 .as_ref()
115 .with_context(|| "PallasChainObserver failed to get inline datum")?
116 .1
117 .clone())
118 }
119
120 fn inspect_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<Datum> {
122 let datum = self.get_datum_tag(utxo)?;
123 let datum = CborWrap(datum).to_vec();
124
125 try_inspect::<Datum>(datum)
126 }
127
128 fn serialize_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TxDatum> {
130 let datum = self.inspect_datum(utxo)?;
131 let serialized = serde_json::to_string(&datum.to_json())
132 .map_err(|err| anyhow!(err))
133 .with_context(|| "PallasChainObserver failed to serialize datum")?;
134
135 Ok(TxDatum(serialized))
136 }
137
138 fn map_datums(&self, transaction: UTxOByAddress) -> StdResult<Datums> {
140 transaction
141 .utxo
142 .iter()
143 .filter_map(|(_, utxo)| match utxo {
144 TransactionOutput::Current(output) => {
145 output.inline_datum.as_ref().map(|_| self.serialize_datum(output))
146 }
147 _ => None,
148 })
149 .collect::<StdResult<Datums>>()
150 }
151
152 async fn get_utxo_datums(
154 &self,
155 client: &mut NodeClient,
156 address: &ChainAddress,
157 ) -> Result<Datums, ChainObserverError> {
158 let statequery = client.statequery();
159 let utxo = self.get_utxo_by_address(statequery, address).await?;
160
161 Ok(self.map_datums(utxo)?)
162 }
163
164 async fn get_utxo_by_address(
166 &self,
167 statequery: &mut Client,
168 address: &ChainAddress,
169 ) -> StdResult<UTxOByAddress> {
170 statequery
171 .acquire(None)
172 .await
173 .map_err(|err| anyhow!(err))
174 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
175
176 let era = queries_v16::get_current_era(statequery)
177 .await
178 .map_err(|err| anyhow!(err))
179 .with_context(|| "PallasChainObserver failed to get current era")?;
180
181 let addr: Address = Address::from_bech32(address)
182 .map_err(|err| anyhow!(err))
183 .with_context(|| "PallasChainObserver failed to parse address")?;
184
185 let addr: Addr = addr.to_vec().into();
186 let addrs: Addrs = vec![addr];
187 let utxo = queries_v16::get_utxo_by_address(statequery, era, addrs)
188 .await
189 .map_err(|err| anyhow!(err))
190 .with_context(|| "PallasChainObserver failed to get utxo")?;
191
192 Ok(utxo)
193 }
194
195 async fn do_stake_snapshots_state_query(
197 &self,
198 statequery: &mut Client,
199 ) -> StdResult<StakeSnapshot> {
200 statequery
201 .acquire(None)
202 .await
203 .map_err(|err| anyhow!(err))
204 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
205
206 let era = queries_v16::get_current_era(statequery)
207 .await
208 .map_err(|err| anyhow!(err))
209 .with_context(|| "PallasChainObserver failed to get current era")?;
210
211 let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
212 .await
213 .map_err(|err| anyhow!(err))
214 .with_context(|| "PallasChainObserver failed to get stake snapshot")?;
215
216 Ok(state_snapshot)
217 }
218
219 fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
221 let pool_id_bech32 = encode_bech32("pool", key)
222 .map_err(|err| anyhow!(err))
223 .with_context(|| "PallasChainObserver failed to encode stake pool hash")?;
224 Ok(pool_id_bech32)
225 }
226
227 async fn get_stake_distribution(
229 &self,
230 client: &mut NodeClient,
231 ) -> Result<Option<StakeDistribution>, ChainObserverError> {
232 let statequery = client.statequery();
233
234 let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;
235
236 let mut stake_distribution = StakeDistribution::new();
237
238 let have_stakes_in_two_epochs = |stakes: &Stakes| stakes.snapshot_mark_pool > 0;
239 for (key, stakes) in stake_snapshot
240 .snapshots
241 .stake_snapshots
242 .iter()
243 .filter(|(_, stakes)| have_stakes_in_two_epochs(stakes))
244 {
245 let pool_hash = self.get_stake_pool_hash(key)?;
246 stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
247 }
248
249 Ok(Some(stake_distribution))
250 }
251
252 async fn calculate_kes_period(
274 &self,
275 chain_point: Point,
276 slots_per_kes_period: u64,
277 ) -> Result<KesPeriod, ChainObserverError> {
278 if slots_per_kes_period == 0 {
279 return Err(anyhow!("slots_per_kes_period must be greater than 0"))
280 .with_context(|| "PallasChainObserver failed to calculate kes period")?;
281 }
282
283 let current_kes_period = chain_point.slot_or_default() / slots_per_kes_period;
284 Ok(u32::try_from(current_kes_period)
285 .map_err(|err| anyhow!(err))
286 .with_context(|| "PallasChainObserver failed to convert kes period")?)
287 }
288
289 async fn do_get_chain_point_state_query(&self, statequery: &mut Client) -> StdResult<Point> {
291 let chain_point = queries_v16::get_chain_point(statequery)
292 .await
293 .map_err(|err| anyhow!(err))
294 .with_context(|| "PallasChainObserver failed to get chain point")?;
295
296 Ok(chain_point)
297 }
298
299 async fn do_get_chain_block_no(&self, statequery: &mut Client) -> StdResult<ChainBlockNumber> {
301 let chain_block_number = queries_v16::get_chain_block_no(statequery)
302 .await
303 .map_err(|err| anyhow!(err))
304 .with_context(|| "PallasChainObserver failed to get chain block number")?;
305
306 Ok(chain_block_number)
307 }
308
309 async fn do_get_current_era_state_query(&self, statequery: &mut Client) -> StdResult<u16> {
311 let era = queries_v16::get_current_era(statequery)
312 .await
313 .map_err(|err| anyhow!(err))
314 .with_context(|| "PallasChainObserver failed to get current era")?;
315
316 Ok(era)
317 }
318
319 async fn do_get_genesis_config_state_query(
321 &self,
322 statequery: &mut Client,
323 ) -> StdResult<Vec<GenesisConfig>> {
324 let era = self.do_get_current_era_state_query(statequery).await?;
325 let genesis_config = queries_v16::get_genesis_config(statequery, era)
326 .await
327 .map_err(|err| anyhow!(err))
328 .with_context(|| "PallasChainObserver failed to get genesis config")?;
329
330 Ok(genesis_config)
331 }
332
333 async fn get_chain_point(&self, statequery: &mut Client) -> StdResult<ChainPoint> {
335 statequery
336 .acquire(None)
337 .await
338 .map_err(|err| anyhow!(err))
339 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
340
341 let chain_point = self.do_get_chain_point_state_query(statequery).await?;
342
343 let header_hash = match chain_point {
344 Point::Origin => None,
345 Point::Specific(_at_slot, ref hash) => Some(hex::encode(hash)),
346 };
347
348 let chain_block_number = self.do_get_chain_block_no(statequery).await?;
349
350 Ok(ChainPoint {
351 slot_number: SlotNumber(chain_point.slot_or_default()),
352 block_hash: header_hash.unwrap_or_default(),
353 block_number: BlockNumber(chain_block_number.block_number as u64),
354 })
355 }
356
357 async fn get_kes_period(
360 &self,
361 client: &mut NodeClient,
362 ) -> Result<Option<KesPeriod>, ChainObserverError> {
363 let statequery = client.statequery();
364
365 statequery
366 .acquire(None)
367 .await
368 .map_err(|err| anyhow!(err))
369 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
370
371 let chain_point = self.do_get_chain_point_state_query(statequery).await?;
372
373 let genesis_config = self.do_get_genesis_config_state_query(statequery).await?;
374
375 let config = genesis_config
376 .first()
377 .with_context(|| "PallasChainObserver failed to extract the config")?;
378
379 let current_kes_period = self
380 .calculate_kes_period(chain_point, config.slots_per_kes_period as u64)
381 .await?;
382
383 Ok(Some(current_kes_period))
384 }
385
386 async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
388 let statequery = client.statequery();
389 statequery
390 .send_release()
391 .await
392 .map_err(|err| anyhow!(err))
393 .with_context(|| "PallasChainObserver send release failed")?;
394
395 statequery
396 .send_done()
397 .await
398 .map_err(|err| anyhow!(err))
399 .with_context(|| "PallasChainObserver send done failed")?;
400
401 Ok(())
402 }
403
404 async fn sync(&self, client: &mut NodeClient) -> StdResult<()> {
406 client
407 .chainsync()
408 .send_done()
409 .await
410 .map_err(|err| anyhow!(err))
411 .with_context(|| "PallasChainObserver chainsync send done failed")?;
412 Ok(())
413 }
414
415 async fn post_process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
417 self.process_statequery(client).await?;
418 self.sync(client).await?;
419
420 Ok(())
421 }
422}
423
424#[async_trait]
425impl ChainObserver for PallasChainObserver {
426 async fn get_current_era(&self) -> Result<Option<String>, ChainObserverError> {
427 let mut client = self.get_client().await?;
428
429 let era = self.get_era(client.statequery()).await?;
430
431 let era = Era::try_from(era + ERA_OFFSET)
432 .with_context(|| "PallasChainObserver failed to convert: '{era}' to Era")?;
433
434 self.post_process_statequery(&mut client).await?;
435
436 client.abort().await;
437
438 Ok(Some(era.to_string()))
439 }
440
441 async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
442 let mut client = self.get_client().await?;
443
444 let epoch = self.get_epoch(client.statequery()).await?;
445
446 self.post_process_statequery(&mut client).await?;
447
448 client.abort().await;
449
450 Ok(Some(Epoch(epoch as u64)))
451 }
452
453 async fn get_current_chain_point(&self) -> Result<Option<ChainPoint>, ChainObserverError> {
454 let mut client = self.get_client().await?;
455
456 let chain_point = self.get_chain_point(client.statequery()).await?;
457
458 self.post_process_statequery(&mut client).await?;
459
460 client.abort().await;
461
462 Ok(Some(chain_point))
463 }
464
465 async fn get_current_datums(
466 &self,
467 address: &ChainAddress,
468 ) -> Result<Datums, ChainObserverError> {
469 let mut client = self.get_client().await?;
470
471 let datums = self.get_utxo_datums(&mut client, address).await?;
472
473 self.post_process_statequery(&mut client).await?;
474
475 client.abort().await;
476
477 Ok(datums)
478 }
479
480 async fn get_current_stake_distribution(
481 &self,
482 ) -> Result<Option<StakeDistribution>, ChainObserverError> {
483 let mut client = self.get_client().await?;
484
485 let stake_distribution = self.get_stake_distribution(&mut client).await?;
486
487 self.post_process_statequery(&mut client).await?;
488
489 client.abort().await;
490
491 Ok(stake_distribution)
492 }
493
494 async fn get_current_kes_period(&self) -> Result<Option<KesPeriod>, ChainObserverError> {
495 let mut client = self.get_client().await?;
496
497 let current_kes_period = self.get_kes_period(&mut client).await?;
498
499 self.post_process_statequery(&mut client).await?;
500
501 client.abort().await;
502
503 Ok(current_kes_period)
504 }
505}
506
507#[cfg(all(test, unix))]
509mod tests {
510 use std::fs;
511
512 use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
513 use pallas_crypto::hash::Hash;
514 use pallas_network::facades::NodeServer;
515 use pallas_network::miniprotocols::{
516 Point,
517 localstate::{
518 ClientQueryRequest,
519 queries_v16::{
520 BlockQuery, ChainBlockNumber, Fraction, GenesisConfig, HardForkQuery, LedgerQuery,
521 Request, Snapshots, StakeSnapshot, SystemStart, Value,
522 },
523 },
524 };
525 use tokio::net::UnixListener;
526
527 use mithril_common::test::TempDir;
528
529 use super::*;
530
531 fn get_fake_utxo_by_address() -> UTxOByAddress {
532 let tx_hex = "1e4e5cf2889d52f1745b941090f04a65dea6ce56c5e5e66e69f65c8e36347c17";
533 let tx_bytes: [u8; 32] = hex::decode(tx_hex).unwrap().try_into().unwrap();
534 let transaction_id = Hash::from(tx_bytes);
535 let index = AnyUInt::MajorByte(2);
536 let lovelace = AnyUInt::MajorByte(2);
537 let hex_datum = "D8799F58407B226D61726B657273223A5B7B226E616D65223A227468616C6573222C2265706F6368223A307D5D2C227369676E6174757265223A22383566323265626261645840333335376338656132646630363230393766396131383064643335643966336261316432363832633732633864313232383866616438636238643063656565625838366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227DFF";
538 let datum = hex::decode(hex_datum).unwrap().into();
539 let tag = TagWrap::<_, 24>::new(datum);
540 let inline_datum = Some((1_u16, tag));
541
542 let address: Address =
543 Address::from_bech32("addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0")
544 .unwrap();
545 let address: Addr = address.to_vec().into();
546 let values = TransactionOutput::Current(PostAlonsoTransactionOutput {
547 address,
548 amount: Value::Coin(lovelace),
549 inline_datum,
550 script_ref: None,
551 });
552 let utxo = KeyValuePairs::from(vec![(
553 queries_v16::UTxO {
554 transaction_id,
555 index,
556 },
557 values,
558 )]);
559
560 UTxOByAddress { utxo }
561 }
562
563 fn get_fake_stake_snapshot() -> StakeSnapshot {
564 let stake_snapshots = KeyValuePairs::from(vec![
565 (
566 Bytes::from(
567 hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
568 .unwrap(),
569 ),
570 Stakes {
571 snapshot_mark_pool: 300000000001,
572 snapshot_set_pool: 300000000002,
573 snapshot_go_pool: 300000000000,
574 },
575 ),
576 (
577 Bytes::from(
578 hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
579 .unwrap(),
580 ),
581 Stakes {
582 snapshot_mark_pool: 600000000001,
583 snapshot_set_pool: 600000000002,
584 snapshot_go_pool: 600000000000,
585 },
586 ),
587 (
588 Bytes::from(
589 hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
590 .unwrap(),
591 ),
592 Stakes {
593 snapshot_mark_pool: 1200000000001,
594 snapshot_set_pool: 1200000000002,
595 snapshot_go_pool: 1200000000000,
596 },
597 ),
598 (
599 Bytes::from(
600 hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
601 .unwrap(),
602 ),
603 Stakes {
604 snapshot_mark_pool: 0,
605 snapshot_set_pool: 1300000000002,
606 snapshot_go_pool: 0,
607 },
608 ),
609 ]);
610
611 StakeSnapshot {
612 snapshots: Snapshots {
613 stake_snapshots,
614 snapshot_stake_mark_total: 2100000000003,
615 snapshot_stake_set_total: 2100000000006,
616 snapshot_stake_go_total: 2100000000000,
617 },
618 }
619 }
620
621 fn get_fake_genesis_config() -> Vec<GenesisConfig> {
622 let genesis = GenesisConfig {
623 system_start: SystemStart {
624 year: 2021,
625 day_of_year: 150,
626 picoseconds_of_day: 0,
627 },
628 network_magic: 42,
629 network_id: 42,
630 active_slots_coefficient: Fraction { num: 6, den: 10 },
631 security_param: 2160,
632 epoch_length: 432000,
633 slots_per_kes_period: 129600,
634 max_kes_evolutions: 62,
635 slot_length: 1,
636 update_quorum: 5,
637 max_lovelace_supply: AnyUInt::MajorByte(2),
638 };
639
640 vec![genesis]
641 }
642
643 async fn mock_server(server: &mut NodeServer) -> AnyCbor {
645 let query: queries_v16::Request =
646 match server.statequery().recv_while_acquired().await.unwrap() {
647 ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
648 x => panic!("unexpected message from client: {x:?}"),
649 };
650
651 match query {
652 Request::GetChainPoint => {
653 AnyCbor::from_encode(Point::Specific(52851885, vec![1, 2, 3]))
654 }
655 Request::GetChainBlockNo => AnyCbor::from_encode(ChainBlockNumber {
656 slot_timeline: 1,
657 block_number: 52851885,
658 }),
659 Request::LedgerQuery(LedgerQuery::HardForkQuery(HardForkQuery::GetCurrentEra)) => {
660 AnyCbor::from_encode(4)
661 }
662 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetEpochNo)) => {
663 AnyCbor::from_encode([8])
664 }
665 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetGenesisConfig)) => {
666 AnyCbor::from_encode(get_fake_genesis_config())
667 }
668 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetUTxOByAddress(_))) => {
669 AnyCbor::from_encode(get_fake_utxo_by_address())
670 }
671 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetStakeSnapshots(_))) => {
672 AnyCbor::from_encode(get_fake_stake_snapshot())
673 }
674 _ => panic!("unexpected query from client: {query:?}"),
675 }
676 }
677
678 fn create_temp_dir(folder_name: &str) -> PathBuf {
680 TempDir::create_with_short_path("pallas_chain_observer_test", folder_name)
681 }
682
683 async fn setup_server(socket_path: PathBuf, intersections: u32) -> tokio::task::JoinHandle<()> {
689 tokio::spawn({
690 async move {
691 if socket_path.exists() {
692 fs::remove_file(&socket_path).expect("Previous socket removal failed");
693 }
694
695 let unix_listener = UnixListener::bind(socket_path.as_path()).unwrap();
696 let mut server = NodeServer::accept(&unix_listener, 10).await.unwrap();
697
698 server.statequery().recv_while_idle().await.unwrap();
699 server.statequery().send_acquired().await.unwrap();
700
701 for _ in 0..intersections {
702 let result = mock_server(&mut server).await;
703 server.statequery().send_result(result).await.unwrap();
704 }
705 }
706 })
707 }
708
709 #[tokio::test]
710 async fn get_current_epoch() {
711 let socket_path = create_temp_dir("get_current_epoch").join("node.socket");
712 let server = setup_server(socket_path.clone(), 2).await;
713 let client = tokio::spawn(async move {
714 let observer =
715 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
716 observer.get_current_epoch().await.unwrap().unwrap()
717 });
718
719 let (_, client_res) = tokio::join!(server, client);
720 let epoch = client_res.expect("Client failed");
721 assert_eq!(epoch, 8);
722 }
723
724 #[tokio::test]
725 async fn get_current_datums() {
726 let socket_path = create_temp_dir("get_current_datums").join("node.socket");
727 let server = setup_server(socket_path.clone(), 2).await;
728 let client = tokio::spawn(async move {
729 let observer =
730 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
731 let address =
732 "addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0".to_string();
733 observer.get_current_datums(&address).await.unwrap()
734 });
735
736 let (_, client_res) = tokio::join!(server, client);
737 let datums = client_res.expect("Client failed");
738 assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
739 }
740
741 #[tokio::test]
742 async fn get_current_stake_distribution() {
743 let socket_path = create_temp_dir("get_current_stake_distribution").join("node.socket");
744 let server = setup_server(socket_path.clone(), 2).await;
745 let client = tokio::spawn(async move {
746 let observer =
747 super::PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
748 observer.get_current_stake_distribution().await.unwrap()
749 });
750
751 let (_, client_res) = tokio::join!(server, client);
752 let computed_stake_distribution = client_res.unwrap().unwrap();
753
754 let mut expected_stake_distribution = StakeDistribution::new();
755 expected_stake_distribution.insert(
756 "pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
757 300000000001,
758 );
759 expected_stake_distribution.insert(
760 "pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
761 600000000001,
762 );
763 expected_stake_distribution.insert(
764 "pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
765 1200000000001,
766 );
767
768 assert_eq!(expected_stake_distribution, computed_stake_distribution);
769 }
770
771 #[tokio::test]
772 async fn get_current_kes_period() {
773 let socket_path = create_temp_dir("get_current_kes_period").join("node.socket");
774 let server = setup_server(socket_path.clone(), 3).await;
775 let client = tokio::spawn(async move {
776 let observer =
777 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
778
779 observer.get_current_kes_period().await.unwrap()
780 });
781
782 let (_, client_res) = tokio::join!(server, client);
783 let kes_period = client_res.unwrap().unwrap();
784 assert_eq!(407, kes_period);
785 }
786
787 #[tokio::test]
788 async fn calculate_kes_period() {
789 let socket_path = create_temp_dir("calculate_kes_period").join("node.socket");
790 let observer = PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
791 let current_kes_period = observer
792 .calculate_kes_period(Point::Specific(53536042, vec![1, 2, 3]), 129600)
793 .await
794 .unwrap();
795
796 assert_eq!(413, current_kes_period);
797
798 let current_kes_period = observer
799 .calculate_kes_period(Point::Specific(53524800, vec![1, 2, 3]), 129600)
800 .await
801 .unwrap();
802
803 assert_eq!(413, current_kes_period);
804
805 let current_kes_period = observer
806 .calculate_kes_period(Point::Specific(53649999, vec![1, 2, 3]), 129600)
807 .await
808 .unwrap();
809
810 assert_eq!(413, current_kes_period);
811 }
812
813 #[tokio::test]
814 async fn get_chain_point() {
815 let socket_path = create_temp_dir("get_chain_point").join("node.socket");
816 let server = setup_server(socket_path.clone(), 1).await;
817 let client = tokio::spawn(async move {
818 let observer =
819 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
820 let mut client = observer.get_client().await.unwrap();
821 let statequery = client.statequery();
822 statequery.acquire(None).await.unwrap();
823 let chain_point = observer.do_get_chain_point_state_query(statequery).await.unwrap();
824 observer.post_process_statequery(&mut client).await.unwrap();
825 client.abort().await;
826 chain_point
827 });
828
829 let (_, client_res) = tokio::join!(server, client);
830 let chain_point = client_res.expect("Client failed");
831 assert_eq!(chain_point, Point::Specific(52851885, vec![1, 2, 3]));
832 }
833
834 #[tokio::test]
835 async fn get_genesis_config() {
836 let socket_path = create_temp_dir("get_genesis_config").join("node.socket");
837 let server = setup_server(socket_path.clone(), 2).await;
838 let client = tokio::spawn(async move {
839 let observer =
840 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
841 let mut client = observer.get_client().await.unwrap();
842 let statequery = client.statequery();
843 statequery.acquire(None).await.unwrap();
844 let genesis_config =
845 observer.do_get_genesis_config_state_query(statequery).await.unwrap();
846 observer.post_process_statequery(&mut client).await.unwrap();
847 client.abort().await;
848 genesis_config
849 });
850
851 let (_, client_res) = tokio::join!(server, client);
852 let genesis_config = client_res.expect("Client failed");
853 assert_eq!(genesis_config, get_fake_genesis_config());
854 }
855
856 #[tokio::test]
857 async fn fetch_current_era_from_state_query() {
858 let socket_path = create_temp_dir("get_current_era").join("node.socket");
859 let server = setup_server(socket_path.clone(), 1).await;
860 let client = tokio::spawn(async move {
861 let observer =
862 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
863 let mut client = observer.get_client().await.unwrap();
864 let statequery = client.statequery();
865 statequery.acquire(None).await.unwrap();
866 let era = observer.do_get_current_era_state_query(statequery).await.unwrap();
867 observer.post_process_statequery(&mut client).await.unwrap();
868 client.abort().await;
869 era
870 });
871
872 let (_, client_res) = tokio::join!(server, client);
873 let era = client_res.expect("Client failed");
874 assert_eq!(era, 4);
875 }
876
877 #[tokio::test]
878 async fn get_current_era() {
879 let socket_path = create_temp_dir("get_current_era_as_string").join("node.socket");
880 let server = setup_server(socket_path.clone(), 1).await;
881 let client = tokio::spawn(async move {
882 let observer =
883 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
884 observer.get_current_era().await.unwrap().unwrap()
885 });
886
887 let (_, client_res) = tokio::join!(server, client);
888 let era = client_res.expect("Client failed");
889
890 let expected_era = Era::try_from(4 + ERA_OFFSET).unwrap().to_string();
891 assert_eq!(era, expected_era);
892 }
893
894 #[tokio::test]
895 async fn get_current_chain_point() {
896 let socket_path = create_temp_dir("get_current_chain_point").join("node.socket");
897 let server = setup_server(socket_path.clone(), 2).await;
898 let client = tokio::spawn(async move {
899 let observer =
900 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
901 observer.get_current_chain_point().await.unwrap()
902 });
903
904 let (_, client_res) = tokio::join!(server, client);
905 let chain_point = client_res.expect("Client failed");
906 assert_eq!(
907 chain_point,
908 Some(ChainPoint {
909 slot_number: SlotNumber(52851885),
910 block_hash: "010203".to_string(),
911 block_number: BlockNumber(52851885)
912 })
913 );
914 }
915}