Skip to content

Instantly share code, notes, and snippets.

@jrialland
Created February 12, 2025 16:33
Show Gist options
  • Save jrialland/6b62da19062407597ef5e7b3c0153528 to your computer and use it in GitHub Desktop.
Save jrialland/6b62da19062407597ef5e7b3c0153528 to your computer and use it in GitHub Desktop.
a quick and dirty resp3 serializer / deserializer
"""
This is a serializer / deserializer for the RESP3 protocol.
protocol spec at https://github.com/antirez/RESP3/blob/master/spec.md
"""
from typing import Generator, BinaryIO, Any
class error_str(str):
pass
def _consume(lines : Generator[bytes, None, None]) -> Any:
line = next(lines)
# blob string or blob error
if line[0] in (ord('$'), ord('!')):
size = int(line[1:].decode())
s = next(lines)[:size].decode()
return s if line[0] == b'$' else error_str(s)
# simple string or simple error
elif line[0] in (ord('+'), ord('-')):
return line[1:] if line[0] == b'+' else error_str(line[1:])
# integer or bigint
elif line[0] in (ord(':'), ord('(')):
return int(line[1:])
# null
elif line[0] == ord('_'):
return None
# float
elif line[0] == ord(','):
return float(line[1:])
# boolean
elif line[0] == ord('#'):
return line[1] == ord('t')
# list, set
elif line[0] in (ord('*'), ord('~')):
size = int(line[1:].decode())
l = [_consume(lines) for i in range(size)]
return l if line[0] == ord('*') else set(l)
# map ( '|' for attribute type, '%' for map type)
elif line[0] in (ord('%'), ord('|')):
size = int(line[1:].decode())
return { _consume(lines) : _consume(lines) for i in range(size) }
else:
raise ValueError(f"unknown type {line[0]}")
def parse(input:BinaryIO) -> Any:
buffer = b''
while True:
c = input.read(4096)
if not c:
break
buffer += c
*lines, buffer = buffer.split(b'\r\n')
return _consume(iter(lines))
def encode(obj:Any) -> bytes:
if obj is None:
return b'_\r\n'
if isinstance(obj, bool):
return f"#{'t' if obj else 'f'}\r\n".encode()
elif isinstance(obj, str):
return f"${len(obj)}\r\n{obj}\r\n".encode()
elif isinstance(obj, error_str):
return f"!{len(obj)}\r\n{obj}\r\n".encode()
elif isinstance(obj, int):
return f":{obj}\r\n".encode()
elif isinstance(obj, float):
return f",{obj}\r\n".encode()
elif isinstance(obj, list) or isinstance(obj, tuple):
return f"*{len(obj)}\r\n{''.join(encode(o).decode() for o in obj)}".encode()
elif isinstance(obj, set):
return f"~{len(obj)}\r\n{''.join(encode(o).decode() for o in obj)}".encode()
elif isinstance(obj, dict):
return f"%{len(obj)}\r\n{''.join(encode(k).decode() + encode(v).decode() for k,v in obj.items())}".encode()
else:
raise ValueError(f"unknown type {type(obj)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment