Last active
May 31, 2018 20:02
-
-
Save akaihola/7d8a3ed4da73e3e9c4f0076f18c459e5 to your computer and use it in GitHub Desktop.
Revisions
-
akaihola revised this gist
May 31, 2018 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -43,7 +43,7 @@ def __init__(self, head_only: bool): def readable(self) -> bool: return True def readinto(self, buf: memoryview) -> int: """Read data from the stream or stored initial lines into a buffer If the other reader has already read some initial lines into the -
akaihola revised this gist
May 31, 2018 . 1 changed file with 0 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -53,7 +53,6 @@ def readinto(self, buf) -> int: :param buf: The buffer to fill with read data :return: The number of bytes read into the buffer """ nonlocal head_line_count -
akaihola created this gist
May 31, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,86 @@ import io from typing import Tuple def tee_stream_head(stream: io.IOBase, head_max_lines: int=3) -> Tuple[io.RawIOBase, io.RawIOBase]: """Split a stream, enable parallel reading of initial lines This helper solves the use case where a limited number of initial lines from a non-seekable need to be inspected before reading the whole stream again in one go. Memory is only consumed for the initial lines instead of buffering all read data until the end of the stream. Example (assuming no compression done by the HTTP server):: >>> response = requests.get('http://acme/', stream=True) >>> headers, content = tee_stream_head(response.raw) >>> first_line = next(headers) >>> second_line = next(headers) >>> all_lines = content.readlines() :param stream: The stream to read :param head_max_lines: The number of initial lines to keep in a buffer :return: Two readers for the stream. The first one only can read the initial lines, and the second one all the way until the end of the stream. The same initial lines can be read from both readers independently. """ head_line_count = 0 head = io.BytesIO() class HeadBufferedReader(io.RawIOBase): """Buffering of initial lines of the stream""" def __init__(self, head_only: bool): super(HeadBufferedReader, self).__init__() self.read_pos = 0 self.head_only = head_only def readable(self) -> bool: return True def readinto(self, buf) -> int: """Read data from the stream or stored initial lines into a buffer If the other reader has already read some initial lines into the `head` buffer, and the read pointer isn't beyond the initial lines, re-use that data. Otherwise read data from the stream and insert it into the `head` buffer for up to `head_max_lines` lines of data. :param buf: The buffer to fill with read data :return: The number of bytes read into the buffer :rtype: int """ nonlocal head_line_count buf_len = len(buf) head_length = head.tell() if self.read_pos < head_length: # there's unread data in the head head.seek(self.read_pos) chunk = head.read(buf_len) # seek to zero bytes (0) from end of file (2) head.seek(0, 2) else: if self.head_only and head_line_count >= head_max_lines: # check if we've read beyond the head raise ValueError(f"Can't read beyond the first " f"{head_max_lines} lines") # read data from the actual stream chunk = stream.read(buf_len) if head_line_count < head_max_lines: head.write(chunk) lines_read = chunk.count(b'\n') head_line_count += lines_read chunk_len = len(chunk) buf[:chunk_len] = chunk self.read_pos += chunk_len return chunk_len header_reader = HeadBufferedReader(head_only=True) all_content_reader = HeadBufferedReader(head_only=False) return header_reader, all_content_reader