mithril_cardano_node_chain/chain_observer/
cli_observer.rs

1use anyhow::{Context, anyhow};
2use async_trait::async_trait;
3use hex::FromHex;
4use nom::IResult;
5use rand_core::RngCore;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::fs;
9use std::path::PathBuf;
10use tokio::process::Command;
11
12use mithril_common::crypto_helper::{KesPeriod, OpCert, SerDeShelleyFileFormat, encode_bech32};
13use mithril_common::entities::{BlockNumber, ChainPoint, Epoch, SlotNumber, StakeDistribution};
14use mithril_common::{CardanoNetwork, StdResult};
15
16use crate::entities::{ChainAddress, TxDatum};
17
18use super::interface::{ChainObserver, ChainObserverError};
19
20const CARDANO_ERA: &str = "latest";
21
22/// `CliRunner` trait defines the asynchronous methods
23/// for interaction with the Cardano CLI.
24#[async_trait]
25pub trait CliRunner {
26    /// Launches a UTxO.
27    async fn launch_utxo(&self, address: &str) -> StdResult<String>;
28    /// Launches the stake distribution.
29    async fn launch_stake_distribution(&self) -> StdResult<String>;
30    /// Launches the stake snapshot.
31    async fn launch_stake_snapshot(&self, stake_pool_id: &str) -> StdResult<String>;
32    /// Launches the stake snapshot for all pools.
33    async fn launch_stake_snapshot_all_pools(&self) -> StdResult<String>;
34    /// Launches the era info.
35    async fn launch_era(&self) -> StdResult<String>;
36    /// Launches the epoch info.
37    async fn launch_epoch(&self) -> StdResult<String>;
38    /// Launches the chain point.
39    async fn launch_chain_point(&self) -> StdResult<String>;
40    /// Launches the kes period.
41    async fn launch_kes_period(&self, opcert_file: &str) -> StdResult<String>;
42}
43
44/// A runner able to request data from a Cardano node using the
45/// [Cardano Cli](https://docs.cardano.org/getting-started/use-cli).
46#[derive(Clone, Debug)]
47pub struct CardanoCliRunner {
48    cli_path: PathBuf,
49    socket_path: PathBuf,
50    network: CardanoNetwork,
51}
52
53impl CardanoCliRunner {
54    /// CardanoCliRunner factory
55    pub fn new(cli_path: PathBuf, socket_path: PathBuf, network: CardanoNetwork) -> Self {
56        Self {
57            cli_path,
58            socket_path,
59            network,
60        }
61    }
62
63    fn random_out_file() -> StdResult<PathBuf> {
64        let mut rng = rand_core::OsRng;
65        let dir = std::env::temp_dir().join("cardano-cli-runner");
66        if !dir.exists() {
67            fs::create_dir_all(&dir)?;
68        }
69        Ok(dir.join(format!("{}.out", rng.next_u64())))
70    }
71
72    fn command_for_utxo(&self, address: &str, out_file: PathBuf) -> Command {
73        let mut command = self.get_command();
74        command
75            .arg(CARDANO_ERA)
76            .arg("query")
77            .arg("utxo")
78            .arg("--address")
79            .arg(address)
80            .arg("--out-file")
81            .arg(out_file);
82        self.post_config_command(&mut command);
83
84        command
85    }
86
87    fn command_for_stake_distribution(&self) -> Command {
88        let mut command = self.get_command();
89        command.arg(CARDANO_ERA).arg("query").arg("stake-distribution");
90        self.post_config_command(&mut command);
91
92        command
93    }
94
95    fn command_for_stake_snapshot(&self, stake_pool_id: &str) -> Command {
96        let mut command = self.get_command();
97        command
98            .arg(CARDANO_ERA)
99            .arg("query")
100            .arg("stake-snapshot")
101            .arg("--stake-pool-id")
102            .arg(stake_pool_id);
103        self.post_config_command(&mut command);
104
105        command
106    }
107
108    fn command_for_stake_snapshot_all_pools(&self) -> Command {
109        let mut command = self.get_command();
110        command
111            .arg(CARDANO_ERA)
112            .arg("query")
113            .arg("stake-snapshot")
114            .arg("--all-stake-pools");
115        self.post_config_command(&mut command);
116
117        command
118    }
119
120    fn command_for_era(&self) -> Command {
121        let mut command = self.get_command();
122        command.arg(CARDANO_ERA).arg("query").arg("tip");
123        self.post_config_command(&mut command);
124
125        command
126    }
127
128    fn command_for_epoch(&self) -> Command {
129        let mut command = self.get_command();
130        command.arg(CARDANO_ERA).arg("query").arg("tip");
131        self.post_config_command(&mut command);
132
133        command
134    }
135
136    fn command_for_chain_point(&self) -> Command {
137        let mut command = self.get_command();
138        command.arg(CARDANO_ERA).arg("query").arg("tip");
139        self.post_config_command(&mut command);
140
141        command
142    }
143
144    fn command_for_kes_period(&self, opcert_file: &str) -> Command {
145        let mut command = self.get_command();
146        command
147            .arg(CARDANO_ERA)
148            .arg("query")
149            .arg("kes-period-info")
150            .arg("--op-cert-file")
151            .arg(opcert_file);
152        self.post_config_command(&mut command);
153
154        command
155    }
156
157    fn get_command(&self) -> Command {
158        let mut command = Command::new(&self.cli_path);
159        command.env(
160            "CARDANO_NODE_SOCKET_PATH",
161            self.socket_path.to_string_lossy().as_ref(),
162        );
163
164        command
165    }
166
167    fn post_config_command<'a>(&'a self, command: &'a mut Command) -> &'a mut Command {
168        match self.network {
169            CardanoNetwork::MainNet => command.arg("--mainnet"),
170            CardanoNetwork::DevNet(magic) => command.args(vec![
171                "--cardano-mode",
172                "--testnet-magic",
173                &magic.to_string(),
174            ]),
175            CardanoNetwork::TestNet(magic) => {
176                command.args(vec!["--testnet-magic", &magic.to_string()])
177            }
178        }
179    }
180}
181
182#[async_trait]
183impl CliRunner for CardanoCliRunner {
184    async fn launch_utxo(&self, address: &str) -> StdResult<String> {
185        let out_file = Self::random_out_file()?;
186        let output = self.command_for_utxo(address, out_file.clone()).output().await?;
187
188        if output.status.success() {
189            Ok(fs::read_to_string(out_file)?.trim().to_string())
190        } else {
191            let message = String::from_utf8_lossy(&output.stderr);
192
193            Err(anyhow!(
194                "Error launching command {:?}, error = '{}'",
195                self.command_for_utxo(address, out_file),
196                message
197            ))
198        }
199    }
200
201    async fn launch_stake_distribution(&self) -> StdResult<String> {
202        let output = self.command_for_stake_distribution().output().await?;
203
204        if output.status.success() {
205            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
206        } else {
207            let message = String::from_utf8_lossy(&output.stderr);
208
209            Err(anyhow!(
210                "Error launching command {:?}, error = '{}'",
211                self.command_for_stake_distribution(),
212                message
213            ))
214        }
215    }
216
217    async fn launch_stake_snapshot(&self, stake_pool_id: &str) -> StdResult<String> {
218        let output = self.command_for_stake_snapshot(stake_pool_id).output().await?;
219
220        if output.status.success() {
221            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
222        } else {
223            let message = String::from_utf8_lossy(&output.stderr);
224
225            Err(anyhow!(
226                "Error launching command {:?}, error = '{}'",
227                self.command_for_stake_snapshot(stake_pool_id),
228                message
229            ))
230        }
231    }
232
233    async fn launch_stake_snapshot_all_pools(&self) -> StdResult<String> {
234        let output = self.command_for_stake_snapshot_all_pools().output().await?;
235
236        if output.status.success() {
237            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
238        } else {
239            let message = String::from_utf8_lossy(&output.stderr);
240
241            Err(anyhow!(
242                "Error launching command {:?}, error = '{}'",
243                self.command_for_stake_snapshot_all_pools(),
244                message
245            ))
246        }
247    }
248
249    async fn launch_era(&self) -> StdResult<String> {
250        let output = self.command_for_era().output().await?;
251
252        if output.status.success() {
253            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
254        } else {
255            let message = String::from_utf8_lossy(&output.stderr);
256
257            Err(anyhow!(
258                "Error launching command {:?}, error = '{}'",
259                self.command_for_era(),
260                message
261            ))
262        }
263    }
264
265    async fn launch_epoch(&self) -> StdResult<String> {
266        let output = self.command_for_epoch().output().await?;
267
268        if output.status.success() {
269            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
270        } else {
271            let message = String::from_utf8_lossy(&output.stderr);
272
273            Err(anyhow!(
274                "Error launching command {:?}, error = '{}'",
275                self.command_for_epoch(),
276                message
277            ))
278        }
279    }
280
281    async fn launch_chain_point(&self) -> StdResult<String> {
282        let output = self.command_for_chain_point().output().await?;
283
284        if output.status.success() {
285            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
286        } else {
287            let message = String::from_utf8_lossy(&output.stderr);
288
289            Err(anyhow!(
290                "Error launching command {:?}, error = '{}'",
291                self.command_for_chain_point(),
292                message
293            ))
294        }
295    }
296
297    async fn launch_kes_period(&self, opcert_file: &str) -> StdResult<String> {
298        let output = self.command_for_kes_period(opcert_file).output().await?;
299
300        if output.status.success() {
301            Ok(std::str::from_utf8(&output.stdout)?.trim().to_string())
302        } else {
303            let message = String::from_utf8_lossy(&output.stderr);
304
305            Err(anyhow!(
306                "Error launching command {:?}, error = '{}'",
307                self.command_for_kes_period(opcert_file),
308                message
309            ))
310        }
311    }
312}
313
314/// A [ChainObserver] pulling it's data using a [CardanoCliRunner].
315pub struct CardanoCliChainObserver {
316    cli_runner: Box<dyn CliRunner + Send + Sync>,
317}
318
319impl CardanoCliChainObserver {
320    /// CardanoCliChainObserver factory
321    pub fn new(cli_runner: Box<dyn CliRunner + Send + Sync>) -> Self {
322        Self { cli_runner }
323    }
324
325    // This is the only way I found to tell the compiler the correct types
326    // and lifetimes for the function `double`.
327    fn parse_string<'a>(&'a self, string: &'a str) -> IResult<&'a str, f64> {
328        nom::number::complete::double(string)
329    }
330
331    async fn get_current_stake_value(
332        &self,
333        stake_pool_id: &str,
334    ) -> Result<u64, ChainObserverError> {
335        let stake_pool_snapshot_output = self
336            .cli_runner
337            .launch_stake_snapshot(stake_pool_id)
338            .await
339            .map_err(ChainObserverError::General)?;
340        let stake_pool_snapshot: Value = serde_json::from_str(&stake_pool_snapshot_output)
341            .with_context(|| format!("output was = '{stake_pool_snapshot_output}'"))
342            .map_err(ChainObserverError::InvalidContent)?;
343        if let Value::Number(stake_pool_stake) = &stake_pool_snapshot["poolStakeMark"] {
344            return stake_pool_stake.as_u64().ok_or_else(|| {
345                ChainObserverError::InvalidContent(anyhow!(
346                    "Error: could not parse stake pool value as u64 {stake_pool_stake:?}"
347                ))
348            });
349        }
350        Err(ChainObserverError::InvalidContent(anyhow!(
351            "Error: could not parse stake pool snapshot {stake_pool_snapshot:?}"
352        )))
353    }
354
355    // This is the legacy way of computing stake distribution, not optimized for mainnet, and usable for versions of the Cardano node up to '1.35.7'
356    async fn get_current_stake_distribution_legacy(
357        &self,
358    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
359        let output = self
360            .cli_runner
361            .launch_stake_distribution()
362            .await
363            .map_err(ChainObserverError::General)?;
364        let mut stake_distribution = StakeDistribution::new();
365
366        for (num, line) in output.lines().enumerate() {
367            let words: Vec<&str> = line.split_ascii_whitespace().collect();
368
369            if num < 2 || words.len() != 2 {
370                continue;
371            }
372
373            let stake_pool_id = words[0];
374            let stake_fraction = words[1];
375
376            if let Ok((_, _f)) = self.parse_string(stake_fraction) {
377                // This block is a fix:
378                // the stake retrieved was computed on the current epoch, when we need a value computed on the previous epoch
379                // in 'let stake: u64 = (f * 1_000_000_000.0).round() as u64;'
380                let stake: u64 = self.get_current_stake_value(stake_pool_id).await?;
381
382                if stake > 0 {
383                    let _ = stake_distribution.insert(stake_pool_id.to_string(), stake);
384                }
385            } else {
386                return Err(ChainObserverError::InvalidContent(anyhow!(
387                    "could not parse stake from '{}'",
388                    words[1]
389                )));
390            }
391        }
392
393        Ok(Some(stake_distribution))
394    }
395
396    // This is the new way of computing stake distribution, optimized for mainnet, and usable for versions of the Cardano node from '8.0.0'
397    async fn get_current_stake_distribution_optimized(
398        &self,
399    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
400        let output = self
401            .cli_runner
402            .launch_stake_snapshot_all_pools()
403            .await
404            .map_err(ChainObserverError::General)?;
405        let mut stake_distribution = StakeDistribution::new();
406
407        let data: HashMap<String, Value> =
408            serde_json::from_str(&output).map_err(|e| ChainObserverError::General(e.into()))?;
409        let pools_data = data
410            .get("pools")
411            .ok_or(ChainObserverError::InvalidContent(anyhow!(
412                "Missing 'pools' field"
413            )))?
414            .as_object()
415            .ok_or(ChainObserverError::InvalidContent(anyhow!(
416                "Could not convert pool data to object"
417            )))?;
418
419        for (k, v) in pools_data.iter() {
420            let pool_id_hex = k;
421            let pool_id_bech32 = encode_bech32(
422                "pool",
423                &Vec::from_hex(pool_id_hex.as_bytes())
424                    .map_err(|e| ChainObserverError::General(e.into()))?,
425            )
426            .map_err(ChainObserverError::General)?;
427            let stakes = v
428                .get("stakeMark")
429                .ok_or(ChainObserverError::InvalidContent(anyhow!(
430                    "Missing 'stakeMark' field for {pool_id_bech32}"
431                )))?
432                .as_u64()
433                .ok_or(ChainObserverError::InvalidContent(anyhow!(
434                    "Stake could not be converted to integer for {pool_id_bech32}"
435                )))?;
436            if stakes > 0 {
437                stake_distribution.insert(pool_id_bech32, stakes);
438            }
439        }
440
441        Ok(Some(stake_distribution))
442    }
443}
444
445#[async_trait]
446impl ChainObserver for CardanoCliChainObserver {
447    async fn get_current_era(&self) -> Result<Option<String>, ChainObserverError> {
448        let output = self
449            .cli_runner
450            .launch_era()
451            .await
452            .map_err(ChainObserverError::General)?;
453        let v: Value = serde_json::from_str(&output)
454            .with_context(|| format!("output was = '{output}'"))
455            .map_err(ChainObserverError::InvalidContent)?;
456
457        if let Value::String(era) = &v["era"] {
458            Ok(Some(era.to_string()))
459        } else {
460            Ok(None)
461        }
462    }
463
464    async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
465        let output = self
466            .cli_runner
467            .launch_epoch()
468            .await
469            .map_err(ChainObserverError::General)?;
470        let v: Value = serde_json::from_str(&output)
471            .with_context(|| format!("output was = '{output}'"))
472            .map_err(ChainObserverError::InvalidContent)?;
473
474        if let Value::Number(epoch) = &v["epoch"] {
475            Ok(epoch.as_u64().map(Epoch))
476        } else {
477            Ok(None)
478        }
479    }
480
481    async fn get_current_chain_point(&self) -> Result<Option<ChainPoint>, ChainObserverError> {
482        let output = self
483            .cli_runner
484            .launch_chain_point()
485            .await
486            .map_err(ChainObserverError::General)?;
487        let v: Value = serde_json::from_str(&output)
488            .with_context(|| format!("output was = '{output}'"))
489            .map_err(ChainObserverError::InvalidContent)?;
490
491        if let Value::String(hash) = &v["hash"] {
492            Ok(Some(ChainPoint {
493                slot_number: SlotNumber(v["slot"].as_u64().unwrap_or_default()),
494                block_number: BlockNumber(v["block"].as_u64().unwrap_or_default()),
495                block_hash: hash.to_string(),
496            }))
497        } else {
498            Ok(None)
499        }
500    }
501
502    async fn get_current_datums(
503        &self,
504        address: &ChainAddress,
505    ) -> Result<Vec<TxDatum>, ChainObserverError> {
506        let output = self
507            .cli_runner
508            .launch_utxo(address)
509            .await
510            .map_err(ChainObserverError::General)?;
511        let v: HashMap<String, Value> = serde_json::from_str(&output)
512            .with_context(|| format!("output was = '{output}'"))
513            .map_err(ChainObserverError::InvalidContent)?;
514
515        Ok(v.values()
516            .filter_map(|v| {
517                v.get("inlineDatum")
518                    .filter(|datum| !datum.is_null())
519                    .map(|datum| TxDatum(datum.to_string()))
520            })
521            .collect())
522    }
523
524    // TODO: This function implements a fallback mechanism to compute the stake distribution: new/optimized computation when available, legacy computation otherwise
525    async fn get_current_stake_distribution(
526        &self,
527    ) -> Result<Option<StakeDistribution>, ChainObserverError> {
528        match self.get_current_stake_distribution_optimized().await {
529            Ok(stake_distribution_maybe) => Ok(stake_distribution_maybe),
530            Err(_) => self.get_current_stake_distribution_legacy().await,
531        }
532    }
533
534    async fn get_current_kes_period(
535        &self,
536        opcert: &OpCert,
537    ) -> Result<Option<KesPeriod>, ChainObserverError> {
538        let dir = std::env::temp_dir().join("mithril_kes_period");
539        fs::create_dir_all(&dir).map_err(|e| ChainObserverError::General(e.into()))?;
540        let opcert_file = dir.join(format!("opcert_kes_period-{}", opcert.compute_hash()));
541        opcert
542            .to_file(&opcert_file)
543            .map_err(|e| ChainObserverError::General(e.into()))?;
544        let output = self
545            .cli_runner
546            .launch_kes_period(opcert_file.to_str().unwrap())
547            .await
548            .map_err(ChainObserverError::General)?;
549        let first_left_curly_bracket_index = output.find('{').unwrap_or_default();
550        let output_cleaned = output.split_at(first_left_curly_bracket_index).1;
551        let v: Value = serde_json::from_str(output_cleaned)
552            .with_context(|| format!("output was = '{output}'"))
553            .map_err(ChainObserverError::InvalidContent)?;
554
555        if let Value::Number(kes_period) = &v["qKesCurrentKesPeriod"] {
556            Ok(kes_period.as_u64().map(|p| p as KesPeriod))
557        } else {
558            Ok(None)
559        }
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use kes_summed_ed25519::{kes::Sum6Kes, traits::KesSk};
566    use std::collections::BTreeMap;
567    use std::ffi::OsStr;
568
569    use mithril_common::crypto_helper::ColdKeyGenerator;
570
571    use crate::test::test_cli_runner::{TestCliRunner, test_expected};
572
573    use super::*;
574
575    macro_rules! assert_cli_command {
576        ($command:expr, $expected_shell:expr, envs: $envs:expr) => {
577            let cmd = $command;
578            let std_cmd = cmd.as_std();
579            let cmd_display = format!("{std_cmd:?}");
580            assert!(
581                cmd_display.contains($expected_shell),
582                "Command shell does not contains expected content\n  full command shell: '{cmd_display}'\n  expect to contains: {}'",
583                $expected_shell
584            );
585            let cmd_envs: Vec<(&OsStr, Option<&OsStr>)> = std_cmd.get_envs().collect();
586            assert_eq!(cmd_envs, $envs);
587        };
588    }
589
590    #[tokio::test]
591    async fn test_get_current_era() {
592        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
593        let era = observer.get_current_era().await.unwrap().unwrap();
594
595        assert_eq!(test_expected::launch_era::ERA.to_string(), era);
596    }
597
598    #[tokio::test]
599    async fn test_get_current_epoch() {
600        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
601        let epoch = observer.get_current_epoch().await.unwrap().unwrap();
602
603        assert_eq!(test_expected::launch_epoch::EPOCH, epoch);
604    }
605
606    #[tokio::test]
607    async fn test_get_current_chain_point() {
608        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
609        let chain_point = observer.get_current_chain_point().await.unwrap().unwrap();
610
611        assert_eq!(
612            ChainPoint {
613                slot_number: test_expected::launch_chain_point::SLOT_NUMBER,
614                block_number: test_expected::launch_chain_point::BLOCK_NUMBER,
615                block_hash: test_expected::launch_chain_point::BLOCK_HASH.to_string(),
616            },
617            chain_point
618        );
619    }
620
621    #[tokio::test]
622    async fn test_cli_testnet_runner() {
623        let runner = CardanoCliRunner::new(
624            PathBuf::from("cardano-cli"),
625            PathBuf::from("/tmp/whatever.sock"),
626            CardanoNetwork::TestNet(10),
627        );
628
629        assert_cli_command!(
630            runner.command_for_epoch(),
631            r#""cardano-cli" "latest" "query" "tip" "--testnet-magic" "10""#,
632            envs: vec![(OsStr::new("CARDANO_NODE_SOCKET_PATH"), Some(OsStr::new("/tmp/whatever.sock")))]
633        );
634        assert_cli_command!(
635            runner.command_for_stake_distribution(),
636            r#""cardano-cli" "latest" "query" "stake-distribution" "--testnet-magic" "10""#,
637            envs: vec![(OsStr::new("CARDANO_NODE_SOCKET_PATH"), Some(OsStr::new("/tmp/whatever.sock")))]
638        );
639    }
640
641    #[tokio::test]
642    async fn test_cli_devnet_runner() {
643        let runner = CardanoCliRunner::new(
644            PathBuf::from("cardano-cli"),
645            PathBuf::from("/tmp/whatever.sock"),
646            CardanoNetwork::DevNet(25),
647        );
648
649        assert_cli_command!(
650            runner.command_for_epoch(),
651            r#""cardano-cli" "latest" "query" "tip" "--cardano-mode" "--testnet-magic" "25""#,
652            envs: vec![(OsStr::new("CARDANO_NODE_SOCKET_PATH"), Some(OsStr::new("/tmp/whatever.sock")))]
653        );
654        assert_cli_command!(
655            runner.command_for_stake_distribution(),
656            r#""cardano-cli" "latest" "query" "stake-distribution" "--cardano-mode" "--testnet-magic" "25""#,
657            envs: vec![(OsStr::new("CARDANO_NODE_SOCKET_PATH"), Some(OsStr::new("/tmp/whatever.sock")))]
658        );
659    }
660
661    #[tokio::test]
662    async fn test_cli_mainnet_runner() {
663        let runner = CardanoCliRunner::new(
664            PathBuf::from("cardano-cli"),
665            PathBuf::from("/tmp/whatever.sock"),
666            CardanoNetwork::MainNet,
667        );
668
669        assert_cli_command!(
670            runner.command_for_epoch(),
671            r#""cardano-cli" "latest" "query" "tip" "--mainnet""#,
672            envs: vec![(OsStr::new("CARDANO_NODE_SOCKET_PATH"), Some(OsStr::new("/tmp/whatever.sock")))]
673        );
674        assert_cli_command!(
675            runner.command_for_stake_distribution(),
676            r#""cardano-cli" "latest" "query" "stake-distribution" "--mainnet"#,
677            envs: vec![(OsStr::new("CARDANO_NODE_SOCKET_PATH"), Some(OsStr::new("/tmp/whatever.sock")))]
678        );
679    }
680
681    #[tokio::test]
682    async fn test_get_current_datums() {
683        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
684        let address = "addrtest_123456".to_string();
685        let datums = observer.get_current_datums(&address).await.unwrap();
686        assert_eq!(
687            vec![TxDatum(
688                format!(
689                    r#"{{"constructor":0,"fields":[{{"bytes":"{}"}}]}}"#,
690                    test_expected::launch_utxo::BYTES
691                )
692                .to_string()
693            )],
694            datums
695        );
696    }
697
698    #[tokio::test]
699    async fn test_get_current_stake_value() {
700        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
701        let stake = observer
702            .get_current_stake_value("pool1qqyjr9pcrv97gwrueunug829fs5znw6p2wxft3fvqkgu5f4qlrg")
703            .await
704            .expect("get current stake value should not fail");
705        assert_eq!(
706            test_expected::launch_stake_snapshot::DEFAULT_POOL_STAKE_MARK,
707            stake
708        );
709
710        let stake = observer
711            .get_current_stake_value(test_expected::launch_stake_snapshot::POOL_ID_SPECIFIC)
712            .await
713            .expect("get current stake value should not fail");
714        assert_eq!(
715            test_expected::launch_stake_snapshot::POOL_STAKE_MARK_FOR_POOL_ID_SPECIFIC,
716            stake
717        );
718    }
719
720    #[tokio::test]
721    async fn test_get_current_stake_distribution_legacy() {
722        let observer = CardanoCliChainObserver::new(Box::new(TestCliRunner::legacy()));
723        let results = observer
724            .get_current_stake_distribution_legacy()
725            .await
726            .unwrap()
727            .unwrap();
728        assert_eq!(7, results.len());
729        assert_eq!(
730            3_000_000,
731            *results
732                .get("pool1qqyjr9pcrv97gwrueunug829fs5znw6p2wxft3fvqkgu5f4qlrg")
733                .unwrap()
734        );
735        assert_eq!(
736            3_000_000,
737            *results
738                .get("pool1qz2vzszautc2c8mljnqre2857dpmheq7kgt6vav0s38tvvhxm6w")
739                .unwrap()
740        );
741        assert!(!results.contains_key("pool1qpqvz90w7qsex2al2ejjej0rfgrwsguch307w8fraw7a7adf6g8"));
742    }
743
744    #[tokio::test]
745    async fn test_get_current_stake_distribution_new() {
746        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
747        let computed_stake_distribution = observer
748            .get_current_stake_distribution_optimized()
749            .await
750            .unwrap()
751            .unwrap();
752        let mut expected_stake_distribution = StakeDistribution::new();
753        expected_stake_distribution.insert(
754            "pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
755            test_expected::launch_stake_snapshot_all_pools::STAKE_MARK_POOL_1,
756        );
757        expected_stake_distribution.insert(
758            "pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
759            test_expected::launch_stake_snapshot_all_pools::STAKE_MARK_POOL_2,
760        );
761        expected_stake_distribution.insert(
762            "pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
763            test_expected::launch_stake_snapshot_all_pools::STAKE_MARK_POOL_3,
764        );
765        assert_eq!(
766            BTreeMap::from_iter(
767                expected_stake_distribution
768                    .into_iter()
769                    .collect::<Vec<(_, _)>>()
770                    .into_iter(),
771            ),
772            BTreeMap::from_iter(
773                computed_stake_distribution
774                    .into_iter()
775                    .collect::<Vec<(_, _)>>()
776                    .into_iter(),
777            ),
778        );
779    }
780
781    #[tokio::test]
782    async fn test_get_current_stake_distribution() {
783        let observer = CardanoCliChainObserver::new(Box::new(TestCliRunner::legacy()));
784        let expected_stake_distribution = observer
785            .get_current_stake_distribution_legacy()
786            .await
787            .unwrap()
788            .unwrap();
789        let computed_stake_distribution =
790            observer.get_current_stake_distribution().await.unwrap().unwrap();
791
792        assert_eq!(
793            BTreeMap::from_iter(
794                expected_stake_distribution
795                    .clone()
796                    .into_iter()
797                    .collect::<Vec<(_, _)>>()
798                    .into_iter(),
799            ),
800            BTreeMap::from_iter(
801                computed_stake_distribution
802                    .into_iter()
803                    .collect::<Vec<(_, _)>>()
804                    .into_iter(),
805            ),
806        );
807
808        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
809        let expected_stake_distribution = observer
810            .get_current_stake_distribution_optimized()
811            .await
812            .unwrap()
813            .unwrap();
814        let computed_stake_distribution =
815            observer.get_current_stake_distribution().await.unwrap().unwrap();
816
817        assert_eq!(
818            BTreeMap::from_iter(
819                expected_stake_distribution
820                    .into_iter()
821                    .collect::<Vec<(_, _)>>()
822                    .into_iter(),
823            ),
824            BTreeMap::from_iter(
825                computed_stake_distribution
826                    .into_iter()
827                    .collect::<Vec<(_, _)>>()
828                    .into_iter(),
829            ),
830        );
831    }
832
833    #[tokio::test]
834    async fn test_get_current_kes_period() {
835        let keypair = ColdKeyGenerator::create_deterministic_keypair([0u8; 32]);
836        let mut dummy_key_buffer = [0u8; Sum6Kes::SIZE + 4];
837        let mut dummy_seed = [0u8; 32];
838        let (_, kes_verification_key) = Sum6Kes::keygen(&mut dummy_key_buffer, &mut dummy_seed);
839        let operational_certificate = OpCert::new(kes_verification_key, 0, 0, keypair);
840        let observer = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
841        let kes_period = observer
842            .get_current_kes_period(&operational_certificate)
843            .await
844            .unwrap()
845            .unwrap();
846        assert_eq!(test_expected::launch_kes_period::KES_PERIOD, kes_period);
847    }
848}