mithril_cardano_node_chain/chain_observer/
pallas_observer.rs

1use std::collections::BTreeSet;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, anyhow};
5use async_trait::async_trait;
6use pallas_addresses::Address;
7use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
8use pallas_network::{
9    facades::NodeClient,
10    miniprotocols::{
11        Point,
12        localstate::{
13            Client,
14            queries_v16::{
15                self, Addr, Addrs, ChainBlockNumber, GenesisConfig, PostAlonsoTransactionOutput,
16                StakeSnapshot, Stakes, TransactionOutput, UTxOByAddress,
17            },
18        },
19    },
20};
21use pallas_primitives::ToCanonicalJson;
22use pallas_traverse::Era;
23
24use mithril_common::crypto_helper::{KesPeriod, OpCert, encode_bech32};
25use mithril_common::entities::{BlockNumber, ChainPoint, Epoch, SlotNumber, StakeDistribution};
26use mithril_common::{CardanoNetwork, StdResult};
27
28use crate::entities::{ChainAddress, Datum, Datums, TxDatum, try_inspect};
29
30use super::{ChainObserver, ChainObserverError};
31
32// The era value returned from the queries_v16::get_current_era has an offset of -1 with the era value of the pallas_traverse::Era due to Cardano node implementation.
33// It needs to be compensated to get the correct era display name.
34const ERA_OFFSET: u16 = 1;
35
36/// A runner that uses Pallas library to interact with a Cardano node using N2C Ouroboros mini-protocols
37pub struct PallasChainObserver {
38    socket: PathBuf,
39    network: CardanoNetwork,
40}
41
42impl From<anyhow::Error> for ChainObserverError {
43    fn from(err: anyhow::Error) -> Self {
44        ChainObserverError::General(err)
45    }
46}
47
48impl PallasChainObserver {
49    /// Creates a new PallasObserver
50    pub fn new(socket: &Path, network: CardanoNetwork) -> Self {
51        Self {
52            socket: socket.to_owned(),
53            network,
54        }
55    }
56
57    /// Creates and returns a new `NodeClient` connected to the specified socket.
58    async fn new_client(&self) -> StdResult<NodeClient> {
59        let magic = self.network.code();
60        let client = NodeClient::connect(&self.socket, magic).await?;
61
62        Ok(client)
63    }
64
65    /// Creates and returns a new `NodeClient`, handling any potential errors.
66    async fn get_client(&self) -> StdResult<NodeClient> {
67        self.new_client()
68            .await
69            .map_err(|err| anyhow!(err))
70            .with_context(|| "PallasChainObserver failed to create new client")
71    }
72
73    /// Fetches the current era using the provided `statequery` client.
74    async fn get_era(&self, statequery: &mut Client) -> StdResult<u16> {
75        statequery
76            .acquire(None)
77            .await
78            .map_err(|err| anyhow!(err))
79            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
80
81        let era = queries_v16::get_current_era(statequery)
82            .await
83            .map_err(|err| anyhow!(err))
84            .with_context(|| "PallasChainObserver failed to get current era")?;
85
86        Ok(era)
87    }
88
89    /// Fetches the current epoch number using the provided `statequery` client.
90    async fn get_epoch(&self, statequery: &mut Client) -> StdResult<u32> {
91        statequery
92            .acquire(None)
93            .await
94            .map_err(|err| anyhow!(err))
95            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
96
97        let era = queries_v16::get_current_era(statequery)
98            .await
99            .map_err(|err| anyhow!(err))
100            .with_context(|| "PallasChainObserver failed to get current era")?;
101
102        let epoch = queries_v16::get_block_epoch_number(statequery, era)
103            .await
104            .map_err(|err| anyhow!(err))
105            .with_context(|| "PallasChainObserver failed to get block epoch number")?;
106
107        Ok(epoch)
108    }
109
110    /// Returns inline datum tag from the given `Values` instance.
111    fn get_datum_tag(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TagWrap<Bytes, 24>> {
112        Ok(utxo
113            .inline_datum
114            .as_ref()
115            .with_context(|| "PallasChainObserver failed to get inline datum")?
116            .1
117            .clone())
118    }
119
120    /// Returns inline datums from the given `Values` instance.
121    fn inspect_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<Datum> {
122        let datum = self.get_datum_tag(utxo)?;
123        let datum = CborWrap(datum).to_vec();
124
125        try_inspect::<Datum>(datum)
126    }
127
128    /// Serializes datum to `TxDatum` instance.
129    fn serialize_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TxDatum> {
130        let datum = self.inspect_datum(utxo)?;
131        let serialized = serde_json::to_string(&datum.to_json())
132            .map_err(|err| anyhow!(err))
133            .with_context(|| "PallasChainObserver failed to serialize datum")?;
134
135        Ok(TxDatum(serialized))
136    }
137
138    /// Maps the given `UTxOByAddress` instance to Datums.
139    fn map_datums(&self, transaction: UTxOByAddress) -> StdResult<Datums> {
140        transaction
141            .utxo
142            .iter()
143            .filter_map(|(_, utxo)| match utxo {
144                TransactionOutput::Current(output) => {
145                    output.inline_datum.as_ref().map(|_| self.serialize_datum(output))
146                }
147                _ => None,
148            })
149            .collect::<StdResult<Datums>>()
150    }
151
152    /// Returns a vector of `TxDatum` instances.
153    async fn get_utxo_datums(
154        &self,
155        client: &mut NodeClient,
156        address: &ChainAddress,
157    ) -> Result<Datums, ChainObserverError> {
158        let statequery = client.statequery();
159        let utxo = self.get_utxo_by_address(statequery, address).await?;
160
161        Ok(self.map_datums(utxo)?)
162    }
163
164    /// Fetches the current UTxO by address using the provided `statequery` client.
165    async fn get_utxo_by_address(
166        &self,
167        statequery: &mut Client,
168        address: &ChainAddress,
169    ) -> StdResult<UTxOByAddress> {
170        statequery
171            .acquire(None)
172            .await
173            .map_err(|err| anyhow!(err))
174            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
175
176        let era = queries_v16::get_current_era(statequery)
177            .await
178            .map_err(|err| anyhow!(err))
179            .with_context(|| "PallasChainObserver failed to get current era")?;
180
181        let addr: Address = Address::from_bech32(address)
182            .map_err(|err| anyhow!(err))
183            .with_context(|| "PallasChainObserver failed to parse address")?;
184
185        let addr: Addr = addr.to_vec().into();
186        let addrs: Addrs = vec![addr];
187        let utxo = queries_v16::get_utxo_by_address(statequery, era, addrs)
188            .await
189            .map_err(|err| anyhow!(err))
190            .with_context(|| "PallasChainObserver failed to get utxo")?;
191
192        Ok(utxo)
193    }
194
195    /// Fetches the current stake distribution using the provided `statequery` client.
196    async fn do_stake_snapshots_state_query(
197        &self,
198        statequery: &mut Client,
199    ) -> StdResult<StakeSnapshot> {
200        statequery
201            .acquire(None)
202            .await
203            .map_err(|err| anyhow!(err))
204            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
205
206        let era = queries_v16::get_current_era(statequery)
207            .await
208            .map_err(|err| anyhow!(err))
209            .with_context(|| "PallasChainObserver failed to get current era")?;
210
211        let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
212            .await
213            .map_err(|err| anyhow!(err))
214            .with_context(|| "PallasChainObserver failed to get stake snapshot")?;
215
216        Ok(state_snapshot)
217    }
218
219    /// Returns the stake pool hash from the given bytestring.
220    fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
221        let pool_id_bech32 = encode_bech32("pool", key)
222            .map_err(|err| anyhow!(err))
223            .with_context(|| "PallasChainObserver failed to encode stake pool hash")?;
224        Ok(pool_id_bech32)
225    }
226
227    /// Fetches the current stake distribution using the provided `statequery` client.
228    async fn get_stake_distribution(
229        &self,
230        client: &mut NodeClient,
231    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
232        let statequery = client.statequery();
233
234        let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;
235
236        let mut stake_distribution = StakeDistribution::new();
237
238        let have_stakes_in_two_epochs = |stakes: &Stakes| stakes.snapshot_mark_pool > 0;
239        for (key, stakes) in stake_snapshot
240            .snapshots
241            .stake_snapshots
242            .iter()
243            .filter(|(_, stakes)| have_stakes_in_two_epochs(stakes))
244        {
245            let pool_hash = self.get_stake_pool_hash(key)?;
246            stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
247        }
248
249        Ok(Some(stake_distribution))
250    }
251
252    /// # Calculate Current KES Period
253    ///
254    /// It calculates the current Key Evolving Signature (KES) period
255    /// based on the provided `chain_point` and `slots_per_kes_period`.
256    ///
257    /// The calculation formula is represented as:
258    ///
259    /// `current_kes_period = ⌊current_slot_number/slots_per_kes_period⌋`
260    ///
261    /// where:
262    /// - `current_slot_number` represents the current slot number given by the `point` on the chain.
263    /// - `slots_per_kes_period` represents the number of slots in a KES period.
264    /// - `⌊x⌋` is the floor function which rounds the greatest integer less than or equal to `x`.
265    ///
266    /// ## Example:
267    ///
268    /// let (chain_point, slots_per_kes_period) = (Point::new(1), 10);
269    /// match calculate_kes_period(&self, chain_point, slots_per_kes_period) {
270    ///     Ok(kes_period) => println!("Current KES Period: {}", kes_period),
271    ///     Err(e) => println!("Error occurred: {}", e),
272    /// }
273    async fn calculate_kes_period(
274        &self,
275        chain_point: Point,
276        slots_per_kes_period: u64,
277    ) -> Result<KesPeriod, ChainObserverError> {
278        if slots_per_kes_period == 0 {
279            return Err(anyhow!("slots_per_kes_period must be greater than 0"))
280                .with_context(|| "PallasChainObserver failed to calculate kes period")?;
281        }
282
283        let current_kes_period = chain_point.slot_or_default() / slots_per_kes_period;
284        Ok(u32::try_from(current_kes_period)
285            .map_err(|err| anyhow!(err))
286            .with_context(|| "PallasChainObserver failed to convert kes period")?)
287    }
288
289    /// Fetches the current chain point using the provided `statequery` client.
290    async fn do_get_chain_point_state_query(&self, statequery: &mut Client) -> StdResult<Point> {
291        let chain_point = queries_v16::get_chain_point(statequery)
292            .await
293            .map_err(|err| anyhow!(err))
294            .with_context(|| "PallasChainObserver failed to get chain point")?;
295
296        Ok(chain_point)
297    }
298
299    /// Fetches the current chain point using the provided `NodeClient`.
300    async fn do_get_chain_block_no(&self, statequery: &mut Client) -> StdResult<ChainBlockNumber> {
301        let chain_block_number = queries_v16::get_chain_block_no(statequery)
302            .await
303            .map_err(|err| anyhow!(err))
304            .with_context(|| "PallasChainObserver failed to get chain block number")?;
305
306        Ok(chain_block_number)
307    }
308
309    /// Fetches the current era using the provided `statequery` client.
310    async fn do_get_current_era_state_query(&self, statequery: &mut Client) -> StdResult<u16> {
311        let era = queries_v16::get_current_era(statequery)
312            .await
313            .map_err(|err| anyhow!(err))
314            .with_context(|| "PallasChainObserver failed to get current era")?;
315
316        Ok(era)
317    }
318
319    /// Fetches the current genesis config using the provided `statequery` client.
320    async fn do_get_genesis_config_state_query(
321        &self,
322        statequery: &mut Client,
323    ) -> StdResult<Vec<GenesisConfig>> {
324        let era = self.do_get_current_era_state_query(statequery).await?;
325        let genesis_config = queries_v16::get_genesis_config(statequery, era)
326            .await
327            .map_err(|err| anyhow!(err))
328            .with_context(|| "PallasChainObserver failed to get genesis config")?;
329
330        Ok(genesis_config)
331    }
332
333    /// Fetches the current chain point using the provided `NodeClient`.
334    async fn get_chain_point(&self, statequery: &mut Client) -> StdResult<ChainPoint> {
335        statequery
336            .acquire(None)
337            .await
338            .map_err(|err| anyhow!(err))
339            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
340
341        let chain_point = self.do_get_chain_point_state_query(statequery).await?;
342
343        let header_hash = match chain_point {
344            Point::Origin => None,
345            Point::Specific(_at_slot, ref hash) => Some(hex::encode(hash)),
346        };
347
348        let chain_block_number = self.do_get_chain_block_no(statequery).await?;
349
350        Ok(ChainPoint {
351            slot_number: SlotNumber(chain_point.slot_or_default()),
352            block_hash: header_hash.unwrap_or_default(),
353            block_number: BlockNumber(chain_block_number.block_number as u64),
354        })
355    }
356
357    /// Fetches chain point and genesis config through the local statequery.
358    /// The KES period is calculated afterwards.
359    async fn get_kes_period(
360        &self,
361        client: &mut NodeClient,
362    ) -> Result<Option<KesPeriod>, ChainObserverError> {
363        let statequery = client.statequery();
364
365        statequery
366            .acquire(None)
367            .await
368            .map_err(|err| anyhow!(err))
369            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
370
371        let chain_point = self.do_get_chain_point_state_query(statequery).await?;
372
373        let genesis_config = self.do_get_genesis_config_state_query(statequery).await?;
374
375        let config = genesis_config
376            .first()
377            .with_context(|| "PallasChainObserver failed to extract the config")?;
378
379        let current_kes_period = self
380            .calculate_kes_period(chain_point, config.slots_per_kes_period as u64)
381            .await?;
382
383        Ok(Some(current_kes_period))
384    }
385
386    /// Processes a state query with the `NodeClient`, releasing the state query.
387    async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
388        let statequery = client.statequery();
389        statequery
390            .send_release()
391            .await
392            .map_err(|err| anyhow!(err))
393            .with_context(|| "PallasChainObserver send release failed")?;
394
395        statequery
396            .send_done()
397            .await
398            .map_err(|err| anyhow!(err))
399            .with_context(|| "PallasChainObserver send done failed")?;
400
401        Ok(())
402    }
403
404    /// Synchronizes the `NodeClient` with the cardano server using `chainsync`.
405    async fn sync(&self, client: &mut NodeClient) -> StdResult<()> {
406        client
407            .chainsync()
408            .send_done()
409            .await
410            .map_err(|err| anyhow!(err))
411            .with_context(|| "PallasChainObserver chainsync send done failed")?;
412        Ok(())
413    }
414
415    /// Post-processes a state query afterwards.
416    async fn post_process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
417        self.process_statequery(client).await?;
418        self.sync(client).await?;
419
420        Ok(())
421    }
422}
423
424#[async_trait]
425impl ChainObserver for PallasChainObserver {
426    async fn get_current_era(&self) -> Result<Option<String>, ChainObserverError> {
427        let mut client = self.get_client().await?;
428
429        let era = self.get_era(client.statequery()).await?;
430
431        let era = Era::try_from(era + ERA_OFFSET)
432            .with_context(|| "PallasChainObserver failed to convert: '{era}' to Era")?;
433
434        self.post_process_statequery(&mut client).await?;
435
436        client.abort().await;
437
438        Ok(Some(era.to_string()))
439    }
440
441    async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
442        let mut client = self.get_client().await?;
443
444        let epoch = self.get_epoch(client.statequery()).await?;
445
446        self.post_process_statequery(&mut client).await?;
447
448        client.abort().await;
449
450        Ok(Some(Epoch(epoch as u64)))
451    }
452
453    async fn get_current_chain_point(&self) -> Result<Option<ChainPoint>, ChainObserverError> {
454        let mut client = self.get_client().await?;
455
456        let chain_point = self.get_chain_point(client.statequery()).await?;
457
458        self.post_process_statequery(&mut client).await?;
459
460        client.abort().await;
461
462        Ok(Some(chain_point))
463    }
464
465    async fn get_current_datums(
466        &self,
467        address: &ChainAddress,
468    ) -> Result<Datums, ChainObserverError> {
469        let mut client = self.get_client().await?;
470
471        let datums = self.get_utxo_datums(&mut client, address).await?;
472
473        self.post_process_statequery(&mut client).await?;
474
475        client.abort().await;
476
477        Ok(datums)
478    }
479
480    async fn get_current_stake_distribution(
481        &self,
482    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
483        let mut client = self.get_client().await?;
484
485        let stake_distribution = self.get_stake_distribution(&mut client).await?;
486
487        self.post_process_statequery(&mut client).await?;
488
489        client.abort().await;
490
491        Ok(stake_distribution)
492    }
493
494    async fn get_current_kes_period(
495        &self,
496        _opcert: &OpCert,
497    ) -> Result<Option<KesPeriod>, ChainObserverError> {
498        let mut client = self.get_client().await?;
499
500        let current_kes_period = self.get_kes_period(&mut client).await?;
501
502        self.post_process_statequery(&mut client).await?;
503
504        client.abort().await;
505
506        Ok(current_kes_period)
507    }
508}
509
510// Windows does not support Unix sockets, nor pallas_network::facades::NodeServer
511#[cfg(all(test, unix))]
512mod tests {
513    use std::fs;
514
515    use kes_summed_ed25519::{kes::Sum6Kes, traits::KesSk};
516    use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
517    use pallas_crypto::hash::Hash;
518    use pallas_network::facades::NodeServer;
519    use pallas_network::miniprotocols::{
520        Point,
521        localstate::{
522            ClientQueryRequest,
523            queries_v16::{
524                BlockQuery, ChainBlockNumber, Fraction, GenesisConfig, HardForkQuery, LedgerQuery,
525                Request, Snapshots, StakeSnapshot, SystemStart, Value,
526            },
527        },
528    };
529    use tokio::net::UnixListener;
530
531    use mithril_common::crypto_helper::ColdKeyGenerator;
532    use mithril_common::test_utils::TempDir;
533
534    use super::*;
535
536    fn get_fake_utxo_by_address() -> UTxOByAddress {
537        let tx_hex = "1e4e5cf2889d52f1745b941090f04a65dea6ce56c5e5e66e69f65c8e36347c17";
538        let tx_bytes: [u8; 32] = hex::decode(tx_hex).unwrap().try_into().unwrap();
539        let transaction_id = Hash::from(tx_bytes);
540        let index = AnyUInt::MajorByte(2);
541        let lovelace = AnyUInt::MajorByte(2);
542        let hex_datum = "D8799F58407B226D61726B657273223A5B7B226E616D65223A227468616C6573222C2265706F6368223A307D5D2C227369676E6174757265223A22383566323265626261645840333335376338656132646630363230393766396131383064643335643966336261316432363832633732633864313232383866616438636238643063656565625838366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227DFF";
543        let datum = hex::decode(hex_datum).unwrap().into();
544        let tag = TagWrap::<_, 24>::new(datum);
545        let inline_datum = Some((1_u16, tag));
546
547        let address: Address =
548            Address::from_bech32("addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0")
549                .unwrap();
550        let address: Addr = address.to_vec().into();
551        let values = TransactionOutput::Current(PostAlonsoTransactionOutput {
552            address,
553            amount: Value::Coin(lovelace),
554            inline_datum,
555            script_ref: None,
556        });
557        let utxo = KeyValuePairs::from(vec![(
558            queries_v16::UTxO {
559                transaction_id,
560                index,
561            },
562            values,
563        )]);
564
565        UTxOByAddress { utxo }
566    }
567
568    fn get_fake_stake_snapshot() -> StakeSnapshot {
569        let stake_snapshots = KeyValuePairs::from(vec![
570            (
571                Bytes::from(
572                    hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
573                        .unwrap(),
574                ),
575                Stakes {
576                    snapshot_mark_pool: 300000000001,
577                    snapshot_set_pool: 300000000002,
578                    snapshot_go_pool: 300000000000,
579                },
580            ),
581            (
582                Bytes::from(
583                    hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
584                        .unwrap(),
585                ),
586                Stakes {
587                    snapshot_mark_pool: 600000000001,
588                    snapshot_set_pool: 600000000002,
589                    snapshot_go_pool: 600000000000,
590                },
591            ),
592            (
593                Bytes::from(
594                    hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
595                        .unwrap(),
596                ),
597                Stakes {
598                    snapshot_mark_pool: 1200000000001,
599                    snapshot_set_pool: 1200000000002,
600                    snapshot_go_pool: 1200000000000,
601                },
602            ),
603            (
604                Bytes::from(
605                    hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
606                        .unwrap(),
607                ),
608                Stakes {
609                    snapshot_mark_pool: 0,
610                    snapshot_set_pool: 1300000000002,
611                    snapshot_go_pool: 0,
612                },
613            ),
614        ]);
615
616        StakeSnapshot {
617            snapshots: Snapshots {
618                stake_snapshots,
619                snapshot_stake_mark_total: 2100000000003,
620                snapshot_stake_set_total: 2100000000006,
621                snapshot_stake_go_total: 2100000000000,
622            },
623        }
624    }
625
626    fn get_fake_genesis_config() -> Vec<GenesisConfig> {
627        let genesis = GenesisConfig {
628            system_start: SystemStart {
629                year: 2021,
630                day_of_year: 150,
631                picoseconds_of_day: 0,
632            },
633            network_magic: 42,
634            network_id: 42,
635            active_slots_coefficient: Fraction { num: 6, den: 10 },
636            security_param: 2160,
637            epoch_length: 432000,
638            slots_per_kes_period: 129600,
639            max_kes_evolutions: 62,
640            slot_length: 1,
641            update_quorum: 5,
642            max_lovelace_supply: AnyUInt::MajorByte(2),
643        };
644
645        vec![genesis]
646    }
647
648    /// pallas responses mock server.
649    async fn mock_server(server: &mut NodeServer) -> AnyCbor {
650        let query: queries_v16::Request =
651            match server.statequery().recv_while_acquired().await.unwrap() {
652                ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
653                x => panic!("unexpected message from client: {x:?}"),
654            };
655
656        match query {
657            Request::GetChainPoint => {
658                AnyCbor::from_encode(Point::Specific(52851885, vec![1, 2, 3]))
659            }
660            Request::GetChainBlockNo => AnyCbor::from_encode(ChainBlockNumber {
661                slot_timeline: 1,
662                block_number: 52851885,
663            }),
664            Request::LedgerQuery(LedgerQuery::HardForkQuery(HardForkQuery::GetCurrentEra)) => {
665                AnyCbor::from_encode(4)
666            }
667            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetEpochNo)) => {
668                AnyCbor::from_encode([8])
669            }
670            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetGenesisConfig)) => {
671                AnyCbor::from_encode(get_fake_genesis_config())
672            }
673            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetUTxOByAddress(_))) => {
674                AnyCbor::from_encode(get_fake_utxo_by_address())
675            }
676            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetStakeSnapshots(_))) => {
677                AnyCbor::from_encode(get_fake_stake_snapshot())
678            }
679            _ => panic!("unexpected query from client: {query:?}"),
680        }
681    }
682
683    /// Creates a new work directory in the system's temporary folder.
684    fn create_temp_dir(folder_name: &str) -> PathBuf {
685        TempDir::create_with_short_path("pallas_chain_observer_test", folder_name)
686    }
687
688    /// Sets up a mock server for related tests.
689    ///
690    /// Use the `intersections` parameter to define exactly how many
691    /// local state queries should be intercepted by the `mock_server`
692    /// and avoid any panic errors.
693    async fn setup_server(socket_path: PathBuf, intersections: u32) -> tokio::task::JoinHandle<()> {
694        tokio::spawn({
695            async move {
696                if socket_path.exists() {
697                    fs::remove_file(&socket_path).expect("Previous socket removal failed");
698                }
699
700                let unix_listener = UnixListener::bind(socket_path.as_path()).unwrap();
701                let mut server = NodeServer::accept(&unix_listener, 10).await.unwrap();
702
703                server.statequery().recv_while_idle().await.unwrap();
704                server.statequery().send_acquired().await.unwrap();
705
706                for _ in 0..intersections {
707                    let result = mock_server(&mut server).await;
708                    server.statequery().send_result(result).await.unwrap();
709                }
710            }
711        })
712    }
713
714    #[tokio::test]
715    async fn get_current_epoch() {
716        let socket_path = create_temp_dir("get_current_epoch").join("node.socket");
717        let server = setup_server(socket_path.clone(), 2).await;
718        let client = tokio::spawn(async move {
719            let observer =
720                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
721            observer.get_current_epoch().await.unwrap().unwrap()
722        });
723
724        let (_, client_res) = tokio::join!(server, client);
725        let epoch = client_res.expect("Client failed");
726        assert_eq!(epoch, 8);
727    }
728
729    #[tokio::test]
730    async fn get_current_datums() {
731        let socket_path = create_temp_dir("get_current_datums").join("node.socket");
732        let server = setup_server(socket_path.clone(), 2).await;
733        let client = tokio::spawn(async move {
734            let observer =
735                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
736            let address =
737                "addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0".to_string();
738            observer.get_current_datums(&address).await.unwrap()
739        });
740
741        let (_, client_res) = tokio::join!(server, client);
742        let datums = client_res.expect("Client failed");
743        assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
744    }
745
746    #[tokio::test]
747    async fn get_current_stake_distribution() {
748        let socket_path = create_temp_dir("get_current_stake_distribution").join("node.socket");
749        let server = setup_server(socket_path.clone(), 2).await;
750        let client = tokio::spawn(async move {
751            let observer =
752                super::PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
753            observer.get_current_stake_distribution().await.unwrap()
754        });
755
756        let (_, client_res) = tokio::join!(server, client);
757        let computed_stake_distribution = client_res.unwrap().unwrap();
758
759        let mut expected_stake_distribution = StakeDistribution::new();
760        expected_stake_distribution.insert(
761            "pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
762            300000000001,
763        );
764        expected_stake_distribution.insert(
765            "pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
766            600000000001,
767        );
768        expected_stake_distribution.insert(
769            "pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
770            1200000000001,
771        );
772
773        assert_eq!(expected_stake_distribution, computed_stake_distribution);
774    }
775
776    #[tokio::test]
777    async fn get_current_kes_period() {
778        let socket_path = create_temp_dir("get_current_kes_period").join("node.socket");
779        let server = setup_server(socket_path.clone(), 3).await;
780        let client = tokio::spawn(async move {
781            let observer =
782                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
783
784            let keypair = ColdKeyGenerator::create_deterministic_keypair([0u8; 32]);
785            let mut dummy_key_buffer = [0u8; Sum6Kes::SIZE + 4];
786            let mut dummy_seed = [0u8; 32];
787            let (_, kes_verification_key) = Sum6Kes::keygen(&mut dummy_key_buffer, &mut dummy_seed);
788            let operational_certificate = OpCert::new(kes_verification_key, 0, 0, keypair);
789            observer
790                .get_current_kes_period(&operational_certificate)
791                .await
792                .unwrap()
793        });
794
795        let (_, client_res) = tokio::join!(server, client);
796        let kes_period = client_res.unwrap().unwrap();
797        assert_eq!(407, kes_period);
798    }
799
800    #[tokio::test]
801    async fn calculate_kes_period() {
802        let socket_path = create_temp_dir("calculate_kes_period").join("node.socket");
803        let observer = PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
804        let current_kes_period = observer
805            .calculate_kes_period(Point::Specific(53536042, vec![1, 2, 3]), 129600)
806            .await
807            .unwrap();
808
809        assert_eq!(413, current_kes_period);
810
811        let current_kes_period = observer
812            .calculate_kes_period(Point::Specific(53524800, vec![1, 2, 3]), 129600)
813            .await
814            .unwrap();
815
816        assert_eq!(413, current_kes_period);
817
818        let current_kes_period = observer
819            .calculate_kes_period(Point::Specific(53649999, vec![1, 2, 3]), 129600)
820            .await
821            .unwrap();
822
823        assert_eq!(413, current_kes_period);
824    }
825
826    #[tokio::test]
827    async fn get_chain_point() {
828        let socket_path = create_temp_dir("get_chain_point").join("node.socket");
829        let server = setup_server(socket_path.clone(), 1).await;
830        let client = tokio::spawn(async move {
831            let observer =
832                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
833            let mut client = observer.get_client().await.unwrap();
834            let statequery = client.statequery();
835            statequery.acquire(None).await.unwrap();
836            let chain_point = observer.do_get_chain_point_state_query(statequery).await.unwrap();
837            observer.post_process_statequery(&mut client).await.unwrap();
838            client.abort().await;
839            chain_point
840        });
841
842        let (_, client_res) = tokio::join!(server, client);
843        let chain_point = client_res.expect("Client failed");
844        assert_eq!(chain_point, Point::Specific(52851885, vec![1, 2, 3]));
845    }
846
847    #[tokio::test]
848    async fn get_genesis_config() {
849        let socket_path = create_temp_dir("get_genesis_config").join("node.socket");
850        let server = setup_server(socket_path.clone(), 2).await;
851        let client = tokio::spawn(async move {
852            let observer =
853                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
854            let mut client = observer.get_client().await.unwrap();
855            let statequery = client.statequery();
856            statequery.acquire(None).await.unwrap();
857            let genesis_config =
858                observer.do_get_genesis_config_state_query(statequery).await.unwrap();
859            observer.post_process_statequery(&mut client).await.unwrap();
860            client.abort().await;
861            genesis_config
862        });
863
864        let (_, client_res) = tokio::join!(server, client);
865        let genesis_config = client_res.expect("Client failed");
866        assert_eq!(genesis_config, get_fake_genesis_config());
867    }
868
869    #[tokio::test]
870    async fn fetch_current_era_from_state_query() {
871        let socket_path = create_temp_dir("get_current_era").join("node.socket");
872        let server = setup_server(socket_path.clone(), 1).await;
873        let client = tokio::spawn(async move {
874            let observer =
875                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
876            let mut client = observer.get_client().await.unwrap();
877            let statequery = client.statequery();
878            statequery.acquire(None).await.unwrap();
879            let era = observer.do_get_current_era_state_query(statequery).await.unwrap();
880            observer.post_process_statequery(&mut client).await.unwrap();
881            client.abort().await;
882            era
883        });
884
885        let (_, client_res) = tokio::join!(server, client);
886        let era = client_res.expect("Client failed");
887        assert_eq!(era, 4);
888    }
889
890    #[tokio::test]
891    async fn get_current_era() {
892        let socket_path = create_temp_dir("get_current_era_as_string").join("node.socket");
893        let server = setup_server(socket_path.clone(), 1).await;
894        let client = tokio::spawn(async move {
895            let observer =
896                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
897            observer.get_current_era().await.unwrap().unwrap()
898        });
899
900        let (_, client_res) = tokio::join!(server, client);
901        let era = client_res.expect("Client failed");
902
903        let expected_era = Era::try_from(4 + ERA_OFFSET).unwrap().to_string();
904        assert_eq!(era, expected_era);
905    }
906
907    #[tokio::test]
908    async fn get_current_chain_point() {
909        let socket_path = create_temp_dir("get_current_chain_point").join("node.socket");
910        let server = setup_server(socket_path.clone(), 2).await;
911        let client = tokio::spawn(async move {
912            let observer =
913                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
914            observer.get_current_chain_point().await.unwrap()
915        });
916
917        let (_, client_res) = tokio::join!(server, client);
918        let chain_point = client_res.expect("Client failed");
919        assert_eq!(
920            chain_point,
921            Some(ChainPoint {
922                slot_number: SlotNumber(52851885),
923                block_hash: "010203".to_string(),
924                block_number: BlockNumber(52851885)
925            })
926        );
927    }
928}