mithril_client/utils/
stream_reader.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use flume::Receiver;
use std::io;

// Credits and many thanks to https://stackoverflow.com/a/69967522 for most of this code

/// A channel receiver that implement [io::Read].
pub struct StreamReader {
    receiver: Receiver<Vec<u8>>,
    current: io::Cursor<Vec<u8>>,
}

impl StreamReader {
    /// Constructs a new `StreamReader`.
    pub fn new(receiver: Receiver<Vec<u8>>) -> Self {
        Self {
            receiver,
            current: io::Cursor::new(vec![]),
        }
    }
}

impl io::Read for StreamReader {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        if self.current.position() == self.current.get_ref().len() as u64 {
            // We've exhausted the previous chunk, get a new one.
            if let Ok(vec) = self.receiver.recv() {
                self.current = io::Cursor::new(vec);
            }
            // If recv() "fails", it means the sender closed its part of
            // the channel, which means EOF. Propagate EOF by allowing
            // a read from the exhausted cursor.
        }
        self.current.read(buf)
    }
}