mithril_cardano_node_chain/chain_observer/
cli_observer.rs

1use anyhow::{Context, anyhow};
2use async_trait::async_trait;
3use hex::FromHex;
4use nom::IResult;
5use rand_core::RngCore;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::fs;
9use std::path::PathBuf;
10use tokio::process::Command;
11
12use mithril_common::crypto_helper::{KesPeriod, encode_bech32};
13use mithril_common::entities::{BlockNumber, ChainPoint, Epoch, SlotNumber, StakeDistribution};
14use mithril_common::{CardanoNetwork, StdResult};
15
16use crate::entities::{ChainAddress, TxDatum};
17
18use super::interface::{ChainObserver, ChainObserverError};
19
20const CARDANO_ERA: &str = "latest";
21
22/// `CliRunner` trait defines the asynchronous methods
23/// for interaction with the Cardano CLI.
24#[async_trait]
25pub trait CliRunner {
26    /// Launches a UTxO.
27    async fn launch_utxo(&self, address: &str) -> StdResult<String>;
28    /// Launches the stake distribution.
29    async fn launch_stake_distribution(&self) -> StdResult<String>;
30    /// Launches the stake snapshot.
31    async fn launch_stake_snapshot(&self, stake_pool_id: &str) -> StdResult<String>;
32    /// Launches the stake snapshot for all pools.
33    async fn launch_stake_snapshot_all_pools(&self) -> StdResult<String>;
34    /// Launches the era info.
35    async fn launch_era(&self) -> StdResult<String>;
36    /// Launches the epoch info.
37    async fn launch_epoch(&self) -> StdResult<String>;
38    /// Launches the chain point.
39    async fn launch_chain_point(&self) -> StdResult<String>;
40    /// Launches the kes period.
41    async fn launch_kes_period(&self) -> StdResult<(String, u64)>;
42}
43
44/// A runner able to request data from a Cardano node using the
45/// [Cardano Cli](https://docs.cardano.org/getting-started/use-cli).
46#[derive(Clone, Debug)]
47pub struct CardanoCliRunner {
48    cli_path: PathBuf,
49    socket_path: PathBuf,
50    network: CardanoNetwork,
51}
52
53impl CardanoCliRunner {
54    /// CardanoCliRunner factory
55    pub fn new(cli_path: PathBuf, socket_path: PathBuf, network: CardanoNetwork) -> Self {
56        Self {
57            cli_path,
58            socket_path,
59            network,
60        }
61    }
62
63    fn random_out_file() -> StdResult<PathBuf> {
64        let mut rng = rand_core::OsRng;
65        let dir = std::env::temp_dir().join("cardano-cli-runner");
66        if !dir.exists() {
67            fs::create_dir_all(&dir)?;
68        }
69        Ok(dir.join(format!("{}.out", rng.next_u64())))
70    }
71
72    fn command_for_utxo(&self, address: &str, out_file: PathBuf) -> Command {
73        let mut command = self.get_command();
74        command
75            .arg(CARDANO_ERA)
76            .arg("query")
77            .arg("utxo")
78            .arg("--address")
79            .arg(address)
80            .arg("--out-file")
81            .arg(out_file);
82        self.post_config_command(&mut command);
83
84        command
85    }
86
87    fn command_for_stake_distribution(&self) -> Command {
88        let mut command = self.get_command();
89        command.arg(CARDANO_ERA).arg("query").arg("stake-distribution");
90        self.post_config_command(&mut command);
91
92        command
93    }
94
95    fn command_for_stake_snapshot(&self, stake_pool_id: &str) -> Command {
96        let mut command = self.get_command();
97        command
98            .arg(CARDANO_ERA)
99            .arg("query")
100            .arg("stake-snapshot")
101            .arg("--stake-pool-id")
102            .arg(stake_pool_id);
103        self.post_config_command(&mut command);
104
105        command
106    }
107
108    fn command_for_stake_snapshot_all_pools(&self) -> Command {
109        let mut command = self.get_command();
110        command
111            .arg(CARDANO_ERA)
112            .arg("query")
113            .arg("stake-snapshot")
114            .arg("--all-stake-pools");
115        self.post_config_command(&mut command);
116
117        command
118    }
119
120    fn command_for_era(&self) -> Command {
121        let mut command = self.get_command();
122        command.arg(CARDANO_ERA).arg("query").arg("tip");
123        self.post_config_command(&mut command);
124
125        command
126    }
127
128    fn command_for_epoch(&self) -> Command {
129        let mut command = self.get_command();
130        command.arg(CARDANO_ERA).arg("query").arg("tip");
131        self.post_config_command(&mut command);
132
133        command
134    }
135
136    fn command_for_chain_point(&self) -> Command {
137        let mut command = self.get_command();
138        command.arg(CARDANO_ERA).arg("query").arg("tip");
139        self.post_config_command(&mut command);
140
141        command
142    }
143
144    fn command_for_kes_period(&self) -> Command {
145        let mut command = self.get_command();
146        command.arg(CARDANO_ERA).arg("query").arg("tip");
147        self.post_config_command(&mut command);
148
149        command
150    }
151
152    fn get_command(&self) -> Command {
153        let mut command = Command::new(&self.cli_path);
154        command.env(
155            "CARDANO_NODE_SOCKET_PATH",
156            self.socket_path.to_string_lossy().as_ref(),
157        );
158
159        command
160    }
161
162    fn post_config_command<'a>(&'a self, command: &'a mut Command) -> &'a mut Command {
163        match self.network {
164            CardanoNetwork::MainNet => command.arg("--mainnet"),
165            CardanoNetwork::TestNet(magic) => {
166                command.args(vec!["--testnet-magic", &magic.to_string()])
167            }
168        }
169    }
170
171    /// Get slots per kes period
172    ///
173    /// This implementation is aligned with current value for the KES period on the testnet and mainnet networks of Cardano.
174    /// If this value changes in the future, the implementation should be updated accordingly.
175    /// The value can be retrieved in the 'slotsPerKESPeriod' field of the 'shelly-genesis.json' configuration file.
176    fn get_slots_per_kes_period(&self) -> u64 {
177        match self.network {
178            CardanoNetwork::MainNet => 129600,
179            CardanoNetwork::TestNet(1) => 129600,
180            CardanoNetwork::TestNet(2) => 129600,
181            CardanoNetwork::TestNet(_) => 129600,
182        }
183    }
184}
185
186#[async_trait]
187impl CliRunner for CardanoCliRunner {
188    async fn launch_utxo(&self, address: &str) -> StdResult<String> {
189        let out_file = Self::random_out_file()?;
190        let output = self.command_for_utxo(address, out_file.clone()).output().await?;
191
192        if output.status.success() {
193            Ok(fs::read_to_string(out_file)?.trim().to_string())
194        } else {
195            let message = String::from_utf8_lossy(&output.stderr);
196
197            Err(anyhow!(
198                "Error launching command {:?}, error = '{}'",
199                self.command_for_utxo(address, out_file),
200                message
201            ))
202        }
203    }
204
205    async fn launch_stake_distribution(&self) -> StdResult<String> {
206        let output = self.command_for_stake_distribution().output().await?;
207
208        if output.status.success() {
209            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
210        } else {
211            let message = String::from_utf8_lossy(&output.stderr);
212
213            Err(anyhow!(
214                "Error launching command {:?}, error = '{}'",
215                self.command_for_stake_distribution(),
216                message
217            ))
218        }
219    }
220
221    async fn launch_stake_snapshot(&self, stake_pool_id: &str) -> StdResult<String> {
222        let output = self.command_for_stake_snapshot(stake_pool_id).output().await?;
223
224        if output.status.success() {
225            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
226        } else {
227            let message = String::from_utf8_lossy(&output.stderr);
228
229            Err(anyhow!(
230                "Error launching command {:?}, error = '{}'",
231                self.command_for_stake_snapshot(stake_pool_id),
232                message
233            ))
234        }
235    }
236
237    async fn launch_stake_snapshot_all_pools(&self) -> StdResult<String> {
238        let output = self.command_for_stake_snapshot_all_pools().output().await?;
239
240        if output.status.success() {
241            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
242        } else {
243            let message = String::from_utf8_lossy(&output.stderr);
244
245            Err(anyhow!(
246                "Error launching command {:?}, error = '{}'",
247                self.command_for_stake_snapshot_all_pools(),
248                message
249            ))
250        }
251    }
252
253    async fn launch_era(&self) -> StdResult<String> {
254        let output = self.command_for_era().output().await?;
255
256        if output.status.success() {
257            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
258        } else {
259            let message = String::from_utf8_lossy(&output.stderr);
260
261            Err(anyhow!(
262                "Error launching command {:?}, error = '{}'",
263                self.command_for_era(),
264                message
265            ))
266        }
267    }
268
269    async fn launch_epoch(&self) -> StdResult<String> {
270        let output = self.command_for_epoch().output().await?;
271
272        if output.status.success() {
273            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
274        } else {
275            let message = String::from_utf8_lossy(&output.stderr);
276
277            Err(anyhow!(
278                "Error launching command {:?}, error = '{}'",
279                self.command_for_epoch(),
280                message
281            ))
282        }
283    }
284
285    async fn launch_chain_point(&self) -> StdResult<String> {
286        let output = self.command_for_chain_point().output().await?;
287
288        if output.status.success() {
289            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
290        } else {
291            let message = String::from_utf8_lossy(&output.stderr);
292
293            Err(anyhow!(
294                "Error launching command {:?}, error = '{}'",
295                self.command_for_chain_point(),
296                message
297            ))
298        }
299    }
300
301    async fn launch_kes_period(&self) -> StdResult<(String, u64)> {
302        let output = self.command_for_kes_period().output().await?;
303
304        if output.status.success() {
305            Ok((
306                std::str::from_utf8(&output.stdout)?.trim().to_string(),
307                self.get_slots_per_kes_period(),
308            ))
309        } else {
310            let message = String::from_utf8_lossy(&output.stderr);
311
312            Err(anyhow!(
313                "Error launching command {:?}, error = '{}'",
314                self.command_for_kes_period(),
315                message
316            ))
317        }
318    }
319}
320
321/// A [ChainObserver] pulling it's data using a [CardanoCliRunner].
322pub struct CardanoCliChainObserver {
323    cli_runner: Box<dyn CliRunner + Send + Sync>,
324}
325
326impl CardanoCliChainObserver {
327    /// CardanoCliChainObserver factory
328    pub fn new(cli_runner: Box<dyn CliRunner + Send + Sync>) -> Self {
329        Self { cli_runner }
330    }
331
332    // This is the only way I found to tell the compiler the correct types
333    // and lifetimes for the function `double`.
334    fn parse_string<'a>(&'a self, string: &'a str) -> IResult<&'a str, f64> {
335        nom::number::complete::double(string)
336    }
337
338    async fn get_current_stake_value(
339        &self,
340        stake_pool_id: &str,
341    ) -> Result<u64, ChainObserverError> {
342        let stake_pool_snapshot_output = self
343            .cli_runner
344            .launch_stake_snapshot(stake_pool_id)
345            .await
346            .map_err(ChainObserverError::General)?;
347        let stake_pool_snapshot: Value = serde_json::from_str(&stake_pool_snapshot_output)
348            .with_context(|| format!("output was = '{stake_pool_snapshot_output}'"))
349            .map_err(ChainObserverError::InvalidContent)?;
350        if let Value::Number(stake_pool_stake) = &stake_pool_snapshot["poolStakeMark"] {
351            return stake_pool_stake.as_u64().ok_or_else(|| {
352                ChainObserverError::InvalidContent(anyhow!(
353                    "Error: could not parse stake pool value as u64 {stake_pool_stake:?}"
354                ))
355            });
356        }
357        Err(ChainObserverError::InvalidContent(anyhow!(
358            "Error: could not parse stake pool snapshot {stake_pool_snapshot:?}"
359        )))
360    }
361
362    // This is the legacy way of computing stake distribution, not optimized for mainnet, and usable for versions of the Cardano node up to '1.35.7'
363    async fn get_current_stake_distribution_legacy(
364        &self,
365    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
366        let output = self
367            .cli_runner
368            .launch_stake_distribution()
369            .await
370            .map_err(ChainObserverError::General)?;
371        let mut stake_distribution = StakeDistribution::new();
372
373        for (num, line) in output.lines().enumerate() {
374            let words: Vec<&str> = line.split_ascii_whitespace().collect();
375
376            if num < 2 || words.len() != 2 {
377                continue;
378            }
379
380            let stake_pool_id = words[0];
381            let stake_fraction = words[1];
382
383            if let Ok((_, _f)) = self.parse_string(stake_fraction) {
384                // This block is a fix:
385                // the stake retrieved was computed on the current epoch, when we need a value computed on the previous epoch
386                // in 'let stake: u64 = (f * 1_000_000_000.0).round() as u64;'
387                let stake: u64 = self.get_current_stake_value(stake_pool_id).await?;
388
389                if stake > 0 {
390                    let _ = stake_distribution.insert(stake_pool_id.to_string(), stake);
391                }
392            } else {
393                return Err(ChainObserverError::InvalidContent(anyhow!(
394                    "could not parse stake from '{}'",
395                    words[1]
396                )));
397            }
398        }
399
400        Ok(Some(stake_distribution))
401    }
402
403    // This is the new way of computing stake distribution, optimized for mainnet, and usable for versions of the Cardano node from '8.0.0'
404    async fn get_current_stake_distribution_optimized(
405        &self,
406    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
407        let output = self
408            .cli_runner
409            .launch_stake_snapshot_all_pools()
410            .await
411            .map_err(ChainObserverError::General)?;
412        let mut stake_distribution = StakeDistribution::new();
413
414        let data: HashMap<String, Value> =
415            serde_json::from_str(&output).map_err(|e| ChainObserverError::General(e.into()))?;
416        let pools_data = data
417            .get("pools")
418            .ok_or(ChainObserverError::InvalidContent(anyhow!(
419                "Missing 'pools' field"
420            )))?
421            .as_object()
422            .ok_or(ChainObserverError::InvalidContent(anyhow!(
423                "Could not convert pool data to object"
424            )))?;
425
426        for (k, v) in pools_data.iter() {
427            let pool_id_hex = k;
428            let pool_id_bech32 = encode_bech32(
429                "pool",
430                &Vec::from_hex(pool_id_hex.as_bytes())
431                    .map_err(|e| ChainObserverError::General(e.into()))?,
432            )
433            .map_err(ChainObserverError::General)?;
434            let stakes = v
435                .get("stakeMark")
436                .ok_or(ChainObserverError::InvalidContent(anyhow!(
437                    "Missing 'stakeMark' field for {pool_id_bech32}"
438                )))?
439                .as_u64()
440                .ok_or(ChainObserverError::InvalidContent(anyhow!(
441                    "Stake could not be converted to integer for {pool_id_bech32}"
442                )))?;
443            if stakes > 0 {
444                stake_distribution.insert(pool_id_bech32, stakes);
445            }
446        }
447
448        Ok(Some(stake_distribution))
449    }
450}
451
452#[async_trait]
453impl ChainObserver for CardanoCliChainObserver {
454    async fn get_current_era(&self) -> Result<Option<String>, ChainObserverError> {
455        let output = self
456            .cli_runner
457            .launch_era()
458            .await
459            .map_err(ChainObserverError::General)?;
460        let v: Value = serde_json::from_str(&output)
461            .with_context(|| format!("output was = '{output}'"))
462            .map_err(ChainObserverError::InvalidContent)?;
463
464        if let Value::String(era) = &v["era"] {
465            Ok(Some(era.to_string()))
466        } else {
467            Ok(None)
468        }
469    }
470
471    async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
472        let output = self
473            .cli_runner
474            .launch_epoch()
475            .await
476            .map_err(ChainObserverError::General)?;
477        let v: Value = serde_json::from_str(&output)
478            .with_context(|| format!("output was = '{output}'"))
479            .map_err(ChainObserverError::InvalidContent)?;
480
481        if let Value::Number(epoch) = &v["epoch"] {
482            Ok(epoch.as_u64().map(Epoch))
483        } else {
484            Ok(None)
485        }
486    }
487
488    async fn get_current_chain_point(&self) -> Result<Option<ChainPoint>, ChainObserverError> {
489        let output = self
490            .cli_runner
491            .launch_chain_point()
492            .await
493            .map_err(ChainObserverError::General)?;
494        let v: Value = serde_json::from_str(&output)
495            .with_context(|| format!("output was = '{output}'"))
496            .map_err(ChainObserverError::InvalidContent)?;
497
498        if let Value::String(hash) = &v["hash"] {
499            Ok(Some(ChainPoint {
500                slot_number: SlotNumber(v["slot"].as_u64().unwrap_or_default()),
501                block_number: BlockNumber(v["block"].as_u64().unwrap_or_default()),
502                block_hash: hash.to_string(),
503            }))
504        } else {
505            Ok(None)
506        }
507    }
508
509    async fn get_current_datums(
510        &self,
511        address: &ChainAddress,
512    ) -> Result<Vec<TxDatum>, ChainObserverError> {
513        let output = self
514            .cli_runner
515            .launch_utxo(address)
516            .await
517            .map_err(ChainObserverError::General)?;
518        let v: HashMap<String, Value> = serde_json::from_str(&output)
519            .with_context(|| format!("output was = '{output}'"))
520            .map_err(ChainObserverError::InvalidContent)?;
521
522        Ok(v.values()
523            .filter_map(|v| {
524                v.get("inlineDatum")
525                    .filter(|datum| !datum.is_null())
526                    .map(|datum| TxDatum(datum.to_string()))
527            })
528            .collect())
529    }
530
531    // TODO: This function implements a fallback mechanism to compute the stake distribution: new/optimized computation when available, legacy computation otherwise
532    async fn get_current_stake_distribution(
533        &self,
534    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
535        match self.get_current_stake_distribution_optimized().await {
536            Ok(stake_distribution_maybe) => Ok(stake_distribution_maybe),
537            Err(_) => self.get_current_stake_distribution_legacy().await,
538        }
539    }
540
541    async fn get_current_kes_period(&self) -> Result<Option<KesPeriod>, ChainObserverError> {
542        let dir = std::env::temp_dir().join("mithril_kes_period");
543        fs::create_dir_all(&dir).map_err(|e| ChainObserverError::General(e.into()))?;
544        let (output, slots_per_kes_period) = self
545            .cli_runner
546            .launch_kes_period()
547            .await
548            .map_err(ChainObserverError::General)?;
549        if slots_per_kes_period == 0 {
550            return Err(anyhow!("slots_per_kes_period must be greater than 0"))
551                .with_context(|| "CardanoCliChainObserver failed to calculate kes period")?;
552        }
553        let first_left_curly_bracket_index = output.find('{').unwrap_or_default();
554        let output_cleaned = output.split_at(first_left_curly_bracket_index).1;
555        let v: Value = serde_json::from_str(output_cleaned)
556            .with_context(|| format!("output was = '{output}'"))
557            .map_err(ChainObserverError::InvalidContent)?;
558
559        if let Value::Number(slot) = &v["slot"] {
560            Ok(slot.as_u64().map(|slot| (slot / slots_per_kes_period) as KesPeriod))
561        } else {
562            Ok(None)
563        }
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use std::collections::BTreeMap;
570    use std::ffi::OsStr;
571
572    use crate::test::test_cli_runner::{TestCliRunner, test_expected};
573
574    use super::*;
575
576    macro_rules! assert_cli_command {
577        ($command:expr, $expected_shell:expr, envs: $envs:expr) => {
578            let cmd = $command;
579            let std_cmd = cmd.as_std();
580            let cmd_display = format!("{std_cmd:?}");
581            assert!(
582                cmd_display.contains($expected_shell),
583                "Command shell does not contains expected content\n  full command shell: '{cmd_display}'\n  expect to contains: {}'",
584                $expected_shell
585            );
586            let cmd_envs: Vec<(&OsStr, Option<&OsStr>)> = std_cmd.get_envs().collect();
587            assert_eq!(cmd_envs, $envs);
588        };
589    }
590
591    #[tokio::test]
592    async fn test_get_current_era() {
593        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
594        let era = observer.get_current_era().await.unwrap().unwrap();
595
596        assert_eq!(test_expected::launch_era::ERA.to_string(), era);
597    }
598
599    #[tokio::test]
600    async fn test_get_current_epoch() {
601        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
602        let epoch = observer.get_current_epoch().await.unwrap().unwrap();
603
604        assert_eq!(test_expected::launch_epoch::EPOCH, epoch);
605    }
606
607    #[tokio::test]
608    async fn test_get_current_chain_point() {
609        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
610        let chain_point = observer.get_current_chain_point().await.unwrap().unwrap();
611
612        assert_eq!(
613            ChainPoint {
614                slot_number: test_expected::launch_chain_point::SLOT_NUMBER,
615                block_number: test_expected::launch_chain_point::BLOCK_NUMBER,
616                block_hash: test_expected::launch_chain_point::BLOCK_HASH.to_string(),
617            },
618            chain_point
619        );
620    }
621
622    #[tokio::test]
623    async fn test_cli_testnet_runner() {
624        let runner = CardanoCliRunner::new(
625            PathBuf::from("cardano-cli"),
626            PathBuf::from("/tmp/whatever.sock"),
627            CardanoNetwork::TestNet(10),
628        );
629
630        assert_cli_command!(
631            runner.command_for_epoch(),
632            r#""cardano-cli" "latest" "query" "tip" "--testnet-magic" "10""#,
633            envs: vec![(OsStr::new("CARDANO_NODE_SOCKET_PATH"), Some(OsStr::new("/tmp/whatever.sock")))]
634        );
635        assert_cli_command!(
636            runner.command_for_stake_distribution(),
637            r#""cardano-cli" "latest" "query" "stake-distribution" "--testnet-magic" "10""#,
638            envs: vec![(OsStr::new("CARDANO_NODE_SOCKET_PATH"), Some(OsStr::new("/tmp/whatever.sock")))]
639        );
640    }
641
642    #[tokio::test]
643    async fn test_cli_mainnet_runner() {
644        let runner = CardanoCliRunner::new(
645            PathBuf::from("cardano-cli"),
646            PathBuf::from("/tmp/whatever.sock"),
647            CardanoNetwork::MainNet,
648        );
649
650        assert_cli_command!(
651            runner.command_for_epoch(),
652            r#""cardano-cli" "latest" "query" "tip" "--mainnet""#,
653            envs: vec![(OsStr::new("CARDANO_NODE_SOCKET_PATH"), Some(OsStr::new("/tmp/whatever.sock")))]
654        );
655        assert_cli_command!(
656            runner.command_for_stake_distribution(),
657            r#""cardano-cli" "latest" "query" "stake-distribution" "--mainnet"#,
658            envs: vec![(OsStr::new("CARDANO_NODE_SOCKET_PATH"), Some(OsStr::new("/tmp/whatever.sock")))]
659        );
660    }
661
662    #[tokio::test]
663    async fn test_get_current_datums() {
664        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
665        let address = "addrtest_123456".to_string();
666        let datums = observer.get_current_datums(&address).await.unwrap();
667        assert_eq!(
668            vec![TxDatum(
669                format!(
670                    r#"{{"constructor":0,"fields":[{{"bytes":"{}"}}]}}"#,
671                    test_expected::launch_utxo::BYTES
672                )
673                .to_string()
674            )],
675            datums
676        );
677    }
678
679    #[tokio::test]
680    async fn test_get_current_stake_value() {
681        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
682        let stake = observer
683            .get_current_stake_value("pool1qqyjr9pcrv97gwrueunug829fs5znw6p2wxft3fvqkgu5f4qlrg")
684            .await
685            .expect("get current stake value should not fail");
686        assert_eq!(
687            test_expected::launch_stake_snapshot::DEFAULT_POOL_STAKE_MARK,
688            stake
689        );
690
691        let stake = observer
692            .get_current_stake_value(test_expected::launch_stake_snapshot::POOL_ID_SPECIFIC)
693            .await
694            .expect("get current stake value should not fail");
695        assert_eq!(
696            test_expected::launch_stake_snapshot::POOL_STAKE_MARK_FOR_POOL_ID_SPECIFIC,
697            stake
698        );
699    }
700
701    #[tokio::test]
702    async fn test_get_current_stake_distribution_legacy() {
703        let observer = CardanoCliChainObserver::new(Box::new(TestCliRunner::legacy()));
704        let results = observer
705            .get_current_stake_distribution_legacy()
706            .await
707            .unwrap()
708            .unwrap();
709        assert_eq!(7, results.len());
710        assert_eq!(
711            3_000_000,
712            *results
713                .get("pool1qqyjr9pcrv97gwrueunug829fs5znw6p2wxft3fvqkgu5f4qlrg")
714                .unwrap()
715        );
716        assert_eq!(
717            3_000_000,
718            *results
719                .get("pool1qz2vzszautc2c8mljnqre2857dpmheq7kgt6vav0s38tvvhxm6w")
720                .unwrap()
721        );
722        assert!(!results.contains_key("pool1qpqvz90w7qsex2al2ejjej0rfgrwsguch307w8fraw7a7adf6g8"));
723    }
724
725    #[tokio::test]
726    async fn test_get_current_stake_distribution_new() {
727        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
728        let computed_stake_distribution = observer
729            .get_current_stake_distribution_optimized()
730            .await
731            .unwrap()
732            .unwrap();
733        let mut expected_stake_distribution = StakeDistribution::new();
734        expected_stake_distribution.insert(
735            "pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
736            test_expected::launch_stake_snapshot_all_pools::STAKE_MARK_POOL_1,
737        );
738        expected_stake_distribution.insert(
739            "pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
740            test_expected::launch_stake_snapshot_all_pools::STAKE_MARK_POOL_2,
741        );
742        expected_stake_distribution.insert(
743            "pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
744            test_expected::launch_stake_snapshot_all_pools::STAKE_MARK_POOL_3,
745        );
746        assert_eq!(
747            BTreeMap::from_iter(
748                expected_stake_distribution
749                    .into_iter()
750                    .collect::<Vec<(_, _)>>()
751                    .into_iter(),
752            ),
753            BTreeMap::from_iter(
754                computed_stake_distribution
755                    .into_iter()
756                    .collect::<Vec<(_, _)>>()
757                    .into_iter(),
758            ),
759        );
760    }
761
762    #[tokio::test]
763    async fn test_get_current_stake_distribution() {
764        let observer = CardanoCliChainObserver::new(Box::new(TestCliRunner::legacy()));
765        let expected_stake_distribution = observer
766            .get_current_stake_distribution_legacy()
767            .await
768            .unwrap()
769            .unwrap();
770        let computed_stake_distribution =
771            observer.get_current_stake_distribution().await.unwrap().unwrap();
772
773        assert_eq!(
774            BTreeMap::from_iter(
775                expected_stake_distribution
776                    .clone()
777                    .into_iter()
778                    .collect::<Vec<(_, _)>>()
779                    .into_iter(),
780            ),
781            BTreeMap::from_iter(
782                computed_stake_distribution
783                    .into_iter()
784                    .collect::<Vec<(_, _)>>()
785                    .into_iter(),
786            ),
787        );
788
789        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
790        let expected_stake_distribution = observer
791            .get_current_stake_distribution_optimized()
792            .await
793            .unwrap()
794            .unwrap();
795        let computed_stake_distribution =
796            observer.get_current_stake_distribution().await.unwrap().unwrap();
797
798        assert_eq!(
799            BTreeMap::from_iter(
800                expected_stake_distribution
801                    .into_iter()
802                    .collect::<Vec<(_, _)>>()
803                    .into_iter(),
804            ),
805            BTreeMap::from_iter(
806                computed_stake_distribution
807                    .into_iter()
808                    .collect::<Vec<(_, _)>>()
809                    .into_iter(),
810            ),
811        );
812    }
813
814    #[tokio::test]
815    async fn test_get_current_kes_period() {
816        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
817        let kes_period = observer.get_current_kes_period().await.unwrap().unwrap();
818        assert_eq!(
819            (test_expected::launch_chain_point::SLOT_NUMBER.0
820                / test_expected::launch_kes_period::SLOTS_PER_KES_PERIOD) as u32,
821            kes_period
822        );
823    }
824}