Last active
July 14, 2025 18:19
-
-
Save ysfchn/07435aaec8e8c3680b1b61901348f9d2 to your computer and use it in GitHub Desktop.
A script to read & unpack firmware files of Sandisk's discontinued wireless flash drive series.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pyright: reportDeprecated=none, reportUnknownVariableType=none, reportUnknownParameterType=none, reportAny=none, reportUnknownMemberType=none, reportMissingParameterType=none, reportUnknownArgumentType=none, reportImplicitOverride=none, reportUnusedCallResult=none | |
from cmd import Cmd | |
from dataclasses import dataclass, asdict | |
from enum import Enum | |
from http.client import HTTPResponse | |
from io import BytesIO | |
from time import sleep | |
from typing import Dict, List, Optional, Union, cast | |
from urllib.error import HTTPError | |
from urllib.parse import urlencode, unquote | |
import xml.etree.ElementTree as ET | |
from concurrent.futures import ThreadPoolExecutor | |
from urllib.request import Request, urlopen | |
from hashlib import sha1, md5 | |
from os.path import join | |
from datetime import datetime | |
# from socket import IP_MULTICAST_LOOP, IP_MULTICAST_TTL, SO_REUSEADDR, SO_REUSEPORT, SOCK_DGRAM, SOL_IP, SOL_SOCKET, gethostname, socket, AF_INET, gethostbyaddr, getfqdn, getaddrinfo, gethostbyname, gethostbyname_ex | |
class DeviceType(Enum): | |
# Probably AirStash | |
A01 = "A01" | |
A02 = "A02" | |
# Wireless Flash Drive | |
FD_128K = "A02S" # 16GB/32GB variant | |
FD_256K = "A02E" # 64GB variant | |
# Wireless Stick | |
WS_V1 = "A03S" # 16GB/32GB variant | |
WS_V2 = "A03E" # 64GB/128GB/200GB variant | |
UNKNOWN = "" | |
@classmethod | |
def _missing_(cls, value): return cls.UNKNOWN | |
class BatteryStatus(Enum): | |
CHARGING = "charging" | |
FULL = "charged" | |
HIGH = "high" | |
MEDIUM = "med" | |
LOW = "low" | |
CRITICAL = "critical" | |
UNKNOWN = "" | |
@classmethod | |
def _missing_(cls, value): return cls.UNKNOWN | |
class WiFiSecurity(Enum): | |
WPA2 = "wpa2" | |
WPA = "wpa" | |
WEP = "wep" | |
PUBLIC = "none" | |
UNKNOWN = "" | |
@classmethod | |
def _missing_(cls, value): return cls.UNKNOWN | |
class StorageMediumStatus(Enum): | |
MOUNTED = "mounted" | |
UNFORMATTED = "unformatted" | |
FS_ERROR = "fserror" | |
ERROR = "carderror" | |
UNSUPPORTED = "" | |
@classmethod | |
def _missing_(cls, value): return cls.UNSUPPORTED | |
class StorageMediumFileSystem(Enum): | |
FAT16 = "fat16" | |
FAT32 = "fat32" | |
EXFAT = "exfat" | |
NTFS = "ntfs" | |
HFS = "hfs" | |
UNKNOWN = "" | |
@classmethod | |
def _missing_(cls, value): return cls.UNKNOWN | |
class NetworkConnectStatus(Enum): | |
CONNECTING = "connecting" | |
CONNECTED = "connected" | |
FAILED = "failed" | |
UNKNOWN = "unknown" | |
@classmethod | |
def _missing_(cls, value): return cls.UNKNOWN | |
class NetworkSecurityLevel(Enum): | |
NONE = "none" | |
ALL = "all" | |
UNKNOWN = "" | |
@classmethod | |
def _missing_(cls, value): return cls.UNKNOWN | |
class NetworkScanState(Enum): | |
SCANNING = "scanning" | |
LOCKED = "locked" | |
NONE = "none" | |
UNKNOWN = "" | |
@classmethod | |
def _missing_(cls, value): return cls.UNKNOWN | |
class SettingsPushState(Enum): | |
# Changes saved successfully. | |
SUCCESS = "ok" | |
SUCCESS_PENDING_RESTART = "ok:pending" | |
# Couldn't register a new network for some reason (?). | |
BAD_REQUEST = "bad" | |
# Couldn't register a new network because maximum limit has been reached. | |
ERROR_ENTRY_FULL = "full" | |
# Couldn't register a new network because it is already exists. | |
ERROR_ENTRY_DUPLICATE = "duplicate" | |
# Couldn't remove a saved network because it doesn't already exists. | |
ERROR_ENTRY_NONEXIST = "notfound" | |
@dataclass | |
class BatteryInfo: | |
status: BatteryStatus | |
voltage: int | |
@property | |
def level(self): | |
if self.status == BatteryStatus.CRITICAL: | |
return 0.2 | |
elif self.status == BatteryStatus.LOW: | |
return 0.3 | |
elif self.status == BatteryStatus.MEDIUM: | |
return 0.5 | |
elif self.status == BatteryStatus.HIGH: | |
return 1 | |
elif self.status == BatteryStatus.FULL: | |
return 1 | |
return -1 | |
@dataclass | |
class ImplementationInfo: | |
security: int | |
cachent: int | |
coex: int | |
upgrade: int | |
restart: int | |
exfat: int | |
move_rename: int | |
@property | |
def supports_auto_upgrade(self): | |
return self.upgrade >= 2 | |
@dataclass | |
class BuildInfo: | |
name: str | |
model: str | |
code: int | |
@dataclass | |
class BitrateThresholdInfo: | |
warning: int | |
critical: int | |
@dataclass | |
class LastErrorInfo: | |
message: str | |
source: str | |
line: int | |
version: Optional[int] | |
address: Optional[int] | |
counter: Optional[int] | |
timestamp: Optional[int] | |
@dataclass | |
class ExpectedAppVersionInfo: | |
android: str | |
ios: str | |
@dataclass | |
class ServerNetworkInfo: | |
enabled: bool | |
clients: int | |
ssid: str | |
channel: int | |
security: WiFiSecurity | |
key: str | |
@dataclass | |
class ClientNetworkInfo: | |
enabled: bool | |
ssid: str | |
status: NetworkConnectStatus | |
ip: str | |
from_home: bool | |
@dataclass | |
class NetworkInfo: | |
ssid: str | |
security: WiFiSecurity | |
rssi: int | |
connected: bool | |
saved: bool | |
def signal(self, is_alt: bool = False): | |
if self.rssi == -1: | |
return 0 | |
if is_alt: | |
if self.rssi <= 50: | |
return 3 | |
elif self.rssi <= 70: | |
return 2 | |
return 1 | |
if self.rssi >= 32: | |
return 3 | |
elif self.rssi >= 16: | |
return 2 | |
return 1 | |
@dataclass | |
class StorageMediumInfo: | |
status: StorageMediumStatus | |
format: StorageMediumFileSystem | |
serial: bytes | |
path: str | |
label: str | |
free: int | |
total: int | |
block_size: int | |
read_only: bool | |
@dataclass | |
class WebDAVFolder: | |
name: str | |
created: datetime | |
modified: datetime | |
@dataclass | |
class WebDAVPartialFolder: | |
name: str | |
@dataclass | |
class WebDAVFile: | |
name: str | |
mimetype: str | |
size: int | |
created: Optional[datetime] | |
modified: datetime | |
etag: str | |
cachent: Optional[str] | |
class MACAddress(str): | |
def __new__(cls, content): | |
mac = str(content).upper() | |
if mac.count(":") != 5: | |
raise ValueError(f"doesn't appear to be a valid mac: {mac}") | |
if not all((int(x, 16) <= 0xFF for x in mac.split(":"))): | |
raise ValueError(f"doesn't appear to be a valid mac: {mac}") | |
return str.__new__(cls, mac) | |
@property | |
def as_home(self): | |
if self.startswith(DIRECT_CONNECT_MAC_PREFIX): | |
return HOME_CONNECT_MAC_PREFIX + self.removeprefix(DIRECT_CONNECT_MAC_PREFIX) | |
return self | |
@property | |
def as_direct(self): | |
if self.startswith(HOME_CONNECT_MAC_PREFIX): | |
return DIRECT_CONNECT_MAC_PREFIX + self.removeprefix(HOME_CONNECT_MAC_PREFIX) | |
return self | |
@property | |
def as_int(self): | |
return int.from_bytes(bytes.fromhex(self.as_direct.replace(":", "")), "big") | |
def to_model(self): | |
# TODO: might be incorrect | |
value = self.as_int | |
if value < 0xD0_E4_0B_00_0F_00: | |
return DeviceType.A01 | |
elif (value >= 0xD0_E4_0B_00_0F_00) and (value <= 0xD0_E4_0B_03_99_FF): | |
return DeviceType.A02 | |
elif (value >= 0xD0_E4_0B_F5_D6_00) and (value <= 0xD0_E4_0B_FB_9F_FF): | |
return DeviceType.FD_128K | |
elif (value >= 0xD0_E4_0B_FB_E0_00) and (value <= 0xD0_E4_0B_FE_FF_FF): | |
return DeviceType.WS_V1 | |
elif (value >= 0xD0_E4_0B_F5_D5_FF) and (value <= 0xD0_E4_0B_80_50_00): | |
return DeviceType.WS_V2 | |
@dataclass | |
class Settings: | |
model: DeviceType | |
hostname: str | |
mac: MACAddress | |
network: ServerNetworkInfo | |
home_network: Optional[ClientNetworkInfo] | |
build: BuildInfo | |
battery: BatteryInfo | |
bitrate: BitrateThresholdInfo | |
app_version: ExpectedAppVersionInfo | |
auth: NetworkSecurityLevel | |
authhash: str | |
timeout: int | |
pending_update: Optional[int] | |
implementation: ImplementationInfo | |
mediums: List[StorageMediumInfo] | |
last_error: Optional[LastErrorInfo] | |
@property | |
def is_usb(self): | |
return (len(self.mediums) == 0) and (self.build.code >= 2009) | |
# SERVER_ADDRESS = "172.25.63.1" | |
# SERVER_ADDRESS = "sandiskf47e27.local" | |
SERVER_ADDRESS = "192.168.1.81" | |
DIRECT_CONNECT_MAC_PREFIX = "D0:E4:0B:" | |
HOME_CONNECT_MAC_PREFIX = "D2:E4:0B:" | |
def do_request(path: str, method: str, headers : Optional[Dict[str, str]] = None, data : Optional[bytes] = None): | |
server = SERVER_ADDRESS | |
# server = "httpbin.org/anything" | |
req = Request( | |
url = f"http://{server}/{path.removeprefix('/')}", | |
method = method, | |
unverifiable = True, | |
data = data, | |
headers = headers or {} | |
) | |
try: | |
resp = cast(HTTPResponse, urlopen(req)) | |
return resp.status, resp.read(), dict(resp.headers) | |
except HTTPError as he: | |
return he.status, he.read(), dict(he.headers) | |
def is_legal_name(fs : StorageMediumFileSystem, name : str): | |
illegal_chars = "" | |
if fs == StorageMediumFileSystem.FAT32: | |
illegal_chars = "*?<>:/\\|\"" | |
illegal_chars = "*?.,;:/\\|+=<>[]\"" | |
elif fs == StorageMediumFileSystem.FAT16: | |
illegal_chars = "\"*,/:;<=>?[\\]|" | |
return illegal_chars | |
def get_service(): | |
pass | |
# host = gethostbyname(gethostname()) | |
# sock = socket(AF_INET, SOCK_DGRAM) | |
# sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
# sock.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1) | |
# sock.setsockopt(SOL_IP, IP_MULTICAST_TTL, 255) | |
# sock.setsockopt(SOL_IP, IP_MULTICAST_LOOP, 1) | |
# sock.bind(("", 5353)) | |
def post_settings(query: Union[Dict[str, str], Dict[str, Union[bytes, str]]], post_restart: bool = False): | |
""" | |
Set device settings. | |
""" | |
if post_restart: | |
query["restart"] = "allowed" | |
body = urlencode(query) | |
_, data, _ = do_request( | |
"/settings.xml", | |
"POST", | |
headers = {"Content-Type": "application/x-www-form-urlencoded"}, | |
data = body.encode("utf-8") | |
) | |
statusxml = ET.XML(data) | |
assert statusxml.tag == "status" | |
# Is there a pending reboot? | |
extra = statusxml.attrib.get("restart", "") | |
if extra: | |
assert extra == "pending" | |
extra = f":{extra}" | |
return SettingsPushState((statusxml.text or "") + extra) | |
def get_networks(new_scan: bool = True) -> List[NetworkInfo]: | |
""" | |
Performs a new Wi-Fi scan, and returns a tuple of two items, a list of | |
already saved networks and a list of currently scanned networks. If | |
`new_scan` is False, then no new scan will be performed and the | |
cached scan list stored on the device will be used instead. | |
""" | |
def _get_network_list(*, is_scanned: bool): | |
query = {} | |
query["group"] = "scan" if is_scanned else "saved" | |
status, data, _ = do_request( | |
"/settings.xml" + "?" + urlencode(query), | |
"GET" | |
) | |
assert status == 200, "failed response" | |
xmldata = ET.XML(data) | |
if is_scanned: | |
if xmldata.tag == "status": | |
return NetworkScanState(xmldata.text) | |
assert xmldata.tag == "ssidlist", f"unexpected xml tag '{xmldata.tag}'" | |
result : List[NetworkInfo] = [] | |
for ssid in xmldata.findall("./ssid"): | |
network = NetworkInfo( | |
ssid = unquote(ssid.attrib["name"]).encode("ISO-8859-15").decode("utf-8"), | |
security = WiFiSecurity(ssid.attrib["security"]), | |
rssi = int(ssid.attrib.get("rssi", "") or -1), | |
# This won't be True for when listing all saved networks even if there is a | |
# saved network that the device is currently connected to. | |
# So, it only applies for scanned networks. | |
connected = ssid.attrib.get("connected", "") == "connected", | |
saved = not is_scanned | |
) | |
# Only include networks that has a SSID. | |
if not network.ssid: | |
continue | |
result.append(network) | |
return result | |
def _block_for_scan(interval: int, wait_more: bool = False): | |
sleep((interval / 1000) * (2 if wait_more else 1)) | |
networks = _get_network_list(is_scanned = True) | |
if type(networks) is NetworkScanState: | |
if networks == NetworkScanState.SCANNING: | |
return _block_for_scan(interval) | |
elif networks == NetworkScanState.LOCKED: | |
return _block_for_scan(interval, True) | |
else: | |
post_settings({ | |
"group": "scan" | |
}) | |
return _block_for_scan(interval, True) | |
else: | |
return networks | |
saved_list = cast(List[NetworkInfo], _get_network_list(is_scanned = False)) | |
scanned_list : List[NetworkInfo] = [] | |
if new_scan: | |
post_settings({ | |
"group": "scan" | |
}) | |
with ThreadPoolExecutor(max_workers=1) as executor: | |
fut = executor.submit(_block_for_scan, 2000 if new_scan else 0) | |
scanned_list = cast(List[NetworkInfo], fut.result()) | |
# If the device is currently connected to a network, | |
# then don't add the same SSID again. | |
result : List[NetworkInfo] = [] | |
result.extend(scanned_list) | |
for old in saved_list: | |
exist_in_new = next((new for new in result if new.ssid == old.ssid), None) | |
if exist_in_new: | |
exist_in_new.saved = True | |
result.append(old) | |
return result | |
# --------------------------------------------------------------------------- | |
# Wi-Fi Security | |
# --------------------------------------------------------------------------- | |
class WiFiPasswordValidationStatus(Enum): | |
REQUIRE_NO_PASSWORD = "require_no_password" | |
REQUIRE_PASSWORD = "require_password" | |
TOO_SHORT = "too_short" | |
TOO_LONG = "too_long" | |
BAD_CHARACTER = "bad_character" | |
VALID = "valid" | |
def validate_wifi_password(security: WiFiSecurity, password: str) -> WiFiPasswordValidationStatus: | |
""" | |
Checks whether the given password satisfies the requirements of the specified Wi-Fi | |
security model, and returns a WiFiPasswordValidationStatus reporting the validation outcome. | |
""" | |
if security == WiFiSecurity.PUBLIC: | |
if password: | |
return WiFiPasswordValidationStatus.REQUIRE_NO_PASSWORD | |
return WiFiPasswordValidationStatus.VALID | |
elif security == WiFiSecurity.WEP: | |
length = len(password) | |
# ASCII characters | |
if length == 13: | |
for x in password: | |
if ord(x) > 127: | |
return WiFiPasswordValidationStatus.BAD_CHARACTER | |
return WiFiPasswordValidationStatus.VALID | |
# Hexadecimal characters | |
elif length == 26: | |
allowed_chars = "0123456789abcdefABCDEF" | |
if not all(c in allowed_chars for c in password): | |
return WiFiPasswordValidationStatus.BAD_CHARACTER | |
return WiFiPasswordValidationStatus.VALID | |
elif length < 13: | |
return WiFiPasswordValidationStatus.TOO_SHORT | |
elif length < 26: | |
return WiFiPasswordValidationStatus.TOO_SHORT | |
else: | |
return WiFiPasswordValidationStatus.TOO_LONG | |
elif (security == WiFiSecurity.WPA2) or (security == WiFiSecurity.WPA): | |
length = len(password) | |
if length < 8: | |
return WiFiPasswordValidationStatus.TOO_SHORT | |
elif length > 63: | |
return WiFiPasswordValidationStatus.TOO_LONG | |
else: | |
# WPA-PSK password must only include printable ASCII characters, | |
# ranging from 32 and 126 (inclusive). | |
for x in password: | |
c = ord(x) | |
if ((c < 32) or (c > 126)): | |
return WiFiPasswordValidationStatus.BAD_CHARACTER | |
return WiFiPasswordValidationStatus.VALID | |
else: | |
raise ValueError("invalid Wi-Fi security to validate password against") | |
def create_pbkdf2_wpapsk_key(ssid: str, password: str) -> bytes: | |
""" | |
Creates a PBKDF2 WPA-PSK value from a SSID and password. | |
""" | |
# Copied from hashlib.pbkdf2_hmac() | |
outer_trans = bytes((x ^ 0x5C) for x in range(256)) | |
inner_trans = bytes((x ^ 0x36) for x in range(256)) | |
inner = sha1() | |
outer = sha1() | |
passbytes = password.encode() | |
if len(password) > 64: | |
passbytes = sha1(passbytes).digest() | |
passbytes += b"\x00" * (64 - len(passbytes)) | |
inner.update(passbytes.translate(inner_trans)) | |
outer.update(passbytes.translate(outer_trans)) | |
def prf(msg, inner=inner, outer=outer): | |
icpy = inner.copy() | |
ocpy = outer.copy() | |
icpy.update(msg) | |
ocpy.update(icpy.digest()) | |
return ocpy.digest() | |
output = bytearray() | |
loop = 1 | |
salt = ssid.encode("ISO-8859-15") | |
while len(output) < 32: | |
prev = prf(salt + loop.to_bytes(4, 'big')) | |
rkey = int.from_bytes(prev, 'big') | |
for _ in range(4096 - 1): | |
prev = prf(prev) | |
rkey ^= int.from_bytes(prev, 'big') | |
loop += 1 | |
output.extend(rkey.to_bytes(inner.digest_size, 'big')) | |
return bytes(output[:32]) | |
def save_network(ssid: str, security: WiFiSecurity, password: str): | |
""" | |
Connects to a Wi-Fi access point with given its SSID and password. | |
Requires version >= 669. | |
""" | |
query : Dict[str, Union[bytes, str]] = { | |
"group": "saved", | |
"action": "save" | |
} | |
query["ssid"] = ssid.encode("ISO-8859-15").decode("utf-8") | |
if security == WiFiSecurity.PUBLIC: | |
query["security"] = "none" | |
elif security == WiFiSecurity.WEP: | |
query["security"] = "wep" | |
elif (security == WiFiSecurity.WPA) or (security == WiFiSecurity.WPA2): | |
query["security"] = "wpa" | |
else: | |
raise ValueError("invalid security model") | |
report = validate_wifi_password(security, password) | |
if report != WiFiPasswordValidationStatus.VALID: | |
raise ValueError("password isn't legal for this security model, reason: " + str(report.name)) | |
# Insert WEP password | |
if security == WiFiSecurity.WEP: | |
query["password"] = bytes.fromhex(password).decode("ISO-8859-1").encode("utf-8") | |
elif (security == WiFiSecurity.WPA) or (security == WiFiSecurity.WPA2): | |
query["password"] = create_pbkdf2_wpapsk_key(ssid, password) | |
return post_settings(query) | |
def remove_network(ssid: str): | |
""" | |
Removes an existing network. Requires version >= 669. | |
""" | |
query = { | |
"group": "saved", | |
"action": "delete" | |
} | |
query["ssid"] = ssid.encode("ISO-8859-15").decode("utf-8") | |
return post_settings(query) | |
def connect_network(ssid: str): | |
""" | |
Connect to an already saved network. Requires version >= 669. | |
""" | |
query = { | |
"group": "saved", | |
"action": "connect" | |
} | |
query["ssid"] = ssid.encode("ISO-8859-15").decode("utf-8") | |
return post_settings(query) | |
def set_sidelink_mode(mode: bool): | |
""" | |
Change home network mode. Requires version >= 669. | |
""" | |
return post_settings({ | |
"sidelinken": "true" if mode else "false" | |
}) | |
def set_ap_password(settings: Settings, password: str, ssid: Optional[str] = None, for_home_network: Optional[bool] = None): | |
""" | |
Set AP password. | |
""" | |
required_model = WiFiSecurity.WEP | |
query = {} | |
if settings.build.code >= 703: | |
required_model = WiFiSecurity.WPA2 | |
if ssid is not None: | |
if not ssid: | |
raise ValueError("either provide an non-empty ssid or leave it blank to use default ssid") | |
if len(ssid) > 32: | |
raise ValueError("ssid can't be longer than 32 characters") | |
if password: | |
report = validate_wifi_password(required_model, password) | |
if report != WiFiPasswordValidationStatus.VALID: | |
raise ValueError("password isn't legal for this security model, reason: " + str(report.name)) | |
if required_model == WiFiSecurity.WEP: | |
query["security"] = "wep" | |
query["password"] = bytes.fromhex(password).decode("ISO-8859-1").encode("utf-8") | |
elif required_model == WiFiSecurity.WPA2: | |
query["security"] = "wpa" | |
query["password"] = create_pbkdf2_wpapsk_key(ssid or settings.network.ssid, password) | |
else: | |
query["security"] = "none" | |
if for_home_network is True: | |
query["auth"] = "all" | |
query["authowner"] = "owner" | |
query["authhash"] = md5(f"owner:{ssid or settings.network.ssid}:{password}".encode("ISO-8859-1")).digest().hex() | |
elif for_home_network is False: | |
query["auth"] = "none" | |
return post_settings(query, post_restart = True) | |
def set_ap_mode(mode: bool): | |
""" | |
Change AP mode. Requires version >= 657. | |
""" | |
return post_settings({ | |
"ap": "true" if mode else "false" | |
}) | |
def set_timeout(value: int): | |
""" | |
Sets power saving timeout. Requires version >= 586. | |
""" | |
if value < 0: | |
raise ValueError("timeout value can't be negative") | |
# 0, 15, 30, 60 | |
return post_settings({ | |
"timeout": str(value) | |
}) | |
def change_wifi_channel(value: int): | |
""" | |
Sets the WLAN channel of the device to a zero-indexed channel number. | |
Channels #13 and #14 should be avoided in North America. | |
Requires version >= 574. See more about WLAN channels: | |
https://en.wikipedia.org/wiki/List_of_WLAN_channels#2.4_GHz_(802.11b/g/n/ax/be) | |
""" | |
if value < 0: | |
raise ValueError("channel number must start from 0 (inclusive)") | |
elif value > 13: | |
raise ValueError("channel number must equal or less than 13") | |
return post_settings({ | |
"channel": str(value) | |
}) | |
def push_firmware(file: bytes, is_xml: bool, is_wfd_v2: bool): | |
""" | |
Uploads a firmware file to the device. | |
is_xml: Must be True if Settings.implementation.upgrade == 2 | |
is_wfd_v2: Must be True if Settings.model == DeviceType.WS_V2, only is in effect when is_xml is False. | |
""" | |
path = "" | |
if not is_xml: | |
if not is_wfd_v2: | |
path = "/files/AIRST.DF2" | |
else: | |
# Applies to WFD_V2 (DeviceType.WS_V2 to be specific) only | |
path = "/files/wfd.df3" | |
else: | |
path = "/settings.xml" + "?" + urlencode({ | |
"group": "firmware" | |
}) | |
assert path, "firmware parameters are invalid!" | |
return do_request( | |
path, | |
"PUT", | |
data = file | |
) | |
def webdav_list(path: str = "/") -> List[Union[WebDAVFolder, WebDAVFile]]: | |
""" | |
Lists files and folders in a path. Returns a list containing WebDAVFile and WebDAVFolder | |
objects depending on the type of the item. | |
""" | |
# https://datatracker.ietf.org/doc/html/rfc4918#section-14.20 | |
# https://datatracker.ietf.org/doc/html/rfc4918#section-9.1 | |
propfind = ET.Element("propfind", {"xmlns:D": "DAV:"}) | |
ET.SubElement(propfind, "{DAV:}propname") | |
tree = ET.ElementTree(propfind) | |
buffer = BytesIO() | |
tree.write(buffer, xml_declaration=True, encoding="utf-8") | |
filename = path.removeprefix('/') | |
status, data, _ = do_request( | |
f"/files/{filename}", | |
"PROPFIND", | |
headers = { | |
"Accept": "*/*", | |
"Depth": "1", | |
"Content-Type": "text/xml; charset=UTF-8" | |
}, | |
data = buffer.getvalue() | |
) | |
if status == 404: | |
if not filename: | |
raise ValueError("no memory card exists on the device") | |
raise ValueError("directory couldn't be found") | |
# If a redirect is detected, toggle the trailing slash. | |
if (status == 301) and path: | |
return webdav_list(path + ("" if path[-1] == "/" else "/")) | |
assert status == 207, f"status was {status}" | |
xmldata = ET.XML(data) | |
assert xmldata.tag == "{DAV:}multistatus", "couldn't parse this xml response" | |
result = [] | |
for element in xmldata.findall("./{DAV:}response"): | |
propsxml = element.find("./{DAV:}propstat/{DAV:}prop") | |
assert propsxml is not None | |
creation_date = cast(str, propsxml.findtext("./{DAV:}creationdate")) | |
modified_date = cast(str, propsxml.findtext("./{DAV:}getlastmodified")) | |
mimetype = cast(str, propsxml.findtext("./{DAV:}getcontenttype")) | |
name = cast(str, element.findtext("./{DAV:}href")).removeprefix(f"http://{SERVER_ADDRESS}/files/{filename}") | |
# Skip the collection itself that being listed | |
if not name: | |
continue | |
# Sub-collections (folders) will have a child element inside resourcetype element. | |
resource_type = propsxml.find("./{DAV:}resourcetype/*") | |
if resource_type is not None: | |
resource_type = resource_type.tag | |
if resource_type == "{DAV:}collection": | |
assert mimetype == "text/directory", "got a non-expected mimetype for a directory" | |
result.append(WebDAVFolder( | |
name = name, | |
created = datetime.strptime(creation_date, "%Y-%m-%dT%H:%M:%SZ"), | |
modified = datetime.strptime(modified_date, "%a, %d %b %Y %H:%M:%S %Z") | |
)) | |
else: | |
result.append(WebDAVFile( | |
name = name, | |
mimetype = mimetype, | |
size = int(cast(str, propsxml.findtext("./{DAV:}getcontentlength"))), | |
created = datetime.strptime(creation_date, "%Y-%m-%dT%H:%M:%SZ"), | |
modified = datetime.strptime(modified_date, "%a, %d %b %Y %H:%M:%S %Z"), | |
etag = cast(str, propsxml.findtext("./{DAV:}getetag")).removeprefix('"').removesuffix('"'), | |
cachent = propsxml.findtext("./{AirStash:}cachent") | |
)) | |
return result | |
def webdav_get(path: str): | |
""" | |
Retrieves details for a given path. Returns a WebDAVFile object if the path is a | |
file, or a WebDAVPartialFolder object if the path is not a file. | |
""" | |
filename = path.removeprefix('/').removesuffix('/') | |
status, _, headers = do_request( | |
f"/files/{filename}", | |
"HEAD", | |
headers = {"Accept": "*/*"} | |
) | |
if not filename: | |
raise ValueError("a filename is required") | |
if status == 404: | |
raise ValueError("file was not found") | |
assert status == 200, f"status was {status}" | |
# The requested filename can be a directory, in this case, | |
# return a WebDAVFolder. | |
if "Content-Length" not in headers: | |
return WebDAVPartialFolder( | |
name = filename | |
) | |
return WebDAVFile( | |
name = filename, | |
mimetype = headers["Content-Type"], | |
size = int(headers["Content-Length"]), | |
created = None, | |
modified = datetime.strptime(headers["Last-Modified"], "%a, %d %b %Y %H:%M:%S %Z"), | |
etag = headers["ETag"].removeprefix('"').removesuffix('"'), | |
cachent = None | |
) | |
def webdav_delete(path: str): | |
""" | |
Deletes an object. Returns True on success, otherwise False. (file is already non exist) | |
""" | |
filename = path.removeprefix('/').removesuffix('/') | |
status, _, _ = do_request( | |
f"/files/{filename}", | |
"DELETE", | |
headers = {"Accept": "*/*"} | |
) | |
if not filename: | |
raise ValueError("a filename is required") | |
assert status in (204, 404), f"status was {status}" | |
return status == 204 | |
def webdav_mkdir(path: str): | |
""" | |
Creates a new path. | |
""" | |
filename = path.removeprefix('/').removesuffix('/') | |
status, _, _ = do_request( | |
f"/files/{filename}/", | |
"MKCOL", | |
headers = {"Accept": "*/*"} | |
) | |
if status == 403: | |
raise ValueError("a file with same name exists or the name contains invalid characters") | |
elif status == 404: | |
raise ValueError("path was not found (try creating missing parent directories first)") | |
assert status == 201, f"status was {status}" | |
def webdav_move(source: str, target: str, overwrite: bool = False): | |
""" | |
Renames the path. | |
""" | |
if (not source) or (not target): | |
raise ValueError("both filenames are required") | |
# While the device itself doesn't care the trailing slashes on renaming, | |
# we check anyway as a safeguard. | |
if (source.endswith("/") != target.endswith("/")): | |
raise ValueError("mixed trailing slashes; both sides must be a directory or a file") | |
src, trg = source.removeprefix('/').removesuffix('/'), target.removeprefix('/').removesuffix('/') | |
status, _, _ = do_request( | |
f"/files/{src}", | |
"MOVE", | |
headers = { | |
"Destination": f"http://{SERVER_ADDRESS}/files/{trg}", | |
"Overwrite": "T" if overwrite else "F", | |
"Accept": "*/*" | |
} | |
) | |
if status == 403: | |
raise ValueError("a file with same name exists") | |
elif status == 404: | |
raise ValueError("path was not found") | |
elif status == 400: | |
raise ValueError("invalid path (file may be already moved)") | |
assert status == 201, f"status was {status}" | |
def webdav_copy(source: str, target: str, overwrite: bool = False): | |
""" | |
Copies an object. | |
TODO: Doesn't work? | |
""" | |
if (not source) or (not target): | |
raise ValueError("both filenames are required") | |
# While the device itself doesn't care the trailing slashes on renaming, | |
# we check anyway as a safeguard. | |
if (source.endswith("/") != target.endswith("/")): | |
raise ValueError("mixed trailing slashes; both sides must be a directory or a file") | |
src, trg = source.removeprefix('/').removesuffix('/'), target.removeprefix('/').removesuffix('/') | |
status, _, _ = do_request( | |
f"/files/{src}", | |
"COPY", | |
headers = { | |
"Destination": f"http://{SERVER_ADDRESS}/files/{trg}", | |
"Overwrite": "T" if overwrite else "F", | |
"Accept": "*/*" | |
} | |
) | |
if status == 403: | |
raise ValueError("a file with same name exists") | |
elif status == 404: | |
raise ValueError("path was not found") | |
elif status == 400: | |
raise ValueError("invalid path") | |
assert status == 201, f"status was {status}" | |
def check_connection(): | |
""" | |
Returns True if connected to the hostname. | |
""" | |
pass | |
# if gethostbyname(getfqdn()) != "172.25.63.2": | |
# return False | |
# return True | |
# sources/com/wearable/sdk/impl/SettingsManager.java | |
def get_settings(): | |
""" | |
Parses a settings.xml file contents and returns a structure containing fields. | |
""" | |
status, data, _ = do_request( | |
"/settings.xml", | |
"GET" | |
) | |
assert status == 200, "failed response" | |
settings = ET.XML(data) | |
assert settings.tag == "settings" | |
build = BuildInfo( | |
name = settings.findtext("./version") or "", | |
model = settings.findtext("./buildmodel") or "", | |
code = int(settings.findtext("./numericversion") or 0) | |
) | |
model = settings.findtext("./model") or "" | |
hostname = settings.findtext("./hostname") or "" | |
serial = settings.findtext("./serial") or "" | |
assert all((model, hostname, serial, build.name, build.model, build.code > 0)) | |
implement = ImplementationInfo( | |
security = int(settings.findtext("./features/security") or 0), | |
cachent = int(settings.findtext("./features/cachent") or 0), | |
coex = int(settings.findtext("./features/coex") or 0), | |
upgrade = int(settings.findtext("./features/firmwareupdate") or (1 if build.code >= 563 else 0)), | |
restart = int(settings.findtext("./features/restart") or 0), | |
exfat = int(settings.findtext("./features/exfat") or 0), | |
move_rename = 1 if build.code >= 1082 else 0 | |
) | |
bitxml = settings.find("./bitrate") | |
assert bitxml is not None | |
bitrate = BitrateThresholdInfo( | |
warning = int(bitxml.attrib["warn"]) * 1000, | |
critical = int(bitxml.attrib["critical"]) * 1000 | |
) | |
# Get battery info. | |
battxml = settings.find("./battery") | |
assert battxml is not None | |
battery = BatteryInfo( | |
status = BatteryStatus(battxml.attrib["status"]), | |
voltage = int(battxml.attrib["voltage"]) | |
) | |
# Get access point information. | |
apxml = settings.find("./ap") | |
assert apxml is not None | |
access_point = ServerNetworkInfo( | |
enabled = apxml.attrib["enabled"] == "true", | |
clients = int(apxml.attrib.get("clients", None) or -1), | |
ssid = unquote(cast(str, settings.findtext("./ssid"))).encode("ISO-8859-15").decode("utf-8"), | |
channel = int(settings.findtext("./channel") or -1), | |
security = WiFiSecurity(settings.findtext("./security")), | |
key = settings.findtext("./wpapsk") or "", | |
) | |
# Get connected Wi-Fi client information. | |
sidelinkxml = settings.find("./sidelink") | |
sidelink_enabled = False | |
if (sidelinkxml is not None) and (sidelinkxml.attrib.get("enabled") == "true"): | |
sidelink_enabled = True | |
sdcxml = settings.find("./client") | |
sidelink = None | |
if sdcxml is not None: | |
sidelink = ClientNetworkInfo( | |
enabled = sidelink_enabled, | |
ssid = unquote(sdcxml.attrib["ssid"]).encode("ISO-8859-15").decode("utf-8"), | |
ip = sdcxml.attrib["ip"], | |
status = NetworkConnectStatus(sdcxml.attrib["status"]), | |
# Are we currently connected to the home network, instead of drive's own AP? | |
from_home = sdcxml.attrib.get("method", "") == "sidelink" | |
) | |
# Not obtained an IP address yet. | |
if sidelink.ip == "0.0.0.0": | |
sidelink.status = NetworkConnectStatus.UNKNOWN | |
# Get a list of storage mediums (in fact, microSD cards residing on the drive). | |
mediums : List[StorageMediumInfo] = [] | |
for medium in settings.findall("./cards/card"): | |
# The microSD card is removed, or the device plugged to a USB port, since | |
# the device doesn't support accessing files from both WebDAV and USB at the same time. | |
if medium.attrib["status"].lower() == "none": | |
continue | |
card_status = StorageMediumStatus(medium.attrib["status"]) | |
mediums.append(StorageMediumInfo( | |
status = card_status, | |
format = StorageMediumFileSystem(medium.attrib["format"]), | |
serial = bytes.fromhex(medium.attrib["serial"]), # The UUID is returned as "1234ABCD", without the dash. | |
path = medium.attrib["path"], | |
label = medium.attrib["label"], | |
free = int(medium.attrib["free"]), | |
total = int(medium.attrib["total"]), | |
block_size = int(medium.attrib["blocksize"]), | |
read_only = | |
card_status != StorageMediumStatus.MOUNTED if build.code >= 914 else | |
medium.attrib.get("readonly", "") == "protected" | |
)) | |
# Get the last thrown exception stored in the device. | |
last_error : Optional[LastErrorInfo] = None | |
if build.code >= 914: | |
stored_error = settings.find("./storederror") | |
if stored_error is not None: | |
errfile = stored_error.attrib.get("file", None) | |
errdesc = stored_error.attrib.get("description", None) | |
if ((errfile is not None) and errdesc) and (((build.code < 2009) and (errdesc.lower() != "none")) or (build.code >= 2009)): | |
last_error = LastErrorInfo( | |
message = errdesc, | |
source = errfile, | |
line = int(stored_error.attrib["line"]), | |
version = None if "version" not in stored_error.attrib else int(stored_error.attrib["version"]), | |
address = None if "address" not in stored_error.attrib else int(stored_error.attrib["address"], 16), | |
counter = None if "pc" not in stored_error.attrib else int(stored_error.attrib["pc"], 16), | |
timestamp = None if "timestamp" not in stored_error.attrib else int(stored_error.attrib["timestamp"]) | |
) | |
pending_firm = None | |
pendingxml = settings.find("./pendingfirmware") | |
if pendingxml is not None: | |
pending_firm = int(pendingxml.attrib.get("build", "") or 0) | |
modelenum = DeviceType(model) | |
if (model == DeviceType.FD_128K) and (build.model == "A02E"): | |
model = DeviceType.FD_256K | |
if (build.model == "A03E"): | |
model = DeviceType.WS_V2 | |
mac = MACAddress(serial) | |
# If we can't still determine the device from its model, | |
# try with MAC address instead. | |
if modelenum == DeviceType.UNKNOWN: | |
print("model couldn't be detected, trying mac!") | |
modelenum = mac.to_model() | |
if not modelenum: | |
raise ValueError(f"couldn't determine the model: {serial}") | |
result = Settings( | |
model = modelenum, | |
hostname = hostname, | |
mac = mac, | |
network = access_point, | |
home_network = sidelink, | |
build = build, | |
battery = battery, | |
bitrate = bitrate, | |
app_version = ExpectedAppVersionInfo( | |
android = settings.findtext("./appversion/android") or "", | |
ios = settings.findtext("./appversion/ios") or "" | |
), | |
auth = NetworkSecurityLevel(settings.findtext("./auth")), | |
authhash = settings.findtext("./authhash") or "", | |
timeout = int(settings.findtext("./timeout") or -1), | |
pending_update = pending_firm, | |
implementation = implement, | |
mediums = mediums, | |
last_error = last_error | |
) | |
return result | |
class SandiskShell(Cmd): | |
intro: str = "" | |
prompt: str = "sandisk:/> " | |
currentdir: str = "/" | |
def do_pwd(self, *_): | |
"Get the current path." | |
print(self.currentdir) | |
def do_info(self, *_): | |
for k, v in asdict(get_settings()).items(): | |
print(f"{k}: {v}") | |
def do_ls(self, arg: str): | |
details = arg.strip() == "more" | |
empty = True | |
for item in webdav_list(self.currentdir): | |
empty = False | |
if isinstance(item, WebDAVFolder): | |
if details: | |
print(f"- [{item.name}] | {item.created.isoformat()} | {item.modified.isoformat()}") | |
else: | |
print(f"- [{item.name}]") | |
else: | |
if details: | |
assert item.created | |
assert item.modified | |
print(f"- {item.name} | {item.created.isoformat()} | {item.modified.isoformat()}, {item.size} bytes, {item.etag}, {item.cachent}") | |
else: | |
print(f"- {item.name}") | |
if empty: | |
print("(directory is empty)") | |
def do_network(self, arg: str): | |
if (arg == "scan") or (arg == "list"): | |
is_scan = arg == "scan" | |
state = get_settings() | |
if is_scan: | |
print("Scanning for networks, wait for few seconds...") | |
networks = get_networks(is_scan) | |
rssi_mode = state.model in (DeviceType.WS_V2, DeviceType.WS_V1, ) | |
for network in networks: | |
level = network.signal(rssi_mode) | |
bars = "..." | |
if level > 0: | |
bars = (level * "■").ljust(3, "_") | |
print(f"* [{bars}] {'♯ ' if network.security.value != 'none' else ' '}{network.ssid}") | |
elif arg.startswith("connect"): | |
ssid = arg.removeprefix("connect").strip() | |
if not ssid: | |
print("An SSID is required!") | |
return | |
network = next((x for x in get_networks(False) if x.ssid == ssid), None) | |
if not network: | |
print("SSID couldn't be found!") | |
return | |
if network.security == WiFiSecurity.UNKNOWN: | |
print("Unknown security model for this SSID!") | |
return | |
password = "" | |
if (network.security != WiFiSecurity.PUBLIC) and (not network.saved): | |
password = input("Input password: ") | |
if not network.saved: | |
save_network(ssid, network.security, password) | |
print(connect_network(ssid)) | |
def do_cd(self, arg: str): | |
upcoming = None | |
if arg == "..": | |
upcoming = "/".join(self.currentdir.split("/")[:-1]) or "/" | |
elif arg == "/": | |
upcoming = "/" | |
elif arg: | |
upcoming = join(self.currentdir, "/".join((x for x in arg.split("/") if x))) | |
if upcoming: | |
try: | |
webdav_get(upcoming) | |
self.currentdir = upcoming | |
except ValueError as e: | |
print("error:", str(e)) | |
def do_crash(self, _): | |
webdav_move("../files/", "../") | |
def do_restart(self, _): | |
print(post_settings({ | |
"restart": "allowed" | |
})) | |
def do_battery(self, _): | |
settings = get_settings() | |
bar = { | |
BatteryStatus.CHARGING: "[░🗲░]", | |
BatteryStatus.CRITICAL: "[░⚠░]", | |
BatteryStatus.FULL: "[▊▊▊]", | |
BatteryStatus.HIGH: "[▊▊▊]", | |
BatteryStatus.MEDIUM: "[▊▊ ]", | |
BatteryStatus.LOW: "[▊ ]", | |
BatteryStatus.UNKNOWN: "[ ? ]" | |
} | |
level = \ | |
"Charging" if settings.battery.status == BatteryStatus.CHARGING else \ | |
"Unknown" if settings.battery.status == BatteryStatus.UNKNOWN else \ | |
"Fully charged" if settings.battery.status == BatteryStatus.FULL else \ | |
f"<= {(settings.battery.level) * 100:.2f}%" | |
print(f"{bar[settings.battery.status]} {level}, {settings.battery.voltage / 1000:.2f}V") | |
print(f"Power save timeout: {settings.timeout} min(s)") | |
def emptyline(self): | |
return False | |
def precmd(self, line: str) -> str: | |
self.prompt = f"sandisk:{self.currentdir}> " | |
return line | |
def postcmd(self, stop: bool, line: str) -> bool: | |
self.prompt = f"sandisk:{self.currentdir}> " | |
return stop | |
def do_exit(self, *_): | |
"Exit from the program." | |
exit(0) | |
if __name__ == '__main__': | |
try: | |
SandiskShell().cmdloop() | |
except KeyboardInterrupt: | |
print() | |
pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pyright: basic | |
# | |
# Copyright (C) 2025 Yusuf Cihan | |
# @ysfchn https://ysfchn.com | |
# | |
# This program is a free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published | |
# by the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
# | |
""" | |
A script to read & unpack firmware files of Sandisk's discontinued Wireless Flash | |
Drive & Wireless Stick series[^1] from 2015. The devices are clone of AirStash devices with | |
Sandisk's branding & modifications added on top of it. | |
Since the device itself does no longer being sold and its firmware format is proprietary, | |
there is very few information about these available on the internet, so I've tried my | |
best to research to gather much info as possible. | |
If you're a looking for copies of the firmware files, | |
1) The discontinued (and pulled off from the app store) "Sandisk Connect Drive" app | |
contains firmware files stored in the APK itself, so you can just grab the APK from a | |
random APK mirror website and unpack it. (in "res/raw" folder) | |
2) Luckily, Internet Archive has some archived copies. Though the below link also lists | |
firmwares for other Sandisk devices, so you will need to filter links that ending with | |
".df2" and ".df3" | |
https://web.archive.org/web/*/http://downloads.sandisk.com/firmware/* | |
".DF2" files uses different format than ".DF3" files, and they are not compatible with | |
each other. Some models uses ".DF2", and newer ones uses ".DF3" from what I know. | |
I've only managed to find information for ".DF3" files, and thanks to the OP, who explains | |
some byte structure found in the firmware, I was able to create this script. [^4] There | |
is also another topic in the same community discussing about the device. [^5] | |
Sandisk's own knowledge base contains several articles about these devices, so if you're | |
interested, you can also check out there, specifially manual firmware upgrade instructions. [^2][^3] | |
[^1]: https://web.archive.org/web/20221205065124/https://support-en.wd.com/app/answers/detailweb/a_id/44632/ | |
[^2]: https://support-en.sandisk.com/app/answers/detailweb/a_id/41388 | |
(archive 1: https://web.archive.org/web/20250409170927/https://support-en.sandisk.com/app/answers/detailweb/a_id/41388) | |
(archive 2: https://archive.is/W1bC6) | |
[^3]: https://web.archive.org/web/20160515082212/http://kb.sandisk.com/app/answers/detail/a_id/17556 | |
[^4]: https://forums.hak5.org/topic/41479-sandisk-wireless-connect-16g-flash-drive/ | |
(archive 1: https://web.archive.org/web/20250409165759/https://forums.hak5.org/topic/41479-sandisk-wireless-connect-16g-flash-drive/) | |
(archive 2: https://archive.md/NTENm) | |
[^5]: https://forums.hak5.org/topic/30273-hack-a-sandisk-32g-wifi-enabled-flash-drive/ | |
""" | |
from io import BytesIO | |
from pathlib import Path | |
from typing import Dict, List, Optional, Tuple, Type, cast, NamedTuple | |
from zipfile import ZipFile | |
from hashlib import sha256 | |
from sys import stderr | |
from argparse import ArgumentParser | |
from datetime import datetime | |
from zlib import crc32 | |
import json | |
UIMAGE_MAGIC = bytes((0x27, 0x05, 0x19, 0x56)) | |
GZIP_MAGIC = bytes((0x1f, 0x8b, 0x08, 0x00)) | |
# https://formats.kaitai.io/uimage/ | |
class UImageHeader(NamedTuple): | |
header_crc: int | |
timestamp: datetime | |
data_length: int | |
load_addr: int | |
entry_addr: int | |
data_crc: int | |
os_type: int | |
arch_type: int | |
image_type: int | |
comp_type: int | |
name: str | |
@classmethod | |
def from_bytes(cls: "Type[UImageHeader]", data: bytes) -> "UImageHeader": | |
assert len(data) == 64, f"uimage header must be 64 in length, not {len(data)}" | |
assert data[0:4] == UIMAGE_MAGIC, f"no uimage header was detected, got 0x{data[0:4].hex()}" | |
# Check if header CRC32 is correct. | |
calculated = crc32(data[0:4] + bytes(4) + data[8:64]) | |
result = cls( | |
header_crc = int.from_bytes(data[4:8], "big"), | |
timestamp = datetime.fromtimestamp(int.from_bytes(data[8:12], "big")), | |
data_length = int.from_bytes(data[12:16], "big"), | |
load_addr = int.from_bytes(data[16:20], "big"), | |
entry_addr = int.from_bytes(data[20:24], "big"), | |
data_crc = int.from_bytes(data[24:28], "big"), | |
os_type = int.from_bytes(data[28:29], "big"), | |
arch_type = int.from_bytes(data[29:30], "big"), | |
image_type = int.from_bytes(data[30:31], "big"), | |
comp_type = int.from_bytes(data[31:32], "big"), | |
name = data[32:64].decode("utf-8") | |
) | |
assert calculated == result.header_crc, f"mismatched uimage header crc32, expected {result.header_crc} but got {calculated}" | |
return result | |
def __repr__(self) -> str: | |
return \ | |
f"time={self.timestamp.isoformat()} length={self.data_length} load=0x{self.load_addr:x} entry=0x{self.entry_addr:x} " + \ | |
f"os={self.os_type} arch={self.arch_type} comp={self.comp_type} itype={self.image_type} name='{self.name}' crc={self.data_crc}" | |
class FirmwareInfo(NamedTuple): | |
version: str | |
entries: List[Tuple[str, bytes, bool]] | |
flash_count: int | |
file_size: int | |
class DF3Firmware(NamedTuple): | |
model: str | |
version: int | |
file_size: int | |
entries: List[Tuple[int, int, Optional[str]]] | |
def read_wmd_firmware(firmware: Path) -> Tuple[BytesIO, FirmwareInfo]: | |
""" | |
Reads an Sandisk Media Drive (SWDS1) firmware and unpacks its contents. | |
The firmware payload is not cracked down fully, so the gathered information is too | |
minimal at the moment. | |
""" | |
buffer = BytesIO() | |
buffer.write(firmware.read_bytes()) | |
total_size = buffer.tell() | |
buffer.seek(0) | |
# Skip these bytes now, we don't know their purpose. | |
buffer.read(20) | |
# The version is stored in 2 bytes, first byte is the major, | |
# and the second byte is the minor. | |
# 02 5D -> 2.93 // 03 04 -> 3.04 | |
major, minor = cast(Tuple[str, str], map(str, buffer.read(2))) | |
version = major + "." + minor.rjust(2, "0") | |
info = FirmwareInfo( | |
version = version, | |
entries = [], | |
flash_count = (total_size - 152) // 1024, | |
file_size = total_size | |
) | |
print(f"version: {info.version}", file = stderr) | |
print(f"size: {info.file_size}", file = stderr) | |
print(f"flash: {info.flash_count}", file = stderr) | |
assert buffer.read(2) == bytes(2), "unexpected bytes at start" | |
assert buffer.read(9) == b"Qwifi.img", "unexpected bytes at start; name" | |
assert (total_size % 1024) == 152, "file size must be multiple of 1024 with 152 bytes at start" | |
# -------------------------------- | |
# File entries | |
# -------------------------------- | |
# Seek to the where file entries are located. | |
# File entries also contain "." and ".." even though these are not files. | |
buffer.seek(0x0B9498) | |
def read_next_file_entry(): | |
unk1 = buffer.read(6) | |
length, ftype = buffer.read(2) | |
name = buffer.read(length).decode("utf-8") | |
# If file name length is not a multiple of 4 bytes, there is a | |
# N number of empty bytes following it to make it a multiple of 4. | |
skip = 4 - (length % 4) | |
if skip == 4: | |
skip = 0 | |
assert buffer.read(skip) == bytes(skip), f"expected {skip} empty bytes after filename" | |
# If we hit to series of null bytes, make sure other | |
# fields are null too. | |
if unk1 == bytes(6): | |
assert ((not length) and (not ftype)) and (not name), "unexpected series of bytes after a null entry" | |
return None | |
assert ftype in (1, 2), f"file type ({ftype}) expected to be either 1 (file) or 2 (folder)" | |
return name, unk1, ftype == 1 | |
assert buffer.tell() == 0x0B9498, "invalid address" | |
while (entry := read_next_file_entry()) is not None: | |
print(f'entry "{entry[0]}" -> {entry[1].hex()}', file = stderr) | |
info.entries.append(entry) | |
buffer.seek(0x000498) | |
buffer.read(1024) | |
assert buffer.read(1024).startswith(b"\x03\x01"), "unexpected bytes" | |
# Seek through sectors that has same constant byte pattern. | |
for i in range(1, 257): | |
assert buffer.tell() == (0x000898 + (i * 1024)), "invalid address" | |
flash = buffer.read(1024) | |
flag = 0x0220 + (i * 256) | |
if flag > 0xFFFF: | |
flag = flag - 0xFFFF | |
expected_data = bytearray() | |
for j in range(5): | |
expected_data.extend((flag + (0 if j == 4 else (j * 64))).to_bytes(2, "big")) | |
expected_data.extend((1, ) if j == 4 else bytes(2)) | |
assert len(expected_data) == 19, "unexpected constructed bytes" | |
expected_data.extend(bytes(1005)) # Make it to 1024 bytes. | |
assert expected_data == flash, f"unexpected flash #{i} -> {expected_data[:-1005].hex()} != {flash[:-1005].hex()}" | |
# -------------------------------- | |
# uImage | |
# -------------------------------- | |
# The firmware contains an uimage header at some address. The header CRC is correct, however I couldn't | |
# manage to verify the data CRC, so I presume the full uimage data isn't immediately followed after the | |
# header and instead stored elsewhere in the firmware, which makes things more difficult for sure. | |
# | |
# For "sandiskmediafirmware-3-04.img", I got this: | |
# time=2013-05-28T09:14:45 length=2242984 load=0x70008000 entry=0x70008000 os=5 arch=2 | |
# compr=0 image=2 name='Linux-2.6.35.3-899-g9b1a262' | |
def read_uimage_header(): | |
header = buffer.read(64) | |
hd = UImageHeader.from_bytes(header) | |
print(hd, file = stderr) | |
read_total = 0 | |
uimage = BytesIO() | |
while (read_total < hd.data_length): | |
data = return_if_valuable(buffer.read(1024)) | |
read_total += uimage.write(data) | |
uimage.seek(0) | |
full = uimage.getvalue() | |
print( | |
crc32(full), | |
crc32(header + full), | |
crc32(full[:hd.data_length]), | |
crc32(header + full[:hd.data_length]), | |
crc32(header + full[:hd.data_length]) | |
) | |
return uimage.getvalue() | |
# -------------------------------- | |
# Flash data | |
# -------------------------------- | |
# Determine if given sector of 1024 bytes possibly contains a valuable data | |
# and not a pattern of constant bytes. | |
def return_if_valuable(data: bytes): | |
assert len(data) == 1024, f"expected 1024 number of bytes, but got {len(data)}" | |
faul = False | |
# If data is empty, discard. | |
if data == bytes(1024): | |
faul = True | |
# Discard data if data only contains FF and 00. | |
vc = 0 | |
for b in data: | |
if (0xFF != b) and (0x00 != b): | |
vc += 1 | |
if not vc: | |
faul = True | |
if vc < 512: | |
faul = True | |
if not faul: | |
it, _ov = data[0], data[1] | |
sus = False | |
for i in range(4): | |
if (it == data[i * 4]) or (((it + i) & 0xFF) == data[i * 4]): | |
sus = True | |
else: | |
if sus: | |
sus = False | |
break | |
faul = sus | |
# if faul: | |
# print("is garbage: ", data.hex(" ")[:128]) | |
return bytes(0) if faul else data | |
uimage_start = buffer.getvalue().find(UIMAGE_MAGIC) | |
print(f"uimage found at 0x{uimage_start:x} / {(uimage_start - 152) / 1024}") | |
buffer.seek(uimage_start) | |
with open("im", "wb") as f: | |
f.write(read_uimage_header()) | |
# Read files. | |
# TODO: I'm not sure where it is written in the firmware so | |
# I have no choice but to hardcode addresses manually. | |
files = { | |
"fw_info.conf": ((0 if version == "2.93" else 0x00300498), 1), | |
"postaction.sh": ((0x00180498 if version == "2.93" else 0x00300898), 3), | |
"preaction.sh": ((0x00181098 if version == "2.93" else 0x00301498), 3), | |
} | |
dumpobj = BytesIO() | |
zipdump = ZipFile(dumpobj, "w") | |
for name, _, is_file in info.entries: | |
if not is_file: | |
continue | |
address, length = files.get(name, (0, 0)) | |
if not address: | |
print(f"couldn't crack down '{name}' yet, skipped.", file = stderr) | |
continue | |
with zipdump.open(name, "w") as fw: | |
buffer.seek(address) | |
data = buffer.read(length * 1024).rstrip(b"\x00") | |
assert data[0] != b"\x00", f"address 0x{address:x} starts with 0x00" | |
fw.write(data) | |
zipdump.close() | |
dumpobj.seek(0) | |
return dumpobj, info | |
# REMOVE ME! | |
# def create_rotating_bytes(current: int, overflows: int): | |
# data = bytearray() | |
# for _ in range(256): | |
# current += 1 | |
# value = current & 0xFF | |
# if value == 0x00: | |
# overflows += 1 | |
# data.append(value) | |
# data.extend(overflows.to_bytes(3, "little")) | |
# return data | |
# | |
# current, overflows = 19, 11, | |
# rootfs = BytesIO() | |
# | |
# def read_next_sector(skip_next: bool = False, is_manual: bool = False): | |
# nonlocal current, overflows | |
# current += 1 | |
# overflows += 1 | |
# garbage = buffer.read(1024) | |
# # Skip garbage check on this address. | |
# if garbage == bytes(1024): | |
# return | |
# if buffer.tell() not in (0x7ca498,): | |
# if not is_manual: | |
# assert garbage == create_rotating_bytes(current, overflows), f"mismatching bytes at 0x{buffer.tell() - 1024:x}" | |
# rootfs.write(buffer.read(256 * 1024)) | |
# if skip_next: | |
# buffer.read(1024) | |
# current += 1 | |
# LEGACY | |
# rootfs.write(buffer.read(12 * 1024)) | |
# read_next_sector(True) | |
# for _ in range(237): | |
# read_next_sector() | |
# buffer.read(1024 * 486) # blank | |
# rootfs.write(buffer.read(40 * 1024)) | |
# read_next_sector(is_manual = True) | |
# assert buffer.tell() == 0x00904898 | |
# current, overflows = 17, 35, | |
# for i in range(26): | |
# read_next_sector(is_manual = i == 27) | |
# OLD | |
# for i in range(465 - 50): | |
# read_next_sector(is_manual = i == 27) | |
# for i in range(4096 + 1024 - 5): | |
# grab = get_if_not_garbage(buffer.read(1024)) | |
# if not rootfs.write(grab): | |
# print(f"#{i + 1} WAS GARBAGE") | |
# | |
# print(buffer.tell(), 0x008BA498, uimage_start) | |
# buffer.seek(0x008BA498) | |
# | |
# for i in range(4096 + 3072 + 512 + 256 - 6): | |
# grab = get_if_not_garbage(buffer.read(1024)) | |
# if not rootfs.write(grab): | |
# print(f"#{i + 1} WAS GARBAGE") | |
# | |
# print(buffer.tell(), uimage_start) | |
# | |
# ui = BytesIO() | |
# while True: | |
# dd = buffer.read(1024) | |
# if not dd: | |
# break | |
# grab = get_if_not_garbage(dd) | |
# ui.write(grab) | |
# | |
# with open("rootfs", "wb") as f: | |
# f.write(rootfs.getvalue()) | |
# | |
# with open("uim", "wb") as f: | |
# f.write(ui.getvalue()) | |
# | |
# zipdump.close() | |
# dumpobj.seek(0) | |
# return dumpobj, entries, version | |
def read_df3_firmware(firmware: Path): | |
""" | |
Reads an Sandisk Wireless Stick (SDWS4) firmware file and unpacks its content. | |
The firmware doesn't contain the filenames of files in the filesystem, assuming they | |
are hardcoded in ROM, so this function tries to decipher the names of the files based | |
on their checksum and which HTTP path are they served from in the web server. | |
""" | |
if firmware.suffix != ".df3": | |
raise ValueError("firmware path doesn't end with .df3!") | |
buffer = BytesIO() | |
buffer.write(firmware.read_bytes()) | |
total_size = buffer.tell() | |
buffer.seek(0) | |
# The byte structure is simply follows: | |
# AA BB BB .. .. | |
# AA -> ID of the sector | |
# BB -> Length of the sector (big-endian) | |
def get_next_sector(eid: int, elen: int): | |
sid = int.from_bytes(buffer.read(1), "big") | |
# Check if the obtained byte matches with given value to make sure it is a valid file. | |
assert eid == sid, f"current sector id mismatch, got {sid} but expected {eid}, is it a valid firmware?" | |
slen = int.from_bytes(buffer.read(2), "big") | |
assert slen == elen, f"current sector length mismatch, got {slen} but expected {elen}, is it a valid firmware?" | |
return buffer.read(slen) | |
# First 3 bytes of the file is equal to [01 00 08], so, | |
# taking above information into the account: | |
# 01 -> The ID of the sector | |
# 00 08 (= 8) -> Sector is 8 bytes long | |
model = get_next_sector(1, 8).strip(b"\x00").decode("ascii") | |
# The version code is represented as an integer, and followed by an ASCII encoded string | |
# of the same version code. For example; the bytes 0-4 is 0x00000802 (= 2050), and | |
# bytes 4-8 is "2050" in ASCII. So, we can presume both versions must be equal in value. | |
version = get_next_sector(2, 36) | |
version_int = int.from_bytes(version[0:4], "big") | |
version_str = version[4:8].decode("ascii") | |
assert str(version_int) == version_str, f"version names doesn't match, '{version_int}' != '{version_str}'" | |
info = DF3Firmware( | |
model = model, | |
version = version_int, | |
file_size = total_size, | |
entries = [] | |
) | |
print(f"model: {info.model}", file = stderr) | |
print(f"version: {info.version}", file = stderr) | |
print(f"file size: {info.file_size}", file = stderr) | |
# Findings: | |
# wfd2050s - 0x001032FF - WiFi driver and firmware version | |
# I couldn't get to know about meaning of these bytes, but as far I've tested with several | |
# firmwares, these sectors are always the same, so we can just seek & validate through it. | |
get_next_sector(3, 4) | |
get_next_sector(4, 1) | |
get_next_sector(5, 96) | |
get_next_sector(6, 16) | |
get_next_sector(7, 117) | |
get_next_sector(7, 117) | |
get_next_sector(7, 117) | |
get_next_sector(10, 96) | |
print(f"read through {buffer.tell()} bytes", file = stderr) | |
# Individual file contents in the firmware start at where [FF FF FF FF 00 00] byte pattern | |
# is first appears, so we just skip to seek there and start reading file entries. | |
boundary = bytes((0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00)) | |
start = buffer.getvalue().find(boundary) | |
assert start != -1, "couldn't find the start of the file entry header" | |
buffer.seek(start + len(boundary)) | |
print(f"header found at {hex(buffer.tell())}", file = stderr) | |
dumpobj = BytesIO() | |
zipdump = ZipFile(dumpobj, "w") | |
# Parse header to read the file entries, each file entry takes up 12 bytes, | |
# therefore the number of bytes must be multiple of 12. | |
header_size = int.from_bytes(buffer.read(2), "big") | |
print(f"header size is {header_size} bytes", file = stderr) | |
header = buffer.read(header_size) | |
assert (len(header) % 12) == 0, f"invalid header size, got {header_size}, which is not a multiple of 12" | |
# Extensions of files contained in the firmware, mapped with their byte values. | |
extensions : Dict[int, str] = { | |
0: "bin", | |
1: "bin", | |
14: "swf", | |
15: "xap", | |
25: "gif", | |
27: "png", | |
28: "svg", | |
33: "css", | |
35: "html", | |
36: "js", | |
37: "txt", | |
} | |
start_offset = buffer.tell() - header_size | |
# bytes 0-4 is the incremental index (starts from 1) | |
# bytes 4-8 is the offset (relative to start of the header) | |
# bytes 8-12 is the file type (see the extension code mapping above) | |
for i in range(len(header) // 12): | |
entry = header[i * 12:i * 12 + 12] | |
file_index1 = int.from_bytes(entry[0:4], "big") | |
assert file_index1 == (i + 1), f"file indexes doesn't match, expected {i} but got {entry[0:4]}" | |
file_offset = int.from_bytes(entry[4:8], "big") | |
file_type = int.from_bytes(entry[8:12], "big") | |
ext = extensions.get(file_type, None) | |
# Seek to the offset and read the file size, so we can also validate if offset is correct. | |
# File index (4 bytes) + File size (4 bytes) + File content (N bytes) | |
buffer.seek(start_offset + file_offset - 8) | |
file_index2 = int.from_bytes(buffer.read(4), "big") | |
assert file_index2 == (i + 1), f"file indexes doesn't match, expected {i} but got {entry[0:4]}" | |
file_size = int.from_bytes(buffer.read(4), "big") | |
print(f"file #{i}, ext: {ext or '?'} ({file_type}), size: {file_size}, offset: {hex(start_offset + file_offset)}", file = stderr) | |
info.entries.append((start_offset + file_offset, file_size, ext)) | |
print(f"found {len(info.entries)} files", file = stderr) | |
buffer.seek(start_offset + header_size) | |
for i, entry in enumerate(info.entries): | |
offset, size, ext = entry | |
assert offset - buffer.tell() == 8, f"invalid alignment at file #{i}, there is extra data before or after offset" | |
buffer.seek(offset) | |
data = buffer.read(size) | |
digest = sha256(data).digest().hex() | |
# file_name = b64encode(f"{i},{offset},{size},{ext},{digest}".encode("ascii")).translate(bytes.maketrans(b"+/", b"-_")).decode("ascii") | |
context = guess_filename(digest) | |
if context: | |
assert ext == context.split(".")[-1], f"file #{i} extension doesn't match with the expected context" | |
print(f"matching context found for file #{i} -> {context}", file = stderr) | |
with zipdump.open(context or f"{i}_unknown__{offset}_{size}_{digest}.{ext or 'bin'}", "w") as f: | |
f.write(data) | |
zipdump.close() | |
dumpobj.seek(0) | |
return dumpobj, info | |
def guess_filename(digest: str): | |
""" | |
Returns the known filename of the given firmware file SHA256 digest. | |
The returned path is relative to the /static/ path served from the device IP. | |
For example, "licenses.txt" is located in "http://{IP}/static/licenses.txt" in the device. | |
""" | |
# Firmware files doesn't contain the name of the files, only we can obtain their contents, so we | |
# hardcode the SHA256 checksum of the content to give more context about the purpose of the file. | |
file_mappings = { | |
"7f9eb2428b942ee7d592f739ffade39987935f9cf87e47a99e172c4dc15ab58e": "licenses.txt", | |
"ac0daed20e915525f905dc2d86a3819a71b6702a6ab1649a47fa643a1517a630": "nocard.html", | |
"2208bd30ff94535b95fe632c436deca3d123b8a0bbf564e884bfb94e013437bf": "nocard.html", # from: wfd2034s | |
"56a23b12e413d4c5057266587e040240e8d8984d4fdeb659ca853a63d183d798": "settings.html", | |
"e2841bdd461bf653b601ffcf57cba770399cf6c4b372cc86d835b612f990c065": "settings.html", # from: wfd2034s | |
"2a9a52a325d9fda977cea97624faed8be00209a99062198591b5927596be0545": "video.html", | |
"d743ebe83b166bf0472b63bf1bf81a624cceec3cce1172bf01ce7e3c8178b0f8": "video.html", # from: wfd2034s | |
"6222dae0bdd42d1c2d6ab7369db7c7ddfa9e44b1173a2e2f21dbf70e228a3a39": "battery/charging0.png", | |
"1edb3b35dc64a7e6727fa5618594db36c8338f71d83932dfb9a365a853d71532": "battery/charging1.png", | |
"03e19edbbc0151bfa6fd3b01fa397a714d99f6a991e9f9e55eeaa9234d1aeb0c": "battery/charging2.png", | |
"48b2f07ce76941a2899fd822e7925d0cda8ad0006fb4382a4b0003b515f73baa": "battery/charging3.png", | |
"884b820a8f6960653877c72a0eb3845562f644abaef740ad7f1c135a4b83e3c7": "battery/critical0.png", | |
"4d0dab46c05c18c0be29574b244b15d9eec359e62c3d1823653e2011deb02ab6": "battery/critical1.png", | |
"ca9c09d6a2c6372e1ea979d03ddbc952b1d504fefdea013303577344cc312286": "battery/high.png", | |
"a2141fa152c3166a277dd839dd977bad26b6e540e38f9f8da15bb7d7c1a3d311": "battery/low.png", | |
"d18b392ded5c02a2551e742da0c8b976c7d4587f30e9ed4d01b9f9ce45ed9e05": "battery/medium.png", | |
"2a279be18af1717a2fbedf5b885a95535a008083098895d4945125954232e929": "folder_check_icon.png", | |
"0a894478123ff42698ceca7564095cb8be8eb4f939f9f6c5966be3838965ec8d": "folder_nocheck_icon.png", | |
"f8a2835f2451e13b4dc95d302b8e2ac28a9c6ec131c5c86fc835b07b1200ca6f": "mejs/silverlightmediaelement.xap", | |
"858566cbfd4b3837477b0842bc3971b9633901317880604c475209e7720b9683": "mejs/mediaelement-and-player.min.js", | |
"e9e77b96fef09b18ba89467cf7285722ea6d6e1e9e11d1aa35c5abf39770ccff": "mejs/flashmediaelement.swf", | |
"05d12432b14d6b810243398927997904668f69f94eacd96001a838d3d70f2143": "mejs/bigplay.svg", | |
"1e5b85acb1b0b2d0bd24f4806a1cfa66d7e6dec37110c78d563b84be9951e8f2": "mejs/bigplay.png", | |
"7acb5f1cc018169d97b1dd90e2aae94f0b545aa4e7244a0321bb3e1093639a37": "mejs/mediaelementplayer.min.css", | |
"016f259972a2aaaf499e93756f6182f73839b1af8c4187fd54976dac723bf853": "mejs/controls.svg", | |
"f38cc337d1e8e5c17baf5c3812da8f6e4f49bedccba605b93dc38c338e89f4d5": "mejs/controls.png", | |
"3036bee9f749fdca0544a5592ce8da4204fab8f2b68edc6ac3905c90266014d4": "mejs/loading.gif", | |
"b294e973896f8f874e90a8eb1a8908ac790980d034c4c4bdf0fc3d37b8abf682": "js/jquery.min.js", | |
"23e57cf573e516953a72a685ea9b27bd6095a4e1aaac58c6724050f7557f487c": "js/sort.js", | |
"c5c2118428c14c6286d2a57c28a5a4278ef457a34f2afab8e0eb3da66daa7ccd": "js/sort.js", # from: wfd2034s | |
"2d40e4435dbbaef6a552220fb7b9206c1872adb3d7af192327dfe7b4b911c0cc": "js/dragdrop.js", | |
"1d629a298fb4bbc58301e6f80e180140e7fa91aac8cc5aac5b46eda551aa1636": "js/video.js", | |
"2877b0d2b73bed938d38651a90865bac9068da60ba43af671a45ad572ddadb97": "css/jquery-ui-core.min.css", | |
"02b9463648c62e92eafc7e7dcbe26b84ccb34b19b68d7c96e72d355cc649b68c": "css/jquery-ui-theme.min.css", | |
"4af2c76d0661c920118c82cae980e86214aa39260bffa364b021dd17eaa8697a": "js/jquery-ui.min.js", | |
"27685be9560b70f041cc56e235a841172a57359af8221ab0d86d17ceeb16d4cb": "js/settings.js", | |
"4bdf71c3c281ec0b3c31c65e1842d20d3b8a6b3b056645772f714fb7d5d316ff": "js/settings.js", # from: wfd2034s | |
"be7349ed81dd4690dd8adfdd6dcba8d6c02655f321bc21f33bea567b7b0c6f1e": "css/style.css", | |
"6f7ae25f30e55b9d211170a94147a716b7c37c6d0336e36801c9d0498f18247b": "css/style.css", # from: wfd2034s | |
"e2d1b1c7c51f8c30431327fe43029d62b6d5dfd2d95bbd6b8b9929c178dba4bf": "css/images/ui-icons_888888_256x240.png", | |
"a8d28e2d83a807b2b86ed2a02e31086f6c0718dfa96e0ba6a4577b657f69cc34": "css/images/ui-icons_454545_256x240.png", | |
"82886336a384acad75c803bb87720b144e09c444c36ad1082203c29870ccf39e": "mejs/background.png" | |
} | |
return file_mappings.get(digest, None) | |
def main(): | |
parser = ArgumentParser( | |
description = "A script to read & unpack firmware files (.DF3) of Sandisk's discontinued wireless flash drive series." | |
) | |
parser.add_argument("firmware", help = "Path of the input firmware file.", type = Path) | |
parser.add_argument("output", help = "Path for the output unpacked archive ZIP file.", type = Path) | |
data = parser.parse_args() | |
ifile = cast(Path, data.firmware).absolute() | |
ofile = cast(Path, data.output).absolute() | |
if (not ifile.exists()) or (not ifile.is_file()): | |
raise ValueError(f"input path '{str(ifile)}' doesn't exists or not a file!") | |
if ofile.exists(): | |
if ofile.is_dir(): | |
raise ValueError(f"output path '{str(ofile)}' must be an file, not a directory!") | |
raise ValueError(f"output path '{str(ofile)}' does already exists, not overwriting it!") | |
buffer, info = read_df3_firmware(ifile) | |
with ofile.open("wb") as f: | |
f.write(buffer.read()) | |
print("written to: " + str(ofile), file = stderr) | |
infojson = { | |
"model": info.model, | |
"version": info.version, | |
"files": json.dumps(info.entries, ensure_ascii = False) | |
} | |
print(json.dumps(infojson, ensure_ascii = False, indent = 2)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment