Created
August 24, 2017 05:09
-
-
Save joodicator/d75cf2c7e85aa9ac456d4cb520dfd119 to your computer and use it in GitHub Desktop.
Minecraft Wireshark YAML TCP stream parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Requires pyCraft <https://github.com/ammaraskar/pyCraft> | |
and pyYAML <https://pypi.python.org/pypi/PyYAML>. | |
Usage: ./parse.py stream1.yml [stream2.yml ...] | |
or: ./parse.py < stream1.yml | |
Input files are TCP streams of *unencrypted* Minecraft connections | |
saved from Wireshark <https://www.wireshark.org/> in YAML format. | |
""" | |
from __future__ import print_function | |
import sys | |
import os | |
import re | |
import io | |
import base64 | |
import zlib | |
import yaml as y | |
from minecraft import SUPPORTED_PROTOCOL_VERSIONS | |
from minecraft.networking import types, packets, connection | |
from minecraft.networking.packets import clientbound, serverbound | |
CLIENT, SERVER = 0, 1 | |
HANDSHAKE, STATUS, LOGIN, PLAY = 0, 1, 2, 3 | |
PEER_RE = re.compile(r'peer(\d+)_(\d+)$') | |
def parse_file(file): | |
events = y.parse(file, y.SafeLoader) | |
for etype in y.StreamStartEvent, y.DocumentStartEvent, y.MappingStartEvent: | |
event = next(events) | |
assert isinstance(event, etype), 'not isinstance(%r, %r)' % (event, etype) | |
streams = {CLIENT: io.BytesIO(), SERVER: io.BytesIO()} | |
counts = {CLIENT: 0, SERVER: 0} | |
context = connection.ConnectionContext( | |
protocol_version=max(SUPPORTED_PROTOCOL_VERSIONS)) | |
compression = False | |
state = HANDSHAKE | |
packet_map = get_packets(context) | |
for event1 in events: | |
assert isinstance(event1, y.ScalarEvent), \ | |
'not isinstance(%r, y.ScalarEvent)' % event1 | |
peer, index = map(int, PEER_RE.match(event1.value).groups()) | |
assert counts[peer] == index, \ | |
'counts[%r] == %r != %r' % (peer, counts[peer], index) | |
counts[peer] += 1 | |
event2 = next(events) | |
assert isinstance(event2, y.ScalarEvent), \ | |
'not isinstance(%r, y.ScalarEvent)' % event2 | |
assert event2.tag == 'tag:yaml.org,2002:binary' | |
streams[peer].seek(len(streams[peer].getbuffer())) | |
streams[peer].write(base64.b64decode(event2.value)) | |
streams[peer].seek(0) | |
try: | |
while True: | |
for packet in read_packet( | |
stream = streams[peer], | |
compression = compression, | |
packet_map = packet_map[peer][state], | |
context = context | |
): | |
if packet is None: break | |
streams[peer] = io.BytesIO(streams[peer].read()) | |
if isinstance(packet, serverbound.handshake.HandShakePacket): | |
assert state == HANDSHAKE | |
context.protocol_version = packet.protocol_version | |
packet_map = get_packets(context) | |
state = packet.next_state | |
elif isinstance(packet, clientbound.login.LoginSuccessPacket): | |
assert state == LOGIN | |
state = PLAY | |
elif packet.packet_name == 'set compression': | |
assert state in (LOGIN, PLAY) | |
compression = packet.threshold > 0 | |
print('%s %s' % ('<--' if peer == CLIENT else '-->', packet)) | |
if packet is None: break | |
except: | |
print('<--' if peer == CLIENT else '-->', end=' ***Exception:\n') | |
raise | |
unused = { p: s.read() for (p, s) in stream } | |
if any(len(s) > 0 for s in unused.values()): | |
print('Unused data at end of stream: %r' % unused) | |
def read_packet(stream, compression, packet_map, context): | |
try: | |
for buffer in read_packet_buffer(stream, compression): | |
if buffer is not None: break | |
yield None | |
except zlib.error as e: | |
print('*** Exception: %s' % e) | |
packet = packets.Packet(context) | |
else: | |
packet_id = next(read_varint(buffer)) | |
assert packet_id is not None | |
if packet_id in packet_map: | |
packet = packet_map[packet_id](context) | |
packet.read(buffer) | |
else: | |
packet = packets.Packet(context, id=packet_id) | |
yield packet | |
def read_packet_buffer(stream, compression): | |
for length in read_varint(stream): | |
if length is not None: break | |
yield None | |
buffer = packets.PacketBuffer() | |
while True: | |
buffer.send(stream.read(length - len(buffer.get_writable()))) | |
if len(buffer.get_writable()) >= length: break | |
yield None | |
assert len(buffer.get_writable()) == length | |
buffer.reset_cursor() | |
if compression: | |
data_length = next(read_varint(buffer)) | |
assert data_length is not None | |
if data_length > 0: | |
data = zlib.decompress(buffer.read()) | |
assert len(data) == data_length | |
buffer.reset() | |
buffer.send(data) | |
buffer.reset_cursor() | |
yield buffer | |
def read_varint(stream): | |
value = 0 | |
for n in range(5): | |
while True: | |
byte = stream.read(1) | |
if len(byte) > 0: break | |
yield None | |
value |= (byte[0] & 0x7F) << 7*n | |
if byte[0] & 0x80 == 0: break | |
else: | |
raise Exception('VarInt overflow.') | |
yield value | |
def get_packets(context): | |
return {bk: {sk: {p.get_id(context): p for p in sm.get_packets(context)} | |
for (sk,sm) in ((HANDSHAKE, bm.handshake), | |
(LOGIN, bm.login), | |
(STATUS, bm.status), | |
(PLAY, bm.play))} | |
for (bk,bm) in ((CLIENT,serverbound), (SERVER,clientbound))} | |
def main(*args): | |
if not args: | |
parse_file(sys.stdin) | |
for arg in args: | |
print('%s:' % arg) | |
with open(arg) as file: | |
parse_file(file) | |
if __name__ == '__main__': | |
main(*sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment