Last active
May 31, 2018 20:02
-
-
Save akaihola/7d8a3ed4da73e3e9c4f0076f18c459e5 to your computer and use it in GitHub Desktop.
Split a non-seekable stream into a header reader and a content reader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
from typing import Tuple | |
def tee_stream_head(stream: io.IOBase, | |
head_max_lines: int=3) -> Tuple[io.RawIOBase, | |
io.RawIOBase]: | |
"""Split a stream, enable parallel reading of initial lines | |
This helper solves the use case where a limited number of initial lines | |
from a non-seekable need to be inspected before reading the whole stream | |
again in one go. | |
Memory is only consumed for the initial lines instead of buffering all read | |
data until the end of the stream. | |
Example (assuming no compression done by the HTTP server):: | |
>>> response = requests.get('http://acme/', stream=True) | |
>>> headers, content = tee_stream_head(response.raw) | |
>>> first_line = next(headers) | |
>>> second_line = next(headers) | |
>>> all_lines = content.readlines() | |
:param stream: The stream to read | |
:param head_max_lines: The number of initial lines to keep in a buffer | |
:return: Two readers for the stream. The first one only can read the | |
initial lines, and the second one all the way until the end of the | |
stream. The same initial lines can be read from both readers | |
independently. | |
""" | |
head_line_count = 0 | |
head = io.BytesIO() | |
class HeadBufferedReader(io.RawIOBase): | |
"""Buffering of initial lines of the stream""" | |
def __init__(self, head_only: bool): | |
super(HeadBufferedReader, self).__init__() | |
self.read_pos = 0 | |
self.head_only = head_only | |
def readable(self) -> bool: | |
return True | |
def readinto(self, buf: memoryview) -> int: | |
"""Read data from the stream or stored initial lines into a buffer | |
If the other reader has already read some initial lines into the | |
`head` buffer, and the read pointer isn't beyond the initial lines, | |
re-use that data. Otherwise read data from the stream and insert it | |
into the `head` buffer for up to `head_max_lines` lines of data. | |
:param buf: The buffer to fill with read data | |
:return: The number of bytes read into the buffer | |
""" | |
nonlocal head_line_count | |
buf_len = len(buf) | |
head_length = head.tell() | |
if self.read_pos < head_length: | |
# there's unread data in the head | |
head.seek(self.read_pos) | |
chunk = head.read(buf_len) | |
# seek to zero bytes (0) from end of file (2) | |
head.seek(0, 2) | |
else: | |
if self.head_only and head_line_count >= head_max_lines: | |
# check if we've read beyond the head | |
raise ValueError(f"Can't read beyond the first " | |
f"{head_max_lines} lines") | |
# read data from the actual stream | |
chunk = stream.read(buf_len) | |
if head_line_count < head_max_lines: | |
head.write(chunk) | |
lines_read = chunk.count(b'\n') | |
head_line_count += lines_read | |
chunk_len = len(chunk) | |
buf[:chunk_len] = chunk | |
self.read_pos += chunk_len | |
return chunk_len | |
header_reader = HeadBufferedReader(head_only=True) | |
all_content_reader = HeadBufferedReader(head_only=False) | |
return header_reader, all_content_reader |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment