1use std::collections::BTreeSet;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, anyhow};
5use async_trait::async_trait;
6use pallas_addresses::Address;
7use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
8use pallas_network::{
9 facades::NodeClient,
10 miniprotocols::{
11 Point,
12 localstate::{
13 Client,
14 queries_v16::{
15 self, Addr, Addrs, ChainBlockNumber, GenesisConfig, PostAlonsoTransactionOutput,
16 StakeSnapshot, Stakes, TransactionOutput, UTxOByAddress,
17 },
18 },
19 },
20};
21use pallas_primitives::ToCanonicalJson;
22use pallas_traverse::Era;
23
24use mithril_common::crypto_helper::{KesPeriod, OpCert, encode_bech32};
25use mithril_common::entities::{BlockNumber, ChainPoint, Epoch, SlotNumber, StakeDistribution};
26use mithril_common::{CardanoNetwork, StdResult};
27
28use crate::entities::{ChainAddress, Datum, Datums, TxDatum, try_inspect};
29
30use super::{ChainObserver, ChainObserverError};
31
32const ERA_OFFSET: u16 = 1;
35
36pub struct PallasChainObserver {
38 socket: PathBuf,
39 network: CardanoNetwork,
40}
41
42impl From<anyhow::Error> for ChainObserverError {
43 fn from(err: anyhow::Error) -> Self {
44 ChainObserverError::General(err)
45 }
46}
47
48impl PallasChainObserver {
49 pub fn new(socket: &Path, network: CardanoNetwork) -> Self {
51 Self {
52 socket: socket.to_owned(),
53 network,
54 }
55 }
56
57 async fn new_client(&self) -> StdResult<NodeClient> {
59 let magic = self.network.code();
60 let client = NodeClient::connect(&self.socket, magic).await?;
61
62 Ok(client)
63 }
64
65 async fn get_client(&self) -> StdResult<NodeClient> {
67 self.new_client()
68 .await
69 .map_err(|err| anyhow!(err))
70 .with_context(|| "PallasChainObserver failed to create new client")
71 }
72
73 async fn get_era(&self, statequery: &mut Client) -> StdResult<u16> {
75 statequery
76 .acquire(None)
77 .await
78 .map_err(|err| anyhow!(err))
79 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
80
81 let era = queries_v16::get_current_era(statequery)
82 .await
83 .map_err(|err| anyhow!(err))
84 .with_context(|| "PallasChainObserver failed to get current era")?;
85
86 Ok(era)
87 }
88
89 async fn get_epoch(&self, statequery: &mut Client) -> StdResult<u32> {
91 statequery
92 .acquire(None)
93 .await
94 .map_err(|err| anyhow!(err))
95 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
96
97 let era = queries_v16::get_current_era(statequery)
98 .await
99 .map_err(|err| anyhow!(err))
100 .with_context(|| "PallasChainObserver failed to get current era")?;
101
102 let epoch = queries_v16::get_block_epoch_number(statequery, era)
103 .await
104 .map_err(|err| anyhow!(err))
105 .with_context(|| "PallasChainObserver failed to get block epoch number")?;
106
107 Ok(epoch)
108 }
109
110 fn get_datum_tag(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TagWrap<Bytes, 24>> {
112 Ok(utxo
113 .inline_datum
114 .as_ref()
115 .with_context(|| "PallasChainObserver failed to get inline datum")?
116 .1
117 .clone())
118 }
119
120 fn inspect_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<Datum> {
122 let datum = self.get_datum_tag(utxo)?;
123 let datum = CborWrap(datum).to_vec();
124
125 try_inspect::<Datum>(datum)
126 }
127
128 fn serialize_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TxDatum> {
130 let datum = self.inspect_datum(utxo)?;
131 let serialized = serde_json::to_string(&datum.to_json())
132 .map_err(|err| anyhow!(err))
133 .with_context(|| "PallasChainObserver failed to serialize datum")?;
134
135 Ok(TxDatum(serialized))
136 }
137
138 fn map_datums(&self, transaction: UTxOByAddress) -> StdResult<Datums> {
140 transaction
141 .utxo
142 .iter()
143 .filter_map(|(_, utxo)| match utxo {
144 TransactionOutput::Current(output) => {
145 output.inline_datum.as_ref().map(|_| self.serialize_datum(output))
146 }
147 _ => None,
148 })
149 .collect::<StdResult<Datums>>()
150 }
151
152 async fn get_utxo_datums(
154 &self,
155 client: &mut NodeClient,
156 address: &ChainAddress,
157 ) -> Result<Datums, ChainObserverError> {
158 let statequery = client.statequery();
159 let utxo = self.get_utxo_by_address(statequery, address).await?;
160
161 Ok(self.map_datums(utxo)?)
162 }
163
164 async fn get_utxo_by_address(
166 &self,
167 statequery: &mut Client,
168 address: &ChainAddress,
169 ) -> StdResult<UTxOByAddress> {
170 statequery
171 .acquire(None)
172 .await
173 .map_err(|err| anyhow!(err))
174 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
175
176 let era = queries_v16::get_current_era(statequery)
177 .await
178 .map_err(|err| anyhow!(err))
179 .with_context(|| "PallasChainObserver failed to get current era")?;
180
181 let addr: Address = Address::from_bech32(address)
182 .map_err(|err| anyhow!(err))
183 .with_context(|| "PallasChainObserver failed to parse address")?;
184
185 let addr: Addr = addr.to_vec().into();
186 let addrs: Addrs = vec![addr];
187 let utxo = queries_v16::get_utxo_by_address(statequery, era, addrs)
188 .await
189 .map_err(|err| anyhow!(err))
190 .with_context(|| "PallasChainObserver failed to get utxo")?;
191
192 Ok(utxo)
193 }
194
195 async fn do_stake_snapshots_state_query(
197 &self,
198 statequery: &mut Client,
199 ) -> StdResult<StakeSnapshot> {
200 statequery
201 .acquire(None)
202 .await
203 .map_err(|err| anyhow!(err))
204 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
205
206 let era = queries_v16::get_current_era(statequery)
207 .await
208 .map_err(|err| anyhow!(err))
209 .with_context(|| "PallasChainObserver failed to get current era")?;
210
211 let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
212 .await
213 .map_err(|err| anyhow!(err))
214 .with_context(|| "PallasChainObserver failed to get stake snapshot")?;
215
216 Ok(state_snapshot)
217 }
218
219 fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
221 let pool_id_bech32 = encode_bech32("pool", key)
222 .map_err(|err| anyhow!(err))
223 .with_context(|| "PallasChainObserver failed to encode stake pool hash")?;
224 Ok(pool_id_bech32)
225 }
226
227 async fn get_stake_distribution(
229 &self,
230 client: &mut NodeClient,
231 ) -> Result<Option<StakeDistribution>, ChainObserverError> {
232 let statequery = client.statequery();
233
234 let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;
235
236 let mut stake_distribution = StakeDistribution::new();
237
238 let have_stakes_in_two_epochs = |stakes: &Stakes| stakes.snapshot_mark_pool > 0;
239 for (key, stakes) in stake_snapshot
240 .snapshots
241 .stake_snapshots
242 .iter()
243 .filter(|(_, stakes)| have_stakes_in_two_epochs(stakes))
244 {
245 let pool_hash = self.get_stake_pool_hash(key)?;
246 stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
247 }
248
249 Ok(Some(stake_distribution))
250 }
251
252 async fn calculate_kes_period(
274 &self,
275 chain_point: Point,
276 slots_per_kes_period: u64,
277 ) -> Result<KesPeriod, ChainObserverError> {
278 if slots_per_kes_period == 0 {
279 return Err(anyhow!("slots_per_kes_period must be greater than 0"))
280 .with_context(|| "PallasChainObserver failed to calculate kes period")?;
281 }
282
283 let current_kes_period = chain_point.slot_or_default() / slots_per_kes_period;
284 Ok(u32::try_from(current_kes_period)
285 .map_err(|err| anyhow!(err))
286 .with_context(|| "PallasChainObserver failed to convert kes period")?)
287 }
288
289 async fn do_get_chain_point_state_query(&self, statequery: &mut Client) -> StdResult<Point> {
291 let chain_point = queries_v16::get_chain_point(statequery)
292 .await
293 .map_err(|err| anyhow!(err))
294 .with_context(|| "PallasChainObserver failed to get chain point")?;
295
296 Ok(chain_point)
297 }
298
299 async fn do_get_chain_block_no(&self, statequery: &mut Client) -> StdResult<ChainBlockNumber> {
301 let chain_block_number = queries_v16::get_chain_block_no(statequery)
302 .await
303 .map_err(|err| anyhow!(err))
304 .with_context(|| "PallasChainObserver failed to get chain block number")?;
305
306 Ok(chain_block_number)
307 }
308
309 async fn do_get_current_era_state_query(&self, statequery: &mut Client) -> StdResult<u16> {
311 let era = queries_v16::get_current_era(statequery)
312 .await
313 .map_err(|err| anyhow!(err))
314 .with_context(|| "PallasChainObserver failed to get current era")?;
315
316 Ok(era)
317 }
318
319 async fn do_get_genesis_config_state_query(
321 &self,
322 statequery: &mut Client,
323 ) -> StdResult<Vec<GenesisConfig>> {
324 let era = self.do_get_current_era_state_query(statequery).await?;
325 let genesis_config = queries_v16::get_genesis_config(statequery, era)
326 .await
327 .map_err(|err| anyhow!(err))
328 .with_context(|| "PallasChainObserver failed to get genesis config")?;
329
330 Ok(genesis_config)
331 }
332
333 async fn get_chain_point(&self, statequery: &mut Client) -> StdResult<ChainPoint> {
335 statequery
336 .acquire(None)
337 .await
338 .map_err(|err| anyhow!(err))
339 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
340
341 let chain_point = self.do_get_chain_point_state_query(statequery).await?;
342
343 let header_hash = match chain_point {
344 Point::Origin => None,
345 Point::Specific(_at_slot, ref hash) => Some(hex::encode(hash)),
346 };
347
348 let chain_block_number = self.do_get_chain_block_no(statequery).await?;
349
350 Ok(ChainPoint {
351 slot_number: SlotNumber(chain_point.slot_or_default()),
352 block_hash: header_hash.unwrap_or_default(),
353 block_number: BlockNumber(chain_block_number.block_number as u64),
354 })
355 }
356
357 async fn get_kes_period(
360 &self,
361 client: &mut NodeClient,
362 ) -> Result<Option<KesPeriod>, ChainObserverError> {
363 let statequery = client.statequery();
364
365 statequery
366 .acquire(None)
367 .await
368 .map_err(|err| anyhow!(err))
369 .with_context(|| "PallasChainObserver failed to acquire statequery")?;
370
371 let chain_point = self.do_get_chain_point_state_query(statequery).await?;
372
373 let genesis_config = self.do_get_genesis_config_state_query(statequery).await?;
374
375 let config = genesis_config
376 .first()
377 .with_context(|| "PallasChainObserver failed to extract the config")?;
378
379 let current_kes_period = self
380 .calculate_kes_period(chain_point, config.slots_per_kes_period as u64)
381 .await?;
382
383 Ok(Some(current_kes_period))
384 }
385
386 async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
388 let statequery = client.statequery();
389 statequery
390 .send_release()
391 .await
392 .map_err(|err| anyhow!(err))
393 .with_context(|| "PallasChainObserver send release failed")?;
394
395 statequery
396 .send_done()
397 .await
398 .map_err(|err| anyhow!(err))
399 .with_context(|| "PallasChainObserver send done failed")?;
400
401 Ok(())
402 }
403
404 async fn sync(&self, client: &mut NodeClient) -> StdResult<()> {
406 client
407 .chainsync()
408 .send_done()
409 .await
410 .map_err(|err| anyhow!(err))
411 .with_context(|| "PallasChainObserver chainsync send done failed")?;
412 Ok(())
413 }
414
415 async fn post_process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
417 self.process_statequery(client).await?;
418 self.sync(client).await?;
419
420 Ok(())
421 }
422}
423
424#[async_trait]
425impl ChainObserver for PallasChainObserver {
426 async fn get_current_era(&self) -> Result<Option<String>, ChainObserverError> {
427 let mut client = self.get_client().await?;
428
429 let era = self.get_era(client.statequery()).await?;
430
431 let era = Era::try_from(era + ERA_OFFSET)
432 .with_context(|| "PallasChainObserver failed to convert: '{era}' to Era")?;
433
434 self.post_process_statequery(&mut client).await?;
435
436 client.abort().await;
437
438 Ok(Some(era.to_string()))
439 }
440
441 async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
442 let mut client = self.get_client().await?;
443
444 let epoch = self.get_epoch(client.statequery()).await?;
445
446 self.post_process_statequery(&mut client).await?;
447
448 client.abort().await;
449
450 Ok(Some(Epoch(epoch as u64)))
451 }
452
453 async fn get_current_chain_point(&self) -> Result<Option<ChainPoint>, ChainObserverError> {
454 let mut client = self.get_client().await?;
455
456 let chain_point = self.get_chain_point(client.statequery()).await?;
457
458 self.post_process_statequery(&mut client).await?;
459
460 client.abort().await;
461
462 Ok(Some(chain_point))
463 }
464
465 async fn get_current_datums(
466 &self,
467 address: &ChainAddress,
468 ) -> Result<Datums, ChainObserverError> {
469 let mut client = self.get_client().await?;
470
471 let datums = self.get_utxo_datums(&mut client, address).await?;
472
473 self.post_process_statequery(&mut client).await?;
474
475 client.abort().await;
476
477 Ok(datums)
478 }
479
480 async fn get_current_stake_distribution(
481 &self,
482 ) -> Result<Option<StakeDistribution>, ChainObserverError> {
483 let mut client = self.get_client().await?;
484
485 let stake_distribution = self.get_stake_distribution(&mut client).await?;
486
487 self.post_process_statequery(&mut client).await?;
488
489 client.abort().await;
490
491 Ok(stake_distribution)
492 }
493
494 async fn get_current_kes_period(
495 &self,
496 _opcert: &OpCert,
497 ) -> Result<Option<KesPeriod>, ChainObserverError> {
498 let mut client = self.get_client().await?;
499
500 let current_kes_period = self.get_kes_period(&mut client).await?;
501
502 self.post_process_statequery(&mut client).await?;
503
504 client.abort().await;
505
506 Ok(current_kes_period)
507 }
508}
509
510#[cfg(all(test, unix))]
512mod tests {
513 use std::fs;
514
515 use kes_summed_ed25519::{kes::Sum6Kes, traits::KesSk};
516 use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
517 use pallas_crypto::hash::Hash;
518 use pallas_network::facades::NodeServer;
519 use pallas_network::miniprotocols::{
520 Point,
521 localstate::{
522 ClientQueryRequest,
523 queries_v16::{
524 BlockQuery, ChainBlockNumber, Fraction, GenesisConfig, HardForkQuery, LedgerQuery,
525 Request, Snapshots, StakeSnapshot, SystemStart, Value,
526 },
527 },
528 };
529 use tokio::net::UnixListener;
530
531 use mithril_common::crypto_helper::ColdKeyGenerator;
532 use mithril_common::test_utils::TempDir;
533
534 use super::*;
535
536 fn get_fake_utxo_by_address() -> UTxOByAddress {
537 let tx_hex = "1e4e5cf2889d52f1745b941090f04a65dea6ce56c5e5e66e69f65c8e36347c17";
538 let tx_bytes: [u8; 32] = hex::decode(tx_hex).unwrap().try_into().unwrap();
539 let transaction_id = Hash::from(tx_bytes);
540 let index = AnyUInt::MajorByte(2);
541 let lovelace = AnyUInt::MajorByte(2);
542 let hex_datum = "D8799F58407B226D61726B657273223A5B7B226E616D65223A227468616C6573222C2265706F6368223A307D5D2C227369676E6174757265223A22383566323265626261645840333335376338656132646630363230393766396131383064643335643966336261316432363832633732633864313232383866616438636238643063656565625838366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227DFF";
543 let datum = hex::decode(hex_datum).unwrap().into();
544 let tag = TagWrap::<_, 24>::new(datum);
545 let inline_datum = Some((1_u16, tag));
546
547 let address: Address =
548 Address::from_bech32("addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0")
549 .unwrap();
550 let address: Addr = address.to_vec().into();
551 let values = TransactionOutput::Current(PostAlonsoTransactionOutput {
552 address,
553 amount: Value::Coin(lovelace),
554 inline_datum,
555 script_ref: None,
556 });
557 let utxo = KeyValuePairs::from(vec![(
558 queries_v16::UTxO {
559 transaction_id,
560 index,
561 },
562 values,
563 )]);
564
565 UTxOByAddress { utxo }
566 }
567
568 fn get_fake_stake_snapshot() -> StakeSnapshot {
569 let stake_snapshots = KeyValuePairs::from(vec![
570 (
571 Bytes::from(
572 hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
573 .unwrap(),
574 ),
575 Stakes {
576 snapshot_mark_pool: 300000000001,
577 snapshot_set_pool: 300000000002,
578 snapshot_go_pool: 300000000000,
579 },
580 ),
581 (
582 Bytes::from(
583 hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
584 .unwrap(),
585 ),
586 Stakes {
587 snapshot_mark_pool: 600000000001,
588 snapshot_set_pool: 600000000002,
589 snapshot_go_pool: 600000000000,
590 },
591 ),
592 (
593 Bytes::from(
594 hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
595 .unwrap(),
596 ),
597 Stakes {
598 snapshot_mark_pool: 1200000000001,
599 snapshot_set_pool: 1200000000002,
600 snapshot_go_pool: 1200000000000,
601 },
602 ),
603 (
604 Bytes::from(
605 hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
606 .unwrap(),
607 ),
608 Stakes {
609 snapshot_mark_pool: 0,
610 snapshot_set_pool: 1300000000002,
611 snapshot_go_pool: 0,
612 },
613 ),
614 ]);
615
616 StakeSnapshot {
617 snapshots: Snapshots {
618 stake_snapshots,
619 snapshot_stake_mark_total: 2100000000003,
620 snapshot_stake_set_total: 2100000000006,
621 snapshot_stake_go_total: 2100000000000,
622 },
623 }
624 }
625
626 fn get_fake_genesis_config() -> Vec<GenesisConfig> {
627 let genesis = GenesisConfig {
628 system_start: SystemStart {
629 year: 2021,
630 day_of_year: 150,
631 picoseconds_of_day: 0,
632 },
633 network_magic: 42,
634 network_id: 42,
635 active_slots_coefficient: Fraction { num: 6, den: 10 },
636 security_param: 2160,
637 epoch_length: 432000,
638 slots_per_kes_period: 129600,
639 max_kes_evolutions: 62,
640 slot_length: 1,
641 update_quorum: 5,
642 max_lovelace_supply: AnyUInt::MajorByte(2),
643 };
644
645 vec![genesis]
646 }
647
648 async fn mock_server(server: &mut NodeServer) -> AnyCbor {
650 let query: queries_v16::Request =
651 match server.statequery().recv_while_acquired().await.unwrap() {
652 ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
653 x => panic!("unexpected message from client: {x:?}"),
654 };
655
656 match query {
657 Request::GetChainPoint => {
658 AnyCbor::from_encode(Point::Specific(52851885, vec![1, 2, 3]))
659 }
660 Request::GetChainBlockNo => AnyCbor::from_encode(ChainBlockNumber {
661 slot_timeline: 1,
662 block_number: 52851885,
663 }),
664 Request::LedgerQuery(LedgerQuery::HardForkQuery(HardForkQuery::GetCurrentEra)) => {
665 AnyCbor::from_encode(4)
666 }
667 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetEpochNo)) => {
668 AnyCbor::from_encode([8])
669 }
670 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetGenesisConfig)) => {
671 AnyCbor::from_encode(get_fake_genesis_config())
672 }
673 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetUTxOByAddress(_))) => {
674 AnyCbor::from_encode(get_fake_utxo_by_address())
675 }
676 Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetStakeSnapshots(_))) => {
677 AnyCbor::from_encode(get_fake_stake_snapshot())
678 }
679 _ => panic!("unexpected query from client: {query:?}"),
680 }
681 }
682
683 fn create_temp_dir(folder_name: &str) -> PathBuf {
685 TempDir::create_with_short_path("pallas_chain_observer_test", folder_name)
686 }
687
688 async fn setup_server(socket_path: PathBuf, intersections: u32) -> tokio::task::JoinHandle<()> {
694 tokio::spawn({
695 async move {
696 if socket_path.exists() {
697 fs::remove_file(&socket_path).expect("Previous socket removal failed");
698 }
699
700 let unix_listener = UnixListener::bind(socket_path.as_path()).unwrap();
701 let mut server = NodeServer::accept(&unix_listener, 10).await.unwrap();
702
703 server.statequery().recv_while_idle().await.unwrap();
704 server.statequery().send_acquired().await.unwrap();
705
706 for _ in 0..intersections {
707 let result = mock_server(&mut server).await;
708 server.statequery().send_result(result).await.unwrap();
709 }
710 }
711 })
712 }
713
714 #[tokio::test]
715 async fn get_current_epoch() {
716 let socket_path = create_temp_dir("get_current_epoch").join("node.socket");
717 let server = setup_server(socket_path.clone(), 2).await;
718 let client = tokio::spawn(async move {
719 let observer =
720 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
721 observer.get_current_epoch().await.unwrap().unwrap()
722 });
723
724 let (_, client_res) = tokio::join!(server, client);
725 let epoch = client_res.expect("Client failed");
726 assert_eq!(epoch, 8);
727 }
728
729 #[tokio::test]
730 async fn get_current_datums() {
731 let socket_path = create_temp_dir("get_current_datums").join("node.socket");
732 let server = setup_server(socket_path.clone(), 2).await;
733 let client = tokio::spawn(async move {
734 let observer =
735 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
736 let address =
737 "addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0".to_string();
738 observer.get_current_datums(&address).await.unwrap()
739 });
740
741 let (_, client_res) = tokio::join!(server, client);
742 let datums = client_res.expect("Client failed");
743 assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
744 }
745
746 #[tokio::test]
747 async fn get_current_stake_distribution() {
748 let socket_path = create_temp_dir("get_current_stake_distribution").join("node.socket");
749 let server = setup_server(socket_path.clone(), 2).await;
750 let client = tokio::spawn(async move {
751 let observer =
752 super::PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
753 observer.get_current_stake_distribution().await.unwrap()
754 });
755
756 let (_, client_res) = tokio::join!(server, client);
757 let computed_stake_distribution = client_res.unwrap().unwrap();
758
759 let mut expected_stake_distribution = StakeDistribution::new();
760 expected_stake_distribution.insert(
761 "pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
762 300000000001,
763 );
764 expected_stake_distribution.insert(
765 "pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
766 600000000001,
767 );
768 expected_stake_distribution.insert(
769 "pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
770 1200000000001,
771 );
772
773 assert_eq!(expected_stake_distribution, computed_stake_distribution);
774 }
775
776 #[tokio::test]
777 async fn get_current_kes_period() {
778 let socket_path = create_temp_dir("get_current_kes_period").join("node.socket");
779 let server = setup_server(socket_path.clone(), 3).await;
780 let client = tokio::spawn(async move {
781 let observer =
782 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
783
784 let keypair = ColdKeyGenerator::create_deterministic_keypair([0u8; 32]);
785 let mut dummy_key_buffer = [0u8; Sum6Kes::SIZE + 4];
786 let mut dummy_seed = [0u8; 32];
787 let (_, kes_verification_key) = Sum6Kes::keygen(&mut dummy_key_buffer, &mut dummy_seed);
788 let operational_certificate = OpCert::new(kes_verification_key, 0, 0, keypair);
789 observer
790 .get_current_kes_period(&operational_certificate)
791 .await
792 .unwrap()
793 });
794
795 let (_, client_res) = tokio::join!(server, client);
796 let kes_period = client_res.unwrap().unwrap();
797 assert_eq!(407, kes_period);
798 }
799
800 #[tokio::test]
801 async fn calculate_kes_period() {
802 let socket_path = create_temp_dir("calculate_kes_period").join("node.socket");
803 let observer = PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
804 let current_kes_period = observer
805 .calculate_kes_period(Point::Specific(53536042, vec![1, 2, 3]), 129600)
806 .await
807 .unwrap();
808
809 assert_eq!(413, current_kes_period);
810
811 let current_kes_period = observer
812 .calculate_kes_period(Point::Specific(53524800, vec![1, 2, 3]), 129600)
813 .await
814 .unwrap();
815
816 assert_eq!(413, current_kes_period);
817
818 let current_kes_period = observer
819 .calculate_kes_period(Point::Specific(53649999, vec![1, 2, 3]), 129600)
820 .await
821 .unwrap();
822
823 assert_eq!(413, current_kes_period);
824 }
825
826 #[tokio::test]
827 async fn get_chain_point() {
828 let socket_path = create_temp_dir("get_chain_point").join("node.socket");
829 let server = setup_server(socket_path.clone(), 1).await;
830 let client = tokio::spawn(async move {
831 let observer =
832 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
833 let mut client = observer.get_client().await.unwrap();
834 let statequery = client.statequery();
835 statequery.acquire(None).await.unwrap();
836 let chain_point = observer.do_get_chain_point_state_query(statequery).await.unwrap();
837 observer.post_process_statequery(&mut client).await.unwrap();
838 client.abort().await;
839 chain_point
840 });
841
842 let (_, client_res) = tokio::join!(server, client);
843 let chain_point = client_res.expect("Client failed");
844 assert_eq!(chain_point, Point::Specific(52851885, vec![1, 2, 3]));
845 }
846
847 #[tokio::test]
848 async fn get_genesis_config() {
849 let socket_path = create_temp_dir("get_genesis_config").join("node.socket");
850 let server = setup_server(socket_path.clone(), 2).await;
851 let client = tokio::spawn(async move {
852 let observer =
853 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
854 let mut client = observer.get_client().await.unwrap();
855 let statequery = client.statequery();
856 statequery.acquire(None).await.unwrap();
857 let genesis_config =
858 observer.do_get_genesis_config_state_query(statequery).await.unwrap();
859 observer.post_process_statequery(&mut client).await.unwrap();
860 client.abort().await;
861 genesis_config
862 });
863
864 let (_, client_res) = tokio::join!(server, client);
865 let genesis_config = client_res.expect("Client failed");
866 assert_eq!(genesis_config, get_fake_genesis_config());
867 }
868
869 #[tokio::test]
870 async fn fetch_current_era_from_state_query() {
871 let socket_path = create_temp_dir("get_current_era").join("node.socket");
872 let server = setup_server(socket_path.clone(), 1).await;
873 let client = tokio::spawn(async move {
874 let observer =
875 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
876 let mut client = observer.get_client().await.unwrap();
877 let statequery = client.statequery();
878 statequery.acquire(None).await.unwrap();
879 let era = observer.do_get_current_era_state_query(statequery).await.unwrap();
880 observer.post_process_statequery(&mut client).await.unwrap();
881 client.abort().await;
882 era
883 });
884
885 let (_, client_res) = tokio::join!(server, client);
886 let era = client_res.expect("Client failed");
887 assert_eq!(era, 4);
888 }
889
890 #[tokio::test]
891 async fn get_current_era() {
892 let socket_path = create_temp_dir("get_current_era_as_string").join("node.socket");
893 let server = setup_server(socket_path.clone(), 1).await;
894 let client = tokio::spawn(async move {
895 let observer =
896 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
897 observer.get_current_era().await.unwrap().unwrap()
898 });
899
900 let (_, client_res) = tokio::join!(server, client);
901 let era = client_res.expect("Client failed");
902
903 let expected_era = Era::try_from(4 + ERA_OFFSET).unwrap().to_string();
904 assert_eq!(era, expected_era);
905 }
906
907 #[tokio::test]
908 async fn get_current_chain_point() {
909 let socket_path = create_temp_dir("get_current_chain_point").join("node.socket");
910 let server = setup_server(socket_path.clone(), 2).await;
911 let client = tokio::spawn(async move {
912 let observer =
913 PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
914 observer.get_current_chain_point().await.unwrap()
915 });
916
917 let (_, client_res) = tokio::join!(server, client);
918 let chain_point = client_res.expect("Client failed");
919 assert_eq!(
920 chain_point,
921 Some(ChainPoint {
922 slot_number: SlotNumber(52851885),
923 block_hash: "010203".to_string(),
924 block_number: BlockNumber(52851885)
925 })
926 );
927 }
928}