1use std::collections::BTreeSet;
2use std::path::{Path, PathBuf};
3
4use anyhow::{anyhow, Context};
5use async_trait::async_trait;
6use pallas_addresses::Address;
7use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
8use pallas_network::{
9 facades::NodeClient,
10 miniprotocols::{
11 localstate::{
12 queries_v16::{
13 self, Addr, Addrs, ChainBlockNumber, GenesisConfig, PostAlonsoTransactionOutput,
14 StakeSnapshot, Stakes, TransactionOutput, UTxOByAddress,
15 },
16 Client,
17 },
18 Point,
19 },
20};
21use pallas_primitives::ToCanonicalJson;
22use pallas_traverse::Era;
23
24use crate::{
25 chain_observer::{interface::*, ChainAddress, TxDatum},
26 crypto_helper::{encode_bech32, KESPeriod, OpCert},
27 entities::{BlockNumber, ChainPoint, Epoch, SlotNumber, StakeDistribution},
28 CardanoNetwork, StdResult,
29};
30
31use super::model::{try_inspect, Datum, Datums};
32
33const ERA_OFFSET: u16 = 1;
36
37pub struct PallasChainObserver {
39 socket: PathBuf,
40 network: CardanoNetwork,
41}
42
43impl From<anyhow::Error> for ChainObserverError {
44 fn from(err: anyhow::Error) -> Self {
45 ChainObserverError::General(err)
46 }
47}
48
49impl PallasChainObserver {
50 pub fn new(socket: &Path, network: CardanoNetwork) -> Self {
52 Self {
53 socket: socket.to_owned(),
54 network,
55 }
56 }
57
58 async fn new_client(&self) -> StdResult<NodeClient> {
60 let magic = self.network.code();
61 let client = NodeClient::connect(&self.socket, magic).await?;
62
63 Ok(client)
64 }
65
66 async fn get_client(&self) -> StdResult<NodeClient> {
68 self.new_client()
69 .await
70 .map_err(|err| anyhow!(err))
71 .with_context(|| "PallasChainObserver failed to create new client")
72 }
73
74 async fn get_era(&self, statequery: &mut Client) -> StdResult<u16> {
76 statequery
77 .acquire(None)
78 .await
79 .map_err(|err| anyhow!(err))
80 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
81
82 let era = queries_v16::get_current_era(statequery)
83 .await
84 .map_err(|err| anyhow!(err))
85 .with_context(|| "PallasChainObserver failed to get current era")?;
86
87 Ok(era)
88 }
89
90 async fn get_epoch(&self, statequery: &mut Client) -> StdResult<u32> {
92 statequery
93 .acquire(None)
94 .await
95 .map_err(|err| anyhow!(err))
96 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
97
98 let era = queries_v16::get_current_era(statequery)
99 .await
100 .map_err(|err| anyhow!(err))
101 .with_context(|| "PallasChainObserver failed to get current era")?;
102
103 let epoch = queries_v16::get_block_epoch_number(statequery, era)
104 .await
105 .map_err(|err| anyhow!(err))
106 .with_context(|| "PallasChainObserver failed to get block epoch number")?;
107
108 Ok(epoch)
109 }
110
111 fn get_datum_tag(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TagWrap<Bytes, 24>> {
113 Ok(utxo
114 .inline_datum
115 .as_ref()
116 .with_context(|| "PallasChainObserver failed to get inline datum")?
117 .1
118 .clone())
119 }
120
121 fn inspect_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<Datum> {
123 let datum = self.get_datum_tag(utxo)?;
124 let datum = CborWrap(datum).to_vec();
125
126 try_inspect::<Datum>(datum)
127 }
128
129 fn serialize_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TxDatum> {
131 let datum = self.inspect_datum(utxo)?;
132 let serialized = serde_json::to_string(&datum.to_json())
133 .map_err(|err| anyhow!(err))
134 .with_context(|| "PallasChainObserver failed to serialize datum")?;
135
136 Ok(TxDatum(serialized))
137 }
138
139 fn map_datums(&self, transaction: UTxOByAddress) -> StdResult<Datums> {
141 transaction
142 .utxo
143 .iter()
144 .filter_map(|(_, utxo)| match utxo {
145 TransactionOutput::Current(output) => output
146 .inline_datum
147 .as_ref()
148 .map(|_| self.serialize_datum(output)),
149 _ => None,
150 })
151 .collect::<StdResult<Datums>>()
152 }
153
154 async fn get_utxo_datums(
156 &self,
157 client: &mut NodeClient,
158 address: &ChainAddress,
159 ) -> Result<Datums, ChainObserverError> {
160 let statequery = client.statequery();
161 let utxo = self.get_utxo_by_address(statequery, address).await?;
162
163 Ok(self.map_datums(utxo)?)
164 }
165
166 async fn get_utxo_by_address(
168 &self,
169 statequery: &mut Client,
170 address: &ChainAddress,
171 ) -> StdResult<UTxOByAddress> {
172 statequery
173 .acquire(None)
174 .await
175 .map_err(|err| anyhow!(err))
176 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
177
178 let era = queries_v16::get_current_era(statequery)
179 .await
180 .map_err(|err| anyhow!(err))
181 .with_context(|| "PallasChainObserver failed to get current era")?;
182
183 let addr: Address = Address::from_bech32(address)
184 .map_err(|err| anyhow!(err))
185 .with_context(|| "PallasChainObserver failed to parse address")?;
186
187 let addr: Addr = addr.to_vec().into();
188 let addrs: Addrs = vec![addr];
189 let utxo = queries_v16::get_utxo_by_address(statequery, era, addrs)
190 .await
191 .map_err(|err| anyhow!(err))
192 .with_context(|| "PallasChainObserver failed to get utxo")?;
193
194 Ok(utxo)
195 }
196
197 async fn do_stake_snapshots_state_query(
199 &self,
200 statequery: &mut Client,
201 ) -> StdResult<StakeSnapshot> {
202 statequery
203 .acquire(None)
204 .await
205 .map_err(|err| anyhow!(err))
206 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
207
208 let era = queries_v16::get_current_era(statequery)
209 .await
210 .map_err(|err| anyhow!(err))
211 .with_context(|| "PallasChainObserver failed to get current era")?;
212
213 let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
214 .await
215 .map_err(|err| anyhow!(err))
216 .with_context(|| "PallasChainObserver failed to get stake snapshot")?;
217
218 Ok(state_snapshot)
219 }
220
221 fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
223 let pool_id_bech32 = encode_bech32("pool", key)
224 .map_err(|err| anyhow!(err))
225 .with_context(|| "PallasChainObserver failed to encode stake pool hash")?;
226 Ok(pool_id_bech32)
227 }
228
229 async fn get_stake_distribution(
231 &self,
232 client: &mut NodeClient,
233 ) -> Result<Option<StakeDistribution>, ChainObserverError> {
234 let statequery = client.statequery();
235
236 let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;
237
238 let mut stake_distribution = StakeDistribution::new();
239
240 let have_stakes_in_two_epochs = |stakes: &Stakes| stakes.snapshot_mark_pool > 0;
241 for (key, stakes) in stake_snapshot
242 .snapshots
243 .stake_snapshots
244 .iter()
245 .filter(|(_, stakes)| have_stakes_in_two_epochs(stakes))
246 {
247 let pool_hash = self.get_stake_pool_hash(key)?;
248 stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
249 }
250
251 Ok(Some(stake_distribution))
252 }
253
254 async fn calculate_kes_period(
276 &self,
277 chain_point: Point,
278 slots_per_kes_period: u64,
279 ) -> Result<KESPeriod, ChainObserverError> {
280 if slots_per_kes_period == 0 {
281 return Err(anyhow!("slots_per_kes_period must be greater than 0"))
282 .with_context(|| "PallasChainObserver failed to calculate kes period")?;
283 }
284
285 let current_kes_period = chain_point.slot_or_default() / slots_per_kes_period;
286 Ok(u32::try_from(current_kes_period)
287 .map_err(|err| anyhow!(err))
288 .with_context(|| "PallasChainObserver failed to convert kes period")?)
289 }
290
291 async fn do_get_chain_point_state_query(&self, statequery: &mut Client) -> StdResult<Point> {
293 let chain_point = queries_v16::get_chain_point(statequery)
294 .await
295 .map_err(|err| anyhow!(err))
296 .with_context(|| "PallasChainObserver failed to get chain point")?;
297
298 Ok(chain_point)
299 }
300
301 async fn do_get_chain_block_no(&self, statequery: &mut Client) -> StdResult<ChainBlockNumber> {
303 let chain_block_number = queries_v16::get_chain_block_no(statequery)
304 .await
305 .map_err(|err| anyhow!(err))
306 .with_context(|| "PallasChainObserver failed to get chain block number")?;
307
308 Ok(chain_block_number)
309 }
310
311 async fn do_get_current_era_state_query(&self, statequery: &mut Client) -> StdResult<u16> {
313 let era = queries_v16::get_current_era(statequery)
314 .await
315 .map_err(|err| anyhow!(err))
316 .with_context(|| "PallasChainObserver failed to get current era")?;
317
318 Ok(era)
319 }
320
321 async fn do_get_genesis_config_state_query(
323 &self,
324 statequery: &mut Client,
325 ) -> StdResult<Vec<GenesisConfig>> {
326 let era = self.do_get_current_era_state_query(statequery).await?;
327 let genesis_config = queries_v16::get_genesis_config(statequery, era)
328 .await
329 .map_err(|err| anyhow!(err))
330 .with_context(|| "PallasChainObserver failed to get genesis config")?;
331
332 Ok(genesis_config)
333 }
334
335 async fn get_chain_point(&self, statequery: &mut Client) -> StdResult<ChainPoint> {
337 statequery
338 .acquire(None)
339 .await
340 .map_err(|err| anyhow!(err))
341 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
342
343 let chain_point = self.do_get_chain_point_state_query(statequery).await?;
344
345 let header_hash = match chain_point {
346 Point::Origin => None,
347 Point::Specific(_at_slot, ref hash) => Some(hex::encode(hash)),
348 };
349
350 let chain_block_number = self.do_get_chain_block_no(statequery).await?;
351
352 Ok(ChainPoint {
353 slot_number: SlotNumber(chain_point.slot_or_default()),
354 block_hash: header_hash.unwrap_or_default(),
355 block_number: BlockNumber(chain_block_number.block_number as u64),
356 })
357 }
358
359 async fn get_kes_period(
362 &self,
363 client: &mut NodeClient,
364 ) -> Result<Option<KESPeriod>, ChainObserverError> {
365 let statequery = client.statequery();
366
367 statequery
368 .acquire(None)
369 .await
370 .map_err(|err| anyhow!(err))
371 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
372
373 let chain_point = self.do_get_chain_point_state_query(statequery).await?;
374
375 let genesis_config = self.do_get_genesis_config_state_query(statequery).await?;
376
377 let config = genesis_config
378 .first()
379 .with_context(|| "PallasChainObserver failed to extract the config")?;
380
381 let current_kes_period = self
382 .calculate_kes_period(chain_point, config.slots_per_kes_period as u64)
383 .await?;
384
385 Ok(Some(current_kes_period))
386 }
387
388 async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
390 let statequery = client.statequery();
391 statequery
392 .send_release()
393 .await
394 .map_err(|err| anyhow!(err))
395 .with_context(|| "PallasChainObserver send release failed")?;
396
397 statequery
398 .send_done()
399 .await
400 .map_err(|err| anyhow!(err))
401 .with_context(|| "PallasChainObserver send done failed")?;
402
403 Ok(())
404 }
405
406 async fn sync(&self, client: &mut NodeClient) -> StdResult<()> {
408 client
409 .chainsync()
410 .send_done()
411 .await
412 .map_err(|err| anyhow!(err))
413 .with_context(|| "PallasChainObserver chainsync send done failed")?;
414 Ok(())
415 }
416
417 async fn post_process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
419 self.process_statequery(client).await?;
420 self.sync(client).await?;
421
422 Ok(())
423 }
424}
425
426#[async_trait]
427impl ChainObserver for PallasChainObserver {
428 async fn get_current_era(&self) -> Result<Option<String>, ChainObserverError> {
429 let mut client = self.get_client().await?;
430
431 let era = self.get_era(client.statequery()).await?;
432
433 let era = Era::try_from(era + ERA_OFFSET)
434 .with_context(|| "PallasChainObserver failed to convert: '{era}' to Era")?;
435
436 self.post_process_statequery(&mut client).await?;
437
438 client.abort().await;
439
440 Ok(Some(era.to_string()))
441 }
442
443 async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
444 let mut client = self.get_client().await?;
445
446 let epoch = self.get_epoch(client.statequery()).await?;
447
448 self.post_process_statequery(&mut client).await?;
449
450 client.abort().await;
451
452 Ok(Some(Epoch(epoch as u64)))
453 }
454
455 async fn get_current_chain_point(&self) -> Result<Option<ChainPoint>, ChainObserverError> {
456 let mut client = self.get_client().await?;
457
458 let chain_point = self.get_chain_point(client.statequery()).await?;
459
460 self.post_process_statequery(&mut client).await?;
461
462 client.abort().await;
463
464 Ok(Some(chain_point))
465 }
466
467 async fn get_current_datums(
468 &self,
469 address: &ChainAddress,
470 ) -> Result<Datums, ChainObserverError> {
471 let mut client = self.get_client().await?;
472
473 let datums = self.get_utxo_datums(&mut client, address).await?;
474
475 self.post_process_statequery(&mut client).await?;
476
477 client.abort().await;
478
479 Ok(datums)
480 }
481
482 async fn get_current_stake_distribution(
483 &self,
484 ) -> Result<Option<StakeDistribution>, ChainObserverError> {
485 let mut client = self.get_client().await?;
486
487 let stake_distribution = self.get_stake_distribution(&mut client).await?;
488
489 self.post_process_statequery(&mut client).await?;
490
491 client.abort().await;
492
493 Ok(stake_distribution)
494 }
495
496 async fn get_current_kes_period(
497 &self,
498 _opcert: &OpCert,
499 ) -> Result<Option<KESPeriod>, ChainObserverError> {
500 let mut client = self.get_client().await?;
501
502 let current_kes_period = self.get_kes_period(&mut client).await?;
503
504 self.post_process_statequery(&mut client).await?;
505
506 client.abort().await;
507
508 Ok(current_kes_period)
509 }
510}
511
512#[cfg(test)]
513mod tests {
514 use std::fs;
515
516 use kes_summed_ed25519::{kes::Sum6Kes, traits::KesSk};
517 use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
518 use pallas_crypto::hash::Hash;
519 use pallas_network::miniprotocols::{
520 localstate::{
521 queries_v16::{
522 BlockQuery, ChainBlockNumber, Fraction, GenesisConfig, HardForkQuery, LedgerQuery,
523 Request, Snapshots, StakeSnapshot, SystemStart, Value,
524 },
525 ClientQueryRequest,
526 },
527 Point,
528 };
529 use tokio::net::UnixListener;
530
531 use crate::test_utils::TempDir;
532 use crate::{crypto_helper::ColdKeyGenerator, CardanoNetwork};
533
534 use super::*;
535
536 fn get_fake_utxo_by_address() -> UTxOByAddress {
537 let tx_hex = "1e4e5cf2889d52f1745b941090f04a65dea6ce56c5e5e66e69f65c8e36347c17";
538 let tx_bytes: [u8; 32] = hex::decode(tx_hex).unwrap().try_into().unwrap();
539 let transaction_id = Hash::from(tx_bytes);
540 let index = AnyUInt::MajorByte(2);
541 let lovelace = AnyUInt::MajorByte(2);
542 let hex_datum = "D8799F58407B226D61726B657273223A5B7B226E616D65223A227468616C6573222C2265706F6368223A307D5D2C227369676E6174757265223A22383566323265626261645840333335376338656132646630363230393766396131383064643335643966336261316432363832633732633864313232383866616438636238643063656565625838366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227DFF";
543 let datum = hex::decode(hex_datum).unwrap().into();
544 let tag = TagWrap::<_, 24>::new(datum);
545 let inline_datum = Some((1_u16, tag));
546
547 let address: Address =
548 Address::from_bech32("addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0")
549 .unwrap();
550 let address: Addr = address.to_vec().into();
551 let values = TransactionOutput::Current(PostAlonsoTransactionOutput {
552 address,
553 amount: Value::Coin(lovelace),
554 inline_datum,
555 script_ref: None,
556 });
557 let utxo = KeyValuePairs::from(vec![(
558 queries_v16::UTxO {
559 transaction_id,
560 index,
561 },
562 values,
563 )]);
564
565 UTxOByAddress { utxo }
566 }
567
568 fn get_fake_stake_snapshot() -> StakeSnapshot {
569 let stake_snapshots = KeyValuePairs::from(vec![
570 (
571 Bytes::from(
572 hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
573 .unwrap(),
574 ),
575 Stakes {
576 snapshot_mark_pool: 300000000001,
577 snapshot_set_pool: 300000000002,
578 snapshot_go_pool: 300000000000,
579 },
580 ),
581 (
582 Bytes::from(
583 hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
584 .unwrap(),
585 ),
586 Stakes {
587 snapshot_mark_pool: 600000000001,
588 snapshot_set_pool: 600000000002,
589 snapshot_go_pool: 600000000000,
590 },
591 ),
592 (
593 Bytes::from(
594 hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
595 .unwrap(),
596 ),
597 Stakes {
598 snapshot_mark_pool: 1200000000001,
599 snapshot_set_pool: 1200000000002,
600 snapshot_go_pool: 1200000000000,
601 },
602 ),
603 (
604 Bytes::from(
605 hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
606 .unwrap(),
607 ),
608 Stakes {
609 snapshot_mark_pool: 0,
610 snapshot_set_pool: 1300000000002,
611 snapshot_go_pool: 0,
612 },
613 ),
614 ]);
615
616 StakeSnapshot {
617 snapshots: Snapshots {
618 stake_snapshots,
619 snapshot_stake_mark_total: 2100000000003,
620 snapshot_stake_set_total: 2100000000006,
621 snapshot_stake_go_total: 2100000000000,
622 },
623 }
624 }
625
626 fn get_fake_genesis_config() -> Vec<GenesisConfig> {
627 let genesis = GenesisConfig {
628 system_start: SystemStart {
629 year: 2021,
630 day_of_year: 150,
631 picoseconds_of_day: 0,
632 },
633 network_magic: 42,
634 network_id: 42,
635 active_slots_coefficient: Fraction { num: 6, den: 10 },
636 security_param: 2160,
637 epoch_length: 432000,
638 slots_per_kes_period: 129600,
639 max_kes_evolutions: 62,
640 slot_length: 1,
641 update_quorum: 5,
642 max_lovelace_supply: AnyUInt::MajorByte(2),
643 };
644
645 vec![genesis]
646 }
647
648 async fn mock_server(server: &mut pallas_network::facades::NodeServer) -> AnyCbor {
650 let query: queries_v16::Request =
651 match server.statequery().recv_while_acquired().await.unwrap() {
652 ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
653 x => panic!("unexpected message from client: {x:?}"),
654 };
655
656 match query {
657 Request::GetChainPoint => {
658 AnyCbor::from_encode(Point::Specific(52851885, vec![1, 2, 3]))
659 }
660 Request::GetChainBlockNo => AnyCbor::from_encode(ChainBlockNumber {
661 slot_timeline: 1,
662 block_number: 52851885,
663 }),
664 Request::LedgerQuery(LedgerQuery::HardForkQuery(HardForkQuery::GetCurrentEra)) => {
665 AnyCbor::from_encode(4)
666 }
667 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetEpochNo)) => {
668 AnyCbor::from_encode([8])
669 }
670 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetGenesisConfig)) => {
671 AnyCbor::from_encode(get_fake_genesis_config())
672 }
673 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetUTxOByAddress(_))) => {
674 AnyCbor::from_encode(get_fake_utxo_by_address())
675 }
676 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetStakeSnapshots(_))) => {
677 AnyCbor::from_encode(get_fake_stake_snapshot())
678 }
679 _ => panic!("unexpected query from client: {query:?}"),
680 }
681 }
682
683 fn create_temp_dir(folder_name: &str) -> PathBuf {
685 TempDir::create_with_short_path("pallas_chain_observer_test", folder_name)
686 }
687
688 async fn setup_server(socket_path: PathBuf, intersections: u32) -> tokio::task::JoinHandle<()> {
694 tokio::spawn({
695 async move {
696 if socket_path.exists() {
697 fs::remove_file(&socket_path).expect("Previous socket removal failed");
698 }
699
700 let unix_listener = UnixListener::bind(socket_path.as_path()).unwrap();
701 let mut server = pallas_network::facades::NodeServer::accept(&unix_listener, 10)
702 .await
703 .unwrap();
704
705 server.statequery().recv_while_idle().await.unwrap();
706 server.statequery().send_acquired().await.unwrap();
707
708 for _ in 0..intersections {
709 let result = mock_server(&mut server).await;
710 server.statequery().send_result(result).await.unwrap();
711 }
712 }
713 })
714 }
715
716 #[tokio::test]
717 async fn get_current_epoch() {
718 let socket_path = create_temp_dir("get_current_epoch").join("node.socket");
719 let server = setup_server(socket_path.clone(), 2).await;
720 let client = tokio::spawn(async move {
721 let observer =
722 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
723 observer.get_current_epoch().await.unwrap().unwrap()
724 });
725
726 let (_, client_res) = tokio::join!(server, client);
727 let epoch = client_res.expect("Client failed");
728 assert_eq!(epoch, 8);
729 }
730
731 #[tokio::test]
732 async fn get_current_datums() {
733 let socket_path = create_temp_dir("get_current_datums").join("node.socket");
734 let server = setup_server(socket_path.clone(), 2).await;
735 let client = tokio::spawn(async move {
736 let observer =
737 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
738 let address =
739 "addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0".to_string();
740 observer.get_current_datums(&address).await.unwrap()
741 });
742
743 let (_, client_res) = tokio::join!(server, client);
744 let datums = client_res.expect("Client failed");
745 assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
746 }
747
748 #[tokio::test]
749 async fn get_current_stake_distribution() {
750 let socket_path = create_temp_dir("get_current_stake_distribution").join("node.socket");
751 let server = setup_server(socket_path.clone(), 2).await;
752 let client = tokio::spawn(async move {
753 let observer =
754 super::PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
755 observer.get_current_stake_distribution().await.unwrap()
756 });
757
758 let (_, client_res) = tokio::join!(server, client);
759 let computed_stake_distribution = client_res.unwrap().unwrap();
760
761 let mut expected_stake_distribution = StakeDistribution::new();
762 expected_stake_distribution.insert(
763 "pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
764 300000000001,
765 );
766 expected_stake_distribution.insert(
767 "pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
768 600000000001,
769 );
770 expected_stake_distribution.insert(
771 "pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
772 1200000000001,
773 );
774
775 assert_eq!(expected_stake_distribution, computed_stake_distribution);
776 }
777
778 #[tokio::test]
779 async fn get_current_kes_period() {
780 let socket_path = create_temp_dir("get_current_kes_period").join("node.socket");
781 let server = setup_server(socket_path.clone(), 3).await;
782 let client = tokio::spawn(async move {
783 let observer =
784 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
785
786 let keypair = ColdKeyGenerator::create_deterministic_keypair([0u8; 32]);
787 let mut dummy_key_buffer = [0u8; Sum6Kes::SIZE + 4];
788 let mut dummy_seed = [0u8; 32];
789 let (_, kes_verification_key) = Sum6Kes::keygen(&mut dummy_key_buffer, &mut dummy_seed);
790 let operational_certificate = OpCert::new(kes_verification_key, 0, 0, keypair);
791 observer
792 .get_current_kes_period(&operational_certificate)
793 .await
794 .unwrap()
795 });
796
797 let (_, client_res) = tokio::join!(server, client);
798 let kes_period = client_res.unwrap().unwrap();
799 assert_eq!(407, kes_period);
800 }
801
802 #[tokio::test]
803 async fn calculate_kes_period() {
804 let socket_path = create_temp_dir("get_current_kes_period").join("node.socket");
805 let observer = PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
806 let current_kes_period = observer
807 .calculate_kes_period(Point::Specific(53536042, vec![1, 2, 3]), 129600)
808 .await
809 .unwrap();
810
811 assert_eq!(413, current_kes_period);
812
813 let current_kes_period = observer
814 .calculate_kes_period(Point::Specific(53524800, vec![1, 2, 3]), 129600)
815 .await
816 .unwrap();
817
818 assert_eq!(413, current_kes_period);
819
820 let current_kes_period = observer
821 .calculate_kes_period(Point::Specific(53649999, vec![1, 2, 3]), 129600)
822 .await
823 .unwrap();
824
825 assert_eq!(413, current_kes_period);
826 }
827
828 #[tokio::test]
829 async fn get_chain_point() {
830 let socket_path = create_temp_dir("get_chain_point").join("node.socket");
831 let server = setup_server(socket_path.clone(), 1).await;
832 let client = tokio::spawn(async move {
833 let observer =
834 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
835 let mut client = observer.get_client().await.unwrap();
836 let statequery = client.statequery();
837 statequery.acquire(None).await.unwrap();
838 let chain_point = observer
839 .do_get_chain_point_state_query(statequery)
840 .await
841 .unwrap();
842 observer.post_process_statequery(&mut client).await.unwrap();
843 client.abort().await;
844 chain_point
845 });
846
847 let (_, client_res) = tokio::join!(server, client);
848 let chain_point = client_res.expect("Client failed");
849 assert_eq!(chain_point, Point::Specific(52851885, vec![1, 2, 3]));
850 }
851
852 #[tokio::test]
853 async fn get_genesis_config() {
854 let socket_path = create_temp_dir("get_genesis_config").join("node.socket");
855 let server = setup_server(socket_path.clone(), 2).await;
856 let client = tokio::spawn(async move {
857 let observer =
858 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
859 let mut client = observer.get_client().await.unwrap();
860 let statequery = client.statequery();
861 statequery.acquire(None).await.unwrap();
862 let genesis_config = observer
863 .do_get_genesis_config_state_query(statequery)
864 .await
865 .unwrap();
866 observer.post_process_statequery(&mut client).await.unwrap();
867 client.abort().await;
868 genesis_config
869 });
870
871 let (_, client_res) = tokio::join!(server, client);
872 let genesis_config = client_res.expect("Client failed");
873 assert_eq!(genesis_config, get_fake_genesis_config());
874 }
875
876 #[tokio::test]
877 async fn fetch_current_era_from_state_query() {
878 let socket_path = create_temp_dir("get_current_era").join("node.socket");
879 let server = setup_server(socket_path.clone(), 1).await;
880 let client = tokio::spawn(async move {
881 let observer =
882 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
883 let mut client = observer.get_client().await.unwrap();
884 let statequery = client.statequery();
885 statequery.acquire(None).await.unwrap();
886 let era = observer
887 .do_get_current_era_state_query(statequery)
888 .await
889 .unwrap();
890 observer.post_process_statequery(&mut client).await.unwrap();
891 client.abort().await;
892 era
893 });
894
895 let (_, client_res) = tokio::join!(server, client);
896 let era = client_res.expect("Client failed");
897 assert_eq!(era, 4);
898 }
899
900 #[tokio::test]
901 async fn get_current_era() {
902 let socket_path = create_temp_dir("get_current_era_as_string").join("node.socket");
903 let server = setup_server(socket_path.clone(), 1).await;
904 let client = tokio::spawn(async move {
905 let observer =
906 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
907 observer.get_current_era().await.unwrap().unwrap()
908 });
909
910 let (_, client_res) = tokio::join!(server, client);
911 let era = client_res.expect("Client failed");
912
913 let expected_era = Era::try_from(4 + ERA_OFFSET).unwrap().to_string();
914 assert_eq!(era, expected_era);
915 }
916
917 #[tokio::test]
918 async fn get_current_chain_point() {
919 let socket_path = create_temp_dir("get_current_chain_point").join("node.socket");
920 let server = setup_server(socket_path.clone(), 2).await;
921 let client = tokio::spawn(async move {
922 let observer =
923 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
924 observer.get_current_chain_point().await.unwrap()
925 });
926
927 let (_, client_res) = tokio::join!(server, client);
928 let chain_point = client_res.expect("Client failed");
929 assert_eq!(
930 chain_point,
931 Some(ChainPoint {
932 slot_number: SlotNumber(52851885),
933 block_hash: "010203".to_string(),
934 block_number: BlockNumber(52851885)
935 })
936 );
937 }
938}