mithril_client/utils/
stream_reader.rs

1use flume::Receiver;
2use std::io;
3
4// Credits and many thanks to https://stackoverflow.com/a/69967522 for most of this code
5
6/// A channel receiver that implement [io::Read].
7pub struct StreamReader {
8    receiver: Receiver<Vec<u8>>,
9    current: io::Cursor<Vec<u8>>,
10}
11
12impl StreamReader {
13    /// Constructs a new `StreamReader`.
14    pub fn new(receiver: Receiver<Vec<u8>>) -> Self {
15        Self {
16            receiver,
17            current: io::Cursor::new(vec![]),
18        }
19    }
20}
21
22impl io::Read for StreamReader {
23    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
24        if self.current.position() == self.current.get_ref().len() as u64 {
25            // We've exhausted the previous chunk, get a new one.
26            if let Ok(vec) = self.receiver.recv() {
27                self.current = io::Cursor::new(vec);
28            }
29            // If recv() "fails", it means the sender closed its part of
30            // the channel, which means EOF. Propagate EOF by allowing
31            // a read from the exhausted cursor.
32        }
33        self.current.read(buf)
34    }
35}