Created
February 12, 2025 16:33
-
-
Save jrialland/6b62da19062407597ef5e7b3c0153528 to your computer and use it in GitHub Desktop.
a quick and dirty resp3 serializer / deserializer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is a serializer / deserializer for the RESP3 protocol. | |
protocol spec at https://github.com/antirez/RESP3/blob/master/spec.md | |
""" | |
from typing import Generator, BinaryIO, Any | |
class error_str(str): | |
pass | |
def _consume(lines : Generator[bytes, None, None]) -> Any: | |
line = next(lines) | |
# blob string or blob error | |
if line[0] in (ord('$'), ord('!')): | |
size = int(line[1:].decode()) | |
s = next(lines)[:size].decode() | |
return s if line[0] == b'$' else error_str(s) | |
# simple string or simple error | |
elif line[0] in (ord('+'), ord('-')): | |
return line[1:] if line[0] == b'+' else error_str(line[1:]) | |
# integer or bigint | |
elif line[0] in (ord(':'), ord('(')): | |
return int(line[1:]) | |
# null | |
elif line[0] == ord('_'): | |
return None | |
# float | |
elif line[0] == ord(','): | |
return float(line[1:]) | |
# boolean | |
elif line[0] == ord('#'): | |
return line[1] == ord('t') | |
# list, set | |
elif line[0] in (ord('*'), ord('~')): | |
size = int(line[1:].decode()) | |
l = [_consume(lines) for i in range(size)] | |
return l if line[0] == ord('*') else set(l) | |
# map ( '|' for attribute type, '%' for map type) | |
elif line[0] in (ord('%'), ord('|')): | |
size = int(line[1:].decode()) | |
return { _consume(lines) : _consume(lines) for i in range(size) } | |
else: | |
raise ValueError(f"unknown type {line[0]}") | |
def parse(input:BinaryIO) -> Any: | |
buffer = b'' | |
while True: | |
c = input.read(4096) | |
if not c: | |
break | |
buffer += c | |
*lines, buffer = buffer.split(b'\r\n') | |
return _consume(iter(lines)) | |
def encode(obj:Any) -> bytes: | |
if obj is None: | |
return b'_\r\n' | |
if isinstance(obj, bool): | |
return f"#{'t' if obj else 'f'}\r\n".encode() | |
elif isinstance(obj, str): | |
return f"${len(obj)}\r\n{obj}\r\n".encode() | |
elif isinstance(obj, error_str): | |
return f"!{len(obj)}\r\n{obj}\r\n".encode() | |
elif isinstance(obj, int): | |
return f":{obj}\r\n".encode() | |
elif isinstance(obj, float): | |
return f",{obj}\r\n".encode() | |
elif isinstance(obj, list) or isinstance(obj, tuple): | |
return f"*{len(obj)}\r\n{''.join(encode(o).decode() for o in obj)}".encode() | |
elif isinstance(obj, set): | |
return f"~{len(obj)}\r\n{''.join(encode(o).decode() for o in obj)}".encode() | |
elif isinstance(obj, dict): | |
return f"%{len(obj)}\r\n{''.join(encode(k).decode() + encode(v).decode() for k,v in obj.items())}".encode() | |
else: | |
raise ValueError(f"unknown type {type(obj)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment