-
-
Save ewdurbin/371e90f13c6165a91298 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
class StringIteratorIO(io.IOBase): | |
def __init__(self, iterator): | |
io.IOBase.__init__(self) | |
self._iter = iterator | |
self._left = '' | |
def _read1(self, num_bytes=None): | |
while not self._left: | |
try: | |
self._left = next(self._iter) | |
except StopIteration: | |
break | |
ret = self._left[:num_bytes] | |
self._left = self._left[len(ret):] | |
return ret | |
def read(self, num_bytes=None): | |
payload = [] | |
if num_bytes is None or num_bytes < 0: | |
while True: | |
part = self._read1() | |
if not part: | |
break | |
payload.append(part) | |
else: | |
while num_bytes > 0: | |
part = self._read1(num_bytes) | |
if not part: | |
break | |
num_bytes -= len(part) | |
payload.append(part) | |
return ''.join(payload) | |
def readline(self): | |
line = [] | |
while True: | |
i = self._left.find('\n') | |
if i == -1: | |
line.append(self._left) | |
try: | |
self._left = next(self._iter) | |
except StopIteration: | |
self._left = '' | |
break | |
else: | |
line.append(self._left[:i + 1]) | |
self._left = self._left[i + 1:] | |
break | |
return ''.join(line) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest2 | |
from identity.person_id_db.string_iterator import StringIteratorIO | |
TEST_DATA = [ | |
'1234\n', | |
'5678\n', | |
'9012\n', | |
'3456\n', | |
] | |
def test_iterator(): | |
return iter(TEST_DATA) | |
class StringIteratorIOTestCase(unittest2.TestCase): | |
def setUp(self): | |
self.string_iterator = StringIteratorIO(test_iterator()) | |
def test_read(self): | |
self.assertEqual('123', self.string_iterator.read(3)) | |
self.assertEqual('4\n5', self.string_iterator.read(3)) | |
self.assertEqual('678\n9012\n', self.string_iterator.read(9)) | |
self.assertEqual('3456\n', self.string_iterator.read(100)) | |
def test_read_all(self): | |
self.assertEqual("".join(TEST_DATA), self.string_iterator.read()) | |
def test_readline(self): | |
self.assertEqual('1234\n', self.string_iterator.readline()) | |
self.assertEqual('5678\n', self.string_iterator.readline()) | |
self.assertEqual('9012\n', self.string_iterator.readline()) | |
self.assertEqual('3456\n', self.string_iterator.readline()) | |
self.assertEqual('', self.string_iterator.readline()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment