Skip to content

Instantly share code, notes, and snippets.

@ysfchn
Last active July 14, 2025 18:19
Show Gist options
  • Save ysfchn/07435aaec8e8c3680b1b61901348f9d2 to your computer and use it in GitHub Desktop.
Save ysfchn/07435aaec8e8c3680b1b61901348f9d2 to your computer and use it in GitHub Desktop.
A script to read & unpack firmware files of Sandisk's discontinued wireless flash drive series.
# pyright: reportDeprecated=none, reportUnknownVariableType=none, reportUnknownParameterType=none, reportAny=none, reportUnknownMemberType=none, reportMissingParameterType=none, reportUnknownArgumentType=none, reportImplicitOverride=none, reportUnusedCallResult=none
from cmd import Cmd
from dataclasses import dataclass, asdict
from enum import Enum
from http.client import HTTPResponse
from io import BytesIO
from time import sleep
from typing import Dict, List, Optional, Union, cast
from urllib.error import HTTPError
from urllib.parse import urlencode, unquote
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor
from urllib.request import Request, urlopen
from hashlib import sha1, md5
from os.path import join
from datetime import datetime
# from socket import IP_MULTICAST_LOOP, IP_MULTICAST_TTL, SO_REUSEADDR, SO_REUSEPORT, SOCK_DGRAM, SOL_IP, SOL_SOCKET, gethostname, socket, AF_INET, gethostbyaddr, getfqdn, getaddrinfo, gethostbyname, gethostbyname_ex
class DeviceType(Enum):
# Probably AirStash
A01 = "A01"
A02 = "A02"
# Wireless Flash Drive
FD_128K = "A02S" # 16GB/32GB variant
FD_256K = "A02E" # 64GB variant
# Wireless Stick
WS_V1 = "A03S" # 16GB/32GB variant
WS_V2 = "A03E" # 64GB/128GB/200GB variant
UNKNOWN = ""
@classmethod
def _missing_(cls, value): return cls.UNKNOWN
class BatteryStatus(Enum):
CHARGING = "charging"
FULL = "charged"
HIGH = "high"
MEDIUM = "med"
LOW = "low"
CRITICAL = "critical"
UNKNOWN = ""
@classmethod
def _missing_(cls, value): return cls.UNKNOWN
class WiFiSecurity(Enum):
WPA2 = "wpa2"
WPA = "wpa"
WEP = "wep"
PUBLIC = "none"
UNKNOWN = ""
@classmethod
def _missing_(cls, value): return cls.UNKNOWN
class StorageMediumStatus(Enum):
MOUNTED = "mounted"
UNFORMATTED = "unformatted"
FS_ERROR = "fserror"
ERROR = "carderror"
UNSUPPORTED = ""
@classmethod
def _missing_(cls, value): return cls.UNSUPPORTED
class StorageMediumFileSystem(Enum):
FAT16 = "fat16"
FAT32 = "fat32"
EXFAT = "exfat"
NTFS = "ntfs"
HFS = "hfs"
UNKNOWN = ""
@classmethod
def _missing_(cls, value): return cls.UNKNOWN
class NetworkConnectStatus(Enum):
CONNECTING = "connecting"
CONNECTED = "connected"
FAILED = "failed"
UNKNOWN = "unknown"
@classmethod
def _missing_(cls, value): return cls.UNKNOWN
class NetworkSecurityLevel(Enum):
NONE = "none"
ALL = "all"
UNKNOWN = ""
@classmethod
def _missing_(cls, value): return cls.UNKNOWN
class NetworkScanState(Enum):
SCANNING = "scanning"
LOCKED = "locked"
NONE = "none"
UNKNOWN = ""
@classmethod
def _missing_(cls, value): return cls.UNKNOWN
class SettingsPushState(Enum):
# Changes saved successfully.
SUCCESS = "ok"
SUCCESS_PENDING_RESTART = "ok:pending"
# Couldn't register a new network for some reason (?).
BAD_REQUEST = "bad"
# Couldn't register a new network because maximum limit has been reached.
ERROR_ENTRY_FULL = "full"
# Couldn't register a new network because it is already exists.
ERROR_ENTRY_DUPLICATE = "duplicate"
# Couldn't remove a saved network because it doesn't already exists.
ERROR_ENTRY_NONEXIST = "notfound"
@dataclass
class BatteryInfo:
status: BatteryStatus
voltage: int
@property
def level(self):
if self.status == BatteryStatus.CRITICAL:
return 0.2
elif self.status == BatteryStatus.LOW:
return 0.3
elif self.status == BatteryStatus.MEDIUM:
return 0.5
elif self.status == BatteryStatus.HIGH:
return 1
elif self.status == BatteryStatus.FULL:
return 1
return -1
@dataclass
class ImplementationInfo:
security: int
cachent: int
coex: int
upgrade: int
restart: int
exfat: int
move_rename: int
@property
def supports_auto_upgrade(self):
return self.upgrade >= 2
@dataclass
class BuildInfo:
name: str
model: str
code: int
@dataclass
class BitrateThresholdInfo:
warning: int
critical: int
@dataclass
class LastErrorInfo:
message: str
source: str
line: int
version: Optional[int]
address: Optional[int]
counter: Optional[int]
timestamp: Optional[int]
@dataclass
class ExpectedAppVersionInfo:
android: str
ios: str
@dataclass
class ServerNetworkInfo:
enabled: bool
clients: int
ssid: str
channel: int
security: WiFiSecurity
key: str
@dataclass
class ClientNetworkInfo:
enabled: bool
ssid: str
status: NetworkConnectStatus
ip: str
from_home: bool
@dataclass
class NetworkInfo:
ssid: str
security: WiFiSecurity
rssi: int
connected: bool
saved: bool
def signal(self, is_alt: bool = False):
if self.rssi == -1:
return 0
if is_alt:
if self.rssi <= 50:
return 3
elif self.rssi <= 70:
return 2
return 1
if self.rssi >= 32:
return 3
elif self.rssi >= 16:
return 2
return 1
@dataclass
class StorageMediumInfo:
status: StorageMediumStatus
format: StorageMediumFileSystem
serial: bytes
path: str
label: str
free: int
total: int
block_size: int
read_only: bool
@dataclass
class WebDAVFolder:
name: str
created: datetime
modified: datetime
@dataclass
class WebDAVPartialFolder:
name: str
@dataclass
class WebDAVFile:
name: str
mimetype: str
size: int
created: Optional[datetime]
modified: datetime
etag: str
cachent: Optional[str]
class MACAddress(str):
def __new__(cls, content):
mac = str(content).upper()
if mac.count(":") != 5:
raise ValueError(f"doesn't appear to be a valid mac: {mac}")
if not all((int(x, 16) <= 0xFF for x in mac.split(":"))):
raise ValueError(f"doesn't appear to be a valid mac: {mac}")
return str.__new__(cls, mac)
@property
def as_home(self):
if self.startswith(DIRECT_CONNECT_MAC_PREFIX):
return HOME_CONNECT_MAC_PREFIX + self.removeprefix(DIRECT_CONNECT_MAC_PREFIX)
return self
@property
def as_direct(self):
if self.startswith(HOME_CONNECT_MAC_PREFIX):
return DIRECT_CONNECT_MAC_PREFIX + self.removeprefix(HOME_CONNECT_MAC_PREFIX)
return self
@property
def as_int(self):
return int.from_bytes(bytes.fromhex(self.as_direct.replace(":", "")), "big")
def to_model(self):
# TODO: might be incorrect
value = self.as_int
if value < 0xD0_E4_0B_00_0F_00:
return DeviceType.A01
elif (value >= 0xD0_E4_0B_00_0F_00) and (value <= 0xD0_E4_0B_03_99_FF):
return DeviceType.A02
elif (value >= 0xD0_E4_0B_F5_D6_00) and (value <= 0xD0_E4_0B_FB_9F_FF):
return DeviceType.FD_128K
elif (value >= 0xD0_E4_0B_FB_E0_00) and (value <= 0xD0_E4_0B_FE_FF_FF):
return DeviceType.WS_V1
elif (value >= 0xD0_E4_0B_F5_D5_FF) and (value <= 0xD0_E4_0B_80_50_00):
return DeviceType.WS_V2
@dataclass
class Settings:
model: DeviceType
hostname: str
mac: MACAddress
network: ServerNetworkInfo
home_network: Optional[ClientNetworkInfo]
build: BuildInfo
battery: BatteryInfo
bitrate: BitrateThresholdInfo
app_version: ExpectedAppVersionInfo
auth: NetworkSecurityLevel
authhash: str
timeout: int
pending_update: Optional[int]
implementation: ImplementationInfo
mediums: List[StorageMediumInfo]
last_error: Optional[LastErrorInfo]
@property
def is_usb(self):
return (len(self.mediums) == 0) and (self.build.code >= 2009)
# SERVER_ADDRESS = "172.25.63.1"
# SERVER_ADDRESS = "sandiskf47e27.local"
SERVER_ADDRESS = "192.168.1.81"
DIRECT_CONNECT_MAC_PREFIX = "D0:E4:0B:"
HOME_CONNECT_MAC_PREFIX = "D2:E4:0B:"
def do_request(path: str, method: str, headers : Optional[Dict[str, str]] = None, data : Optional[bytes] = None):
server = SERVER_ADDRESS
# server = "httpbin.org/anything"
req = Request(
url = f"http://{server}/{path.removeprefix('/')}",
method = method,
unverifiable = True,
data = data,
headers = headers or {}
)
try:
resp = cast(HTTPResponse, urlopen(req))
return resp.status, resp.read(), dict(resp.headers)
except HTTPError as he:
return he.status, he.read(), dict(he.headers)
def is_legal_name(fs : StorageMediumFileSystem, name : str):
illegal_chars = ""
if fs == StorageMediumFileSystem.FAT32:
illegal_chars = "*?<>:/\\|\""
illegal_chars = "*?.,;:/\\|+=<>[]\""
elif fs == StorageMediumFileSystem.FAT16:
illegal_chars = "\"*,/:;<=>?[\\]|"
return illegal_chars
def get_service():
pass
# host = gethostbyname(gethostname())
# sock = socket(AF_INET, SOCK_DGRAM)
# sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
# sock.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1)
# sock.setsockopt(SOL_IP, IP_MULTICAST_TTL, 255)
# sock.setsockopt(SOL_IP, IP_MULTICAST_LOOP, 1)
# sock.bind(("", 5353))
def post_settings(query: Union[Dict[str, str], Dict[str, Union[bytes, str]]], post_restart: bool = False):
"""
Set device settings.
"""
if post_restart:
query["restart"] = "allowed"
body = urlencode(query)
_, data, _ = do_request(
"/settings.xml",
"POST",
headers = {"Content-Type": "application/x-www-form-urlencoded"},
data = body.encode("utf-8")
)
statusxml = ET.XML(data)
assert statusxml.tag == "status"
# Is there a pending reboot?
extra = statusxml.attrib.get("restart", "")
if extra:
assert extra == "pending"
extra = f":{extra}"
return SettingsPushState((statusxml.text or "") + extra)
def get_networks(new_scan: bool = True) -> List[NetworkInfo]:
"""
Performs a new Wi-Fi scan, and returns a tuple of two items, a list of
already saved networks and a list of currently scanned networks. If
`new_scan` is False, then no new scan will be performed and the
cached scan list stored on the device will be used instead.
"""
def _get_network_list(*, is_scanned: bool):
query = {}
query["group"] = "scan" if is_scanned else "saved"
status, data, _ = do_request(
"/settings.xml" + "?" + urlencode(query),
"GET"
)
assert status == 200, "failed response"
xmldata = ET.XML(data)
if is_scanned:
if xmldata.tag == "status":
return NetworkScanState(xmldata.text)
assert xmldata.tag == "ssidlist", f"unexpected xml tag '{xmldata.tag}'"
result : List[NetworkInfo] = []
for ssid in xmldata.findall("./ssid"):
network = NetworkInfo(
ssid = unquote(ssid.attrib["name"]).encode("ISO-8859-15").decode("utf-8"),
security = WiFiSecurity(ssid.attrib["security"]),
rssi = int(ssid.attrib.get("rssi", "") or -1),
# This won't be True for when listing all saved networks even if there is a
# saved network that the device is currently connected to.
# So, it only applies for scanned networks.
connected = ssid.attrib.get("connected", "") == "connected",
saved = not is_scanned
)
# Only include networks that has a SSID.
if not network.ssid:
continue
result.append(network)
return result
def _block_for_scan(interval: int, wait_more: bool = False):
sleep((interval / 1000) * (2 if wait_more else 1))
networks = _get_network_list(is_scanned = True)
if type(networks) is NetworkScanState:
if networks == NetworkScanState.SCANNING:
return _block_for_scan(interval)
elif networks == NetworkScanState.LOCKED:
return _block_for_scan(interval, True)
else:
post_settings({
"group": "scan"
})
return _block_for_scan(interval, True)
else:
return networks
saved_list = cast(List[NetworkInfo], _get_network_list(is_scanned = False))
scanned_list : List[NetworkInfo] = []
if new_scan:
post_settings({
"group": "scan"
})
with ThreadPoolExecutor(max_workers=1) as executor:
fut = executor.submit(_block_for_scan, 2000 if new_scan else 0)
scanned_list = cast(List[NetworkInfo], fut.result())
# If the device is currently connected to a network,
# then don't add the same SSID again.
result : List[NetworkInfo] = []
result.extend(scanned_list)
for old in saved_list:
exist_in_new = next((new for new in result if new.ssid == old.ssid), None)
if exist_in_new:
exist_in_new.saved = True
result.append(old)
return result
# ---------------------------------------------------------------------------
# Wi-Fi Security
# ---------------------------------------------------------------------------
class WiFiPasswordValidationStatus(Enum):
REQUIRE_NO_PASSWORD = "require_no_password"
REQUIRE_PASSWORD = "require_password"
TOO_SHORT = "too_short"
TOO_LONG = "too_long"
BAD_CHARACTER = "bad_character"
VALID = "valid"
def validate_wifi_password(security: WiFiSecurity, password: str) -> WiFiPasswordValidationStatus:
"""
Checks whether the given password satisfies the requirements of the specified Wi-Fi
security model, and returns a WiFiPasswordValidationStatus reporting the validation outcome.
"""
if security == WiFiSecurity.PUBLIC:
if password:
return WiFiPasswordValidationStatus.REQUIRE_NO_PASSWORD
return WiFiPasswordValidationStatus.VALID
elif security == WiFiSecurity.WEP:
length = len(password)
# ASCII characters
if length == 13:
for x in password:
if ord(x) > 127:
return WiFiPasswordValidationStatus.BAD_CHARACTER
return WiFiPasswordValidationStatus.VALID
# Hexadecimal characters
elif length == 26:
allowed_chars = "0123456789abcdefABCDEF"
if not all(c in allowed_chars for c in password):
return WiFiPasswordValidationStatus.BAD_CHARACTER
return WiFiPasswordValidationStatus.VALID
elif length < 13:
return WiFiPasswordValidationStatus.TOO_SHORT
elif length < 26:
return WiFiPasswordValidationStatus.TOO_SHORT
else:
return WiFiPasswordValidationStatus.TOO_LONG
elif (security == WiFiSecurity.WPA2) or (security == WiFiSecurity.WPA):
length = len(password)
if length < 8:
return WiFiPasswordValidationStatus.TOO_SHORT
elif length > 63:
return WiFiPasswordValidationStatus.TOO_LONG
else:
# WPA-PSK password must only include printable ASCII characters,
# ranging from 32 and 126 (inclusive).
for x in password:
c = ord(x)
if ((c < 32) or (c > 126)):
return WiFiPasswordValidationStatus.BAD_CHARACTER
return WiFiPasswordValidationStatus.VALID
else:
raise ValueError("invalid Wi-Fi security to validate password against")
def create_pbkdf2_wpapsk_key(ssid: str, password: str) -> bytes:
"""
Creates a PBKDF2 WPA-PSK value from a SSID and password.
"""
# Copied from hashlib.pbkdf2_hmac()
outer_trans = bytes((x ^ 0x5C) for x in range(256))
inner_trans = bytes((x ^ 0x36) for x in range(256))
inner = sha1()
outer = sha1()
passbytes = password.encode()
if len(password) > 64:
passbytes = sha1(passbytes).digest()
passbytes += b"\x00" * (64 - len(passbytes))
inner.update(passbytes.translate(inner_trans))
outer.update(passbytes.translate(outer_trans))
def prf(msg, inner=inner, outer=outer):
icpy = inner.copy()
ocpy = outer.copy()
icpy.update(msg)
ocpy.update(icpy.digest())
return ocpy.digest()
output = bytearray()
loop = 1
salt = ssid.encode("ISO-8859-15")
while len(output) < 32:
prev = prf(salt + loop.to_bytes(4, 'big'))
rkey = int.from_bytes(prev, 'big')
for _ in range(4096 - 1):
prev = prf(prev)
rkey ^= int.from_bytes(prev, 'big')
loop += 1
output.extend(rkey.to_bytes(inner.digest_size, 'big'))
return bytes(output[:32])
def save_network(ssid: str, security: WiFiSecurity, password: str):
"""
Connects to a Wi-Fi access point with given its SSID and password.
Requires version >= 669.
"""
query : Dict[str, Union[bytes, str]] = {
"group": "saved",
"action": "save"
}
query["ssid"] = ssid.encode("ISO-8859-15").decode("utf-8")
if security == WiFiSecurity.PUBLIC:
query["security"] = "none"
elif security == WiFiSecurity.WEP:
query["security"] = "wep"
elif (security == WiFiSecurity.WPA) or (security == WiFiSecurity.WPA2):
query["security"] = "wpa"
else:
raise ValueError("invalid security model")
report = validate_wifi_password(security, password)
if report != WiFiPasswordValidationStatus.VALID:
raise ValueError("password isn't legal for this security model, reason: " + str(report.name))
# Insert WEP password
if security == WiFiSecurity.WEP:
query["password"] = bytes.fromhex(password).decode("ISO-8859-1").encode("utf-8")
elif (security == WiFiSecurity.WPA) or (security == WiFiSecurity.WPA2):
query["password"] = create_pbkdf2_wpapsk_key(ssid, password)
return post_settings(query)
def remove_network(ssid: str):
"""
Removes an existing network. Requires version >= 669.
"""
query = {
"group": "saved",
"action": "delete"
}
query["ssid"] = ssid.encode("ISO-8859-15").decode("utf-8")
return post_settings(query)
def connect_network(ssid: str):
"""
Connect to an already saved network. Requires version >= 669.
"""
query = {
"group": "saved",
"action": "connect"
}
query["ssid"] = ssid.encode("ISO-8859-15").decode("utf-8")
return post_settings(query)
def set_sidelink_mode(mode: bool):
"""
Change home network mode. Requires version >= 669.
"""
return post_settings({
"sidelinken": "true" if mode else "false"
})
def set_ap_password(settings: Settings, password: str, ssid: Optional[str] = None, for_home_network: Optional[bool] = None):
"""
Set AP password.
"""
required_model = WiFiSecurity.WEP
query = {}
if settings.build.code >= 703:
required_model = WiFiSecurity.WPA2
if ssid is not None:
if not ssid:
raise ValueError("either provide an non-empty ssid or leave it blank to use default ssid")
if len(ssid) > 32:
raise ValueError("ssid can't be longer than 32 characters")
if password:
report = validate_wifi_password(required_model, password)
if report != WiFiPasswordValidationStatus.VALID:
raise ValueError("password isn't legal for this security model, reason: " + str(report.name))
if required_model == WiFiSecurity.WEP:
query["security"] = "wep"
query["password"] = bytes.fromhex(password).decode("ISO-8859-1").encode("utf-8")
elif required_model == WiFiSecurity.WPA2:
query["security"] = "wpa"
query["password"] = create_pbkdf2_wpapsk_key(ssid or settings.network.ssid, password)
else:
query["security"] = "none"
if for_home_network is True:
query["auth"] = "all"
query["authowner"] = "owner"
query["authhash"] = md5(f"owner:{ssid or settings.network.ssid}:{password}".encode("ISO-8859-1")).digest().hex()
elif for_home_network is False:
query["auth"] = "none"
return post_settings(query, post_restart = True)
def set_ap_mode(mode: bool):
"""
Change AP mode. Requires version >= 657.
"""
return post_settings({
"ap": "true" if mode else "false"
})
def set_timeout(value: int):
"""
Sets power saving timeout. Requires version >= 586.
"""
if value < 0:
raise ValueError("timeout value can't be negative")
# 0, 15, 30, 60
return post_settings({
"timeout": str(value)
})
def change_wifi_channel(value: int):
"""
Sets the WLAN channel of the device to a zero-indexed channel number.
Channels #13 and #14 should be avoided in North America.
Requires version >= 574. See more about WLAN channels:
https://en.wikipedia.org/wiki/List_of_WLAN_channels#2.4_GHz_(802.11b/g/n/ax/be)
"""
if value < 0:
raise ValueError("channel number must start from 0 (inclusive)")
elif value > 13:
raise ValueError("channel number must equal or less than 13")
return post_settings({
"channel": str(value)
})
def push_firmware(file: bytes, is_xml: bool, is_wfd_v2: bool):
"""
Uploads a firmware file to the device.
is_xml: Must be True if Settings.implementation.upgrade == 2
is_wfd_v2: Must be True if Settings.model == DeviceType.WS_V2, only is in effect when is_xml is False.
"""
path = ""
if not is_xml:
if not is_wfd_v2:
path = "/files/AIRST.DF2"
else:
# Applies to WFD_V2 (DeviceType.WS_V2 to be specific) only
path = "/files/wfd.df3"
else:
path = "/settings.xml" + "?" + urlencode({
"group": "firmware"
})
assert path, "firmware parameters are invalid!"
return do_request(
path,
"PUT",
data = file
)
def webdav_list(path: str = "/") -> List[Union[WebDAVFolder, WebDAVFile]]:
"""
Lists files and folders in a path. Returns a list containing WebDAVFile and WebDAVFolder
objects depending on the type of the item.
"""
# https://datatracker.ietf.org/doc/html/rfc4918#section-14.20
# https://datatracker.ietf.org/doc/html/rfc4918#section-9.1
propfind = ET.Element("propfind", {"xmlns:D": "DAV:"})
ET.SubElement(propfind, "{DAV:}propname")
tree = ET.ElementTree(propfind)
buffer = BytesIO()
tree.write(buffer, xml_declaration=True, encoding="utf-8")
filename = path.removeprefix('/')
status, data, _ = do_request(
f"/files/{filename}",
"PROPFIND",
headers = {
"Accept": "*/*",
"Depth": "1",
"Content-Type": "text/xml; charset=UTF-8"
},
data = buffer.getvalue()
)
if status == 404:
if not filename:
raise ValueError("no memory card exists on the device")
raise ValueError("directory couldn't be found")
# If a redirect is detected, toggle the trailing slash.
if (status == 301) and path:
return webdav_list(path + ("" if path[-1] == "/" else "/"))
assert status == 207, f"status was {status}"
xmldata = ET.XML(data)
assert xmldata.tag == "{DAV:}multistatus", "couldn't parse this xml response"
result = []
for element in xmldata.findall("./{DAV:}response"):
propsxml = element.find("./{DAV:}propstat/{DAV:}prop")
assert propsxml is not None
creation_date = cast(str, propsxml.findtext("./{DAV:}creationdate"))
modified_date = cast(str, propsxml.findtext("./{DAV:}getlastmodified"))
mimetype = cast(str, propsxml.findtext("./{DAV:}getcontenttype"))
name = cast(str, element.findtext("./{DAV:}href")).removeprefix(f"http://{SERVER_ADDRESS}/files/{filename}")
# Skip the collection itself that being listed
if not name:
continue
# Sub-collections (folders) will have a child element inside resourcetype element.
resource_type = propsxml.find("./{DAV:}resourcetype/*")
if resource_type is not None:
resource_type = resource_type.tag
if resource_type == "{DAV:}collection":
assert mimetype == "text/directory", "got a non-expected mimetype for a directory"
result.append(WebDAVFolder(
name = name,
created = datetime.strptime(creation_date, "%Y-%m-%dT%H:%M:%SZ"),
modified = datetime.strptime(modified_date, "%a, %d %b %Y %H:%M:%S %Z")
))
else:
result.append(WebDAVFile(
name = name,
mimetype = mimetype,
size = int(cast(str, propsxml.findtext("./{DAV:}getcontentlength"))),
created = datetime.strptime(creation_date, "%Y-%m-%dT%H:%M:%SZ"),
modified = datetime.strptime(modified_date, "%a, %d %b %Y %H:%M:%S %Z"),
etag = cast(str, propsxml.findtext("./{DAV:}getetag")).removeprefix('"').removesuffix('"'),
cachent = propsxml.findtext("./{AirStash:}cachent")
))
return result
def webdav_get(path: str):
"""
Retrieves details for a given path. Returns a WebDAVFile object if the path is a
file, or a WebDAVPartialFolder object if the path is not a file.
"""
filename = path.removeprefix('/').removesuffix('/')
status, _, headers = do_request(
f"/files/{filename}",
"HEAD",
headers = {"Accept": "*/*"}
)
if not filename:
raise ValueError("a filename is required")
if status == 404:
raise ValueError("file was not found")
assert status == 200, f"status was {status}"
# The requested filename can be a directory, in this case,
# return a WebDAVFolder.
if "Content-Length" not in headers:
return WebDAVPartialFolder(
name = filename
)
return WebDAVFile(
name = filename,
mimetype = headers["Content-Type"],
size = int(headers["Content-Length"]),
created = None,
modified = datetime.strptime(headers["Last-Modified"], "%a, %d %b %Y %H:%M:%S %Z"),
etag = headers["ETag"].removeprefix('"').removesuffix('"'),
cachent = None
)
def webdav_delete(path: str):
"""
Deletes an object. Returns True on success, otherwise False. (file is already non exist)
"""
filename = path.removeprefix('/').removesuffix('/')
status, _, _ = do_request(
f"/files/{filename}",
"DELETE",
headers = {"Accept": "*/*"}
)
if not filename:
raise ValueError("a filename is required")
assert status in (204, 404), f"status was {status}"
return status == 204
def webdav_mkdir(path: str):
"""
Creates a new path.
"""
filename = path.removeprefix('/').removesuffix('/')
status, _, _ = do_request(
f"/files/{filename}/",
"MKCOL",
headers = {"Accept": "*/*"}
)
if status == 403:
raise ValueError("a file with same name exists or the name contains invalid characters")
elif status == 404:
raise ValueError("path was not found (try creating missing parent directories first)")
assert status == 201, f"status was {status}"
def webdav_move(source: str, target: str, overwrite: bool = False):
"""
Renames the path.
"""
if (not source) or (not target):
raise ValueError("both filenames are required")
# While the device itself doesn't care the trailing slashes on renaming,
# we check anyway as a safeguard.
if (source.endswith("/") != target.endswith("/")):
raise ValueError("mixed trailing slashes; both sides must be a directory or a file")
src, trg = source.removeprefix('/').removesuffix('/'), target.removeprefix('/').removesuffix('/')
status, _, _ = do_request(
f"/files/{src}",
"MOVE",
headers = {
"Destination": f"http://{SERVER_ADDRESS}/files/{trg}",
"Overwrite": "T" if overwrite else "F",
"Accept": "*/*"
}
)
if status == 403:
raise ValueError("a file with same name exists")
elif status == 404:
raise ValueError("path was not found")
elif status == 400:
raise ValueError("invalid path (file may be already moved)")
assert status == 201, f"status was {status}"
def webdav_copy(source: str, target: str, overwrite: bool = False):
"""
Copies an object.
TODO: Doesn't work?
"""
if (not source) or (not target):
raise ValueError("both filenames are required")
# While the device itself doesn't care the trailing slashes on renaming,
# we check anyway as a safeguard.
if (source.endswith("/") != target.endswith("/")):
raise ValueError("mixed trailing slashes; both sides must be a directory or a file")
src, trg = source.removeprefix('/').removesuffix('/'), target.removeprefix('/').removesuffix('/')
status, _, _ = do_request(
f"/files/{src}",
"COPY",
headers = {
"Destination": f"http://{SERVER_ADDRESS}/files/{trg}",
"Overwrite": "T" if overwrite else "F",
"Accept": "*/*"
}
)
if status == 403:
raise ValueError("a file with same name exists")
elif status == 404:
raise ValueError("path was not found")
elif status == 400:
raise ValueError("invalid path")
assert status == 201, f"status was {status}"
def check_connection():
"""
Returns True if connected to the hostname.
"""
pass
# if gethostbyname(getfqdn()) != "172.25.63.2":
# return False
# return True
# sources/com/wearable/sdk/impl/SettingsManager.java
def get_settings():
"""
Parses a settings.xml file contents and returns a structure containing fields.
"""
status, data, _ = do_request(
"/settings.xml",
"GET"
)
assert status == 200, "failed response"
settings = ET.XML(data)
assert settings.tag == "settings"
build = BuildInfo(
name = settings.findtext("./version") or "",
model = settings.findtext("./buildmodel") or "",
code = int(settings.findtext("./numericversion") or 0)
)
model = settings.findtext("./model") or ""
hostname = settings.findtext("./hostname") or ""
serial = settings.findtext("./serial") or ""
assert all((model, hostname, serial, build.name, build.model, build.code > 0))
implement = ImplementationInfo(
security = int(settings.findtext("./features/security") or 0),
cachent = int(settings.findtext("./features/cachent") or 0),
coex = int(settings.findtext("./features/coex") or 0),
upgrade = int(settings.findtext("./features/firmwareupdate") or (1 if build.code >= 563 else 0)),
restart = int(settings.findtext("./features/restart") or 0),
exfat = int(settings.findtext("./features/exfat") or 0),
move_rename = 1 if build.code >= 1082 else 0
)
bitxml = settings.find("./bitrate")
assert bitxml is not None
bitrate = BitrateThresholdInfo(
warning = int(bitxml.attrib["warn"]) * 1000,
critical = int(bitxml.attrib["critical"]) * 1000
)
# Get battery info.
battxml = settings.find("./battery")
assert battxml is not None
battery = BatteryInfo(
status = BatteryStatus(battxml.attrib["status"]),
voltage = int(battxml.attrib["voltage"])
)
# Get access point information.
apxml = settings.find("./ap")
assert apxml is not None
access_point = ServerNetworkInfo(
enabled = apxml.attrib["enabled"] == "true",
clients = int(apxml.attrib.get("clients", None) or -1),
ssid = unquote(cast(str, settings.findtext("./ssid"))).encode("ISO-8859-15").decode("utf-8"),
channel = int(settings.findtext("./channel") or -1),
security = WiFiSecurity(settings.findtext("./security")),
key = settings.findtext("./wpapsk") or "",
)
# Get connected Wi-Fi client information.
sidelinkxml = settings.find("./sidelink")
sidelink_enabled = False
if (sidelinkxml is not None) and (sidelinkxml.attrib.get("enabled") == "true"):
sidelink_enabled = True
sdcxml = settings.find("./client")
sidelink = None
if sdcxml is not None:
sidelink = ClientNetworkInfo(
enabled = sidelink_enabled,
ssid = unquote(sdcxml.attrib["ssid"]).encode("ISO-8859-15").decode("utf-8"),
ip = sdcxml.attrib["ip"],
status = NetworkConnectStatus(sdcxml.attrib["status"]),
# Are we currently connected to the home network, instead of drive's own AP?
from_home = sdcxml.attrib.get("method", "") == "sidelink"
)
# Not obtained an IP address yet.
if sidelink.ip == "0.0.0.0":
sidelink.status = NetworkConnectStatus.UNKNOWN
# Get a list of storage mediums (in fact, microSD cards residing on the drive).
mediums : List[StorageMediumInfo] = []
for medium in settings.findall("./cards/card"):
# The microSD card is removed, or the device plugged to a USB port, since
# the device doesn't support accessing files from both WebDAV and USB at the same time.
if medium.attrib["status"].lower() == "none":
continue
card_status = StorageMediumStatus(medium.attrib["status"])
mediums.append(StorageMediumInfo(
status = card_status,
format = StorageMediumFileSystem(medium.attrib["format"]),
serial = bytes.fromhex(medium.attrib["serial"]), # The UUID is returned as "1234ABCD", without the dash.
path = medium.attrib["path"],
label = medium.attrib["label"],
free = int(medium.attrib["free"]),
total = int(medium.attrib["total"]),
block_size = int(medium.attrib["blocksize"]),
read_only =
card_status != StorageMediumStatus.MOUNTED if build.code >= 914 else
medium.attrib.get("readonly", "") == "protected"
))
# Get the last thrown exception stored in the device.
last_error : Optional[LastErrorInfo] = None
if build.code >= 914:
stored_error = settings.find("./storederror")
if stored_error is not None:
errfile = stored_error.attrib.get("file", None)
errdesc = stored_error.attrib.get("description", None)
if ((errfile is not None) and errdesc) and (((build.code < 2009) and (errdesc.lower() != "none")) or (build.code >= 2009)):
last_error = LastErrorInfo(
message = errdesc,
source = errfile,
line = int(stored_error.attrib["line"]),
version = None if "version" not in stored_error.attrib else int(stored_error.attrib["version"]),
address = None if "address" not in stored_error.attrib else int(stored_error.attrib["address"], 16),
counter = None if "pc" not in stored_error.attrib else int(stored_error.attrib["pc"], 16),
timestamp = None if "timestamp" not in stored_error.attrib else int(stored_error.attrib["timestamp"])
)
pending_firm = None
pendingxml = settings.find("./pendingfirmware")
if pendingxml is not None:
pending_firm = int(pendingxml.attrib.get("build", "") or 0)
modelenum = DeviceType(model)
if (model == DeviceType.FD_128K) and (build.model == "A02E"):
model = DeviceType.FD_256K
if (build.model == "A03E"):
model = DeviceType.WS_V2
mac = MACAddress(serial)
# If we can't still determine the device from its model,
# try with MAC address instead.
if modelenum == DeviceType.UNKNOWN:
print("model couldn't be detected, trying mac!")
modelenum = mac.to_model()
if not modelenum:
raise ValueError(f"couldn't determine the model: {serial}")
result = Settings(
model = modelenum,
hostname = hostname,
mac = mac,
network = access_point,
home_network = sidelink,
build = build,
battery = battery,
bitrate = bitrate,
app_version = ExpectedAppVersionInfo(
android = settings.findtext("./appversion/android") or "",
ios = settings.findtext("./appversion/ios") or ""
),
auth = NetworkSecurityLevel(settings.findtext("./auth")),
authhash = settings.findtext("./authhash") or "",
timeout = int(settings.findtext("./timeout") or -1),
pending_update = pending_firm,
implementation = implement,
mediums = mediums,
last_error = last_error
)
return result
class SandiskShell(Cmd):
intro: str = ""
prompt: str = "sandisk:/> "
currentdir: str = "/"
def do_pwd(self, *_):
"Get the current path."
print(self.currentdir)
def do_info(self, *_):
for k, v in asdict(get_settings()).items():
print(f"{k}: {v}")
def do_ls(self, arg: str):
details = arg.strip() == "more"
empty = True
for item in webdav_list(self.currentdir):
empty = False
if isinstance(item, WebDAVFolder):
if details:
print(f"- [{item.name}] | {item.created.isoformat()} | {item.modified.isoformat()}")
else:
print(f"- [{item.name}]")
else:
if details:
assert item.created
assert item.modified
print(f"- {item.name} | {item.created.isoformat()} | {item.modified.isoformat()}, {item.size} bytes, {item.etag}, {item.cachent}")
else:
print(f"- {item.name}")
if empty:
print("(directory is empty)")
def do_network(self, arg: str):
if (arg == "scan") or (arg == "list"):
is_scan = arg == "scan"
state = get_settings()
if is_scan:
print("Scanning for networks, wait for few seconds...")
networks = get_networks(is_scan)
rssi_mode = state.model in (DeviceType.WS_V2, DeviceType.WS_V1, )
for network in networks:
level = network.signal(rssi_mode)
bars = "..."
if level > 0:
bars = (level * "■").ljust(3, "_")
print(f"* [{bars}] {'♯ ' if network.security.value != 'none' else ' '}{network.ssid}")
elif arg.startswith("connect"):
ssid = arg.removeprefix("connect").strip()
if not ssid:
print("An SSID is required!")
return
network = next((x for x in get_networks(False) if x.ssid == ssid), None)
if not network:
print("SSID couldn't be found!")
return
if network.security == WiFiSecurity.UNKNOWN:
print("Unknown security model for this SSID!")
return
password = ""
if (network.security != WiFiSecurity.PUBLIC) and (not network.saved):
password = input("Input password: ")
if not network.saved:
save_network(ssid, network.security, password)
print(connect_network(ssid))
def do_cd(self, arg: str):
upcoming = None
if arg == "..":
upcoming = "/".join(self.currentdir.split("/")[:-1]) or "/"
elif arg == "/":
upcoming = "/"
elif arg:
upcoming = join(self.currentdir, "/".join((x for x in arg.split("/") if x)))
if upcoming:
try:
webdav_get(upcoming)
self.currentdir = upcoming
except ValueError as e:
print("error:", str(e))
def do_crash(self, _):
webdav_move("../files/", "../")
def do_restart(self, _):
print(post_settings({
"restart": "allowed"
}))
def do_battery(self, _):
settings = get_settings()
bar = {
BatteryStatus.CHARGING: "[░🗲░]",
BatteryStatus.CRITICAL: "[░⚠░]",
BatteryStatus.FULL: "[▊▊▊]",
BatteryStatus.HIGH: "[▊▊▊]",
BatteryStatus.MEDIUM: "[▊▊ ]",
BatteryStatus.LOW: "[▊ ]",
BatteryStatus.UNKNOWN: "[ ? ]"
}
level = \
"Charging" if settings.battery.status == BatteryStatus.CHARGING else \
"Unknown" if settings.battery.status == BatteryStatus.UNKNOWN else \
"Fully charged" if settings.battery.status == BatteryStatus.FULL else \
f"<= {(settings.battery.level) * 100:.2f}%"
print(f"{bar[settings.battery.status]} {level}, {settings.battery.voltage / 1000:.2f}V")
print(f"Power save timeout: {settings.timeout} min(s)")
def emptyline(self):
return False
def precmd(self, line: str) -> str:
self.prompt = f"sandisk:{self.currentdir}> "
return line
def postcmd(self, stop: bool, line: str) -> bool:
self.prompt = f"sandisk:{self.currentdir}> "
return stop
def do_exit(self, *_):
"Exit from the program."
exit(0)
if __name__ == '__main__':
try:
SandiskShell().cmdloop()
except KeyboardInterrupt:
print()
pass
# pyright: basic
#
# Copyright (C) 2025 Yusuf Cihan
# @ysfchn https://ysfchn.com
#
# This program is a free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
A script to read & unpack firmware files of Sandisk's discontinued Wireless Flash
Drive & Wireless Stick series[^1] from 2015. The devices are clone of AirStash devices with
Sandisk's branding & modifications added on top of it.
Since the device itself does no longer being sold and its firmware format is proprietary,
there is very few information about these available on the internet, so I've tried my
best to research to gather much info as possible.
If you're a looking for copies of the firmware files,
1) The discontinued (and pulled off from the app store) "Sandisk Connect Drive" app
contains firmware files stored in the APK itself, so you can just grab the APK from a
random APK mirror website and unpack it. (in "res/raw" folder)
2) Luckily, Internet Archive has some archived copies. Though the below link also lists
firmwares for other Sandisk devices, so you will need to filter links that ending with
".df2" and ".df3"
https://web.archive.org/web/*/http://downloads.sandisk.com/firmware/*
".DF2" files uses different format than ".DF3" files, and they are not compatible with
each other. Some models uses ".DF2", and newer ones uses ".DF3" from what I know.
I've only managed to find information for ".DF3" files, and thanks to the OP, who explains
some byte structure found in the firmware, I was able to create this script. [^4] There
is also another topic in the same community discussing about the device. [^5]
Sandisk's own knowledge base contains several articles about these devices, so if you're
interested, you can also check out there, specifially manual firmware upgrade instructions. [^2][^3]
[^1]: https://web.archive.org/web/20221205065124/https://support-en.wd.com/app/answers/detailweb/a_id/44632/
[^2]: https://support-en.sandisk.com/app/answers/detailweb/a_id/41388
(archive 1: https://web.archive.org/web/20250409170927/https://support-en.sandisk.com/app/answers/detailweb/a_id/41388)
(archive 2: https://archive.is/W1bC6)
[^3]: https://web.archive.org/web/20160515082212/http://kb.sandisk.com/app/answers/detail/a_id/17556
[^4]: https://forums.hak5.org/topic/41479-sandisk-wireless-connect-16g-flash-drive/
(archive 1: https://web.archive.org/web/20250409165759/https://forums.hak5.org/topic/41479-sandisk-wireless-connect-16g-flash-drive/)
(archive 2: https://archive.md/NTENm)
[^5]: https://forums.hak5.org/topic/30273-hack-a-sandisk-32g-wifi-enabled-flash-drive/
"""
from io import BytesIO
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Type, cast, NamedTuple
from zipfile import ZipFile
from hashlib import sha256
from sys import stderr
from argparse import ArgumentParser
from datetime import datetime
from zlib import crc32
import json
UIMAGE_MAGIC = bytes((0x27, 0x05, 0x19, 0x56))
GZIP_MAGIC = bytes((0x1f, 0x8b, 0x08, 0x00))
# https://formats.kaitai.io/uimage/
class UImageHeader(NamedTuple):
header_crc: int
timestamp: datetime
data_length: int
load_addr: int
entry_addr: int
data_crc: int
os_type: int
arch_type: int
image_type: int
comp_type: int
name: str
@classmethod
def from_bytes(cls: "Type[UImageHeader]", data: bytes) -> "UImageHeader":
assert len(data) == 64, f"uimage header must be 64 in length, not {len(data)}"
assert data[0:4] == UIMAGE_MAGIC, f"no uimage header was detected, got 0x{data[0:4].hex()}"
# Check if header CRC32 is correct.
calculated = crc32(data[0:4] + bytes(4) + data[8:64])
result = cls(
header_crc = int.from_bytes(data[4:8], "big"),
timestamp = datetime.fromtimestamp(int.from_bytes(data[8:12], "big")),
data_length = int.from_bytes(data[12:16], "big"),
load_addr = int.from_bytes(data[16:20], "big"),
entry_addr = int.from_bytes(data[20:24], "big"),
data_crc = int.from_bytes(data[24:28], "big"),
os_type = int.from_bytes(data[28:29], "big"),
arch_type = int.from_bytes(data[29:30], "big"),
image_type = int.from_bytes(data[30:31], "big"),
comp_type = int.from_bytes(data[31:32], "big"),
name = data[32:64].decode("utf-8")
)
assert calculated == result.header_crc, f"mismatched uimage header crc32, expected {result.header_crc} but got {calculated}"
return result
def __repr__(self) -> str:
return \
f"time={self.timestamp.isoformat()} length={self.data_length} load=0x{self.load_addr:x} entry=0x{self.entry_addr:x} " + \
f"os={self.os_type} arch={self.arch_type} comp={self.comp_type} itype={self.image_type} name='{self.name}' crc={self.data_crc}"
class FirmwareInfo(NamedTuple):
version: str
entries: List[Tuple[str, bytes, bool]]
flash_count: int
file_size: int
class DF3Firmware(NamedTuple):
model: str
version: int
file_size: int
entries: List[Tuple[int, int, Optional[str]]]
def read_wmd_firmware(firmware: Path) -> Tuple[BytesIO, FirmwareInfo]:
"""
Reads an Sandisk Media Drive (SWDS1) firmware and unpacks its contents.
The firmware payload is not cracked down fully, so the gathered information is too
minimal at the moment.
"""
buffer = BytesIO()
buffer.write(firmware.read_bytes())
total_size = buffer.tell()
buffer.seek(0)
# Skip these bytes now, we don't know their purpose.
buffer.read(20)
# The version is stored in 2 bytes, first byte is the major,
# and the second byte is the minor.
# 02 5D -> 2.93 // 03 04 -> 3.04
major, minor = cast(Tuple[str, str], map(str, buffer.read(2)))
version = major + "." + minor.rjust(2, "0")
info = FirmwareInfo(
version = version,
entries = [],
flash_count = (total_size - 152) // 1024,
file_size = total_size
)
print(f"version: {info.version}", file = stderr)
print(f"size: {info.file_size}", file = stderr)
print(f"flash: {info.flash_count}", file = stderr)
assert buffer.read(2) == bytes(2), "unexpected bytes at start"
assert buffer.read(9) == b"Qwifi.img", "unexpected bytes at start; name"
assert (total_size % 1024) == 152, "file size must be multiple of 1024 with 152 bytes at start"
# --------------------------------
# File entries
# --------------------------------
# Seek to the where file entries are located.
# File entries also contain "." and ".." even though these are not files.
buffer.seek(0x0B9498)
def read_next_file_entry():
unk1 = buffer.read(6)
length, ftype = buffer.read(2)
name = buffer.read(length).decode("utf-8")
# If file name length is not a multiple of 4 bytes, there is a
# N number of empty bytes following it to make it a multiple of 4.
skip = 4 - (length % 4)
if skip == 4:
skip = 0
assert buffer.read(skip) == bytes(skip), f"expected {skip} empty bytes after filename"
# If we hit to series of null bytes, make sure other
# fields are null too.
if unk1 == bytes(6):
assert ((not length) and (not ftype)) and (not name), "unexpected series of bytes after a null entry"
return None
assert ftype in (1, 2), f"file type ({ftype}) expected to be either 1 (file) or 2 (folder)"
return name, unk1, ftype == 1
assert buffer.tell() == 0x0B9498, "invalid address"
while (entry := read_next_file_entry()) is not None:
print(f'entry "{entry[0]}" -> {entry[1].hex()}', file = stderr)
info.entries.append(entry)
buffer.seek(0x000498)
buffer.read(1024)
assert buffer.read(1024).startswith(b"\x03\x01"), "unexpected bytes"
# Seek through sectors that has same constant byte pattern.
for i in range(1, 257):
assert buffer.tell() == (0x000898 + (i * 1024)), "invalid address"
flash = buffer.read(1024)
flag = 0x0220 + (i * 256)
if flag > 0xFFFF:
flag = flag - 0xFFFF
expected_data = bytearray()
for j in range(5):
expected_data.extend((flag + (0 if j == 4 else (j * 64))).to_bytes(2, "big"))
expected_data.extend((1, ) if j == 4 else bytes(2))
assert len(expected_data) == 19, "unexpected constructed bytes"
expected_data.extend(bytes(1005)) # Make it to 1024 bytes.
assert expected_data == flash, f"unexpected flash #{i} -> {expected_data[:-1005].hex()} != {flash[:-1005].hex()}"
# --------------------------------
# uImage
# --------------------------------
# The firmware contains an uimage header at some address. The header CRC is correct, however I couldn't
# manage to verify the data CRC, so I presume the full uimage data isn't immediately followed after the
# header and instead stored elsewhere in the firmware, which makes things more difficult for sure.
#
# For "sandiskmediafirmware-3-04.img", I got this:
# time=2013-05-28T09:14:45 length=2242984 load=0x70008000 entry=0x70008000 os=5 arch=2
# compr=0 image=2 name='Linux-2.6.35.3-899-g9b1a262'
def read_uimage_header():
header = buffer.read(64)
hd = UImageHeader.from_bytes(header)
print(hd, file = stderr)
read_total = 0
uimage = BytesIO()
while (read_total < hd.data_length):
data = return_if_valuable(buffer.read(1024))
read_total += uimage.write(data)
uimage.seek(0)
full = uimage.getvalue()
print(
crc32(full),
crc32(header + full),
crc32(full[:hd.data_length]),
crc32(header + full[:hd.data_length]),
crc32(header + full[:hd.data_length])
)
return uimage.getvalue()
# --------------------------------
# Flash data
# --------------------------------
# Determine if given sector of 1024 bytes possibly contains a valuable data
# and not a pattern of constant bytes.
def return_if_valuable(data: bytes):
assert len(data) == 1024, f"expected 1024 number of bytes, but got {len(data)}"
faul = False
# If data is empty, discard.
if data == bytes(1024):
faul = True
# Discard data if data only contains FF and 00.
vc = 0
for b in data:
if (0xFF != b) and (0x00 != b):
vc += 1
if not vc:
faul = True
if vc < 512:
faul = True
if not faul:
it, _ov = data[0], data[1]
sus = False
for i in range(4):
if (it == data[i * 4]) or (((it + i) & 0xFF) == data[i * 4]):
sus = True
else:
if sus:
sus = False
break
faul = sus
# if faul:
# print("is garbage: ", data.hex(" ")[:128])
return bytes(0) if faul else data
uimage_start = buffer.getvalue().find(UIMAGE_MAGIC)
print(f"uimage found at 0x{uimage_start:x} / {(uimage_start - 152) / 1024}")
buffer.seek(uimage_start)
with open("im", "wb") as f:
f.write(read_uimage_header())
# Read files.
# TODO: I'm not sure where it is written in the firmware so
# I have no choice but to hardcode addresses manually.
files = {
"fw_info.conf": ((0 if version == "2.93" else 0x00300498), 1),
"postaction.sh": ((0x00180498 if version == "2.93" else 0x00300898), 3),
"preaction.sh": ((0x00181098 if version == "2.93" else 0x00301498), 3),
}
dumpobj = BytesIO()
zipdump = ZipFile(dumpobj, "w")
for name, _, is_file in info.entries:
if not is_file:
continue
address, length = files.get(name, (0, 0))
if not address:
print(f"couldn't crack down '{name}' yet, skipped.", file = stderr)
continue
with zipdump.open(name, "w") as fw:
buffer.seek(address)
data = buffer.read(length * 1024).rstrip(b"\x00")
assert data[0] != b"\x00", f"address 0x{address:x} starts with 0x00"
fw.write(data)
zipdump.close()
dumpobj.seek(0)
return dumpobj, info
# REMOVE ME!
# def create_rotating_bytes(current: int, overflows: int):
# data = bytearray()
# for _ in range(256):
# current += 1
# value = current & 0xFF
# if value == 0x00:
# overflows += 1
# data.append(value)
# data.extend(overflows.to_bytes(3, "little"))
# return data
#
# current, overflows = 19, 11,
# rootfs = BytesIO()
#
# def read_next_sector(skip_next: bool = False, is_manual: bool = False):
# nonlocal current, overflows
# current += 1
# overflows += 1
# garbage = buffer.read(1024)
# # Skip garbage check on this address.
# if garbage == bytes(1024):
# return
# if buffer.tell() not in (0x7ca498,):
# if not is_manual:
# assert garbage == create_rotating_bytes(current, overflows), f"mismatching bytes at 0x{buffer.tell() - 1024:x}"
# rootfs.write(buffer.read(256 * 1024))
# if skip_next:
# buffer.read(1024)
# current += 1
# LEGACY
# rootfs.write(buffer.read(12 * 1024))
# read_next_sector(True)
# for _ in range(237):
# read_next_sector()
# buffer.read(1024 * 486) # blank
# rootfs.write(buffer.read(40 * 1024))
# read_next_sector(is_manual = True)
# assert buffer.tell() == 0x00904898
# current, overflows = 17, 35,
# for i in range(26):
# read_next_sector(is_manual = i == 27)
# OLD
# for i in range(465 - 50):
# read_next_sector(is_manual = i == 27)
# for i in range(4096 + 1024 - 5):
# grab = get_if_not_garbage(buffer.read(1024))
# if not rootfs.write(grab):
# print(f"#{i + 1} WAS GARBAGE")
#
# print(buffer.tell(), 0x008BA498, uimage_start)
# buffer.seek(0x008BA498)
#
# for i in range(4096 + 3072 + 512 + 256 - 6):
# grab = get_if_not_garbage(buffer.read(1024))
# if not rootfs.write(grab):
# print(f"#{i + 1} WAS GARBAGE")
#
# print(buffer.tell(), uimage_start)
#
# ui = BytesIO()
# while True:
# dd = buffer.read(1024)
# if not dd:
# break
# grab = get_if_not_garbage(dd)
# ui.write(grab)
#
# with open("rootfs", "wb") as f:
# f.write(rootfs.getvalue())
#
# with open("uim", "wb") as f:
# f.write(ui.getvalue())
#
# zipdump.close()
# dumpobj.seek(0)
# return dumpobj, entries, version
def read_df3_firmware(firmware: Path):
"""
Reads an Sandisk Wireless Stick (SDWS4) firmware file and unpacks its content.
The firmware doesn't contain the filenames of files in the filesystem, assuming they
are hardcoded in ROM, so this function tries to decipher the names of the files based
on their checksum and which HTTP path are they served from in the web server.
"""
if firmware.suffix != ".df3":
raise ValueError("firmware path doesn't end with .df3!")
buffer = BytesIO()
buffer.write(firmware.read_bytes())
total_size = buffer.tell()
buffer.seek(0)
# The byte structure is simply follows:
# AA BB BB .. ..
# AA -> ID of the sector
# BB -> Length of the sector (big-endian)
def get_next_sector(eid: int, elen: int):
sid = int.from_bytes(buffer.read(1), "big")
# Check if the obtained byte matches with given value to make sure it is a valid file.
assert eid == sid, f"current sector id mismatch, got {sid} but expected {eid}, is it a valid firmware?"
slen = int.from_bytes(buffer.read(2), "big")
assert slen == elen, f"current sector length mismatch, got {slen} but expected {elen}, is it a valid firmware?"
return buffer.read(slen)
# First 3 bytes of the file is equal to [01 00 08], so,
# taking above information into the account:
# 01 -> The ID of the sector
# 00 08 (= 8) -> Sector is 8 bytes long
model = get_next_sector(1, 8).strip(b"\x00").decode("ascii")
# The version code is represented as an integer, and followed by an ASCII encoded string
# of the same version code. For example; the bytes 0-4 is 0x00000802 (= 2050), and
# bytes 4-8 is "2050" in ASCII. So, we can presume both versions must be equal in value.
version = get_next_sector(2, 36)
version_int = int.from_bytes(version[0:4], "big")
version_str = version[4:8].decode("ascii")
assert str(version_int) == version_str, f"version names doesn't match, '{version_int}' != '{version_str}'"
info = DF3Firmware(
model = model,
version = version_int,
file_size = total_size,
entries = []
)
print(f"model: {info.model}", file = stderr)
print(f"version: {info.version}", file = stderr)
print(f"file size: {info.file_size}", file = stderr)
# Findings:
# wfd2050s - 0x001032FF - WiFi driver and firmware version
# I couldn't get to know about meaning of these bytes, but as far I've tested with several
# firmwares, these sectors are always the same, so we can just seek & validate through it.
get_next_sector(3, 4)
get_next_sector(4, 1)
get_next_sector(5, 96)
get_next_sector(6, 16)
get_next_sector(7, 117)
get_next_sector(7, 117)
get_next_sector(7, 117)
get_next_sector(10, 96)
print(f"read through {buffer.tell()} bytes", file = stderr)
# Individual file contents in the firmware start at where [FF FF FF FF 00 00] byte pattern
# is first appears, so we just skip to seek there and start reading file entries.
boundary = bytes((0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00))
start = buffer.getvalue().find(boundary)
assert start != -1, "couldn't find the start of the file entry header"
buffer.seek(start + len(boundary))
print(f"header found at {hex(buffer.tell())}", file = stderr)
dumpobj = BytesIO()
zipdump = ZipFile(dumpobj, "w")
# Parse header to read the file entries, each file entry takes up 12 bytes,
# therefore the number of bytes must be multiple of 12.
header_size = int.from_bytes(buffer.read(2), "big")
print(f"header size is {header_size} bytes", file = stderr)
header = buffer.read(header_size)
assert (len(header) % 12) == 0, f"invalid header size, got {header_size}, which is not a multiple of 12"
# Extensions of files contained in the firmware, mapped with their byte values.
extensions : Dict[int, str] = {
0: "bin",
1: "bin",
14: "swf",
15: "xap",
25: "gif",
27: "png",
28: "svg",
33: "css",
35: "html",
36: "js",
37: "txt",
}
start_offset = buffer.tell() - header_size
# bytes 0-4 is the incremental index (starts from 1)
# bytes 4-8 is the offset (relative to start of the header)
# bytes 8-12 is the file type (see the extension code mapping above)
for i in range(len(header) // 12):
entry = header[i * 12:i * 12 + 12]
file_index1 = int.from_bytes(entry[0:4], "big")
assert file_index1 == (i + 1), f"file indexes doesn't match, expected {i} but got {entry[0:4]}"
file_offset = int.from_bytes(entry[4:8], "big")
file_type = int.from_bytes(entry[8:12], "big")
ext = extensions.get(file_type, None)
# Seek to the offset and read the file size, so we can also validate if offset is correct.
# File index (4 bytes) + File size (4 bytes) + File content (N bytes)
buffer.seek(start_offset + file_offset - 8)
file_index2 = int.from_bytes(buffer.read(4), "big")
assert file_index2 == (i + 1), f"file indexes doesn't match, expected {i} but got {entry[0:4]}"
file_size = int.from_bytes(buffer.read(4), "big")
print(f"file #{i}, ext: {ext or '?'} ({file_type}), size: {file_size}, offset: {hex(start_offset + file_offset)}", file = stderr)
info.entries.append((start_offset + file_offset, file_size, ext))
print(f"found {len(info.entries)} files", file = stderr)
buffer.seek(start_offset + header_size)
for i, entry in enumerate(info.entries):
offset, size, ext = entry
assert offset - buffer.tell() == 8, f"invalid alignment at file #{i}, there is extra data before or after offset"
buffer.seek(offset)
data = buffer.read(size)
digest = sha256(data).digest().hex()
# file_name = b64encode(f"{i},{offset},{size},{ext},{digest}".encode("ascii")).translate(bytes.maketrans(b"+/", b"-_")).decode("ascii")
context = guess_filename(digest)
if context:
assert ext == context.split(".")[-1], f"file #{i} extension doesn't match with the expected context"
print(f"matching context found for file #{i} -> {context}", file = stderr)
with zipdump.open(context or f"{i}_unknown__{offset}_{size}_{digest}.{ext or 'bin'}", "w") as f:
f.write(data)
zipdump.close()
dumpobj.seek(0)
return dumpobj, info
def guess_filename(digest: str):
"""
Returns the known filename of the given firmware file SHA256 digest.
The returned path is relative to the /static/ path served from the device IP.
For example, "licenses.txt" is located in "http://{IP}/static/licenses.txt" in the device.
"""
# Firmware files doesn't contain the name of the files, only we can obtain their contents, so we
# hardcode the SHA256 checksum of the content to give more context about the purpose of the file.
file_mappings = {
"7f9eb2428b942ee7d592f739ffade39987935f9cf87e47a99e172c4dc15ab58e": "licenses.txt",
"ac0daed20e915525f905dc2d86a3819a71b6702a6ab1649a47fa643a1517a630": "nocard.html",
"2208bd30ff94535b95fe632c436deca3d123b8a0bbf564e884bfb94e013437bf": "nocard.html", # from: wfd2034s
"56a23b12e413d4c5057266587e040240e8d8984d4fdeb659ca853a63d183d798": "settings.html",
"e2841bdd461bf653b601ffcf57cba770399cf6c4b372cc86d835b612f990c065": "settings.html", # from: wfd2034s
"2a9a52a325d9fda977cea97624faed8be00209a99062198591b5927596be0545": "video.html",
"d743ebe83b166bf0472b63bf1bf81a624cceec3cce1172bf01ce7e3c8178b0f8": "video.html", # from: wfd2034s
"6222dae0bdd42d1c2d6ab7369db7c7ddfa9e44b1173a2e2f21dbf70e228a3a39": "battery/charging0.png",
"1edb3b35dc64a7e6727fa5618594db36c8338f71d83932dfb9a365a853d71532": "battery/charging1.png",
"03e19edbbc0151bfa6fd3b01fa397a714d99f6a991e9f9e55eeaa9234d1aeb0c": "battery/charging2.png",
"48b2f07ce76941a2899fd822e7925d0cda8ad0006fb4382a4b0003b515f73baa": "battery/charging3.png",
"884b820a8f6960653877c72a0eb3845562f644abaef740ad7f1c135a4b83e3c7": "battery/critical0.png",
"4d0dab46c05c18c0be29574b244b15d9eec359e62c3d1823653e2011deb02ab6": "battery/critical1.png",
"ca9c09d6a2c6372e1ea979d03ddbc952b1d504fefdea013303577344cc312286": "battery/high.png",
"a2141fa152c3166a277dd839dd977bad26b6e540e38f9f8da15bb7d7c1a3d311": "battery/low.png",
"d18b392ded5c02a2551e742da0c8b976c7d4587f30e9ed4d01b9f9ce45ed9e05": "battery/medium.png",
"2a279be18af1717a2fbedf5b885a95535a008083098895d4945125954232e929": "folder_check_icon.png",
"0a894478123ff42698ceca7564095cb8be8eb4f939f9f6c5966be3838965ec8d": "folder_nocheck_icon.png",
"f8a2835f2451e13b4dc95d302b8e2ac28a9c6ec131c5c86fc835b07b1200ca6f": "mejs/silverlightmediaelement.xap",
"858566cbfd4b3837477b0842bc3971b9633901317880604c475209e7720b9683": "mejs/mediaelement-and-player.min.js",
"e9e77b96fef09b18ba89467cf7285722ea6d6e1e9e11d1aa35c5abf39770ccff": "mejs/flashmediaelement.swf",
"05d12432b14d6b810243398927997904668f69f94eacd96001a838d3d70f2143": "mejs/bigplay.svg",
"1e5b85acb1b0b2d0bd24f4806a1cfa66d7e6dec37110c78d563b84be9951e8f2": "mejs/bigplay.png",
"7acb5f1cc018169d97b1dd90e2aae94f0b545aa4e7244a0321bb3e1093639a37": "mejs/mediaelementplayer.min.css",
"016f259972a2aaaf499e93756f6182f73839b1af8c4187fd54976dac723bf853": "mejs/controls.svg",
"f38cc337d1e8e5c17baf5c3812da8f6e4f49bedccba605b93dc38c338e89f4d5": "mejs/controls.png",
"3036bee9f749fdca0544a5592ce8da4204fab8f2b68edc6ac3905c90266014d4": "mejs/loading.gif",
"b294e973896f8f874e90a8eb1a8908ac790980d034c4c4bdf0fc3d37b8abf682": "js/jquery.min.js",
"23e57cf573e516953a72a685ea9b27bd6095a4e1aaac58c6724050f7557f487c": "js/sort.js",
"c5c2118428c14c6286d2a57c28a5a4278ef457a34f2afab8e0eb3da66daa7ccd": "js/sort.js", # from: wfd2034s
"2d40e4435dbbaef6a552220fb7b9206c1872adb3d7af192327dfe7b4b911c0cc": "js/dragdrop.js",
"1d629a298fb4bbc58301e6f80e180140e7fa91aac8cc5aac5b46eda551aa1636": "js/video.js",
"2877b0d2b73bed938d38651a90865bac9068da60ba43af671a45ad572ddadb97": "css/jquery-ui-core.min.css",
"02b9463648c62e92eafc7e7dcbe26b84ccb34b19b68d7c96e72d355cc649b68c": "css/jquery-ui-theme.min.css",
"4af2c76d0661c920118c82cae980e86214aa39260bffa364b021dd17eaa8697a": "js/jquery-ui.min.js",
"27685be9560b70f041cc56e235a841172a57359af8221ab0d86d17ceeb16d4cb": "js/settings.js",
"4bdf71c3c281ec0b3c31c65e1842d20d3b8a6b3b056645772f714fb7d5d316ff": "js/settings.js", # from: wfd2034s
"be7349ed81dd4690dd8adfdd6dcba8d6c02655f321bc21f33bea567b7b0c6f1e": "css/style.css",
"6f7ae25f30e55b9d211170a94147a716b7c37c6d0336e36801c9d0498f18247b": "css/style.css", # from: wfd2034s
"e2d1b1c7c51f8c30431327fe43029d62b6d5dfd2d95bbd6b8b9929c178dba4bf": "css/images/ui-icons_888888_256x240.png",
"a8d28e2d83a807b2b86ed2a02e31086f6c0718dfa96e0ba6a4577b657f69cc34": "css/images/ui-icons_454545_256x240.png",
"82886336a384acad75c803bb87720b144e09c444c36ad1082203c29870ccf39e": "mejs/background.png"
}
return file_mappings.get(digest, None)
def main():
parser = ArgumentParser(
description = "A script to read & unpack firmware files (.DF3) of Sandisk's discontinued wireless flash drive series."
)
parser.add_argument("firmware", help = "Path of the input firmware file.", type = Path)
parser.add_argument("output", help = "Path for the output unpacked archive ZIP file.", type = Path)
data = parser.parse_args()
ifile = cast(Path, data.firmware).absolute()
ofile = cast(Path, data.output).absolute()
if (not ifile.exists()) or (not ifile.is_file()):
raise ValueError(f"input path '{str(ifile)}' doesn't exists or not a file!")
if ofile.exists():
if ofile.is_dir():
raise ValueError(f"output path '{str(ofile)}' must be an file, not a directory!")
raise ValueError(f"output path '{str(ofile)}' does already exists, not overwriting it!")
buffer, info = read_df3_firmware(ifile)
with ofile.open("wb") as f:
f.write(buffer.read())
print("written to: " + str(ofile), file = stderr)
infojson = {
"model": info.model,
"version": info.version,
"files": json.dumps(info.entries, ensure_ascii = False)
}
print(json.dumps(infojson, ensure_ascii = False, indent = 2))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment