1use std::collections::BTreeSet;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, anyhow};
5use async_trait::async_trait;
6use pallas_addresses::Address;
7use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
8use pallas_network::{
9 facades::NodeClient,
10 miniprotocols::{
11 Point,
12 localstate::{
13 Client,
14 queries_v16::{
15 self, Addr, Addrs, ChainBlockNumber, GenesisConfig, PostAlonsoTransactionOutput,
16 StakeSnapshot, Stakes, TransactionOutput, UTxOByAddress,
17 },
18 },
19 },
20};
21use pallas_primitives::ToCanonicalJson;
22use pallas_traverse::Era;
23
24use mithril_common::crypto_helper::{KesPeriod, encode_bech32};
25use mithril_common::entities::{BlockNumber, ChainPoint, Epoch, SlotNumber, StakeDistribution};
26use mithril_common::{CardanoNetwork, StdResult};
27
28use crate::entities::{ChainAddress, Datum, Datums, TxDatum, try_inspect};
29
30use super::{ChainObserver, ChainObserverError};
31
32const ERA_OFFSET: u16 = 1;
35
36pub struct PallasChainObserver {
38 socket: PathBuf,
39 network: CardanoNetwork,
40}
41
42impl From<anyhow::Error> for ChainObserverError {
43 fn from(err: anyhow::Error) -> Self {
44 ChainObserverError::General(err)
45 }
46}
47
48impl PallasChainObserver {
49 pub fn new(socket: &Path, network: CardanoNetwork) -> Self {
51 Self {
52 socket: socket.to_owned(),
53 network,
54 }
55 }
56
57 async fn new_client(&self) -> StdResult<NodeClient> {
59 let magic = self.network.magic_id();
60 let client = NodeClient::connect(&self.socket, magic).await?;
61
62 Ok(client)
63 }
64
65 async fn get_client(&self) -> StdResult<NodeClient> {
67 self.new_client()
68 .await
69 .with_context(|| "PallasChainObserver failed to create new client")
70 }
71
72 async fn get_era(&self, statequery: &mut Client) -> StdResult<u16> {
74 statequery
75 .acquire(None)
76 .await
77 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
78
79 let era = queries_v16::get_current_era(statequery)
80 .await
81 .with_context(|| "PallasChainObserver failed to get current era")?;
82
83 Ok(era)
84 }
85
86 async fn get_epoch(&self, statequery: &mut Client) -> StdResult<u32> {
88 statequery
89 .acquire(None)
90 .await
91 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
92
93 let era = queries_v16::get_current_era(statequery)
94 .await
95 .with_context(|| "PallasChainObserver failed to get current era")?;
96
97 let epoch = queries_v16::get_block_epoch_number(statequery, era)
98 .await
99 .with_context(|| "PallasChainObserver failed to get block epoch number")?;
100
101 Ok(epoch)
102 }
103
104 fn get_datum_tag(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TagWrap<Bytes, 24>> {
106 Ok(utxo
107 .inline_datum
108 .as_ref()
109 .with_context(|| "PallasChainObserver failed to get inline datum")?
110 .1
111 .clone())
112 }
113
114 fn inspect_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<Datum> {
116 let datum = self.get_datum_tag(utxo)?;
117 let datum = CborWrap(datum).to_vec();
118
119 try_inspect::<Datum>(datum)
120 }
121
122 fn serialize_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TxDatum> {
124 let datum = self.inspect_datum(utxo)?;
125 let serialized = serde_json::to_string(&datum.to_json())
126 .with_context(|| "PallasChainObserver failed to serialize datum")?;
127
128 Ok(TxDatum(serialized))
129 }
130
131 fn map_datums(&self, transaction: UTxOByAddress) -> StdResult<Datums> {
133 transaction
134 .utxo
135 .iter()
136 .filter_map(|(_, utxo)| match utxo {
137 TransactionOutput::Current(output) => {
138 output.inline_datum.as_ref().map(|_| self.serialize_datum(output))
139 }
140 _ => None,
141 })
142 .collect::<StdResult<Datums>>()
143 }
144
145 async fn get_utxo_datums(
147 &self,
148 client: &mut NodeClient,
149 address: &ChainAddress,
150 ) -> Result<Datums, ChainObserverError> {
151 let statequery = client.statequery();
152 let utxo = self.get_utxo_by_address(statequery, address).await?;
153
154 Ok(self.map_datums(utxo)?)
155 }
156
157 async fn get_utxo_by_address(
159 &self,
160 statequery: &mut Client,
161 address: &ChainAddress,
162 ) -> StdResult<UTxOByAddress> {
163 statequery
164 .acquire(None)
165 .await
166 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
167
168 let era = queries_v16::get_current_era(statequery)
169 .await
170 .with_context(|| "PallasChainObserver failed to get current era")?;
171
172 let addr: Address = Address::from_bech32(address)
173 .with_context(|| "PallasChainObserver failed to parse address")?;
174
175 let addr: Addr = addr.to_vec().into();
176 let addrs: Addrs = vec![addr];
177 let utxo = queries_v16::get_utxo_by_address(statequery, era, addrs)
178 .await
179 .with_context(|| "PallasChainObserver failed to get utxo")?;
180
181 Ok(utxo)
182 }
183
184 async fn do_stake_snapshots_state_query(
186 &self,
187 statequery: &mut Client,
188 ) -> StdResult<StakeSnapshot> {
189 statequery
190 .acquire(None)
191 .await
192 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
193
194 let era = queries_v16::get_current_era(statequery)
195 .await
196 .with_context(|| "PallasChainObserver failed to get current era")?;
197
198 let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
199 .await
200 .with_context(|| "PallasChainObserver failed to get stake snapshot")?;
201
202 Ok(state_snapshot)
203 }
204
205 fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
207 let pool_id_bech32 = encode_bech32("pool", key)
208 .with_context(|| "PallasChainObserver failed to encode stake pool hash")?;
209 Ok(pool_id_bech32)
210 }
211
212 async fn get_stake_distribution(
214 &self,
215 client: &mut NodeClient,
216 ) -> Result<Option<StakeDistribution>, ChainObserverError> {
217 let statequery = client.statequery();
218
219 let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;
220
221 let mut stake_distribution = StakeDistribution::new();
222
223 let have_stakes_in_two_epochs = |stakes: &Stakes| stakes.snapshot_mark_pool > 0;
224 for (key, stakes) in stake_snapshot
225 .snapshots
226 .stake_snapshots
227 .iter()
228 .filter(|(_, stakes)| have_stakes_in_two_epochs(stakes))
229 {
230 let pool_hash = self.get_stake_pool_hash(key)?;
231 stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
232 }
233
234 Ok(Some(stake_distribution))
235 }
236
237 async fn calculate_kes_period(
259 &self,
260 chain_point: Point,
261 slots_per_kes_period: u64,
262 ) -> Result<KesPeriod, ChainObserverError> {
263 if slots_per_kes_period == 0 {
264 return Err(anyhow!("slots_per_kes_period must be greater than 0"))
265 .with_context(|| "PallasChainObserver failed to calculate kes period")?;
266 }
267
268 let current_kes_period = chain_point.slot_or_default() / slots_per_kes_period;
269
270 Ok(KesPeriod(current_kes_period))
271 }
272
273 async fn do_get_chain_point_state_query(&self, statequery: &mut Client) -> StdResult<Point> {
275 let chain_point = queries_v16::get_chain_point(statequery)
276 .await
277 .with_context(|| "PallasChainObserver failed to get chain point")?;
278
279 Ok(chain_point)
280 }
281
282 async fn do_get_chain_block_no(&self, statequery: &mut Client) -> StdResult<ChainBlockNumber> {
284 let chain_block_number = queries_v16::get_chain_block_no(statequery)
285 .await
286 .with_context(|| "PallasChainObserver failed to get chain block number")?;
287
288 Ok(chain_block_number)
289 }
290
291 async fn do_get_current_era_state_query(&self, statequery: &mut Client) -> StdResult<u16> {
293 let era = queries_v16::get_current_era(statequery)
294 .await
295 .with_context(|| "PallasChainObserver failed to get current era")?;
296
297 Ok(era)
298 }
299
300 async fn do_get_genesis_config_state_query(
302 &self,
303 statequery: &mut Client,
304 ) -> StdResult<Vec<GenesisConfig>> {
305 let era = self.do_get_current_era_state_query(statequery).await?;
306 let genesis_config = queries_v16::get_genesis_config(statequery, era)
307 .await
308 .with_context(|| "PallasChainObserver failed to get genesis config")?;
309
310 Ok(genesis_config)
311 }
312
313 async fn get_chain_point(&self, statequery: &mut Client) -> StdResult<ChainPoint> {
315 statequery
316 .acquire(None)
317 .await
318 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
319
320 let chain_point = self.do_get_chain_point_state_query(statequery).await?;
321
322 let header_hash = match chain_point {
323 Point::Origin => None,
324 Point::Specific(_at_slot, ref hash) => Some(hex::encode(hash)),
325 };
326
327 let chain_block_number = self.do_get_chain_block_no(statequery).await?;
328
329 Ok(ChainPoint {
330 slot_number: SlotNumber(chain_point.slot_or_default()),
331 block_hash: header_hash.unwrap_or_default(),
332 block_number: BlockNumber(chain_block_number.block_number as u64),
333 })
334 }
335
336 async fn get_kes_period(
339 &self,
340 client: &mut NodeClient,
341 ) -> Result<Option<KesPeriod>, ChainObserverError> {
342 let statequery = client.statequery();
343
344 statequery
345 .acquire(None)
346 .await
347 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
348
349 let chain_point = self.do_get_chain_point_state_query(statequery).await?;
350
351 let genesis_config = self.do_get_genesis_config_state_query(statequery).await?;
352
353 let config = genesis_config
354 .first()
355 .with_context(|| "PallasChainObserver failed to extract the config")?;
356
357 let current_kes_period = self
358 .calculate_kes_period(chain_point, config.slots_per_kes_period as u64)
359 .await?;
360
361 Ok(Some(current_kes_period))
362 }
363
364 async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
366 let statequery = client.statequery();
367 statequery
368 .send_release()
369 .await
370 .with_context(|| "PallasChainObserver send release failed")?;
371
372 statequery
373 .send_done()
374 .await
375 .with_context(|| "PallasChainObserver send done failed")?;
376
377 Ok(())
378 }
379
380 async fn sync(&self, client: &mut NodeClient) -> StdResult<()> {
382 client
383 .chainsync()
384 .send_done()
385 .await
386 .with_context(|| "PallasChainObserver chainsync send done failed")?;
387 Ok(())
388 }
389
390 async fn post_process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
392 self.process_statequery(client).await?;
393 self.sync(client).await?;
394
395 Ok(())
396 }
397}
398
399#[async_trait]
400impl ChainObserver for PallasChainObserver {
401 async fn get_current_era(&self) -> Result<Option<String>, ChainObserverError> {
402 let mut client = self.get_client().await?;
403
404 let era = self.get_era(client.statequery()).await?;
405
406 let era = Era::try_from(era + ERA_OFFSET)
407 .with_context(|| "PallasChainObserver failed to convert: '{era}' to Era")?;
408
409 self.post_process_statequery(&mut client).await?;
410
411 client.abort().await;
412
413 Ok(Some(era.to_string()))
414 }
415
416 async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
417 let mut client = self.get_client().await?;
418
419 let epoch = self.get_epoch(client.statequery()).await?;
420
421 self.post_process_statequery(&mut client).await?;
422
423 client.abort().await;
424
425 Ok(Some(Epoch(epoch as u64)))
426 }
427
428 async fn get_current_chain_point(&self) -> Result<Option<ChainPoint>, ChainObserverError> {
429 let mut client = self.get_client().await?;
430
431 let chain_point = self.get_chain_point(client.statequery()).await?;
432
433 self.post_process_statequery(&mut client).await?;
434
435 client.abort().await;
436
437 Ok(Some(chain_point))
438 }
439
440 async fn get_current_datums(
441 &self,
442 address: &ChainAddress,
443 ) -> Result<Datums, ChainObserverError> {
444 let mut client = self.get_client().await?;
445
446 let datums = self.get_utxo_datums(&mut client, address).await?;
447
448 self.post_process_statequery(&mut client).await?;
449
450 client.abort().await;
451
452 Ok(datums)
453 }
454
455 async fn get_current_stake_distribution(
456 &self,
457 ) -> Result<Option<StakeDistribution>, ChainObserverError> {
458 let mut client = self.get_client().await?;
459
460 let stake_distribution = self.get_stake_distribution(&mut client).await?;
461
462 self.post_process_statequery(&mut client).await?;
463
464 client.abort().await;
465
466 Ok(stake_distribution)
467 }
468
469 async fn get_current_kes_period(&self) -> Result<Option<KesPeriod>, ChainObserverError> {
470 let mut client = self.get_client().await?;
471
472 let current_kes_period = self.get_kes_period(&mut client).await?;
473
474 self.post_process_statequery(&mut client).await?;
475
476 client.abort().await;
477
478 Ok(current_kes_period)
479 }
480}
481
482#[cfg(all(test, unix))]
484mod tests {
485 use std::fs;
486
487 use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
488 use pallas_crypto::hash::Hash;
489 use pallas_network::facades::NodeServer;
490 use pallas_network::miniprotocols::{
491 Point,
492 localstate::{
493 ClientQueryRequest,
494 queries_v16::{
495 BlockQuery, ChainBlockNumber, Fraction, GenesisConfig, HardForkQuery, LedgerQuery,
496 Request, Snapshots, StakeSnapshot, SystemStart, Value,
497 },
498 },
499 };
500 use tokio::net::UnixListener;
501
502 use mithril_common::test::TempDir;
503
504 use super::*;
505
506 fn get_fake_utxo_by_address() -> UTxOByAddress {
507 let tx_hex = "1e4e5cf2889d52f1745b941090f04a65dea6ce56c5e5e66e69f65c8e36347c17";
508 let tx_bytes: [u8; 32] = hex::decode(tx_hex).unwrap().try_into().unwrap();
509 let transaction_id = Hash::from(tx_bytes);
510 let index = AnyUInt::MajorByte(2);
511 let lovelace = AnyUInt::MajorByte(2);
512 let hex_datum = "D8799F58407B226D61726B657273223A5B7B226E616D65223A227468616C6573222C2265706F6368223A307D5D2C227369676E6174757265223A22383566323265626261645840333335376338656132646630363230393766396131383064643335643966336261316432363832633732633864313232383866616438636238643063656565625838366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227DFF";
513 let datum = hex::decode(hex_datum).unwrap().into();
514 let tag = TagWrap::<_, 24>::new(datum);
515 let inline_datum = Some((1_u16, tag));
516
517 let address: Address =
518 Address::from_bech32("addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0")
519 .unwrap();
520 let address: Addr = address.to_vec().into();
521 let values = TransactionOutput::Current(PostAlonsoTransactionOutput {
522 address,
523 amount: Value::Coin(lovelace),
524 inline_datum,
525 script_ref: None,
526 });
527 let utxo = KeyValuePairs::from(vec![(
528 queries_v16::UTxO {
529 transaction_id,
530 index,
531 },
532 values,
533 )]);
534
535 UTxOByAddress { utxo }
536 }
537
538 fn get_fake_stake_snapshot() -> StakeSnapshot {
539 let stake_snapshots = KeyValuePairs::from(vec![
540 (
541 Bytes::from(
542 hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
543 .unwrap(),
544 ),
545 Stakes {
546 snapshot_mark_pool: 300000000001,
547 snapshot_set_pool: 300000000002,
548 snapshot_go_pool: 300000000000,
549 },
550 ),
551 (
552 Bytes::from(
553 hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
554 .unwrap(),
555 ),
556 Stakes {
557 snapshot_mark_pool: 600000000001,
558 snapshot_set_pool: 600000000002,
559 snapshot_go_pool: 600000000000,
560 },
561 ),
562 (
563 Bytes::from(
564 hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
565 .unwrap(),
566 ),
567 Stakes {
568 snapshot_mark_pool: 1200000000001,
569 snapshot_set_pool: 1200000000002,
570 snapshot_go_pool: 1200000000000,
571 },
572 ),
573 (
574 Bytes::from(
575 hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
576 .unwrap(),
577 ),
578 Stakes {
579 snapshot_mark_pool: 0,
580 snapshot_set_pool: 1300000000002,
581 snapshot_go_pool: 0,
582 },
583 ),
584 ]);
585
586 StakeSnapshot {
587 snapshots: Snapshots {
588 stake_snapshots,
589 snapshot_stake_mark_total: 2100000000003,
590 snapshot_stake_set_total: 2100000000006,
591 snapshot_stake_go_total: 2100000000000,
592 },
593 }
594 }
595
596 fn get_fake_genesis_config() -> Vec<GenesisConfig> {
597 let genesis = GenesisConfig {
598 system_start: SystemStart {
599 year: 2021,
600 day_of_year: 150,
601 picoseconds_of_day: 0,
602 },
603 network_magic: 42,
604 network_id: 42,
605 active_slots_coefficient: Fraction { num: 6, den: 10 },
606 security_param: 2160,
607 epoch_length: 432000,
608 slots_per_kes_period: 129600,
609 max_kes_evolutions: 62,
610 slot_length: 1,
611 update_quorum: 5,
612 max_lovelace_supply: AnyUInt::MajorByte(2),
613 };
614
615 vec![genesis]
616 }
617
618 async fn mock_server(server: &mut NodeServer) -> AnyCbor {
620 let query: queries_v16::Request =
621 match server.statequery().recv_while_acquired().await.unwrap() {
622 ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
623 x => panic!("unexpected message from client: {x:?}"),
624 };
625
626 match query {
627 Request::GetChainPoint => {
628 AnyCbor::from_encode(Point::Specific(52851885, vec![1, 2, 3]))
629 }
630 Request::GetChainBlockNo => AnyCbor::from_encode(ChainBlockNumber {
631 slot_timeline: 1,
632 block_number: 52851885,
633 }),
634 Request::LedgerQuery(LedgerQuery::HardForkQuery(HardForkQuery::GetCurrentEra)) => {
635 AnyCbor::from_encode(4)
636 }
637 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetEpochNo)) => {
638 AnyCbor::from_encode([8])
639 }
640 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetGenesisConfig)) => {
641 AnyCbor::from_encode(get_fake_genesis_config())
642 }
643 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetUTxOByAddress(_))) => {
644 AnyCbor::from_encode(get_fake_utxo_by_address())
645 }
646 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetStakeSnapshots(_))) => {
647 AnyCbor::from_encode(get_fake_stake_snapshot())
648 }
649 _ => panic!("unexpected query from client: {query:?}"),
650 }
651 }
652
653 fn create_temp_dir(folder_name: &str) -> PathBuf {
655 TempDir::create_with_short_path("pallas_chain_observer_test", folder_name)
656 }
657
658 async fn setup_server(socket_path: PathBuf, intersections: u32) -> tokio::task::JoinHandle<()> {
664 tokio::spawn({
665 async move {
666 if socket_path.exists() {
667 fs::remove_file(&socket_path).expect("Previous socket removal failed");
668 }
669
670 let unix_listener = UnixListener::bind(socket_path.as_path()).unwrap();
671 let mut server = NodeServer::accept(&unix_listener, 10).await.unwrap();
672
673 server.statequery().recv_while_idle().await.unwrap();
674 server.statequery().send_acquired().await.unwrap();
675
676 for _ in 0..intersections {
677 let result = mock_server(&mut server).await;
678 server.statequery().send_result(result).await.unwrap();
679 }
680 }
681 })
682 }
683
684 #[tokio::test]
685 async fn get_current_epoch() {
686 let socket_path = create_temp_dir("get_current_epoch").join("node.socket");
687 let server = setup_server(socket_path.clone(), 2).await;
688 let client = tokio::spawn(async move {
689 let observer =
690 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
691 observer.get_current_epoch().await.unwrap().unwrap()
692 });
693
694 let (_, client_res) = tokio::join!(server, client);
695 let epoch = client_res.expect("Client failed");
696 assert_eq!(epoch, 8);
697 }
698
699 #[tokio::test]
700 async fn get_current_datums() {
701 let socket_path = create_temp_dir("get_current_datums").join("node.socket");
702 let server = setup_server(socket_path.clone(), 2).await;
703 let client = tokio::spawn(async move {
704 let observer =
705 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
706 let address =
707 "addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0".to_string();
708 observer.get_current_datums(&address).await.unwrap()
709 });
710
711 let (_, client_res) = tokio::join!(server, client);
712 let datums = client_res.expect("Client failed");
713 assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
714 }
715
716 #[tokio::test]
717 async fn get_current_stake_distribution() {
718 let socket_path = create_temp_dir("get_current_stake_distribution").join("node.socket");
719 let server = setup_server(socket_path.clone(), 2).await;
720 let client = tokio::spawn(async move {
721 let observer =
722 super::PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
723 observer.get_current_stake_distribution().await.unwrap()
724 });
725
726 let (_, client_res) = tokio::join!(server, client);
727 let computed_stake_distribution = client_res.unwrap().unwrap();
728
729 let mut expected_stake_distribution = StakeDistribution::new();
730 expected_stake_distribution.insert(
731 "pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
732 300000000001,
733 );
734 expected_stake_distribution.insert(
735 "pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
736 600000000001,
737 );
738 expected_stake_distribution.insert(
739 "pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
740 1200000000001,
741 );
742
743 assert_eq!(expected_stake_distribution, computed_stake_distribution);
744 }
745
746 #[tokio::test]
747 async fn get_current_kes_period() {
748 let socket_path = create_temp_dir("get_current_kes_period").join("node.socket");
749 let server = setup_server(socket_path.clone(), 3).await;
750 let client = tokio::spawn(async move {
751 let observer =
752 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
753
754 observer.get_current_kes_period().await.unwrap()
755 });
756
757 let (_, client_res) = tokio::join!(server, client);
758 let kes_period = client_res.unwrap().unwrap();
759 assert_eq!(407, kes_period);
760 }
761
762 #[tokio::test]
763 async fn calculate_kes_period() {
764 let socket_path = create_temp_dir("calculate_kes_period").join("node.socket");
765 let observer = PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
766 let current_kes_period = observer
767 .calculate_kes_period(Point::Specific(53536042, vec![1, 2, 3]), 129600)
768 .await
769 .unwrap();
770
771 assert_eq!(413, current_kes_period);
772
773 let current_kes_period = observer
774 .calculate_kes_period(Point::Specific(53524800, vec![1, 2, 3]), 129600)
775 .await
776 .unwrap();
777
778 assert_eq!(413, current_kes_period);
779
780 let current_kes_period = observer
781 .calculate_kes_period(Point::Specific(53649999, vec![1, 2, 3]), 129600)
782 .await
783 .unwrap();
784
785 assert_eq!(413, current_kes_period);
786 }
787
788 #[tokio::test]
789 async fn get_chain_point() {
790 let socket_path = create_temp_dir("get_chain_point").join("node.socket");
791 let server = setup_server(socket_path.clone(), 1).await;
792 let client = tokio::spawn(async move {
793 let observer =
794 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
795 let mut client = observer.get_client().await.unwrap();
796 let statequery = client.statequery();
797 statequery.acquire(None).await.unwrap();
798 let chain_point = observer.do_get_chain_point_state_query(statequery).await.unwrap();
799 observer.post_process_statequery(&mut client).await.unwrap();
800 client.abort().await;
801 chain_point
802 });
803
804 let (_, client_res) = tokio::join!(server, client);
805 let chain_point = client_res.expect("Client failed");
806 assert_eq!(chain_point, Point::Specific(52851885, vec![1, 2, 3]));
807 }
808
809 #[tokio::test]
810 async fn get_genesis_config() {
811 let socket_path = create_temp_dir("get_genesis_config").join("node.socket");
812 let server = setup_server(socket_path.clone(), 2).await;
813 let client = tokio::spawn(async move {
814 let observer =
815 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
816 let mut client = observer.get_client().await.unwrap();
817 let statequery = client.statequery();
818 statequery.acquire(None).await.unwrap();
819 let genesis_config =
820 observer.do_get_genesis_config_state_query(statequery).await.unwrap();
821 observer.post_process_statequery(&mut client).await.unwrap();
822 client.abort().await;
823 genesis_config
824 });
825
826 let (_, client_res) = tokio::join!(server, client);
827 let genesis_config = client_res.expect("Client failed");
828 assert_eq!(genesis_config, get_fake_genesis_config());
829 }
830
831 #[tokio::test]
832 async fn fetch_current_era_from_state_query() {
833 let socket_path = create_temp_dir("get_current_era").join("node.socket");
834 let server = setup_server(socket_path.clone(), 1).await;
835 let client = tokio::spawn(async move {
836 let observer =
837 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
838 let mut client = observer.get_client().await.unwrap();
839 let statequery = client.statequery();
840 statequery.acquire(None).await.unwrap();
841 let era = observer.do_get_current_era_state_query(statequery).await.unwrap();
842 observer.post_process_statequery(&mut client).await.unwrap();
843 client.abort().await;
844 era
845 });
846
847 let (_, client_res) = tokio::join!(server, client);
848 let era = client_res.expect("Client failed");
849 assert_eq!(era, 4);
850 }
851
852 #[tokio::test]
853 async fn get_current_era() {
854 let socket_path = create_temp_dir("get_current_era_as_string").join("node.socket");
855 let server = setup_server(socket_path.clone(), 1).await;
856 let client = tokio::spawn(async move {
857 let observer =
858 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
859 observer.get_current_era().await.unwrap().unwrap()
860 });
861
862 let (_, client_res) = tokio::join!(server, client);
863 let era = client_res.expect("Client failed");
864
865 let expected_era = Era::try_from(4 + ERA_OFFSET).unwrap().to_string();
866 assert_eq!(era, expected_era);
867 }
868
869 #[tokio::test]
870 async fn get_current_chain_point() {
871 let socket_path = create_temp_dir("get_current_chain_point").join("node.socket");
872 let server = setup_server(socket_path.clone(), 2).await;
873 let client = tokio::spawn(async move {
874 let observer =
875 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
876 observer.get_current_chain_point().await.unwrap()
877 });
878
879 let (_, client_res) = tokio::join!(server, client);
880 let chain_point = client_res.expect("Client failed");
881 assert_eq!(
882 chain_point,
883 Some(ChainPoint {
884 slot_number: SlotNumber(52851885),
885 block_hash: "010203".to_string(),
886 block_number: BlockNumber(52851885)
887 })
888 );
889 }
890}