mithril_cardano_node_chain/chain_observer/
pallas_observer.rs

1use std::collections::BTreeSet;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, anyhow};
5use async_trait::async_trait;
6use pallas_addresses::Address;
7use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
8use pallas_network::{
9    facades::NodeClient,
10    miniprotocols::{
11        Point,
12        localstate::{
13            Client,
14            queries_v16::{
15                self, Addr, Addrs, ChainBlockNumber, GenesisConfig, PostAlonsoTransactionOutput,
16                StakeSnapshot, Stakes, TransactionOutput, UTxOByAddress,
17            },
18        },
19    },
20};
21use pallas_primitives::ToCanonicalJson;
22use pallas_traverse::Era;
23
24use mithril_common::crypto_helper::{KesPeriod, encode_bech32};
25use mithril_common::entities::{BlockNumber, ChainPoint, Epoch, SlotNumber, StakeDistribution};
26use mithril_common::{CardanoNetwork, StdResult};
27
28use crate::entities::{ChainAddress, Datum, Datums, TxDatum, try_inspect};
29
30use super::{ChainObserver, ChainObserverError};
31
32// The era value returned from the queries_v16::get_current_era has an offset of -1 with the era value of the pallas_traverse::Era due to Cardano node implementation.
33// It needs to be compensated to get the correct era display name.
34const ERA_OFFSET: u16 = 1;
35
36/// A runner that uses Pallas library to interact with a Cardano node using N2C Ouroboros mini-protocols
37pub struct PallasChainObserver {
38    socket: PathBuf,
39    network: CardanoNetwork,
40}
41
42impl From<anyhow::Error> for ChainObserverError {
43    fn from(err: anyhow::Error) -> Self {
44        ChainObserverError::General(err)
45    }
46}
47
48impl PallasChainObserver {
49    /// Creates a new PallasObserver
50    pub fn new(socket: &Path, network: CardanoNetwork) -> Self {
51        Self {
52            socket: socket.to_owned(),
53            network,
54        }
55    }
56
57    /// Creates and returns a new `NodeClient` connected to the specified socket.
58    async fn new_client(&self) -> StdResult<NodeClient> {
59        let magic = self.network.magic_id();
60        let client = NodeClient::connect(&self.socket, magic).await?;
61
62        Ok(client)
63    }
64
65    /// Creates and returns a new `NodeClient`, handling any potential errors.
66    async fn get_client(&self) -> StdResult<NodeClient> {
67        self.new_client()
68            .await
69            .map_err(|err| anyhow!(err))
70            .with_context(|| "PallasChainObserver failed to create new client")
71    }
72
73    /// Fetches the current era using the provided `statequery` client.
74    async fn get_era(&self, statequery: &mut Client) -> StdResult<u16> {
75        statequery
76            .acquire(None)
77            .await
78            .map_err(|err| anyhow!(err))
79            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
80
81        let era = queries_v16::get_current_era(statequery)
82            .await
83            .map_err(|err| anyhow!(err))
84            .with_context(|| "PallasChainObserver failed to get current era")?;
85
86        Ok(era)
87    }
88
89    /// Fetches the current epoch number using the provided `statequery` client.
90    async fn get_epoch(&self, statequery: &mut Client) -> StdResult<u32> {
91        statequery
92            .acquire(None)
93            .await
94            .map_err(|err| anyhow!(err))
95            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
96
97        let era = queries_v16::get_current_era(statequery)
98            .await
99            .map_err(|err| anyhow!(err))
100            .with_context(|| "PallasChainObserver failed to get current era")?;
101
102        let epoch = queries_v16::get_block_epoch_number(statequery, era)
103            .await
104            .map_err(|err| anyhow!(err))
105            .with_context(|| "PallasChainObserver failed to get block epoch number")?;
106
107        Ok(epoch)
108    }
109
110    /// Returns inline datum tag from the given `Values` instance.
111    fn get_datum_tag(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TagWrap<Bytes, 24>> {
112        Ok(utxo
113            .inline_datum
114            .as_ref()
115            .with_context(|| "PallasChainObserver failed to get inline datum")?
116            .1
117            .clone())
118    }
119
120    /// Returns inline datums from the given `Values` instance.
121    fn inspect_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<Datum> {
122        let datum = self.get_datum_tag(utxo)?;
123        let datum = CborWrap(datum).to_vec();
124
125        try_inspect::<Datum>(datum)
126    }
127
128    /// Serializes datum to `TxDatum` instance.
129    fn serialize_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TxDatum> {
130        let datum = self.inspect_datum(utxo)?;
131        let serialized = serde_json::to_string(&datum.to_json())
132            .map_err(|err| anyhow!(err))
133            .with_context(|| "PallasChainObserver failed to serialize datum")?;
134
135        Ok(TxDatum(serialized))
136    }
137
138    /// Maps the given `UTxOByAddress` instance to Datums.
139    fn map_datums(&self, transaction: UTxOByAddress) -> StdResult<Datums> {
140        transaction
141            .utxo
142            .iter()
143            .filter_map(|(_, utxo)| match utxo {
144                TransactionOutput::Current(output) => {
145                    output.inline_datum.as_ref().map(|_| self.serialize_datum(output))
146                }
147                _ => None,
148            })
149            .collect::<StdResult<Datums>>()
150    }
151
152    /// Returns a vector of `TxDatum` instances.
153    async fn get_utxo_datums(
154        &self,
155        client: &mut NodeClient,
156        address: &ChainAddress,
157    ) -> Result<Datums, ChainObserverError> {
158        let statequery = client.statequery();
159        let utxo = self.get_utxo_by_address(statequery, address).await?;
160
161        Ok(self.map_datums(utxo)?)
162    }
163
164    /// Fetches the current UTxO by address using the provided `statequery` client.
165    async fn get_utxo_by_address(
166        &self,
167        statequery: &mut Client,
168        address: &ChainAddress,
169    ) -> StdResult<UTxOByAddress> {
170        statequery
171            .acquire(None)
172            .await
173            .map_err(|err| anyhow!(err))
174            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
175
176        let era = queries_v16::get_current_era(statequery)
177            .await
178            .map_err(|err| anyhow!(err))
179            .with_context(|| "PallasChainObserver failed to get current era")?;
180
181        let addr: Address = Address::from_bech32(address)
182            .map_err(|err| anyhow!(err))
183            .with_context(|| "PallasChainObserver failed to parse address")?;
184
185        let addr: Addr = addr.to_vec().into();
186        let addrs: Addrs = vec![addr];
187        let utxo = queries_v16::get_utxo_by_address(statequery, era, addrs)
188            .await
189            .map_err(|err| anyhow!(err))
190            .with_context(|| "PallasChainObserver failed to get utxo")?;
191
192        Ok(utxo)
193    }
194
195    /// Fetches the current stake distribution using the provided `statequery` client.
196    async fn do_stake_snapshots_state_query(
197        &self,
198        statequery: &mut Client,
199    ) -> StdResult<StakeSnapshot> {
200        statequery
201            .acquire(None)
202            .await
203            .map_err(|err| anyhow!(err))
204            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
205
206        let era = queries_v16::get_current_era(statequery)
207            .await
208            .map_err(|err| anyhow!(err))
209            .with_context(|| "PallasChainObserver failed to get current era")?;
210
211        let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
212            .await
213            .map_err(|err| anyhow!(err))
214            .with_context(|| "PallasChainObserver failed to get stake snapshot")?;
215
216        Ok(state_snapshot)
217    }
218
219    /// Returns the stake pool hash from the given bytestring.
220    fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
221        let pool_id_bech32 = encode_bech32("pool", key)
222            .map_err(|err| anyhow!(err))
223            .with_context(|| "PallasChainObserver failed to encode stake pool hash")?;
224        Ok(pool_id_bech32)
225    }
226
227    /// Fetches the current stake distribution using the provided `statequery` client.
228    async fn get_stake_distribution(
229        &self,
230        client: &mut NodeClient,
231    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
232        let statequery = client.statequery();
233
234        let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;
235
236        let mut stake_distribution = StakeDistribution::new();
237
238        let have_stakes_in_two_epochs = |stakes: &Stakes| stakes.snapshot_mark_pool > 0;
239        for (key, stakes) in stake_snapshot
240            .snapshots
241            .stake_snapshots
242            .iter()
243            .filter(|(_, stakes)| have_stakes_in_two_epochs(stakes))
244        {
245            let pool_hash = self.get_stake_pool_hash(key)?;
246            stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
247        }
248
249        Ok(Some(stake_distribution))
250    }
251
252    /// # Calculate Current KES Period
253    ///
254    /// It calculates the current Key Evolving Signature (KES) period
255    /// based on the provided `chain_point` and `slots_per_kes_period`.
256    ///
257    /// The calculation formula is represented as:
258    ///
259    /// `current_kes_period = ⌊current_slot_number/slots_per_kes_period⌋`
260    ///
261    /// where:
262    /// - `current_slot_number` represents the current slot number given by the `point` on the chain.
263    /// - `slots_per_kes_period` represents the number of slots in a KES period.
264    /// - `⌊x⌋` is the floor function which rounds the greatest integer less than or equal to `x`.
265    ///
266    /// ## Example:
267    ///
268    /// let (chain_point, slots_per_kes_period) = (Point::new(1), 10);
269    /// match calculate_kes_period(&self, chain_point, slots_per_kes_period) {
270    ///     Ok(kes_period) => println!("Current KES Period: {}", kes_period),
271    ///     Err(e) => println!("Error occurred: {}", e),
272    /// }
273    async fn calculate_kes_period(
274        &self,
275        chain_point: Point,
276        slots_per_kes_period: u64,
277    ) -> Result<KesPeriod, ChainObserverError> {
278        if slots_per_kes_period == 0 {
279            return Err(anyhow!("slots_per_kes_period must be greater than 0"))
280                .with_context(|| "PallasChainObserver failed to calculate kes period")?;
281        }
282
283        let current_kes_period = chain_point.slot_or_default() / slots_per_kes_period;
284        Ok(u32::try_from(current_kes_period)
285            .map_err(|err| anyhow!(err))
286            .with_context(|| "PallasChainObserver failed to convert kes period")?)
287    }
288
289    /// Fetches the current chain point using the provided `statequery` client.
290    async fn do_get_chain_point_state_query(&self, statequery: &mut Client) -> StdResult<Point> {
291        let chain_point = queries_v16::get_chain_point(statequery)
292            .await
293            .map_err(|err| anyhow!(err))
294            .with_context(|| "PallasChainObserver failed to get chain point")?;
295
296        Ok(chain_point)
297    }
298
299    /// Fetches the current chain point using the provided `NodeClient`.
300    async fn do_get_chain_block_no(&self, statequery: &mut Client) -> StdResult<ChainBlockNumber> {
301        let chain_block_number = queries_v16::get_chain_block_no(statequery)
302            .await
303            .map_err(|err| anyhow!(err))
304            .with_context(|| "PallasChainObserver failed to get chain block number")?;
305
306        Ok(chain_block_number)
307    }
308
309    /// Fetches the current era using the provided `statequery` client.
310    async fn do_get_current_era_state_query(&self, statequery: &mut Client) -> StdResult<u16> {
311        let era = queries_v16::get_current_era(statequery)
312            .await
313            .map_err(|err| anyhow!(err))
314            .with_context(|| "PallasChainObserver failed to get current era")?;
315
316        Ok(era)
317    }
318
319    /// Fetches the current genesis config using the provided `statequery` client.
320    async fn do_get_genesis_config_state_query(
321        &self,
322        statequery: &mut Client,
323    ) -> StdResult<Vec<GenesisConfig>> {
324        let era = self.do_get_current_era_state_query(statequery).await?;
325        let genesis_config = queries_v16::get_genesis_config(statequery, era)
326            .await
327            .map_err(|err| anyhow!(err))
328            .with_context(|| "PallasChainObserver failed to get genesis config")?;
329
330        Ok(genesis_config)
331    }
332
333    /// Fetches the current chain point using the provided `NodeClient`.
334    async fn get_chain_point(&self, statequery: &mut Client) -> StdResult<ChainPoint> {
335        statequery
336            .acquire(None)
337            .await
338            .map_err(|err| anyhow!(err))
339            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
340
341        let chain_point = self.do_get_chain_point_state_query(statequery).await?;
342
343        let header_hash = match chain_point {
344            Point::Origin => None,
345            Point::Specific(_at_slot, ref hash) => Some(hex::encode(hash)),
346        };
347
348        let chain_block_number = self.do_get_chain_block_no(statequery).await?;
349
350        Ok(ChainPoint {
351            slot_number: SlotNumber(chain_point.slot_or_default()),
352            block_hash: header_hash.unwrap_or_default(),
353            block_number: BlockNumber(chain_block_number.block_number as u64),
354        })
355    }
356
357    /// Fetches chain point and genesis config through the local statequery.
358    /// The KES period is calculated afterwards.
359    async fn get_kes_period(
360        &self,
361        client: &mut NodeClient,
362    ) -> Result<Option<KesPeriod>, ChainObserverError> {
363        let statequery = client.statequery();
364
365        statequery
366            .acquire(None)
367            .await
368            .map_err(|err| anyhow!(err))
369            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
370
371        let chain_point = self.do_get_chain_point_state_query(statequery).await?;
372
373        let genesis_config = self.do_get_genesis_config_state_query(statequery).await?;
374
375        let config = genesis_config
376            .first()
377            .with_context(|| "PallasChainObserver failed to extract the config")?;
378
379        let current_kes_period = self
380            .calculate_kes_period(chain_point, config.slots_per_kes_period as u64)
381            .await?;
382
383        Ok(Some(current_kes_period))
384    }
385
386    /// Processes a state query with the `NodeClient`, releasing the state query.
387    async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
388        let statequery = client.statequery();
389        statequery
390            .send_release()
391            .await
392            .map_err(|err| anyhow!(err))
393            .with_context(|| "PallasChainObserver send release failed")?;
394
395        statequery
396            .send_done()
397            .await
398            .map_err(|err| anyhow!(err))
399            .with_context(|| "PallasChainObserver send done failed")?;
400
401        Ok(())
402    }
403
404    /// Synchronizes the `NodeClient` with the cardano server using `chainsync`.
405    async fn sync(&self, client: &mut NodeClient) -> StdResult<()> {
406        client
407            .chainsync()
408            .send_done()
409            .await
410            .map_err(|err| anyhow!(err))
411            .with_context(|| "PallasChainObserver chainsync send done failed")?;
412        Ok(())
413    }
414
415    /// Post-processes a state query afterwards.
416    async fn post_process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
417        self.process_statequery(client).await?;
418        self.sync(client).await?;
419
420        Ok(())
421    }
422}
423
424#[async_trait]
425impl ChainObserver for PallasChainObserver {
426    async fn get_current_era(&self) -> Result<Option<String>, ChainObserverError> {
427        let mut client = self.get_client().await?;
428
429        let era = self.get_era(client.statequery()).await?;
430
431        let era = Era::try_from(era + ERA_OFFSET)
432            .with_context(|| "PallasChainObserver failed to convert: '{era}' to Era")?;
433
434        self.post_process_statequery(&mut client).await?;
435
436        client.abort().await;
437
438        Ok(Some(era.to_string()))
439    }
440
441    async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
442        let mut client = self.get_client().await?;
443
444        let epoch = self.get_epoch(client.statequery()).await?;
445
446        self.post_process_statequery(&mut client).await?;
447
448        client.abort().await;
449
450        Ok(Some(Epoch(epoch as u64)))
451    }
452
453    async fn get_current_chain_point(&self) -> Result<Option<ChainPoint>, ChainObserverError> {
454        let mut client = self.get_client().await?;
455
456        let chain_point = self.get_chain_point(client.statequery()).await?;
457
458        self.post_process_statequery(&mut client).await?;
459
460        client.abort().await;
461
462        Ok(Some(chain_point))
463    }
464
465    async fn get_current_datums(
466        &self,
467        address: &ChainAddress,
468    ) -> Result<Datums, ChainObserverError> {
469        let mut client = self.get_client().await?;
470
471        let datums = self.get_utxo_datums(&mut client, address).await?;
472
473        self.post_process_statequery(&mut client).await?;
474
475        client.abort().await;
476
477        Ok(datums)
478    }
479
480    async fn get_current_stake_distribution(
481        &self,
482    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
483        let mut client = self.get_client().await?;
484
485        let stake_distribution = self.get_stake_distribution(&mut client).await?;
486
487        self.post_process_statequery(&mut client).await?;
488
489        client.abort().await;
490
491        Ok(stake_distribution)
492    }
493
494    async fn get_current_kes_period(&self) -> Result<Option<KesPeriod>, ChainObserverError> {
495        let mut client = self.get_client().await?;
496
497        let current_kes_period = self.get_kes_period(&mut client).await?;
498
499        self.post_process_statequery(&mut client).await?;
500
501        client.abort().await;
502
503        Ok(current_kes_period)
504    }
505}
506
507// Windows does not support Unix sockets, nor pallas_network::facades::NodeServer
508#[cfg(all(test, unix))]
509mod tests {
510    use std::fs;
511
512    use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
513    use pallas_crypto::hash::Hash;
514    use pallas_network::facades::NodeServer;
515    use pallas_network::miniprotocols::{
516        Point,
517        localstate::{
518            ClientQueryRequest,
519            queries_v16::{
520                BlockQuery, ChainBlockNumber, Fraction, GenesisConfig, HardForkQuery, LedgerQuery,
521                Request, Snapshots, StakeSnapshot, SystemStart, Value,
522            },
523        },
524    };
525    use tokio::net::UnixListener;
526
527    use mithril_common::test::TempDir;
528
529    use super::*;
530
531    fn get_fake_utxo_by_address() -> UTxOByAddress {
532        let tx_hex = "1e4e5cf2889d52f1745b941090f04a65dea6ce56c5e5e66e69f65c8e36347c17";
533        let tx_bytes: [u8; 32] = hex::decode(tx_hex).unwrap().try_into().unwrap();
534        let transaction_id = Hash::from(tx_bytes);
535        let index = AnyUInt::MajorByte(2);
536        let lovelace = AnyUInt::MajorByte(2);
537        let hex_datum = "D8799F58407B226D61726B657273223A5B7B226E616D65223A227468616C6573222C2265706F6368223A307D5D2C227369676E6174757265223A22383566323265626261645840333335376338656132646630363230393766396131383064643335643966336261316432363832633732633864313232383866616438636238643063656565625838366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227DFF";
538        let datum = hex::decode(hex_datum).unwrap().into();
539        let tag = TagWrap::<_, 24>::new(datum);
540        let inline_datum = Some((1_u16, tag));
541
542        let address: Address =
543            Address::from_bech32("addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0")
544                .unwrap();
545        let address: Addr = address.to_vec().into();
546        let values = TransactionOutput::Current(PostAlonsoTransactionOutput {
547            address,
548            amount: Value::Coin(lovelace),
549            inline_datum,
550            script_ref: None,
551        });
552        let utxo = KeyValuePairs::from(vec![(
553            queries_v16::UTxO {
554                transaction_id,
555                index,
556            },
557            values,
558        )]);
559
560        UTxOByAddress { utxo }
561    }
562
563    fn get_fake_stake_snapshot() -> StakeSnapshot {
564        let stake_snapshots = KeyValuePairs::from(vec![
565            (
566                Bytes::from(
567                    hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
568                        .unwrap(),
569                ),
570                Stakes {
571                    snapshot_mark_pool: 300000000001,
572                    snapshot_set_pool: 300000000002,
573                    snapshot_go_pool: 300000000000,
574                },
575            ),
576            (
577                Bytes::from(
578                    hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
579                        .unwrap(),
580                ),
581                Stakes {
582                    snapshot_mark_pool: 600000000001,
583                    snapshot_set_pool: 600000000002,
584                    snapshot_go_pool: 600000000000,
585                },
586            ),
587            (
588                Bytes::from(
589                    hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
590                        .unwrap(),
591                ),
592                Stakes {
593                    snapshot_mark_pool: 1200000000001,
594                    snapshot_set_pool: 1200000000002,
595                    snapshot_go_pool: 1200000000000,
596                },
597            ),
598            (
599                Bytes::from(
600                    hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
601                        .unwrap(),
602                ),
603                Stakes {
604                    snapshot_mark_pool: 0,
605                    snapshot_set_pool: 1300000000002,
606                    snapshot_go_pool: 0,
607                },
608            ),
609        ]);
610
611        StakeSnapshot {
612            snapshots: Snapshots {
613                stake_snapshots,
614                snapshot_stake_mark_total: 2100000000003,
615                snapshot_stake_set_total: 2100000000006,
616                snapshot_stake_go_total: 2100000000000,
617            },
618        }
619    }
620
621    fn get_fake_genesis_config() -> Vec<GenesisConfig> {
622        let genesis = GenesisConfig {
623            system_start: SystemStart {
624                year: 2021,
625                day_of_year: 150,
626                picoseconds_of_day: 0,
627            },
628            network_magic: 42,
629            network_id: 42,
630            active_slots_coefficient: Fraction { num: 6, den: 10 },
631            security_param: 2160,
632            epoch_length: 432000,
633            slots_per_kes_period: 129600,
634            max_kes_evolutions: 62,
635            slot_length: 1,
636            update_quorum: 5,
637            max_lovelace_supply: AnyUInt::MajorByte(2),
638        };
639
640        vec![genesis]
641    }
642
643    /// pallas responses mock server.
644    async fn mock_server(server: &mut NodeServer) -> AnyCbor {
645        let query: queries_v16::Request =
646            match server.statequery().recv_while_acquired().await.unwrap() {
647                ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
648                x => panic!("unexpected message from client: {x:?}"),
649            };
650
651        match query {
652            Request::GetChainPoint => {
653                AnyCbor::from_encode(Point::Specific(52851885, vec![1, 2, 3]))
654            }
655            Request::GetChainBlockNo => AnyCbor::from_encode(ChainBlockNumber {
656                slot_timeline: 1,
657                block_number: 52851885,
658            }),
659            Request::LedgerQuery(LedgerQuery::HardForkQuery(HardForkQuery::GetCurrentEra)) => {
660                AnyCbor::from_encode(4)
661            }
662            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetEpochNo)) => {
663                AnyCbor::from_encode([8])
664            }
665            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetGenesisConfig)) => {
666                AnyCbor::from_encode(get_fake_genesis_config())
667            }
668            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetUTxOByAddress(_))) => {
669                AnyCbor::from_encode(get_fake_utxo_by_address())
670            }
671            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetStakeSnapshots(_))) => {
672                AnyCbor::from_encode(get_fake_stake_snapshot())
673            }
674            _ => panic!("unexpected query from client: {query:?}"),
675        }
676    }
677
678    /// Creates a new work directory in the system's temporary folder.
679    fn create_temp_dir(folder_name: &str) -> PathBuf {
680        TempDir::create_with_short_path("pallas_chain_observer_test", folder_name)
681    }
682
683    /// Sets up a mock server for related tests.
684    ///
685    /// Use the `intersections` parameter to define exactly how many
686    /// local state queries should be intercepted by the `mock_server`
687    /// and avoid any panic errors.
688    async fn setup_server(socket_path: PathBuf, intersections: u32) -> tokio::task::JoinHandle<()> {
689        tokio::spawn({
690            async move {
691                if socket_path.exists() {
692                    fs::remove_file(&socket_path).expect("Previous socket removal failed");
693                }
694
695                let unix_listener = UnixListener::bind(socket_path.as_path()).unwrap();
696                let mut server = NodeServer::accept(&unix_listener, 10).await.unwrap();
697
698                server.statequery().recv_while_idle().await.unwrap();
699                server.statequery().send_acquired().await.unwrap();
700
701                for _ in 0..intersections {
702                    let result = mock_server(&mut server).await;
703                    server.statequery().send_result(result).await.unwrap();
704                }
705            }
706        })
707    }
708
709    #[tokio::test]
710    async fn get_current_epoch() {
711        let socket_path = create_temp_dir("get_current_epoch").join("node.socket");
712        let server = setup_server(socket_path.clone(), 2).await;
713        let client = tokio::spawn(async move {
714            let observer =
715                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
716            observer.get_current_epoch().await.unwrap().unwrap()
717        });
718
719        let (_, client_res) = tokio::join!(server, client);
720        let epoch = client_res.expect("Client failed");
721        assert_eq!(epoch, 8);
722    }
723
724    #[tokio::test]
725    async fn get_current_datums() {
726        let socket_path = create_temp_dir("get_current_datums").join("node.socket");
727        let server = setup_server(socket_path.clone(), 2).await;
728        let client = tokio::spawn(async move {
729            let observer =
730                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
731            let address =
732                "addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0".to_string();
733            observer.get_current_datums(&address).await.unwrap()
734        });
735
736        let (_, client_res) = tokio::join!(server, client);
737        let datums = client_res.expect("Client failed");
738        assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
739    }
740
741    #[tokio::test]
742    async fn get_current_stake_distribution() {
743        let socket_path = create_temp_dir("get_current_stake_distribution").join("node.socket");
744        let server = setup_server(socket_path.clone(), 2).await;
745        let client = tokio::spawn(async move {
746            let observer =
747                super::PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
748            observer.get_current_stake_distribution().await.unwrap()
749        });
750
751        let (_, client_res) = tokio::join!(server, client);
752        let computed_stake_distribution = client_res.unwrap().unwrap();
753
754        let mut expected_stake_distribution = StakeDistribution::new();
755        expected_stake_distribution.insert(
756            "pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
757            300000000001,
758        );
759        expected_stake_distribution.insert(
760            "pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
761            600000000001,
762        );
763        expected_stake_distribution.insert(
764            "pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
765            1200000000001,
766        );
767
768        assert_eq!(expected_stake_distribution, computed_stake_distribution);
769    }
770
771    #[tokio::test]
772    async fn get_current_kes_period() {
773        let socket_path = create_temp_dir("get_current_kes_period").join("node.socket");
774        let server = setup_server(socket_path.clone(), 3).await;
775        let client = tokio::spawn(async move {
776            let observer =
777                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
778
779            observer.get_current_kes_period().await.unwrap()
780        });
781
782        let (_, client_res) = tokio::join!(server, client);
783        let kes_period = client_res.unwrap().unwrap();
784        assert_eq!(407, kes_period);
785    }
786
787    #[tokio::test]
788    async fn calculate_kes_period() {
789        let socket_path = create_temp_dir("calculate_kes_period").join("node.socket");
790        let observer = PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
791        let current_kes_period = observer
792            .calculate_kes_period(Point::Specific(53536042, vec![1, 2, 3]), 129600)
793            .await
794            .unwrap();
795
796        assert_eq!(413, current_kes_period);
797
798        let current_kes_period = observer
799            .calculate_kes_period(Point::Specific(53524800, vec![1, 2, 3]), 129600)
800            .await
801            .unwrap();
802
803        assert_eq!(413, current_kes_period);
804
805        let current_kes_period = observer
806            .calculate_kes_period(Point::Specific(53649999, vec![1, 2, 3]), 129600)
807            .await
808            .unwrap();
809
810        assert_eq!(413, current_kes_period);
811    }
812
813    #[tokio::test]
814    async fn get_chain_point() {
815        let socket_path = create_temp_dir("get_chain_point").join("node.socket");
816        let server = setup_server(socket_path.clone(), 1).await;
817        let client = tokio::spawn(async move {
818            let observer =
819                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
820            let mut client = observer.get_client().await.unwrap();
821            let statequery = client.statequery();
822            statequery.acquire(None).await.unwrap();
823            let chain_point = observer.do_get_chain_point_state_query(statequery).await.unwrap();
824            observer.post_process_statequery(&mut client).await.unwrap();
825            client.abort().await;
826            chain_point
827        });
828
829        let (_, client_res) = tokio::join!(server, client);
830        let chain_point = client_res.expect("Client failed");
831        assert_eq!(chain_point, Point::Specific(52851885, vec![1, 2, 3]));
832    }
833
834    #[tokio::test]
835    async fn get_genesis_config() {
836        let socket_path = create_temp_dir("get_genesis_config").join("node.socket");
837        let server = setup_server(socket_path.clone(), 2).await;
838        let client = tokio::spawn(async move {
839            let observer =
840                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
841            let mut client = observer.get_client().await.unwrap();
842            let statequery = client.statequery();
843            statequery.acquire(None).await.unwrap();
844            let genesis_config =
845                observer.do_get_genesis_config_state_query(statequery).await.unwrap();
846            observer.post_process_statequery(&mut client).await.unwrap();
847            client.abort().await;
848            genesis_config
849        });
850
851        let (_, client_res) = tokio::join!(server, client);
852        let genesis_config = client_res.expect("Client failed");
853        assert_eq!(genesis_config, get_fake_genesis_config());
854    }
855
856    #[tokio::test]
857    async fn fetch_current_era_from_state_query() {
858        let socket_path = create_temp_dir("get_current_era").join("node.socket");
859        let server = setup_server(socket_path.clone(), 1).await;
860        let client = tokio::spawn(async move {
861            let observer =
862                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
863            let mut client = observer.get_client().await.unwrap();
864            let statequery = client.statequery();
865            statequery.acquire(None).await.unwrap();
866            let era = observer.do_get_current_era_state_query(statequery).await.unwrap();
867            observer.post_process_statequery(&mut client).await.unwrap();
868            client.abort().await;
869            era
870        });
871
872        let (_, client_res) = tokio::join!(server, client);
873        let era = client_res.expect("Client failed");
874        assert_eq!(era, 4);
875    }
876
877    #[tokio::test]
878    async fn get_current_era() {
879        let socket_path = create_temp_dir("get_current_era_as_string").join("node.socket");
880        let server = setup_server(socket_path.clone(), 1).await;
881        let client = tokio::spawn(async move {
882            let observer =
883                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
884            observer.get_current_era().await.unwrap().unwrap()
885        });
886
887        let (_, client_res) = tokio::join!(server, client);
888        let era = client_res.expect("Client failed");
889
890        let expected_era = Era::try_from(4 + ERA_OFFSET).unwrap().to_string();
891        assert_eq!(era, expected_era);
892    }
893
894    #[tokio::test]
895    async fn get_current_chain_point() {
896        let socket_path = create_temp_dir("get_current_chain_point").join("node.socket");
897        let server = setup_server(socket_path.clone(), 2).await;
898        let client = tokio::spawn(async move {
899            let observer =
900                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
901            observer.get_current_chain_point().await.unwrap()
902        });
903
904        let (_, client_res) = tokio::join!(server, client);
905        let chain_point = client_res.expect("Client failed");
906        assert_eq!(
907            chain_point,
908            Some(ChainPoint {
909                slot_number: SlotNumber(52851885),
910                block_hash: "010203".to_string(),
911                block_number: BlockNumber(52851885)
912            })
913        );
914    }
915}