Created
November 3, 2025 13:17
-
-
Save rayepeng/4fd454b8519809b47de4f89fc9cfa212 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ''' | |
| python pb_test.py # 解码,默认输出到output.txt | |
| python pb_test.py encode # 编码,默认读取output.txt | |
| ''' | |
| from google.protobuf.internal import decoder, encoder | |
| import base64 | |
| import struct | |
| # a = '''CA8SyQEIARABGAAgACgAMAA4AEAASABQAFgAYABoAHC+T3gAgAEAiAEAkAEAmAEAoAEAqAEDsAEAuAEAwgEIAQg2ONAP0Q/IAQDQAdvctKeiM9gBAOABAOgBAPABAPgBAYACAIgC0A+SAgCYAgKgAgCoAgywAgC4AgDAAgDIAgDYAgDgAgDoAgDyAgoIBhAAGAAgASgA+AIAgAMAigMICJvbgMgGIAGSAxUKCAiAi/nHBhAACgkIgK7+xwYQryuaAwYoADABOGSiAwCoAwAaCAix2wYQABgAGggIstsGEAAYABoICLPbBhAAGAAaCAi02wYQABgAIgYIwZoMEAAiBwihjQYQ1QoiBgiijQYQACq9BwojCAEQARgBIAEqAgEFMLuAwcaiMzgAQABIAFAAYABoAHAAeAESAwjwBxoCCGUqDAihjQYQZRgAIAAoADoNCJFOEGUYASAAKAAwADoNCJJOEGQYACAAKAAwAzoNCJNOEGQYACAAKAAwA0IMCKGcARBkGAAgACgAQgwIopwBEGQYACAAKABCDAijnAEQZBgAIAAoAEIMCKScARBkGAAgACgAQgwIpZwBEGQYACAAKABCDAimnAEQZBgAIAAoAEIMCKmcARBlGAEgACgASgwIseoBEGUYASAAKABSDgjBuAIQZBgAIAAoADAEWggI0YYDEGUYAGIICOHUAxBlGAFqCAjxogQQZRgBcgwIgfEEEGUYASAAKAB6CgiRvwUQZBgAIACCAQQIARAAggEECAIQAIIBBAgDEACCAQQIBBAAggEECAUQAIIBBAgGEACCAQQIBxAAggEECAgQAIIBBAgJEACKAQwIsdsGEGQYACAAKACKAQwIstsGEGQYACAAKACKAQwIs9sGEGQYACAAKACSAQwIwakHEGQYACAAKACaAQwI4cUIEGQYACAAKACaAQwI4sUIEGQYACAAKACaAQwI48UIEGQYACAAKACaAQwI5MUIEGQYACAAKACaAQwI5cUIEGQYACAAKACaAQwI5sUIEGQYACAAKACaAQwI58UIEGQYACAAKACaAQwI6MUIEGQYACAAKACaAQwI6cUIEGQYACAAKACqAQwIgeIJEGQYACAAKACqAQwIguIJEGQYACAAKACqAQwIg+IJEGQYACAAKACyAQ4Ik7AKEGQYACAAKAAwArIBDgiSsAoQZBgAIAAoADACsgEOCJGwChBkGAAgACgAMAK6AQgIof4KEGUYAMIBDAixzAsQZBgAIAAoAMIBDAiyzAsQZBgAIAAoAMIBDAizzAsQZBgAIAAoAMIBDAi0zAsQZBgAIAAoAMoBCAjBmgwQZBgA0gEMCNToDBBkGAAgACgA0gEMCNHoDBBkGAAgACgA0gEMCNLoDBBkGAAgACgA0gEMCNPoDBBkGAAgACgA4gEMCPGEDhBkGAAgACgA4gEMCPKEDhBkGAAgACgA4gEMCPOEDhBkGAAgACgA4gEMCPSEDhBkGAAgACgA6gEOCIHTDhBkGAAgACgAMAPqAQ4IgtMOEGQYACAAKAAwA/IBDAjhmBcQZBgAIAAoAPoBDAiBtRgQZBgAIAAoAIICDAjx5hcQZBgAIAAoAIoCHggBEAIYACAAKAAyADoAQgBIAFAAWABgAGgAcAB4ATIAOgoIARDL6oDIBioAQgA=''' | |
| a = '''CA8SyQEIARABGAAgACgAMAA4AEAASABQAFgAYABoAHC+T3gAgAEAiAEAkAEAmAEAoAEAqAEDsAEAuAEAwgEIAQg2ONAP0Q/IAQDQAdvctKeiM9gBAOABAOgBAPABAPgBAYACAIgC0A+SAgCYAgKgAgCoAgywAgC4AgDAAgDIAgDYAgDgAgDoAgDyAgoIBhAAGAAgASgA+AIAgAMAigMICJvbgMgGIAGSAxUKCAiAi/nHBhAACgkIgK7+xwYQryuaAwYoADABOGSiAwCoAwAaCAix2wYQABgAGggIstsGEAAYABoICLPbBhAAGAAaCAi02wYQABgAIgYIwZoMEAAiCgihjQYQgNKTrQMiCgiijQYQgNKTrQMqvQcKIwgBEAEYASABKgIBBTC7gMHGojM4AEAASABQAGAAaABwAHgBEgMI8AcaAghlKgwIoY0GEGUYACAAKAA6DQiRThBlGAEgACgAMAA6DQiSThBkGAAgACgAMAM6DQiTThBkGAAgACgAMANCDAihnAEQZBgAIAAoAEIMCKKcARBkGAAgACgAQgwIo5wBEGQYACAAKABCDAiknAEQZBgAIAAoAEIMCKWcARBkGAAgACgAQgwIppwBEGQYACAAKABCDAipnAEQZRgBIAAoAEoMCLHqARBlGAEgACgAUg4IwbgCEGQYACAAKAAwBFoICNGGAxBlGABiCAjh1AMQZRgBaggI8aIEEGUYAXIMCIHxBBBlGAEgACgAegoIkb8FEGQYACAAggEECAEQAIIBBAgCEACCAQQIAxAAggEECAQQAIIBBAgFEACCAQQIBhAAggEECAcQAIIBBAgIEACCAQQICRAAigEMCLHbBhBkGAAgACgAigEMCLLbBhBkGAAgACgAigEMCLPbBhBkGAAgACgAkgEMCMGpBxBkGAAgACgAmgEMCOHFCBBkGAAgACgAmgEMCOLFCBBkGAAgACgAmgEMCOPFCBBkGAAgACgAmgEMCOTFCBBkGAAgACgAmgEMCOXFCBBkGAAgACgAmgEMCObFCBBkGAAgACgAmgEMCOfFCBBkGAAgACgAmgEMCOjFCBBkGAAgACgAmgEMCOnFCBBkGAAgACgAqgEMCIHiCRBkGAAgACgAqgEMCILiCRBkGAAgACgAqgEMCIPiCRBkGAAgACgAsgEOCJOwChBkGAAgACgAMAKyAQ4IkrAKEGQYACAAKAAwArIBDgiRsAoQZBgAIAAoADACugEICKH+ChBlGADCAQwIscwLEGQYACAAKADCAQwIsswLEGQYACAAKADCAQwIs8wLEGQYACAAKADCAQwItMwLEGQYACAAKADKAQgIwZoMEGQYANIBDAjU6AwQZBgAIAAoANIBDAjR6AwQZBgAIAAoANIBDAjS6AwQZBgAIAAoANIBDAjT6AwQZBgAIAAoAOIBDAjxhA4QZBgAIAAoAOIBDAjyhA4QZBgAIAAoAOIBDAjzhA4QZBgAIAAoAOIBDAj0hA4QZBgAIAAoAOoBDgiB0w4QZBgAIAAoADAD6gEOCILTDhBkGAAgACgAMAPyAQwI4ZgXEGQYACAAKAD6AQwIgbUYEGQYACAAKACCAgwI8eYXEGQYACAAKACKAh4IARACGAAgACgAMgA6AEIASABQAFgAYABoAHAAeAEyADoKCAEQy+qAyAYqAEIA''' | |
| # b = '''CA4SyQEIARABGAAgACgAMAA4AEAASABQAFgAYABoAHDpRHgAgAEAiAEAkAEAmAEAoAEAqAEDsAEAuAEAwgEIAQg2ONAP0Q/IAQDQAdvctKeiM9gBAOABAOgBAPABAPgBAYACAIgC0A+SAgCYAgKgAgCoAgywAgC4AgDAAgDIAgDYAgDgAgDoAgDyAgoIBhAAGAAgASgA+AIAgAMAigMICJvbgMgGIAGSAxUKCAiAi/nHBhAACgkIgK7+xwYQryuaAwYoADABOGSiAwCoAwAaCAix2wYQABgAGggIstsGEAAYABoICLPbBhAAGAAaCAi02wYQABgAIgYIwZoMEAAiBwihjQYQ1ggiBgiijQYQACq9BwojCAEQARgBIAEqAgEFMInGhsaiMzgAQABIAFAAYABoAHAAeAESAwjwBxoCCGUqDAihjQYQZRgAIAAoADoNCJFOEGUYASAAKAAwADoNCJJOEGQYACAAKAAwAzoNCJNOEGQYACAAKAAwA0IMCKGcARBkGAAgACgAQgwIopwBEGQYACAAKABCDAijnAEQZBgAIAAoAEIMCKScARBkGAAgACgAQgwIpZwBEGQYACAAKABCDAimnAEQZBgAIAAoAEIMCKmcARBlGAEgACgASgwIseoBEGUYASAAKABSDgjBuAIQZBgAIAAoADAEWggI0YYDEGUYAGIICOHUAxBlGAFqCAjxogQQZRgBcgwIgfEEEGUYASAAKAB6CgiRvwUQZBgAIACCAQQIARAAggEECAIQAIIBBAgDEACCAQQIBBAAggEECAUQAIIBBAgGEACCAQQIBxAAggEECAgQAIIBBAgJEACKAQwIsdsGEGQYACAAKACKAQwIstsGEGQYACAAKACKAQwIs9sGEGQYACAAKACSAQwIwakHEGQYACAAKACaAQwI4cUIEGQYACAAKACaAQwI4sUIEGQYACAAKACaAQwI48UIEGQYACAAKACaAQwI5MUIEGQYACAAKACaAQwI5cUIEGQYACAAKACaAQwI5sUIEGQYACAAKACaAQwI58UIEGQYACAAKACaAQwI6MUIEGQYACAAKACaAQwI6cUIEGQYACAAKACqAQwIgeIJEGQYACAAKACqAQwIguIJEGQYACAAKACqAQwIg+IJEGQYACAAKACyAQ4Ik7AKEGQYACAAKAAwArIBDgiSsAoQZBgAIAAoADACsgEOCJGwChBkGAAgACgAMAK6AQgIof4KEGUYAMIBDAixzAsQZBgAIAAoAMIBDAiyzAsQZBgAIAAoAMIBDAizzAsQZBgAIAAoAMIBDAi0zAsQZBgAIAAoAMoBCAjBmgwQZBgA0gEMCNToDBBkGAAgACgA0gEMCNHoDBBkGAAgACgA0gEMCNLoDBBkGAAgACgA0gEMCNPoDBBkGAAgACgA4gEMCPGEDhBkGAAgACgA4gEMCPKEDhBkGAAgACgA4gEMCPOEDhBkGAAgACgA4gEMCPSEDhBkGAAgACgA6gEOCIHTDhBkGAAgACgAMAPqAQ4IgtMOEGQYACAAKAAwA/IBDAjhmBcQZBgAIAAoAPoBDAiBtRgQZBgAIAAoAIICDAjx5hcQZBgAIAAoAIoCHggBEAIYACAAKAAyADoAQgBIAFAAWABgAGgAcAB4ATIAOgoIARDT4oDIBioAQgA=''' | |
| # base64解码 | |
| data = base64.b64decode(a) | |
| def is_valid_utf8(data): | |
| """检查数据是否为有效的UTF-8字符串""" | |
| try: | |
| data.decode('utf-8') | |
| return True | |
| except: | |
| return False | |
| def parse_message(data, depth=0): | |
| """ | |
| 完整解析 protobuf message,支持所有wire types | |
| 返回 list of (field_number, value),保留重复字段 | |
| """ | |
| pos = 0 | |
| result = [] | |
| while pos < len(data): | |
| try: | |
| key, pos = decoder._DecodeVarint(data, pos) | |
| except: | |
| break | |
| field_number = key >> 3 | |
| wire_type = key & 0x7 | |
| if wire_type == 0: # varint | |
| value, pos = decoder._DecodeVarint(data, pos) | |
| result.append((field_number, value)) | |
| elif wire_type == 1: # 64-bit (fixed64, double) | |
| if pos + 8 > len(data): | |
| break | |
| value = struct.unpack('<Q', data[pos:pos+8])[0] | |
| result.append((field_number, value)) | |
| pos += 8 | |
| elif wire_type == 2: # length-delimited (字符串/bytes/嵌套消息) | |
| length, pos = decoder._DecodeVarint(data, pos) | |
| if pos + length > len(data): | |
| break | |
| value = data[pos:pos+length] | |
| # 尝试解析为嵌套消息 | |
| try: | |
| nested = parse_message(value, depth+1) | |
| if nested and len(nested) > 0: | |
| # 如果成功解析出字段,认为是嵌套消息 | |
| result.append((field_number, nested)) | |
| else: | |
| # 否则作为bytes/string | |
| result.append((field_number, value)) | |
| except: | |
| # 解析失败,作为bytes/string | |
| result.append((field_number, value)) | |
| pos += length | |
| elif wire_type == 5: # 32-bit (fixed32, float) | |
| if pos + 4 > len(data): | |
| break | |
| value = struct.unpack('<I', data[pos:pos+4])[0] | |
| result.append((field_number, value)) | |
| pos += 4 | |
| else: | |
| # 不支持的wire type,跳过 | |
| break | |
| return result | |
| def format_bytes(b): | |
| """格式化bytes为带引号和转义的字符串""" | |
| result = [] | |
| for byte in b: | |
| if 32 <= byte <= 126 and byte != ord('\\') and byte != ord('"'): | |
| # 可打印ASCII字符 | |
| result.append(chr(byte)) | |
| else: | |
| # 转义表示 | |
| if byte < 8: | |
| result.append(f'\\{byte:03o}') | |
| else: | |
| result.append(f'\\{byte:03o}') | |
| return '"' + ''.join(result) + '"' | |
| def format_value(value, indent=0): | |
| """格式化单个值""" | |
| if isinstance(value, list): | |
| # 嵌套消息 | |
| lines = [] | |
| lines.append(' ' * indent + '{') | |
| for field_num, field_val in value: | |
| field_lines = format_field(field_num, field_val, indent + 2) | |
| lines.extend(field_lines) | |
| lines.append(' ' * indent + '}') | |
| return lines | |
| elif isinstance(value, bytes): | |
| # bytes类型,判断是否为空字符串或二进制数据 | |
| if len(value) == 0: | |
| return [' ' * indent + '""'] | |
| # 检查是否全是可打印字符 | |
| try: | |
| decoded = value.decode('utf-8') | |
| # 如果只包含数字字母和一些常见字符,直接显示 | |
| if all(32 <= b <= 126 for b in value): | |
| return [' ' * indent + f'"{decoded}"'] | |
| except: | |
| pass | |
| # 否则使用转义格式 | |
| return [' ' * indent + format_bytes(value)] | |
| else: | |
| # 整数 | |
| return [' ' * indent + str(value)] | |
| def format_field(field_num, value, indent=0): | |
| """格式化字段""" | |
| lines = [] | |
| if isinstance(value, list): | |
| # 嵌套消息 | |
| lines.append(' ' * indent + f'{field_num} {{') | |
| for sub_field_num, sub_value in value: | |
| sub_lines = format_field(sub_field_num, sub_value, indent + 2) | |
| lines.extend(sub_lines) | |
| lines.append(' ' * indent + '}') | |
| elif isinstance(value, bytes): | |
| # bytes类型 | |
| if len(value) == 0: | |
| lines.append(' ' * indent + f'{field_num}: ""') | |
| else: | |
| # 尝试解码 | |
| try: | |
| decoded = value.decode('utf-8') | |
| if all(32 <= b <= 126 for b in value): | |
| lines.append(' ' * indent + f'{field_num}: "{decoded}"') | |
| else: | |
| lines.append(' ' * indent + f'{field_num}: {format_bytes(value)}') | |
| except: | |
| lines.append(' ' * indent + f'{field_num}: {format_bytes(value)}') | |
| else: | |
| # 整数 | |
| lines.append(' ' * indent + f'{field_num}: {value}') | |
| return lines | |
| def format_message(parsed_list): | |
| """格式化整个消息为res1.txt的格式""" | |
| lines = [] | |
| for field_num, value in parsed_list: | |
| field_lines = format_field(field_num, value, 0) | |
| lines.extend(field_lines) | |
| return '\n'.join(lines) | |
| def parse_escaped_string(s): | |
| """解析带转义的字符串,返回bytes""" | |
| if not s.startswith('"') or not s.endswith('"'): | |
| raise ValueError(f"字符串必须以引号包围: {s}") | |
| s = s[1:-1] # 去掉首尾引号 | |
| result = [] | |
| i = 0 | |
| while i < len(s): | |
| if s[i] == '\\' and i + 3 < len(s): | |
| # 八进制转义 \xxx | |
| try: | |
| octal = s[i+1:i+4] | |
| byte_val = int(octal, 8) | |
| result.append(byte_val) | |
| i += 4 | |
| except: | |
| result.append(ord(s[i])) | |
| i += 1 | |
| else: | |
| result.append(ord(s[i])) | |
| i += 1 | |
| return bytes(result) | |
| def parse_text_format(lines): | |
| """ | |
| 从文本格式解析回数据结构 | |
| 返回 list of (field_number, value) | |
| """ | |
| result = [] | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i].rstrip() | |
| if not line or line.isspace(): | |
| i += 1 | |
| continue | |
| # 计算缩进 | |
| indent = len(line) - len(line.lstrip()) | |
| line = line.strip() | |
| if line == '}': | |
| # 消息结束 | |
| break | |
| elif ':' in line and not line.endswith('{'): | |
| # 字段: 值 | |
| field_str, value_str = line.split(':', 1) | |
| field_num = int(field_str.strip()) | |
| value_str = value_str.strip() | |
| if value_str.startswith('"'): | |
| # 字符串/bytes值 | |
| value = parse_escaped_string(value_str) | |
| else: | |
| # 整数值 | |
| value = int(value_str) | |
| result.append((field_num, value)) | |
| i += 1 | |
| elif line.endswith('{'): | |
| # 嵌套消息开始 | |
| field_str = line[:-1].strip() | |
| field_num = int(field_str) | |
| # 找到匹配的结束括号 | |
| bracket_count = 1 | |
| j = i + 1 | |
| start_line = j | |
| while j < len(lines) and bracket_count > 0: | |
| sub_line = lines[j].strip() | |
| if sub_line.endswith('{'): | |
| bracket_count += 1 | |
| elif sub_line == '}': | |
| bracket_count -= 1 | |
| j += 1 | |
| # 递归解析嵌套消息 | |
| nested_lines = lines[start_line:j-1] | |
| nested_value = parse_text_format(nested_lines) | |
| result.append((field_num, nested_value)) | |
| i = j | |
| else: | |
| i += 1 | |
| return result | |
| def encode_message(parsed_list): | |
| """ | |
| 将解析后的list编码回protobuf二进制 | |
| """ | |
| parts = [] | |
| for field_number, value in parsed_list: | |
| if isinstance(value, int): # varint | |
| key = (field_number << 3) | 0 | |
| parts.append(encoder._VarintBytes(key)) | |
| parts.append(encoder._VarintBytes(value)) | |
| elif isinstance(value, list): # 嵌套消息 | |
| key = (field_number << 3) | 2 | |
| nested_bytes = encode_message(value) | |
| parts.append(encoder._VarintBytes(key)) | |
| parts.append(encoder._VarintBytes(len(nested_bytes))) | |
| parts.append(nested_bytes) | |
| elif isinstance(value, bytes): | |
| key = (field_number << 3) | 2 | |
| parts.append(encoder._VarintBytes(key)) | |
| parts.append(encoder._VarintBytes(len(value))) | |
| parts.append(value) | |
| else: | |
| raise TypeError(f"不支持的类型: {type(value)}") | |
| return b''.join(parts) | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) > 1 and sys.argv[1] == 'encode': | |
| # 反向模式:从文本文件编码回base64 | |
| input_file = sys.argv[2] if len(sys.argv) > 2 else 'output.txt' | |
| print(f"正在从 {input_file} 读取并编码...") | |
| with open(input_file, 'r', encoding='utf-8') as f: | |
| lines = f.readlines() | |
| # 解析文本格式 | |
| parsed = parse_text_format(lines) | |
| # 编码回二进制 | |
| encoded_data = encode_message(parsed) | |
| # base64编码 | |
| b64_result = base64.b64encode(encoded_data).decode('ascii') | |
| print("\n编码结果(base64):") | |
| print(b64_result) | |
| print(f"\n原始长度: {len(data)} bytes") | |
| print(f"编码长度: {len(encoded_data)} bytes") | |
| print(f"是否匹配: {encoded_data == data}") | |
| # 保存到文件 | |
| with open('encoded_result.txt', 'w') as f: | |
| f.write(b64_result) | |
| print(f"\n已保存到 encoded_result.txt") | |
| else: | |
| # 正常模式:解析base64并输出文本 | |
| print("正在解析protobuf数据...") | |
| msg = parse_message(data) | |
| # 格式化输出 | |
| formatted = format_message(msg) | |
| print(formatted) | |
| # 保存到文件 | |
| with open('output1.txt', 'w', encoding='utf-8') as f: | |
| f.write(formatted + '\n') | |
| print("\n已保存到 output.txt") | |
| # 验证:编码回去看是否一致 | |
| print("\n验证:重新编码...") | |
| re_encoded = encode_message(msg) | |
| print(f"原始数据: {len(data)} bytes") | |
| print(f"重新编码: {len(re_encoded)} bytes") | |
| print(f"数据一致: {re_encoded == data}") | |
| if re_encoded == data: | |
| print("✅ 编码解码完全可逆!") | |
| else: | |
| print("❌ 数据不一致") | |
| # 找出差异 | |
| for i, (a, b) in enumerate(zip(data, re_encoded)): | |
| if a != b: | |
| print(f"第一个差异在位置 {i}: 原始={a:02x}, 重编码={b:02x}") | |
| break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment