Created
July 2, 2025 13:21
-
-
Save ajxchapman/d94581e06ed40a8ed356f42df8f8d819 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import struct | |
COMPRESSION_METHOD_NONE = 0 | |
COMPRESSION_METHOD_DEFLATE = 8 | |
LFH_ALLIGNMENT = 0x600 | |
class Header: | |
def __init__(self, **kwargs): | |
for field in self.fields: | |
setattr(self, field[0], kwargs.get(field[0], field[2])) | |
if getattr(self, field[0]) is None: | |
raise ValueError(f"Missing required field: {field[0]}") | |
def pack(self): | |
return struct.pack( | |
"<" + "".join(field[1] for field in self.fields if field[1] is not None), | |
*[getattr(self, field[0]) for field in self.fields if field[1] is not None] | |
) | |
class LocalFileHeader(Header): | |
fields = [ | |
("signature", "4s", b'PK\x03\x04'), | |
("version", "H", 20), | |
("flags", "H", 0), | |
("compression", "H", COMPRESSION_METHOD_NONE), | |
("mod_time", "H", 0), | |
("mod_date", "H", 0), | |
("crc32", "I", 0), | |
("compressed_size", "I", 0), | |
("uncompressed_size", "I", 0), | |
("filename_length", "H", None), | |
("extra_field_length", "H", 0), | |
("filename", None, None), | |
("extra_field", None, b''), | |
] | |
def __init__(self, **kwargs): | |
if 'filename_length' not in kwargs: | |
kwargs['filename_length'] = len(kwargs.get('filename', '').encode()) | |
super().__init__(**kwargs) | |
def pack(self): | |
return super().pack() + self.filename.encode() + self.extra_field | |
class CentralDirectoryHeader(Header): | |
fields = [ | |
("signature", "4s", b'PK\x01\x02'), | |
("version_made_by", "H", 20), | |
("version_needed", "H", 20), | |
("flags", "H", 0), | |
("compression", "H", COMPRESSION_METHOD_NONE), | |
("mod_time", "H", 0), | |
("mod_date", "H", 0), | |
("crc32", "I", 0), | |
("compressed_size", "I", None), | |
("uncompressed_size", "I", None), | |
("filename_length", "H", None), | |
("extra_field_length", "H", 0), | |
("comment_length", "H", 0), | |
("disk_number_start", "H", 0), | |
("internal_file_attributes", "H", 0), | |
("external_file_attributes", "I", 0), | |
("local_header_offset", "I", None), | |
("filename", None, None), | |
("extra_field", None, b''), | |
("comment", None, b'') | |
] | |
def __init__(self, **kwargs): | |
if 'filename_length' not in kwargs: | |
kwargs['filename_length'] = len(kwargs.get('filename', '').encode()) | |
super().__init__(**kwargs) | |
def pack(self): | |
return super().pack() + self.filename.encode() + self.extra_field + self.comment | |
class EndOfCentralDirectory(Header): | |
fields = [ | |
("signature", "4s", b'PK\x05\x06'), | |
("disk_number", "H", 0), | |
("central_directory_disk_number", "H", 0), | |
("central_directory_records_on_this_disk", "H", None), | |
("central_directory_records_total", "H", None), | |
("central_directory_size", "I", None), | |
("central_directory_offset", "I", None), | |
("comment_length", "H", 0), | |
("comment", None, b'') | |
] | |
def pack(self): | |
return super().pack() + self.comment | |
class ZipFile: | |
def __init__(self, filename): | |
self.filename = filename | |
self.files = [] | |
def add_file(self, name, data=None, compressed_size=None, uncompressed_size=None, compression_method=COMPRESSION_METHOD_NONE): | |
if data is None: | |
with open(name, 'rb') as f: | |
data = f.read() | |
compressed_size = compressed_size or len(data) # Assuming data is already compressed | |
uncompressed_size = uncompressed_size or len(data) | |
if uncompressed_size & 0x80808080 > 0: | |
raise ValueError(f"Uncompressed size 0x{uncompressed_size:x} has the high bit set, which is not supported in this implementation.") | |
if compressed_size & 0x80808080 > 0: | |
raise ValueError(f"Compressed size 0x{compressed_size:x} has the high bit set, which is not supported in this implementation.") | |
if compressed_size > LFH_ALLIGNMENT: | |
raise ValueError(f"Compressed size 0x{compressed_size:x} exceeds 0x{LFH_ALLIGNMENT:x} bytes, which is not supported in this implementation.") | |
self.files.append({ | |
'name': name, | |
'data': data, | |
'compressed_size': compressed_size, | |
'uncompressed_size': uncompressed_size, | |
'compression_method': compression_method, | |
'offset': len(self.files) * LFH_ALLIGNMENT, | |
}) | |
def write(self): | |
with open(self.filename, 'wb') as f: | |
for file in self.files: | |
f.seek(file['offset']) | |
file['local_header_offset'] = f.tell() | |
f.write(LocalFileHeader( | |
filename=file['name'], | |
compressed_size=0, | |
# compressed_size=file['compressed_size'], | |
uncompressed_size=0, | |
# uncompressed_size=file['uncompressed_size'], | |
compression=file['compression_method'], | |
).pack()) | |
f.write(file['data']) | |
f.seek(len(self.files) * LFH_ALLIGNMENT) | |
central_directory_offset = f.tell() | |
for file in self.files: | |
f.write(CentralDirectoryHeader( | |
filename=file['name'], | |
local_header_offset=file['local_header_offset'], | |
compressed_size=file['compressed_size'], | |
uncompressed_size=file['uncompressed_size'], | |
compression=file['compression_method'], | |
).pack()) | |
central_directory_size = f.tell() - central_directory_offset | |
if central_directory_size & 0x80808080 > 0: | |
raise ValueError(f"Central directory size 0x{central_directory_size:x} has the high bit set, which is not supported in this implementation.") | |
f.write(EndOfCentralDirectory( | |
central_directory_records_on_this_disk=len(self.files), | |
central_directory_records_total=len(self.files), | |
central_directory_size=central_directory_size, | |
central_directory_offset=central_directory_offset, | |
).pack()) | |
z = ZipFile("output.jar") | |
z.add_file("META-INF/MANIFEST.MF") | |
with open("Hello.class.deflate", "rb") as f: | |
uncompressed_size = os.stat("Hello.class").st_size | |
data = f.read() | |
z.add_file("Hello.class", data=data, uncompressed_size=uncompressed_size, compression_method=COMPRESSION_METHOD_DEFLATE) | |
z.write() | |
with open("output.jar", "rb") as f: | |
for x in f.read(): | |
if x > 0x7f: | |
raise ValueError(f"Byte 0x{x:02x} in output.jar is not a valid ASCII character.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment