mithril_cardano_node_chain/chain_observer/
pallas_observer.rs

1use std::collections::BTreeSet;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, anyhow};
5use async_trait::async_trait;
6use pallas_addresses::Address;
7use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
8use pallas_network::{
9    facades::NodeClient,
10    miniprotocols::{
11        Point,
12        localstate::{
13            Client,
14            queries_v16::{
15                self, Addr, Addrs, ChainBlockNumber, GenesisConfig, PostAlonsoTransactionOutput,
16                StakeSnapshot, Stakes, TransactionOutput, UTxOByAddress,
17            },
18        },
19    },
20};
21use pallas_primitives::ToCanonicalJson;
22use pallas_traverse::Era;
23
24use mithril_common::crypto_helper::{KesPeriod, encode_bech32};
25use mithril_common::entities::{BlockNumber, ChainPoint, Epoch, SlotNumber, StakeDistribution};
26use mithril_common::{CardanoNetwork, StdResult};
27
28use crate::entities::{ChainAddress, Datum, Datums, TxDatum, try_inspect};
29
30use super::{ChainObserver, ChainObserverError};
31
32// The era value returned from the queries_v16::get_current_era has an offset of -1 with the era value of the pallas_traverse::Era due to Cardano node implementation.
33// It needs to be compensated to get the correct era display name.
34const ERA_OFFSET: u16 = 1;
35
36/// A runner that uses Pallas library to interact with a Cardano node using N2C Ouroboros mini-protocols
37pub struct PallasChainObserver {
38    socket: PathBuf,
39    network: CardanoNetwork,
40}
41
42impl From<anyhow::Error> for ChainObserverError {
43    fn from(err: anyhow::Error) -> Self {
44        ChainObserverError::General(err)
45    }
46}
47
48impl PallasChainObserver {
49    /// Creates a new PallasObserver
50    pub fn new(socket: &Path, network: CardanoNetwork) -> Self {
51        Self {
52            socket: socket.to_owned(),
53            network,
54        }
55    }
56
57    /// Creates and returns a new `NodeClient` connected to the specified socket.
58    async fn new_client(&self) -> StdResult<NodeClient> {
59        let magic = self.network.magic_id();
60        let client = NodeClient::connect(&self.socket, magic).await?;
61
62        Ok(client)
63    }
64
65    /// Creates and returns a new `NodeClient`, handling any potential errors.
66    async fn get_client(&self) -> StdResult<NodeClient> {
67        self.new_client()
68            .await
69            .with_context(|| "PallasChainObserver failed to create new client")
70    }
71
72    /// Fetches the current era using the provided `statequery` client.
73    async fn get_era(&self, statequery: &mut Client) -> StdResult<u16> {
74        statequery
75            .acquire(None)
76            .await
77            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
78
79        let era = queries_v16::get_current_era(statequery)
80            .await
81            .with_context(|| "PallasChainObserver failed to get current era")?;
82
83        Ok(era)
84    }
85
86    /// Fetches the current epoch number using the provided `statequery` client.
87    async fn get_epoch(&self, statequery: &mut Client) -> StdResult<u32> {
88        statequery
89            .acquire(None)
90            .await
91            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
92
93        let era = queries_v16::get_current_era(statequery)
94            .await
95            .with_context(|| "PallasChainObserver failed to get current era")?;
96
97        let epoch = queries_v16::get_block_epoch_number(statequery, era)
98            .await
99            .with_context(|| "PallasChainObserver failed to get block epoch number")?;
100
101        Ok(epoch)
102    }
103
104    /// Returns inline datum tag from the given `Values` instance.
105    fn get_datum_tag(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TagWrap<Bytes, 24>> {
106        Ok(utxo
107            .inline_datum
108            .as_ref()
109            .with_context(|| "PallasChainObserver failed to get inline datum")?
110            .1
111            .clone())
112    }
113
114    /// Returns inline datums from the given `Values` instance.
115    fn inspect_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<Datum> {
116        let datum = self.get_datum_tag(utxo)?;
117        let datum = CborWrap(datum).to_vec();
118
119        try_inspect::<Datum>(datum)
120    }
121
122    /// Serializes datum to `TxDatum` instance.
123    fn serialize_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TxDatum> {
124        let datum = self.inspect_datum(utxo)?;
125        let serialized = serde_json::to_string(&datum.to_json())
126            .with_context(|| "PallasChainObserver failed to serialize datum")?;
127
128        Ok(TxDatum(serialized))
129    }
130
131    /// Maps the given `UTxOByAddress` instance to Datums.
132    fn map_datums(&self, transaction: UTxOByAddress) -> StdResult<Datums> {
133        transaction
134            .utxo
135            .iter()
136            .filter_map(|(_, utxo)| match utxo {
137                TransactionOutput::Current(output) => {
138                    output.inline_datum.as_ref().map(|_| self.serialize_datum(output))
139                }
140                _ => None,
141            })
142            .collect::<StdResult<Datums>>()
143    }
144
145    /// Returns a vector of `TxDatum` instances.
146    async fn get_utxo_datums(
147        &self,
148        client: &mut NodeClient,
149        address: &ChainAddress,
150    ) -> Result<Datums, ChainObserverError> {
151        let statequery = client.statequery();
152        let utxo = self.get_utxo_by_address(statequery, address).await?;
153
154        Ok(self.map_datums(utxo)?)
155    }
156
157    /// Fetches the current UTxO by address using the provided `statequery` client.
158    async fn get_utxo_by_address(
159        &self,
160        statequery: &mut Client,
161        address: &ChainAddress,
162    ) -> StdResult<UTxOByAddress> {
163        statequery
164            .acquire(None)
165            .await
166            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
167
168        let era = queries_v16::get_current_era(statequery)
169            .await
170            .with_context(|| "PallasChainObserver failed to get current era")?;
171
172        let addr: Address = Address::from_bech32(address)
173            .with_context(|| "PallasChainObserver failed to parse address")?;
174
175        let addr: Addr = addr.to_vec().into();
176        let addrs: Addrs = vec![addr];
177        let utxo = queries_v16::get_utxo_by_address(statequery, era, addrs)
178            .await
179            .with_context(|| "PallasChainObserver failed to get utxo")?;
180
181        Ok(utxo)
182    }
183
184    /// Fetches the current stake distribution using the provided `statequery` client.
185    async fn do_stake_snapshots_state_query(
186        &self,
187        statequery: &mut Client,
188    ) -> StdResult<StakeSnapshot> {
189        statequery
190            .acquire(None)
191            .await
192            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
193
194        let era = queries_v16::get_current_era(statequery)
195            .await
196            .with_context(|| "PallasChainObserver failed to get current era")?;
197
198        let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
199            .await
200            .with_context(|| "PallasChainObserver failed to get stake snapshot")?;
201
202        Ok(state_snapshot)
203    }
204
205    /// Returns the stake pool hash from the given bytestring.
206    fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
207        let pool_id_bech32 = encode_bech32("pool", key)
208            .with_context(|| "PallasChainObserver failed to encode stake pool hash")?;
209        Ok(pool_id_bech32)
210    }
211
212    /// Fetches the current stake distribution using the provided `statequery` client.
213    async fn get_stake_distribution(
214        &self,
215        client: &mut NodeClient,
216    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
217        let statequery = client.statequery();
218
219        let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;
220
221        let mut stake_distribution = StakeDistribution::new();
222
223        let have_stakes_in_two_epochs = |stakes: &Stakes| stakes.snapshot_mark_pool > 0;
224        for (key, stakes) in stake_snapshot
225            .snapshots
226            .stake_snapshots
227            .iter()
228            .filter(|(_, stakes)| have_stakes_in_two_epochs(stakes))
229        {
230            let pool_hash = self.get_stake_pool_hash(key)?;
231            stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
232        }
233
234        Ok(Some(stake_distribution))
235    }
236
237    /// # Calculate Current KES Period
238    ///
239    /// It calculates the current Key Evolving Signature (KES) period
240    /// based on the provided `chain_point` and `slots_per_kes_period`.
241    ///
242    /// The calculation formula is represented as:
243    ///
244    /// `current_kes_period = ⌊current_slot_number/slots_per_kes_period⌋`
245    ///
246    /// where:
247    /// - `current_slot_number` represents the current slot number given by the `point` on the chain.
248    /// - `slots_per_kes_period` represents the number of slots in a KES period.
249    /// - `⌊x⌋` is the floor function which rounds the greatest integer less than or equal to `x`.
250    ///
251    /// ## Example:
252    ///
253    /// let (chain_point, slots_per_kes_period) = (Point::new(1), 10);
254    /// match calculate_kes_period(&self, chain_point, slots_per_kes_period) {
255    ///     Ok(kes_period) => println!("Current KES Period: {}", kes_period),
256    ///     Err(e) => println!("Error occurred: {}", e),
257    /// }
258    async fn calculate_kes_period(
259        &self,
260        chain_point: Point,
261        slots_per_kes_period: u64,
262    ) -> Result<KesPeriod, ChainObserverError> {
263        if slots_per_kes_period == 0 {
264            return Err(anyhow!("slots_per_kes_period must be greater than 0"))
265                .with_context(|| "PallasChainObserver failed to calculate kes period")?;
266        }
267
268        let current_kes_period = chain_point.slot_or_default() / slots_per_kes_period;
269
270        Ok(KesPeriod(current_kes_period))
271    }
272
273    /// Fetches the current chain point using the provided `statequery` client.
274    async fn do_get_chain_point_state_query(&self, statequery: &mut Client) -> StdResult<Point> {
275        let chain_point = queries_v16::get_chain_point(statequery)
276            .await
277            .with_context(|| "PallasChainObserver failed to get chain point")?;
278
279        Ok(chain_point)
280    }
281
282    /// Fetches the current chain point using the provided `NodeClient`.
283    async fn do_get_chain_block_no(&self, statequery: &mut Client) -> StdResult<ChainBlockNumber> {
284        let chain_block_number = queries_v16::get_chain_block_no(statequery)
285            .await
286            .with_context(|| "PallasChainObserver failed to get chain block number")?;
287
288        Ok(chain_block_number)
289    }
290
291    /// Fetches the current era using the provided `statequery` client.
292    async fn do_get_current_era_state_query(&self, statequery: &mut Client) -> StdResult<u16> {
293        let era = queries_v16::get_current_era(statequery)
294            .await
295            .with_context(|| "PallasChainObserver failed to get current era")?;
296
297        Ok(era)
298    }
299
300    /// Fetches the current genesis config using the provided `statequery` client.
301    async fn do_get_genesis_config_state_query(
302        &self,
303        statequery: &mut Client,
304    ) -> StdResult<Vec<GenesisConfig>> {
305        let era = self.do_get_current_era_state_query(statequery).await?;
306        let genesis_config = queries_v16::get_genesis_config(statequery, era)
307            .await
308            .with_context(|| "PallasChainObserver failed to get genesis config")?;
309
310        Ok(genesis_config)
311    }
312
313    /// Fetches the current chain point using the provided `NodeClient`.
314    async fn get_chain_point(&self, statequery: &mut Client) -> StdResult<ChainPoint> {
315        statequery
316            .acquire(None)
317            .await
318            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
319
320        let chain_point = self.do_get_chain_point_state_query(statequery).await?;
321
322        let header_hash = match chain_point {
323            Point::Origin => None,
324            Point::Specific(_at_slot, ref hash) => Some(hex::encode(hash)),
325        };
326
327        let chain_block_number = self.do_get_chain_block_no(statequery).await?;
328
329        Ok(ChainPoint {
330            slot_number: SlotNumber(chain_point.slot_or_default()),
331            block_hash: header_hash.unwrap_or_default(),
332            block_number: BlockNumber(chain_block_number.block_number as u64),
333        })
334    }
335
336    /// Fetches chain point and genesis config through the local statequery.
337    /// The KES period is calculated afterwards.
338    async fn get_kes_period(
339        &self,
340        client: &mut NodeClient,
341    ) -> Result<Option<KesPeriod>, ChainObserverError> {
342        let statequery = client.statequery();
343
344        statequery
345            .acquire(None)
346            .await
347            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
348
349        let chain_point = self.do_get_chain_point_state_query(statequery).await?;
350
351        let genesis_config = self.do_get_genesis_config_state_query(statequery).await?;
352
353        let config = genesis_config
354            .first()
355            .with_context(|| "PallasChainObserver failed to extract the config")?;
356
357        let current_kes_period = self
358            .calculate_kes_period(chain_point, config.slots_per_kes_period as u64)
359            .await?;
360
361        Ok(Some(current_kes_period))
362    }
363
364    /// Processes a state query with the `NodeClient`, releasing the state query.
365    async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
366        let statequery = client.statequery();
367        statequery
368            .send_release()
369            .await
370            .with_context(|| "PallasChainObserver send release failed")?;
371
372        statequery
373            .send_done()
374            .await
375            .with_context(|| "PallasChainObserver send done failed")?;
376
377        Ok(())
378    }
379
380    /// Synchronizes the `NodeClient` with the cardano server using `chainsync`.
381    async fn sync(&self, client: &mut NodeClient) -> StdResult<()> {
382        client
383            .chainsync()
384            .send_done()
385            .await
386            .with_context(|| "PallasChainObserver chainsync send done failed")?;
387        Ok(())
388    }
389
390    /// Post-processes a state query afterwards.
391    async fn post_process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
392        self.process_statequery(client).await?;
393        self.sync(client).await?;
394
395        Ok(())
396    }
397}
398
399#[async_trait]
400impl ChainObserver for PallasChainObserver {
401    async fn get_current_era(&self) -> Result<Option<String>, ChainObserverError> {
402        let mut client = self.get_client().await?;
403
404        let era = self.get_era(client.statequery()).await?;
405
406        let era = Era::try_from(era + ERA_OFFSET)
407            .with_context(|| "PallasChainObserver failed to convert: '{era}' to Era")?;
408
409        self.post_process_statequery(&mut client).await?;
410
411        client.abort().await;
412
413        Ok(Some(era.to_string()))
414    }
415
416    async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
417        let mut client = self.get_client().await?;
418
419        let epoch = self.get_epoch(client.statequery()).await?;
420
421        self.post_process_statequery(&mut client).await?;
422
423        client.abort().await;
424
425        Ok(Some(Epoch(epoch as u64)))
426    }
427
428    async fn get_current_chain_point(&self) -> Result<Option<ChainPoint>, ChainObserverError> {
429        let mut client = self.get_client().await?;
430
431        let chain_point = self.get_chain_point(client.statequery()).await?;
432
433        self.post_process_statequery(&mut client).await?;
434
435        client.abort().await;
436
437        Ok(Some(chain_point))
438    }
439
440    async fn get_current_datums(
441        &self,
442        address: &ChainAddress,
443    ) -> Result<Datums, ChainObserverError> {
444        let mut client = self.get_client().await?;
445
446        let datums = self.get_utxo_datums(&mut client, address).await?;
447
448        self.post_process_statequery(&mut client).await?;
449
450        client.abort().await;
451
452        Ok(datums)
453    }
454
455    async fn get_current_stake_distribution(
456        &self,
457    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
458        let mut client = self.get_client().await?;
459
460        let stake_distribution = self.get_stake_distribution(&mut client).await?;
461
462        self.post_process_statequery(&mut client).await?;
463
464        client.abort().await;
465
466        Ok(stake_distribution)
467    }
468
469    async fn get_current_kes_period(&self) -> Result<Option<KesPeriod>, ChainObserverError> {
470        let mut client = self.get_client().await?;
471
472        let current_kes_period = self.get_kes_period(&mut client).await?;
473
474        self.post_process_statequery(&mut client).await?;
475
476        client.abort().await;
477
478        Ok(current_kes_period)
479    }
480}
481
482// Windows does not support Unix sockets, nor pallas_network::facades::NodeServer
483#[cfg(all(test, unix))]
484mod tests {
485    use std::fs;
486
487    use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
488    use pallas_crypto::hash::Hash;
489    use pallas_network::facades::NodeServer;
490    use pallas_network::miniprotocols::{
491        Point,
492        localstate::{
493            ClientQueryRequest,
494            queries_v16::{
495                BlockQuery, ChainBlockNumber, Fraction, GenesisConfig, HardForkQuery, LedgerQuery,
496                Request, Snapshots, StakeSnapshot, SystemStart, Value,
497            },
498        },
499    };
500    use tokio::net::UnixListener;
501
502    use mithril_common::test::TempDir;
503
504    use super::*;
505
506    fn get_fake_utxo_by_address() -> UTxOByAddress {
507        let tx_hex = "1e4e5cf2889d52f1745b941090f04a65dea6ce56c5e5e66e69f65c8e36347c17";
508        let tx_bytes: [u8; 32] = hex::decode(tx_hex).unwrap().try_into().unwrap();
509        let transaction_id = Hash::from(tx_bytes);
510        let index = AnyUInt::MajorByte(2);
511        let lovelace = AnyUInt::MajorByte(2);
512        let hex_datum = "D8799F58407B226D61726B657273223A5B7B226E616D65223A227468616C6573222C2265706F6368223A307D5D2C227369676E6174757265223A22383566323265626261645840333335376338656132646630363230393766396131383064643335643966336261316432363832633732633864313232383866616438636238643063656565625838366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227DFF";
513        let datum = hex::decode(hex_datum).unwrap().into();
514        let tag = TagWrap::<_, 24>::new(datum);
515        let inline_datum = Some((1_u16, tag));
516
517        let address: Address =
518            Address::from_bech32("addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0")
519                .unwrap();
520        let address: Addr = address.to_vec().into();
521        let values = TransactionOutput::Current(PostAlonsoTransactionOutput {
522            address,
523            amount: Value::Coin(lovelace),
524            inline_datum,
525            script_ref: None,
526        });
527        let utxo = KeyValuePairs::from(vec![(
528            queries_v16::UTxO {
529                transaction_id,
530                index,
531            },
532            values,
533        )]);
534
535        UTxOByAddress { utxo }
536    }
537
538    fn get_fake_stake_snapshot() -> StakeSnapshot {
539        let stake_snapshots = KeyValuePairs::from(vec![
540            (
541                Bytes::from(
542                    hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
543                        .unwrap(),
544                ),
545                Stakes {
546                    snapshot_mark_pool: 300000000001,
547                    snapshot_set_pool: 300000000002,
548                    snapshot_go_pool: 300000000000,
549                },
550            ),
551            (
552                Bytes::from(
553                    hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
554                        .unwrap(),
555                ),
556                Stakes {
557                    snapshot_mark_pool: 600000000001,
558                    snapshot_set_pool: 600000000002,
559                    snapshot_go_pool: 600000000000,
560                },
561            ),
562            (
563                Bytes::from(
564                    hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
565                        .unwrap(),
566                ),
567                Stakes {
568                    snapshot_mark_pool: 1200000000001,
569                    snapshot_set_pool: 1200000000002,
570                    snapshot_go_pool: 1200000000000,
571                },
572            ),
573            (
574                Bytes::from(
575                    hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
576                        .unwrap(),
577                ),
578                Stakes {
579                    snapshot_mark_pool: 0,
580                    snapshot_set_pool: 1300000000002,
581                    snapshot_go_pool: 0,
582                },
583            ),
584        ]);
585
586        StakeSnapshot {
587            snapshots: Snapshots {
588                stake_snapshots,
589                snapshot_stake_mark_total: 2100000000003,
590                snapshot_stake_set_total: 2100000000006,
591                snapshot_stake_go_total: 2100000000000,
592            },
593        }
594    }
595
596    fn get_fake_genesis_config() -> Vec<GenesisConfig> {
597        let genesis = GenesisConfig {
598            system_start: SystemStart {
599                year: 2021,
600                day_of_year: 150,
601                picoseconds_of_day: 0,
602            },
603            network_magic: 42,
604            network_id: 42,
605            active_slots_coefficient: Fraction { num: 6, den: 10 },
606            security_param: 2160,
607            epoch_length: 432000,
608            slots_per_kes_period: 129600,
609            max_kes_evolutions: 62,
610            slot_length: 1,
611            update_quorum: 5,
612            max_lovelace_supply: AnyUInt::MajorByte(2),
613        };
614
615        vec![genesis]
616    }
617
618    /// pallas responses mock server.
619    async fn mock_server(server: &mut NodeServer) -> AnyCbor {
620        let query: queries_v16::Request =
621            match server.statequery().recv_while_acquired().await.unwrap() {
622                ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
623                x => panic!("unexpected message from client: {x:?}"),
624            };
625
626        match query {
627            Request::GetChainPoint => {
628                AnyCbor::from_encode(Point::Specific(52851885, vec![1, 2, 3]))
629            }
630            Request::GetChainBlockNo => AnyCbor::from_encode(ChainBlockNumber {
631                slot_timeline: 1,
632                block_number: 52851885,
633            }),
634            Request::LedgerQuery(LedgerQuery::HardForkQuery(HardForkQuery::GetCurrentEra)) => {
635                AnyCbor::from_encode(4)
636            }
637            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetEpochNo)) => {
638                AnyCbor::from_encode([8])
639            }
640            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetGenesisConfig)) => {
641                AnyCbor::from_encode(get_fake_genesis_config())
642            }
643            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetUTxOByAddress(_))) => {
644                AnyCbor::from_encode(get_fake_utxo_by_address())
645            }
646            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetStakeSnapshots(_))) => {
647                AnyCbor::from_encode(get_fake_stake_snapshot())
648            }
649            _ => panic!("unexpected query from client: {query:?}"),
650        }
651    }
652
653    /// Creates a new work directory in the system's temporary folder.
654    fn create_temp_dir(folder_name: &str) -> PathBuf {
655        TempDir::create_with_short_path("pallas_chain_observer_test", folder_name)
656    }
657
658    /// Sets up a mock server for related tests.
659    ///
660    /// Use the `intersections` parameter to define exactly how many
661    /// local state queries should be intercepted by the `mock_server`
662    /// and avoid any panic errors.
663    async fn setup_server(socket_path: PathBuf, intersections: u32) -> tokio::task::JoinHandle<()> {
664        tokio::spawn({
665            async move {
666                if socket_path.exists() {
667                    fs::remove_file(&socket_path).expect("Previous socket removal failed");
668                }
669
670                let unix_listener = UnixListener::bind(socket_path.as_path()).unwrap();
671                let mut server = NodeServer::accept(&unix_listener, 10).await.unwrap();
672
673                server.statequery().recv_while_idle().await.unwrap();
674                server.statequery().send_acquired().await.unwrap();
675
676                for _ in 0..intersections {
677                    let result = mock_server(&mut server).await;
678                    server.statequery().send_result(result).await.unwrap();
679                }
680            }
681        })
682    }
683
684    #[tokio::test]
685    async fn get_current_epoch() {
686        let socket_path = create_temp_dir("get_current_epoch").join("node.socket");
687        let server = setup_server(socket_path.clone(), 2).await;
688        let client = tokio::spawn(async move {
689            let observer =
690                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
691            observer.get_current_epoch().await.unwrap().unwrap()
692        });
693
694        let (_, client_res) = tokio::join!(server, client);
695        let epoch = client_res.expect("Client failed");
696        assert_eq!(epoch, 8);
697    }
698
699    #[tokio::test]
700    async fn get_current_datums() {
701        let socket_path = create_temp_dir("get_current_datums").join("node.socket");
702        let server = setup_server(socket_path.clone(), 2).await;
703        let client = tokio::spawn(async move {
704            let observer =
705                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
706            let address =
707                "addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0".to_string();
708            observer.get_current_datums(&address).await.unwrap()
709        });
710
711        let (_, client_res) = tokio::join!(server, client);
712        let datums = client_res.expect("Client failed");
713        assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
714    }
715
716    #[tokio::test]
717    async fn get_current_stake_distribution() {
718        let socket_path = create_temp_dir("get_current_stake_distribution").join("node.socket");
719        let server = setup_server(socket_path.clone(), 2).await;
720        let client = tokio::spawn(async move {
721            let observer =
722                super::PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
723            observer.get_current_stake_distribution().await.unwrap()
724        });
725
726        let (_, client_res) = tokio::join!(server, client);
727        let computed_stake_distribution = client_res.unwrap().unwrap();
728
729        let mut expected_stake_distribution = StakeDistribution::new();
730        expected_stake_distribution.insert(
731            "pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
732            300000000001,
733        );
734        expected_stake_distribution.insert(
735            "pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
736            600000000001,
737        );
738        expected_stake_distribution.insert(
739            "pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
740            1200000000001,
741        );
742
743        assert_eq!(expected_stake_distribution, computed_stake_distribution);
744    }
745
746    #[tokio::test]
747    async fn get_current_kes_period() {
748        let socket_path = create_temp_dir("get_current_kes_period").join("node.socket");
749        let server = setup_server(socket_path.clone(), 3).await;
750        let client = tokio::spawn(async move {
751            let observer =
752                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
753
754            observer.get_current_kes_period().await.unwrap()
755        });
756
757        let (_, client_res) = tokio::join!(server, client);
758        let kes_period = client_res.unwrap().unwrap();
759        assert_eq!(407, kes_period);
760    }
761
762    #[tokio::test]
763    async fn calculate_kes_period() {
764        let socket_path = create_temp_dir("calculate_kes_period").join("node.socket");
765        let observer = PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
766        let current_kes_period = observer
767            .calculate_kes_period(Point::Specific(53536042, vec![1, 2, 3]), 129600)
768            .await
769            .unwrap();
770
771        assert_eq!(413, current_kes_period);
772
773        let current_kes_period = observer
774            .calculate_kes_period(Point::Specific(53524800, vec![1, 2, 3]), 129600)
775            .await
776            .unwrap();
777
778        assert_eq!(413, current_kes_period);
779
780        let current_kes_period = observer
781            .calculate_kes_period(Point::Specific(53649999, vec![1, 2, 3]), 129600)
782            .await
783            .unwrap();
784
785        assert_eq!(413, current_kes_period);
786    }
787
788    #[tokio::test]
789    async fn get_chain_point() {
790        let socket_path = create_temp_dir("get_chain_point").join("node.socket");
791        let server = setup_server(socket_path.clone(), 1).await;
792        let client = tokio::spawn(async move {
793            let observer =
794                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
795            let mut client = observer.get_client().await.unwrap();
796            let statequery = client.statequery();
797            statequery.acquire(None).await.unwrap();
798            let chain_point = observer.do_get_chain_point_state_query(statequery).await.unwrap();
799            observer.post_process_statequery(&mut client).await.unwrap();
800            client.abort().await;
801            chain_point
802        });
803
804        let (_, client_res) = tokio::join!(server, client);
805        let chain_point = client_res.expect("Client failed");
806        assert_eq!(chain_point, Point::Specific(52851885, vec![1, 2, 3]));
807    }
808
809    #[tokio::test]
810    async fn get_genesis_config() {
811        let socket_path = create_temp_dir("get_genesis_config").join("node.socket");
812        let server = setup_server(socket_path.clone(), 2).await;
813        let client = tokio::spawn(async move {
814            let observer =
815                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
816            let mut client = observer.get_client().await.unwrap();
817            let statequery = client.statequery();
818            statequery.acquire(None).await.unwrap();
819            let genesis_config =
820                observer.do_get_genesis_config_state_query(statequery).await.unwrap();
821            observer.post_process_statequery(&mut client).await.unwrap();
822            client.abort().await;
823            genesis_config
824        });
825
826        let (_, client_res) = tokio::join!(server, client);
827        let genesis_config = client_res.expect("Client failed");
828        assert_eq!(genesis_config, get_fake_genesis_config());
829    }
830
831    #[tokio::test]
832    async fn fetch_current_era_from_state_query() {
833        let socket_path = create_temp_dir("get_current_era").join("node.socket");
834        let server = setup_server(socket_path.clone(), 1).await;
835        let client = tokio::spawn(async move {
836            let observer =
837                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
838            let mut client = observer.get_client().await.unwrap();
839            let statequery = client.statequery();
840            statequery.acquire(None).await.unwrap();
841            let era = observer.do_get_current_era_state_query(statequery).await.unwrap();
842            observer.post_process_statequery(&mut client).await.unwrap();
843            client.abort().await;
844            era
845        });
846
847        let (_, client_res) = tokio::join!(server, client);
848        let era = client_res.expect("Client failed");
849        assert_eq!(era, 4);
850    }
851
852    #[tokio::test]
853    async fn get_current_era() {
854        let socket_path = create_temp_dir("get_current_era_as_string").join("node.socket");
855        let server = setup_server(socket_path.clone(), 1).await;
856        let client = tokio::spawn(async move {
857            let observer =
858                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
859            observer.get_current_era().await.unwrap().unwrap()
860        });
861
862        let (_, client_res) = tokio::join!(server, client);
863        let era = client_res.expect("Client failed");
864
865        let expected_era = Era::try_from(4 + ERA_OFFSET).unwrap().to_string();
866        assert_eq!(era, expected_era);
867    }
868
869    #[tokio::test]
870    async fn get_current_chain_point() {
871        let socket_path = create_temp_dir("get_current_chain_point").join("node.socket");
872        let server = setup_server(socket_path.clone(), 2).await;
873        let client = tokio::spawn(async move {
874            let observer =
875                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
876            observer.get_current_chain_point().await.unwrap()
877        });
878
879        let (_, client_res) = tokio::join!(server, client);
880        let chain_point = client_res.expect("Client failed");
881        assert_eq!(
882            chain_point,
883            Some(ChainPoint {
884                slot_number: SlotNumber(52851885),
885                block_hash: "010203".to_string(),
886                block_number: BlockNumber(52851885)
887            })
888        );
889    }
890}