Skip to content

Instantly share code, notes, and snippets.

@ewdurbin
Forked from anacrolix/striter.py
Last active August 29, 2015 14:02
Show Gist options
  • Save ewdurbin/371e90f13c6165a91298 to your computer and use it in GitHub Desktop.
Save ewdurbin/371e90f13c6165a91298 to your computer and use it in GitHub Desktop.
import io
class StringIteratorIO(io.IOBase):
def __init__(self, iterator):
io.IOBase.__init__(self)
self._iter = iterator
self._left = ''
def _read1(self, num_bytes=None):
while not self._left:
try:
self._left = next(self._iter)
except StopIteration:
break
ret = self._left[:num_bytes]
self._left = self._left[len(ret):]
return ret
def read(self, num_bytes=None):
payload = []
if num_bytes is None or num_bytes < 0:
while True:
part = self._read1()
if not part:
break
payload.append(part)
else:
while num_bytes > 0:
part = self._read1(num_bytes)
if not part:
break
num_bytes -= len(part)
payload.append(part)
return ''.join(payload)
def readline(self):
line = []
while True:
i = self._left.find('\n')
if i == -1:
line.append(self._left)
try:
self._left = next(self._iter)
except StopIteration:
self._left = ''
break
else:
line.append(self._left[:i + 1])
self._left = self._left[i + 1:]
break
return ''.join(line)
import unittest2
from identity.person_id_db.string_iterator import StringIteratorIO
TEST_DATA = [
'1234\n',
'5678\n',
'9012\n',
'3456\n',
]
def test_iterator():
return iter(TEST_DATA)
class StringIteratorIOTestCase(unittest2.TestCase):
def setUp(self):
self.string_iterator = StringIteratorIO(test_iterator())
def test_read(self):
self.assertEqual('123', self.string_iterator.read(3))
self.assertEqual('4\n5', self.string_iterator.read(3))
self.assertEqual('678\n9012\n', self.string_iterator.read(9))
self.assertEqual('3456\n', self.string_iterator.read(100))
def test_read_all(self):
self.assertEqual("".join(TEST_DATA), self.string_iterator.read())
def test_readline(self):
self.assertEqual('1234\n', self.string_iterator.readline())
self.assertEqual('5678\n', self.string_iterator.readline())
self.assertEqual('9012\n', self.string_iterator.readline())
self.assertEqual('3456\n', self.string_iterator.readline())
self.assertEqual('', self.string_iterator.readline())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment