mithril_common/chain_observer/
pallas_observer.rs

1use std::collections::BTreeSet;
2use std::path::{Path, PathBuf};
3
4use anyhow::{anyhow, Context};
5use async_trait::async_trait;
6use pallas_addresses::Address;
7use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
8use pallas_network::{
9    facades::NodeClient,
10    miniprotocols::{
11        localstate::{
12            queries_v16::{
13                self, Addr, Addrs, ChainBlockNumber, GenesisConfig, PostAlonsoTransactionOutput,
14                StakeSnapshot, Stakes, TransactionOutput, UTxOByAddress,
15            },
16            Client,
17        },
18        Point,
19    },
20};
21use pallas_primitives::ToCanonicalJson;
22use pallas_traverse::Era;
23
24use crate::{
25    chain_observer::{interface::*, ChainAddress, TxDatum},
26    crypto_helper::{encode_bech32, KESPeriod, OpCert},
27    entities::{BlockNumber, ChainPoint, Epoch, SlotNumber, StakeDistribution},
28    CardanoNetwork, StdResult,
29};
30
31use super::model::{try_inspect, Datum, Datums};
32
33// The era value returned from the queries_v16::get_current_era has an offset of -1 with the era value of the pallas_traverse::Era due to Cardano node implementation.
34// It needs to be compensated to get the correct era display name.
35const ERA_OFFSET: u16 = 1;
36
37/// A runner that uses Pallas library to interact with a Cardano node using N2C Ouroboros mini-protocols
38pub struct PallasChainObserver {
39    socket: PathBuf,
40    network: CardanoNetwork,
41}
42
43impl From<anyhow::Error> for ChainObserverError {
44    fn from(err: anyhow::Error) -> Self {
45        ChainObserverError::General(err)
46    }
47}
48
49impl PallasChainObserver {
50    /// Creates a new PallasObserver
51    pub fn new(socket: &Path, network: CardanoNetwork) -> Self {
52        Self {
53            socket: socket.to_owned(),
54            network,
55        }
56    }
57
58    /// Creates and returns a new `NodeClient` connected to the specified socket.
59    async fn new_client(&self) -> StdResult<NodeClient> {
60        let magic = self.network.code();
61        let client = NodeClient::connect(&self.socket, magic).await?;
62
63        Ok(client)
64    }
65
66    /// Creates and returns a new `NodeClient`, handling any potential errors.
67    async fn get_client(&self) -> StdResult<NodeClient> {
68        self.new_client()
69            .await
70            .map_err(|err| anyhow!(err))
71            .with_context(|| "PallasChainObserver failed to create new client")
72    }
73
74    /// Fetches the current era using the provided `statequery` client.
75    async fn get_era(&self, statequery: &mut Client) -> StdResult<u16> {
76        statequery
77            .acquire(None)
78            .await
79            .map_err(|err| anyhow!(err))
80            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
81
82        let era = queries_v16::get_current_era(statequery)
83            .await
84            .map_err(|err| anyhow!(err))
85            .with_context(|| "PallasChainObserver failed to get current era")?;
86
87        Ok(era)
88    }
89
90    /// Fetches the current epoch number using the provided `statequery` client.
91    async fn get_epoch(&self, statequery: &mut Client) -> StdResult<u32> {
92        statequery
93            .acquire(None)
94            .await
95            .map_err(|err| anyhow!(err))
96            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
97
98        let era = queries_v16::get_current_era(statequery)
99            .await
100            .map_err(|err| anyhow!(err))
101            .with_context(|| "PallasChainObserver failed to get current era")?;
102
103        let epoch = queries_v16::get_block_epoch_number(statequery, era)
104            .await
105            .map_err(|err| anyhow!(err))
106            .with_context(|| "PallasChainObserver failed to get block epoch number")?;
107
108        Ok(epoch)
109    }
110
111    /// Returns inline datum tag from the given `Values` instance.
112    fn get_datum_tag(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TagWrap<Bytes, 24>> {
113        Ok(utxo
114            .inline_datum
115            .as_ref()
116            .with_context(|| "PallasChainObserver failed to get inline datum")?
117            .1
118            .clone())
119    }
120
121    /// Returns inline datums from the given `Values` instance.
122    fn inspect_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<Datum> {
123        let datum = self.get_datum_tag(utxo)?;
124        let datum = CborWrap(datum).to_vec();
125
126        try_inspect::<Datum>(datum)
127    }
128
129    /// Serializes datum to `TxDatum` instance.
130    fn serialize_datum(&self, utxo: &PostAlonsoTransactionOutput) -> StdResult<TxDatum> {
131        let datum = self.inspect_datum(utxo)?;
132        let serialized = serde_json::to_string(&datum.to_json())
133            .map_err(|err| anyhow!(err))
134            .with_context(|| "PallasChainObserver failed to serialize datum")?;
135
136        Ok(TxDatum(serialized))
137    }
138
139    /// Maps the given `UTxOByAddress` instance to Datums.
140    fn map_datums(&self, transaction: UTxOByAddress) -> StdResult<Datums> {
141        transaction
142            .utxo
143            .iter()
144            .filter_map(|(_, utxo)| match utxo {
145                TransactionOutput::Current(output) => output
146                    .inline_datum
147                    .as_ref()
148                    .map(|_| self.serialize_datum(output)),
149                _ => None,
150            })
151            .collect::<StdResult<Datums>>()
152    }
153
154    /// Returns a vector of `TxDatum` instances.
155    async fn get_utxo_datums(
156        &self,
157        client: &mut NodeClient,
158        address: &ChainAddress,
159    ) -> Result<Datums, ChainObserverError> {
160        let statequery = client.statequery();
161        let utxo = self.get_utxo_by_address(statequery, address).await?;
162
163        Ok(self.map_datums(utxo)?)
164    }
165
166    /// Fetches the current UTxO by address using the provided `statequery` client.
167    async fn get_utxo_by_address(
168        &self,
169        statequery: &mut Client,
170        address: &ChainAddress,
171    ) -> StdResult<UTxOByAddress> {
172        statequery
173            .acquire(None)
174            .await
175            .map_err(|err| anyhow!(err))
176            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
177
178        let era = queries_v16::get_current_era(statequery)
179            .await
180            .map_err(|err| anyhow!(err))
181            .with_context(|| "PallasChainObserver failed to get current era")?;
182
183        let addr: Address = Address::from_bech32(address)
184            .map_err(|err| anyhow!(err))
185            .with_context(|| "PallasChainObserver failed to parse address")?;
186
187        let addr: Addr = addr.to_vec().into();
188        let addrs: Addrs = vec![addr];
189        let utxo = queries_v16::get_utxo_by_address(statequery, era, addrs)
190            .await
191            .map_err(|err| anyhow!(err))
192            .with_context(|| "PallasChainObserver failed to get utxo")?;
193
194        Ok(utxo)
195    }
196
197    /// Fetches the current stake distribution using the provided `statequery` client.
198    async fn do_stake_snapshots_state_query(
199        &self,
200        statequery: &mut Client,
201    ) -> StdResult<StakeSnapshot> {
202        statequery
203            .acquire(None)
204            .await
205            .map_err(|err| anyhow!(err))
206            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
207
208        let era = queries_v16::get_current_era(statequery)
209            .await
210            .map_err(|err| anyhow!(err))
211            .with_context(|| "PallasChainObserver failed to get current era")?;
212
213        let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
214            .await
215            .map_err(|err| anyhow!(err))
216            .with_context(|| "PallasChainObserver failed to get stake snapshot")?;
217
218        Ok(state_snapshot)
219    }
220
221    /// Returns the stake pool hash from the given bytestring.
222    fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
223        let pool_id_bech32 = encode_bech32("pool", key)
224            .map_err(|err| anyhow!(err))
225            .with_context(|| "PallasChainObserver failed to encode stake pool hash")?;
226        Ok(pool_id_bech32)
227    }
228
229    /// Fetches the current stake distribution using the provided `statequery` client.
230    async fn get_stake_distribution(
231        &self,
232        client: &mut NodeClient,
233    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
234        let statequery = client.statequery();
235
236        let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;
237
238        let mut stake_distribution = StakeDistribution::new();
239
240        let have_stakes_in_two_epochs = |stakes: &Stakes| stakes.snapshot_mark_pool > 0;
241        for (key, stakes) in stake_snapshot
242            .snapshots
243            .stake_snapshots
244            .iter()
245            .filter(|(_, stakes)| have_stakes_in_two_epochs(stakes))
246        {
247            let pool_hash = self.get_stake_pool_hash(key)?;
248            stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
249        }
250
251        Ok(Some(stake_distribution))
252    }
253
254    /// # Calculate Current KES Period
255    ///
256    /// It calculates the current Key Evolving Signature (KES) period
257    /// based on the provided `chain_point` and `slots_per_kes_period`.
258    ///
259    /// The calculation formula is represented as:
260    ///
261    /// `current_kes_period = ⌊current_slot_number/slots_per_kes_period⌋`
262    ///
263    /// where:
264    /// - `current_slot_number` represents the current slot number given by the `point` on the chain.
265    /// - `slots_per_kes_period` represents the number of slots in a KES period.
266    /// - `⌊x⌋` is the floor function which rounds the greatest integer less than or equal to `x`.
267    ///
268    /// ## Example:
269    ///
270    /// let (chain_point, slots_per_kes_period) = (Point::new(1), 10);
271    /// match calculate_kes_period(&self, chain_point, slots_per_kes_period) {
272    ///     Ok(kes_period) => println!("Current KES Period: {}", kes_period),
273    ///     Err(e) => println!("Error occurred: {}", e),
274    /// }
275    async fn calculate_kes_period(
276        &self,
277        chain_point: Point,
278        slots_per_kes_period: u64,
279    ) -> Result<KESPeriod, ChainObserverError> {
280        if slots_per_kes_period == 0 {
281            return Err(anyhow!("slots_per_kes_period must be greater than 0"))
282                .with_context(|| "PallasChainObserver failed to calculate kes period")?;
283        }
284
285        let current_kes_period = chain_point.slot_or_default() / slots_per_kes_period;
286        Ok(u32::try_from(current_kes_period)
287            .map_err(|err| anyhow!(err))
288            .with_context(|| "PallasChainObserver failed to convert kes period")?)
289    }
290
291    /// Fetches the current chain point using the provided `statequery` client.
292    async fn do_get_chain_point_state_query(&self, statequery: &mut Client) -> StdResult<Point> {
293        let chain_point = queries_v16::get_chain_point(statequery)
294            .await
295            .map_err(|err| anyhow!(err))
296            .with_context(|| "PallasChainObserver failed to get chain point")?;
297
298        Ok(chain_point)
299    }
300
301    /// Fetches the current chain point using the provided `NodeClient`.
302    async fn do_get_chain_block_no(&self, statequery: &mut Client) -> StdResult<ChainBlockNumber> {
303        let chain_block_number = queries_v16::get_chain_block_no(statequery)
304            .await
305            .map_err(|err| anyhow!(err))
306            .with_context(|| "PallasChainObserver failed to get chain block number")?;
307
308        Ok(chain_block_number)
309    }
310
311    /// Fetches the current era using the provided `statequery` client.
312    async fn do_get_current_era_state_query(&self, statequery: &mut Client) -> StdResult<u16> {
313        let era = queries_v16::get_current_era(statequery)
314            .await
315            .map_err(|err| anyhow!(err))
316            .with_context(|| "PallasChainObserver failed to get current era")?;
317
318        Ok(era)
319    }
320
321    /// Fetches the current genesis config using the provided `statequery` client.
322    async fn do_get_genesis_config_state_query(
323        &self,
324        statequery: &mut Client,
325    ) -> StdResult<Vec<GenesisConfig>> {
326        let era = self.do_get_current_era_state_query(statequery).await?;
327        let genesis_config = queries_v16::get_genesis_config(statequery, era)
328            .await
329            .map_err(|err| anyhow!(err))
330            .with_context(|| "PallasChainObserver failed to get genesis config")?;
331
332        Ok(genesis_config)
333    }
334
335    /// Fetches the current chain point using the provided `NodeClient`.
336    async fn get_chain_point(&self, statequery: &mut Client) -> StdResult<ChainPoint> {
337        statequery
338            .acquire(None)
339            .await
340            .map_err(|err| anyhow!(err))
341            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
342
343        let chain_point = self.do_get_chain_point_state_query(statequery).await?;
344
345        let header_hash = match chain_point {
346            Point::Origin => None,
347            Point::Specific(_at_slot, ref hash) => Some(hex::encode(hash)),
348        };
349
350        let chain_block_number = self.do_get_chain_block_no(statequery).await?;
351
352        Ok(ChainPoint {
353            slot_number: SlotNumber(chain_point.slot_or_default()),
354            block_hash: header_hash.unwrap_or_default(),
355            block_number: BlockNumber(chain_block_number.block_number as u64),
356        })
357    }
358
359    /// Fetches chain point and genesis config through the local statequery.
360    /// The KES period is calculated afterwards.
361    async fn get_kes_period(
362        &self,
363        client: &mut NodeClient,
364    ) -> Result<Option<KESPeriod>, ChainObserverError> {
365        let statequery = client.statequery();
366
367        statequery
368            .acquire(None)
369            .await
370            .map_err(|err| anyhow!(err))
371            .with_context(|| "PallasChainObserver failed to acquire statequery")?;
372
373        let chain_point = self.do_get_chain_point_state_query(statequery).await?;
374
375        let genesis_config = self.do_get_genesis_config_state_query(statequery).await?;
376
377        let config = genesis_config
378            .first()
379            .with_context(|| "PallasChainObserver failed to extract the config")?;
380
381        let current_kes_period = self
382            .calculate_kes_period(chain_point, config.slots_per_kes_period as u64)
383            .await?;
384
385        Ok(Some(current_kes_period))
386    }
387
388    /// Processes a state query with the `NodeClient`, releasing the state query.
389    async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
390        let statequery = client.statequery();
391        statequery
392            .send_release()
393            .await
394            .map_err(|err| anyhow!(err))
395            .with_context(|| "PallasChainObserver send release failed")?;
396
397        statequery
398            .send_done()
399            .await
400            .map_err(|err| anyhow!(err))
401            .with_context(|| "PallasChainObserver send done failed")?;
402
403        Ok(())
404    }
405
406    /// Synchronizes the `NodeClient` with the cardano server using `chainsync`.
407    async fn sync(&self, client: &mut NodeClient) -> StdResult<()> {
408        client
409            .chainsync()
410            .send_done()
411            .await
412            .map_err(|err| anyhow!(err))
413            .with_context(|| "PallasChainObserver chainsync send done failed")?;
414        Ok(())
415    }
416
417    /// Post-processes a state query afterwards.
418    async fn post_process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
419        self.process_statequery(client).await?;
420        self.sync(client).await?;
421
422        Ok(())
423    }
424}
425
426#[async_trait]
427impl ChainObserver for PallasChainObserver {
428    async fn get_current_era(&self) -> Result<Option<String>, ChainObserverError> {
429        let mut client = self.get_client().await?;
430
431        let era = self.get_era(client.statequery()).await?;
432
433        let era = Era::try_from(era + ERA_OFFSET)
434            .with_context(|| "PallasChainObserver failed to convert: '{era}' to Era")?;
435
436        self.post_process_statequery(&mut client).await?;
437
438        client.abort().await;
439
440        Ok(Some(era.to_string()))
441    }
442
443    async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
444        let mut client = self.get_client().await?;
445
446        let epoch = self.get_epoch(client.statequery()).await?;
447
448        self.post_process_statequery(&mut client).await?;
449
450        client.abort().await;
451
452        Ok(Some(Epoch(epoch as u64)))
453    }
454
455    async fn get_current_chain_point(&self) -> Result<Option<ChainPoint>, ChainObserverError> {
456        let mut client = self.get_client().await?;
457
458        let chain_point = self.get_chain_point(client.statequery()).await?;
459
460        self.post_process_statequery(&mut client).await?;
461
462        client.abort().await;
463
464        Ok(Some(chain_point))
465    }
466
467    async fn get_current_datums(
468        &self,
469        address: &ChainAddress,
470    ) -> Result<Datums, ChainObserverError> {
471        let mut client = self.get_client().await?;
472
473        let datums = self.get_utxo_datums(&mut client, address).await?;
474
475        self.post_process_statequery(&mut client).await?;
476
477        client.abort().await;
478
479        Ok(datums)
480    }
481
482    async fn get_current_stake_distribution(
483        &self,
484    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
485        let mut client = self.get_client().await?;
486
487        let stake_distribution = self.get_stake_distribution(&mut client).await?;
488
489        self.post_process_statequery(&mut client).await?;
490
491        client.abort().await;
492
493        Ok(stake_distribution)
494    }
495
496    async fn get_current_kes_period(
497        &self,
498        _opcert: &OpCert,
499    ) -> Result<Option<KESPeriod>, ChainObserverError> {
500        let mut client = self.get_client().await?;
501
502        let current_kes_period = self.get_kes_period(&mut client).await?;
503
504        self.post_process_statequery(&mut client).await?;
505
506        client.abort().await;
507
508        Ok(current_kes_period)
509    }
510}
511
512#[cfg(test)]
513mod tests {
514    use std::fs;
515
516    use kes_summed_ed25519::{kes::Sum6Kes, traits::KesSk};
517    use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
518    use pallas_crypto::hash::Hash;
519    use pallas_network::miniprotocols::{
520        localstate::{
521            queries_v16::{
522                BlockQuery, ChainBlockNumber, Fraction, GenesisConfig, HardForkQuery, LedgerQuery,
523                Request, Snapshots, StakeSnapshot, SystemStart, Value,
524            },
525            ClientQueryRequest,
526        },
527        Point,
528    };
529    use tokio::net::UnixListener;
530
531    use crate::test_utils::TempDir;
532    use crate::{crypto_helper::ColdKeyGenerator, CardanoNetwork};
533
534    use super::*;
535
536    fn get_fake_utxo_by_address() -> UTxOByAddress {
537        let tx_hex = "1e4e5cf2889d52f1745b941090f04a65dea6ce56c5e5e66e69f65c8e36347c17";
538        let tx_bytes: [u8; 32] = hex::decode(tx_hex).unwrap().try_into().unwrap();
539        let transaction_id = Hash::from(tx_bytes);
540        let index = AnyUInt::MajorByte(2);
541        let lovelace = AnyUInt::MajorByte(2);
542        let hex_datum = "D8799F58407B226D61726B657273223A5B7B226E616D65223A227468616C6573222C2265706F6368223A307D5D2C227369676E6174757265223A22383566323265626261645840333335376338656132646630363230393766396131383064643335643966336261316432363832633732633864313232383866616438636238643063656565625838366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227DFF";
543        let datum = hex::decode(hex_datum).unwrap().into();
544        let tag = TagWrap::<_, 24>::new(datum);
545        let inline_datum = Some((1_u16, tag));
546
547        let address: Address =
548            Address::from_bech32("addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0")
549                .unwrap();
550        let address: Addr = address.to_vec().into();
551        let values = TransactionOutput::Current(PostAlonsoTransactionOutput {
552            address,
553            amount: Value::Coin(lovelace),
554            inline_datum,
555            script_ref: None,
556        });
557        let utxo = KeyValuePairs::from(vec![(
558            queries_v16::UTxO {
559                transaction_id,
560                index,
561            },
562            values,
563        )]);
564
565        UTxOByAddress { utxo }
566    }
567
568    fn get_fake_stake_snapshot() -> StakeSnapshot {
569        let stake_snapshots = KeyValuePairs::from(vec![
570            (
571                Bytes::from(
572                    hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
573                        .unwrap(),
574                ),
575                Stakes {
576                    snapshot_mark_pool: 300000000001,
577                    snapshot_set_pool: 300000000002,
578                    snapshot_go_pool: 300000000000,
579                },
580            ),
581            (
582                Bytes::from(
583                    hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
584                        .unwrap(),
585                ),
586                Stakes {
587                    snapshot_mark_pool: 600000000001,
588                    snapshot_set_pool: 600000000002,
589                    snapshot_go_pool: 600000000000,
590                },
591            ),
592            (
593                Bytes::from(
594                    hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
595                        .unwrap(),
596                ),
597                Stakes {
598                    snapshot_mark_pool: 1200000000001,
599                    snapshot_set_pool: 1200000000002,
600                    snapshot_go_pool: 1200000000000,
601                },
602            ),
603            (
604                Bytes::from(
605                    hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
606                        .unwrap(),
607                ),
608                Stakes {
609                    snapshot_mark_pool: 0,
610                    snapshot_set_pool: 1300000000002,
611                    snapshot_go_pool: 0,
612                },
613            ),
614        ]);
615
616        StakeSnapshot {
617            snapshots: Snapshots {
618                stake_snapshots,
619                snapshot_stake_mark_total: 2100000000003,
620                snapshot_stake_set_total: 2100000000006,
621                snapshot_stake_go_total: 2100000000000,
622            },
623        }
624    }
625
626    fn get_fake_genesis_config() -> Vec<GenesisConfig> {
627        let genesis = GenesisConfig {
628            system_start: SystemStart {
629                year: 2021,
630                day_of_year: 150,
631                picoseconds_of_day: 0,
632            },
633            network_magic: 42,
634            network_id: 42,
635            active_slots_coefficient: Fraction { num: 6, den: 10 },
636            security_param: 2160,
637            epoch_length: 432000,
638            slots_per_kes_period: 129600,
639            max_kes_evolutions: 62,
640            slot_length: 1,
641            update_quorum: 5,
642            max_lovelace_supply: AnyUInt::MajorByte(2),
643        };
644
645        vec![genesis]
646    }
647
648    /// pallas responses mock server.
649    async fn mock_server(server: &mut pallas_network::facades::NodeServer) -> AnyCbor {
650        let query: queries_v16::Request =
651            match server.statequery().recv_while_acquired().await.unwrap() {
652                ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
653                x => panic!("unexpected message from client: {x:?}"),
654            };
655
656        match query {
657            Request::GetChainPoint => {
658                AnyCbor::from_encode(Point::Specific(52851885, vec![1, 2, 3]))
659            }
660            Request::GetChainBlockNo => AnyCbor::from_encode(ChainBlockNumber {
661                slot_timeline: 1,
662                block_number: 52851885,
663            }),
664            Request::LedgerQuery(LedgerQuery::HardForkQuery(HardForkQuery::GetCurrentEra)) => {
665                AnyCbor::from_encode(4)
666            }
667            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetEpochNo)) => {
668                AnyCbor::from_encode([8])
669            }
670            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetGenesisConfig)) => {
671                AnyCbor::from_encode(get_fake_genesis_config())
672            }
673            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetUTxOByAddress(_))) => {
674                AnyCbor::from_encode(get_fake_utxo_by_address())
675            }
676            Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetStakeSnapshots(_))) => {
677                AnyCbor::from_encode(get_fake_stake_snapshot())
678            }
679            _ => panic!("unexpected query from client: {query:?}"),
680        }
681    }
682
683    /// Creates a new work directory in the system's temporary folder.
684    fn create_temp_dir(folder_name: &str) -> PathBuf {
685        TempDir::create_with_short_path("pallas_chain_observer_test", folder_name)
686    }
687
688    /// Sets up a mock server for related tests.
689    ///
690    /// Use the `intersections` parameter to define exactly how many
691    /// local state queries should be intercepted by the `mock_server`
692    /// and avoid any panic errors.
693    async fn setup_server(socket_path: PathBuf, intersections: u32) -> tokio::task::JoinHandle<()> {
694        tokio::spawn({
695            async move {
696                if socket_path.exists() {
697                    fs::remove_file(&socket_path).expect("Previous socket removal failed");
698                }
699
700                let unix_listener = UnixListener::bind(socket_path.as_path()).unwrap();
701                let mut server = pallas_network::facades::NodeServer::accept(&unix_listener, 10)
702                    .await
703                    .unwrap();
704
705                server.statequery().recv_while_idle().await.unwrap();
706                server.statequery().send_acquired().await.unwrap();
707
708                for _ in 0..intersections {
709                    let result = mock_server(&mut server).await;
710                    server.statequery().send_result(result).await.unwrap();
711                }
712            }
713        })
714    }
715
716    #[tokio::test]
717    async fn get_current_epoch() {
718        let socket_path = create_temp_dir("get_current_epoch").join("node.socket");
719        let server = setup_server(socket_path.clone(), 2).await;
720        let client = tokio::spawn(async move {
721            let observer =
722                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
723            observer.get_current_epoch().await.unwrap().unwrap()
724        });
725
726        let (_, client_res) = tokio::join!(server, client);
727        let epoch = client_res.expect("Client failed");
728        assert_eq!(epoch, 8);
729    }
730
731    #[tokio::test]
732    async fn get_current_datums() {
733        let socket_path = create_temp_dir("get_current_datums").join("node.socket");
734        let server = setup_server(socket_path.clone(), 2).await;
735        let client = tokio::spawn(async move {
736            let observer =
737                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
738            let address =
739                "addr_test1vr80076l3x5uw6n94nwhgmv7ssgy6muzf47ugn6z0l92rhg2mgtu0".to_string();
740            observer.get_current_datums(&address).await.unwrap()
741        });
742
743        let (_, client_res) = tokio::join!(server, client);
744        let datums = client_res.expect("Client failed");
745        assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
746    }
747
748    #[tokio::test]
749    async fn get_current_stake_distribution() {
750        let socket_path = create_temp_dir("get_current_stake_distribution").join("node.socket");
751        let server = setup_server(socket_path.clone(), 2).await;
752        let client = tokio::spawn(async move {
753            let observer =
754                super::PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
755            observer.get_current_stake_distribution().await.unwrap()
756        });
757
758        let (_, client_res) = tokio::join!(server, client);
759        let computed_stake_distribution = client_res.unwrap().unwrap();
760
761        let mut expected_stake_distribution = StakeDistribution::new();
762        expected_stake_distribution.insert(
763            "pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
764            300000000001,
765        );
766        expected_stake_distribution.insert(
767            "pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
768            600000000001,
769        );
770        expected_stake_distribution.insert(
771            "pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
772            1200000000001,
773        );
774
775        assert_eq!(expected_stake_distribution, computed_stake_distribution);
776    }
777
778    #[tokio::test]
779    async fn get_current_kes_period() {
780        let socket_path = create_temp_dir("get_current_kes_period").join("node.socket");
781        let server = setup_server(socket_path.clone(), 3).await;
782        let client = tokio::spawn(async move {
783            let observer =
784                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
785
786            let keypair = ColdKeyGenerator::create_deterministic_keypair([0u8; 32]);
787            let mut dummy_key_buffer = [0u8; Sum6Kes::SIZE + 4];
788            let mut dummy_seed = [0u8; 32];
789            let (_, kes_verification_key) = Sum6Kes::keygen(&mut dummy_key_buffer, &mut dummy_seed);
790            let operational_certificate = OpCert::new(kes_verification_key, 0, 0, keypair);
791            observer
792                .get_current_kes_period(&operational_certificate)
793                .await
794                .unwrap()
795        });
796
797        let (_, client_res) = tokio::join!(server, client);
798        let kes_period = client_res.unwrap().unwrap();
799        assert_eq!(407, kes_period);
800    }
801
802    #[tokio::test]
803    async fn calculate_kes_period() {
804        let socket_path = create_temp_dir("get_current_kes_period").join("node.socket");
805        let observer = PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
806        let current_kes_period = observer
807            .calculate_kes_period(Point::Specific(53536042, vec![1, 2, 3]), 129600)
808            .await
809            .unwrap();
810
811        assert_eq!(413, current_kes_period);
812
813        let current_kes_period = observer
814            .calculate_kes_period(Point::Specific(53524800, vec![1, 2, 3]), 129600)
815            .await
816            .unwrap();
817
818        assert_eq!(413, current_kes_period);
819
820        let current_kes_period = observer
821            .calculate_kes_period(Point::Specific(53649999, vec![1, 2, 3]), 129600)
822            .await
823            .unwrap();
824
825        assert_eq!(413, current_kes_period);
826    }
827
828    #[tokio::test]
829    async fn get_chain_point() {
830        let socket_path = create_temp_dir("get_chain_point").join("node.socket");
831        let server = setup_server(socket_path.clone(), 1).await;
832        let client = tokio::spawn(async move {
833            let observer =
834                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
835            let mut client = observer.get_client().await.unwrap();
836            let statequery = client.statequery();
837            statequery.acquire(None).await.unwrap();
838            let chain_point = observer
839                .do_get_chain_point_state_query(statequery)
840                .await
841                .unwrap();
842            observer.post_process_statequery(&mut client).await.unwrap();
843            client.abort().await;
844            chain_point
845        });
846
847        let (_, client_res) = tokio::join!(server, client);
848        let chain_point = client_res.expect("Client failed");
849        assert_eq!(chain_point, Point::Specific(52851885, vec![1, 2, 3]));
850    }
851
852    #[tokio::test]
853    async fn get_genesis_config() {
854        let socket_path = create_temp_dir("get_genesis_config").join("node.socket");
855        let server = setup_server(socket_path.clone(), 2).await;
856        let client = tokio::spawn(async move {
857            let observer =
858                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
859            let mut client = observer.get_client().await.unwrap();
860            let statequery = client.statequery();
861            statequery.acquire(None).await.unwrap();
862            let genesis_config = observer
863                .do_get_genesis_config_state_query(statequery)
864                .await
865                .unwrap();
866            observer.post_process_statequery(&mut client).await.unwrap();
867            client.abort().await;
868            genesis_config
869        });
870
871        let (_, client_res) = tokio::join!(server, client);
872        let genesis_config = client_res.expect("Client failed");
873        assert_eq!(genesis_config, get_fake_genesis_config());
874    }
875
876    #[tokio::test]
877    async fn fetch_current_era_from_state_query() {
878        let socket_path = create_temp_dir("get_current_era").join("node.socket");
879        let server = setup_server(socket_path.clone(), 1).await;
880        let client = tokio::spawn(async move {
881            let observer =
882                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
883            let mut client = observer.get_client().await.unwrap();
884            let statequery = client.statequery();
885            statequery.acquire(None).await.unwrap();
886            let era = observer
887                .do_get_current_era_state_query(statequery)
888                .await
889                .unwrap();
890            observer.post_process_statequery(&mut client).await.unwrap();
891            client.abort().await;
892            era
893        });
894
895        let (_, client_res) = tokio::join!(server, client);
896        let era = client_res.expect("Client failed");
897        assert_eq!(era, 4);
898    }
899
900    #[tokio::test]
901    async fn get_current_era() {
902        let socket_path = create_temp_dir("get_current_era_as_string").join("node.socket");
903        let server = setup_server(socket_path.clone(), 1).await;
904        let client = tokio::spawn(async move {
905            let observer =
906                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
907            observer.get_current_era().await.unwrap().unwrap()
908        });
909
910        let (_, client_res) = tokio::join!(server, client);
911        let era = client_res.expect("Client failed");
912
913        let expected_era = Era::try_from(4 + ERA_OFFSET).unwrap().to_string();
914        assert_eq!(era, expected_era);
915    }
916
917    #[tokio::test]
918    async fn get_current_chain_point() {
919        let socket_path = create_temp_dir("get_current_chain_point").join("node.socket");
920        let server = setup_server(socket_path.clone(), 2).await;
921        let client = tokio::spawn(async move {
922            let observer =
923                PallasChainObserver::new(socket_path.as_path(), CardanoNetwork::TestNet(10));
924            observer.get_current_chain_point().await.unwrap()
925        });
926
927        let (_, client_res) = tokio::join!(server, client);
928        let chain_point = client_res.expect("Client failed");
929        assert_eq!(
930            chain_point,
931            Some(ChainPoint {
932                slot_number: SlotNumber(52851885),
933                block_hash: "010203".to_string(),
934                block_number: BlockNumber(52851885)
935            })
936        );
937    }
938}