Skip to content

Instantly share code, notes, and snippets.

@matthiasbeyer
Created October 15, 2020 09:29
Show Gist options
  • Select an option

  • Save matthiasbeyer/6b5f3a79f75a68c2bdef5536bcd8f57d to your computer and use it in GitHub Desktop.

Select an option

Save matthiasbeyer/6b5f3a79f75a68c2bdef5536bcd8f57d to your computer and use it in GitHub Desktop.

Revisions

  1. matthiasbeyer created this gist Oct 15, 2020.
    70 changes: 70 additions & 0 deletions lib.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,70 @@
    // should be all code...

    use std::pin::Pin;
    use std::result::Result as RResult;

    use futures::Stream;
    use futures::StreamExt;
    use futures::task::Context;
    use futures::task::Poll;
    use anyhow::Error;
    use anyhow::Result;
    use tokio::io::BufReader;
    use tokio::io::StreamReader;
    use tokio::io::stream_reader;
    use tokio::io::AsyncBufRead;

    use shiplift::tty::TtyChunk;

    mod util;
    use crate::util::*;

    type IoResult<T> = RResult<T, tokio::io::Error>;

    pub struct LogParser<S>
    where S: Stream<Item = IoResult<TtyChunkBuf>>
    {
    stream: BufReader<StreamReader<S, TtyChunkBuf>>
    }

    impl<S> LogParser<S>
    where S: Stream<Item = IoResult<TtyChunkBuf>>
    {
    pub fn new<Src>(stream: Src) -> Result<LogParser<S>>
    where Src: Stream<Item = shiplift::Result<TtyChunk>>
    {
    let stream = stream.map(|item| {
    let f: IoResult<TtyChunkBuf> = item
    .map(TtyChunkBuf::from)
    .map_err(|e| tokio::io::Error::new(tokio::io::ErrorKind::Other, e));

    f
    });

    let stream = stream_reader(stream);
    let stream = BufReader::new(stream);

    Ok(LogParser { stream })
    }
    }


    impl<S> Stream for LogParser<S>
    where S: Stream<Item = IoResult<TtyChunkBuf>>
    {
    type Item = Result<LogItem>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    let mut buf = String::new();
    let b = self.stream
    .read_line(&mut buf)
    .await?;

    if b == 0 {
    Poll::Ready(None)
    } else {
    Poll::Ready(Some(LogItem::Line(buf))) // TODO: Parse
    }
    }
    }

    44 changes: 44 additions & 0 deletions util.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,44 @@
    use std::collections::VecDeque;
    use bytes::buf::Buf;
    use shiplift::tty::TtyChunk;

    pub enum TtyChunkBuf {
    StdIn(VecDeque<u8>),
    StdOut(VecDeque<u8>),
    StdErr(VecDeque<u8>),
    }

    impl From<TtyChunk> for TtyChunkBuf {
    fn from(c: TtyChunk) -> Self {
    match c {
    TtyChunk::StdIn(buffer) => TtyChunkBuf::StdIn(VecDeque::from(buffer)),
    TtyChunk::StdOut(buffer) => TtyChunkBuf::StdOut(VecDeque::from(buffer)),
    TtyChunk::StdErr(buffer) => TtyChunkBuf::StdErr(VecDeque::from(buffer)),
    }
    }
    }

    impl Buf for TtyChunkBuf {
    fn remaining(&self) -> usize {
    match self {
    TtyChunkBuf::StdIn(buf) => buf.remaining(),
    TtyChunkBuf::StdErr(buf) => buf.remaining(),
    TtyChunkBuf::StdOut(buf) => buf.remaining(),
    }
    }
    fn bytes<'a>(&'a self) -> &'a [u8] {
    match self {
    TtyChunkBuf::StdIn(buf) => buf.bytes(),
    TtyChunkBuf::StdErr(buf) => buf.bytes(),
    TtyChunkBuf::StdOut(buf) => buf.bytes(),
    }
    }

    fn advance(&mut self, cnt: usize) {
    match self {
    TtyChunkBuf::StdIn(buf) => buf.advance(cnt),
    TtyChunkBuf::StdErr(buf) => buf.advance(cnt),
    TtyChunkBuf::StdOut(buf) => buf.advance(cnt),
    }
    }
    }