Skip to content

Instantly share code, notes, and snippets.

@akaihola
Last active May 31, 2018 20:02
Show Gist options
  • Select an option

  • Save akaihola/7d8a3ed4da73e3e9c4f0076f18c459e5 to your computer and use it in GitHub Desktop.

Select an option

Save akaihola/7d8a3ed4da73e3e9c4f0076f18c459e5 to your computer and use it in GitHub Desktop.

Revisions

  1. akaihola revised this gist May 31, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion tee_stream_head.py
    Original file line number Diff line number Diff line change
    @@ -43,7 +43,7 @@ def __init__(self, head_only: bool):
    def readable(self) -> bool:
    return True

    def readinto(self, buf) -> int:
    def readinto(self, buf: memoryview) -> int:
    """Read data from the stream or stored initial lines into a buffer
    If the other reader has already read some initial lines into the
  2. akaihola revised this gist May 31, 2018. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion tee_stream_head.py
    Original file line number Diff line number Diff line change
    @@ -53,7 +53,6 @@ def readinto(self, buf) -> int:
    :param buf: The buffer to fill with read data
    :return: The number of bytes read into the buffer
    :rtype: int
    """
    nonlocal head_line_count
  3. akaihola created this gist May 31, 2018.
    86 changes: 86 additions & 0 deletions tee_stream_head.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,86 @@
    import io
    from typing import Tuple


    def tee_stream_head(stream: io.IOBase,
    head_max_lines: int=3) -> Tuple[io.RawIOBase,
    io.RawIOBase]:
    """Split a stream, enable parallel reading of initial lines
    This helper solves the use case where a limited number of initial lines
    from a non-seekable need to be inspected before reading the whole stream
    again in one go.
    Memory is only consumed for the initial lines instead of buffering all read
    data until the end of the stream.
    Example (assuming no compression done by the HTTP server)::
    >>> response = requests.get('http://acme/', stream=True)
    >>> headers, content = tee_stream_head(response.raw)
    >>> first_line = next(headers)
    >>> second_line = next(headers)
    >>> all_lines = content.readlines()
    :param stream: The stream to read
    :param head_max_lines: The number of initial lines to keep in a buffer
    :return: Two readers for the stream. The first one only can read the
    initial lines, and the second one all the way until the end of the
    stream. The same initial lines can be read from both readers
    independently.
    """
    head_line_count = 0
    head = io.BytesIO()

    class HeadBufferedReader(io.RawIOBase):
    """Buffering of initial lines of the stream"""
    def __init__(self, head_only: bool):
    super(HeadBufferedReader, self).__init__()
    self.read_pos = 0
    self.head_only = head_only

    def readable(self) -> bool:
    return True

    def readinto(self, buf) -> int:
    """Read data from the stream or stored initial lines into a buffer
    If the other reader has already read some initial lines into the
    `head` buffer, and the read pointer isn't beyond the initial lines,
    re-use that data. Otherwise read data from the stream and insert it
    into the `head` buffer for up to `head_max_lines` lines of data.
    :param buf: The buffer to fill with read data
    :return: The number of bytes read into the buffer
    :rtype: int
    """
    nonlocal head_line_count
    buf_len = len(buf)
    head_length = head.tell()
    if self.read_pos < head_length:
    # there's unread data in the head
    head.seek(self.read_pos)
    chunk = head.read(buf_len)
    # seek to zero bytes (0) from end of file (2)
    head.seek(0, 2)
    else:
    if self.head_only and head_line_count >= head_max_lines:
    # check if we've read beyond the head
    raise ValueError(f"Can't read beyond the first "
    f"{head_max_lines} lines")
    # read data from the actual stream
    chunk = stream.read(buf_len)
    if head_line_count < head_max_lines:
    head.write(chunk)
    lines_read = chunk.count(b'\n')
    head_line_count += lines_read
    chunk_len = len(chunk)
    buf[:chunk_len] = chunk
    self.read_pos += chunk_len
    return chunk_len

    header_reader = HeadBufferedReader(head_only=True)
    all_content_reader = HeadBufferedReader(head_only=False)
    return header_reader, all_content_reader