Created
September 2, 2020 15:23
-
-
Save akngs/112537341fe0bf41977c16fdbc4a6df8 to your computer and use it in GitHub Desktop.
Streaming test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from itertools import chain | |
def compare(snapshot, old, new): | |
for old_row in old: | |
try: | |
new_row = next(new) | |
except StopIteration: | |
break | |
if old_row == new_row: | |
# 정확히 일치하면 갱신되지 않은 것 | |
pass | |
elif old_row[0] > new_row[0]: | |
# 과거 로그엔 없지만 새 로그엔 있는 경우... | |
if new_row[0] in snapshot: | |
# ...스냅샷에 있었으면 갱신 | |
yield ('MOD', (new_row[0], snapshot[new_row[0]]), new_row) | |
else: | |
# ...스냅샷에도 없었으면 신규 | |
yield ('NEW', new_row) | |
snapshot[new_row[0]] = new_row[1] | |
elif old_row[0] == new_row[0]: | |
# 키만 일치하면 갱신 | |
yield ('MOD', old_row, new_row) | |
snapshot[new_row[0]] = new_row[1] | |
elif old_row[0] < new_row[0]: | |
# 과거 로그엔 있지만 새 로그엔 없으면 삭제 | |
del snapshot[old_row[0]] | |
yield ('DEL', old_row) | |
new = chain((new_row, ), new) | |
else: | |
# 논리적으로 도달할 수 없는 코드 | |
assert False, 'should not reach here' | |
# old를 모두 소진하고도 남은 new 데이터는 신규이거나 갱신 | |
for new_row in new: | |
if new_row[0] in snapshot: | |
# 스냅샷에 있었으면 갱신 | |
yield ('MOD', (new_row[0], snapshot[new_row[0]]), new_row) | |
else: | |
# 스냅샷에 없었으면 신규 | |
yield ('NEW', new_row) | |
snapshot[new_row[0]] = new_row[1] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from stream import compare | |
def test_empty(): | |
snapshot = {} | |
old = iter([]) | |
new = iter([]) | |
assert list(compare(snapshot, old, new)) == [] | |
assert snapshot == {} | |
def test_compare(): | |
snapshot = {'P1': 'A', 'P2': 'A', 'P4': 'A', 'P5': 'A'} | |
old = iter([ | |
('P1', 'A'), | |
('P2', 'A'), | |
('P4', 'A'), | |
]) | |
new = iter([ | |
('P2', 'B'), | |
('P2', 'C'), | |
('P3', 'A'), | |
('P5', 'D'), | |
('P6', 'E'), | |
]) | |
assert list(compare(snapshot, old, new)) == [ | |
('DEL', ('P1', 'A')), | |
('MOD', ('P2', 'A'), ('P2', 'B')), | |
('MOD', ('P2', 'B'), ('P2', 'C')), | |
('NEW', ('P3', 'A')), | |
('MOD', ('P5', 'A'), ('P5', 'D')), | |
('NEW', ('P6', 'E')), | |
] | |
assert snapshot == {'P2': 'C', 'P3': 'A', 'P4': 'A', 'P5': 'D', 'P6': 'E'} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment