Created
June 12, 2025 01:22
-
-
Save cwawak/3d15b050e4ba2122387a0a0d53e20fea to your computer and use it in GitHub Desktop.
This script reads and decodes live telemetry data from a Corsair iCUE LINK System Hub.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# iCUE LINK Telemetry Utility | |
# | |
# This script reads and decodes live telemetry data from a Corsair iCUE LINK | |
# System Hub. It implements the transactional polling protocol discovered from | |
# the FanControl.CorsairLink driver and refined through live debugging. | |
# | |
# Features: | |
# - Polls for Liquid Temp, Pump RPM, and Fan RPMs. | |
# - Outputs to console or logs to a CSV file. | |
# - Optional debug mode for viewing raw packet data. | |
# - Configurable polling interval. | |
import hid | |
import time | |
import struct | |
import argparse | |
import csv | |
from datetime import datetime | |
# --- DEVICE CONFIGURATION --- | |
# The USB Vendor and Product ID for the Corsair iCUE LINK System Hub. | |
# These values are used to find the correct HID device. | |
VENDOR_ID = 0x1B1C | |
PRODUCT_ID = 0x0C3F | |
# The size of HID report packets sent to and read from the device. | |
# Note: The actual report might be smaller, but this buffer size ensures | |
# we can read the entire payload. FanControl.CorsairLink uses 512, but | |
# 64 is sufficient for these specific data packets. | |
PACKET_SIZE = 64 | |
# --- PROTOCOL CONSTANTS --- | |
# These byte sequences represent specific commands sent to the hub. They are | |
# derived from analysis of how official software and drivers communicate. | |
# Every command packet sent to the device starts with this 3-byte header. | |
CMD_HEADER = bytes([0x00, 0x00, 0x01]) | |
# Commands to switch the device between its default hardware mode and a | |
# software-controlled mode, which is required to read telemetry. | |
CMD_ENTER_SOFTWARE_MODE = bytes([0x01, 0x03, 0x00, 0x02]) | |
CMD_EXIT_SOFTWARE_MODE = bytes([0x01, 0x03, 0x00, 0x01]) | |
# Commands to manage communication channels ("endpoints") for specific data types. | |
# An endpoint must be opened before reading and closed afterward. | |
CMD_OPEN_ENDPOINT = bytes([0x0d, 0x01]) | |
CMD_CLOSE_ENDPOINT = bytes([0x05, 0x01, 0x01]) | |
# The command to request a data read from the currently open endpoint. | |
CMD_READ = bytes([0x08, 0x01]) | |
# Specific endpoint identifiers. These are sent as data along with the | |
# OPEN and CLOSE commands to specify which channel to manage. | |
ENDPOINT_SPEEDS = bytes([0x17]) | |
ENDPOINT_TEMPS = bytes([0x21]) | |
# Expected data type identifiers found in response packets. These are used | |
# to confirm that a received packet contains the data we asked for. | |
DATA_TYPE_SPEEDS = bytes([0x25, 0x00]) | |
DATA_TYPE_TEMPS = bytes([0x10, 0x00]) | |
# --- RESPONSE PARSING CONSTANTS --- | |
# These constants define the byte offsets for key information within a | |
# received HID data packet, based on reverse-engineering. | |
# The starting index of the 2-byte data type identifier in a response packet. | |
DATA_TYPE_START_INDEX = 4 | |
# The scaling factor to convert the raw integer temperature value to Celsius. | |
TEMP_SCALING_FACTOR = 10.0 | |
# The byte indices for the 16-bit little-endian temperature value. | |
TEMP_VALUE_LOW = 11 | |
TEMP_VALUE_HIGH = 12 | |
# The index of the byte that indicates how many sensors are reported in a packet. | |
SENSOR_COUNT_INDEX = 6 | |
# The starting index for the series of 3-byte sensor data blocks. | |
SENSOR_DATA_START_INDEX = 7 | |
# The size of a single sensor's data block (1 byte status + 2 bytes value). | |
SENSOR_BLOCK_SIZE = 3 | |
# In the speed sensor array, the pump is at this index. | |
PUMP_INDEX = 1 | |
# In the speed sensor array, the fans start at this index. | |
FAN_START_INDEX = 13 | |
def create_command_packet(command, data=bytes()): | |
""" | |
Wraps a command and optional data in the required header for sending. | |
Args: | |
command (bytes): The core command constant. | |
data (bytes, optional): The data payload for the command, such as an | |
endpoint identifier. Defaults to bytes(). | |
Returns: | |
bytes: A PACKET_SIZE byte array ready to be sent to the HID device. | |
""" | |
packet = bytearray(PACKET_SIZE) | |
full_command = CMD_HEADER + command + data | |
packet[0:len(full_command)] = full_command | |
return bytes(packet) | |
def send_command(device, command, data=bytes(), debug=False): | |
""" | |
Sends a complete command packet to the device. | |
Args: | |
device (hid.device): The opened HID device object. | |
command (bytes): The core command constant. | |
data (bytes, optional): The data payload for the command. | |
debug (bool): If True, prints the raw hex of the command being sent. | |
""" | |
packet_to_send = create_command_packet(command, data) | |
if debug: | |
print(f"--> SEND: CMD={command.hex()} DATA={data.hex()}") | |
try: | |
device.write(packet_to_send) | |
# A short delay is crucial to give the device time to process | |
# the command before we send the next one. | |
time.sleep(0.05) | |
except Exception as e: | |
if debug: | |
print(f" !!! WRITE FAILED: {e}") | |
def read_specific_response(device, wait_for_type, debug=False): | |
""" | |
Reads from the device until a packet with the expected type is found. | |
This function continuously polls the device and inspects incoming packets | |
to find one that matches the requested data type (e.g., temps or speeds). | |
Args: | |
device (hid.device): The opened HID device object. | |
wait_for_type (bytes): The 2-byte data type to listen for. | |
debug (bool): If True, prints raw hex of all received packets. | |
Returns: | |
bytes: The matching response packet, or None if a timeout occurs. | |
""" | |
if debug: | |
print(f" (Waiting for type: {wait_for_type.hex()})") | |
start_time = time.monotonic() | |
while time.monotonic() - start_time < 1.0: # 1-second timeout | |
response = device.read(PACKET_SIZE) | |
if response and debug: | |
hex_data = ' '.join(f'{b:02X}' for b in response) | |
print(f"<-- RECV: {hex_data}") | |
# Check if the packet is long enough and its type matches our request. | |
if (response and len(response) > DATA_TYPE_START_INDEX + 1 and | |
bytes(response[DATA_TYPE_START_INDEX:DATA_TYPE_START_INDEX+2]) == wait_for_type): | |
if debug: | |
print(f" (Match found!)") | |
return bytes(response) | |
if debug: | |
print(f" (Timeout waiting for type: {wait_for_type.hex()})") | |
return None | |
def parse_speed_sensors(packet): | |
""" | |
Parses a SPEED data packet containing multiple sensor blocks. | |
The packet structure consists of a sensor count followed by a series of | |
3-byte blocks. Each block has a status byte and a 2-byte value. | |
Args: | |
packet (bytes): The raw speed data packet. | |
Returns: | |
list: A list of sensor values (RPMs). `None` is used for any | |
sensor that is reported as unavailable. | |
""" | |
if not packet or len(packet) <= SENSOR_DATA_START_INDEX: | |
return [] | |
sensors = [] | |
# The byte at index 6 indicates how many sensor blocks follow. | |
count = packet[SENSOR_COUNT_INDEX] | |
sensor_data_payload = packet[SENSOR_DATA_START_INDEX:] | |
for i in range(count): | |
offset = i * SENSOR_BLOCK_SIZE | |
if offset + 2 >= len(sensor_data_payload): | |
break | |
# Status 0x00 means the sensor is connected and providing data. | |
status = sensor_data_payload[offset] | |
if status == 0x00: | |
# The value is a 16-bit signed integer in little-endian format. | |
raw_value = struct.unpack('<h', sensor_data_payload[offset+1:offset+3])[0] | |
sensors.append(raw_value) | |
else: | |
sensors.append(None) # Sensor is not available or disconnected. | |
return sensors | |
def main(args): | |
""" | |
The main execution function. Handles device discovery, connection, | |
the main polling loop, and graceful shutdown. | |
""" | |
device = None | |
csv_writer = None | |
csv_file = None | |
try: | |
print(f"Searching for device VID=0x{VENDOR_ID:04X}, PID=0x{PRODUCT_ID:04X}...") | |
device_info_list = hid.enumerate(VENDOR_ID, PRODUCT_ID) | |
if not device_info_list: | |
print("\n--- ERROR: Device not found. ---") | |
print("Please ensure the iCUE LINK System Hub is connected.") | |
return | |
device_info = device_info_list[0] | |
path = device_info['path'] | |
device_path_str = path.decode('utf-8') | |
print(f"Device found at path: {device_path_str}") | |
# Initialize and open the HID device. | |
device = hid.device() | |
device.open_path(path) | |
print("Successfully opened device.") | |
# Set to non-blocking mode to allow timeouts on reads. | |
device.set_nonblocking(1) | |
print("Entering software mode...") | |
send_command(device, CMD_ENTER_SOFTWARE_MODE, debug=args.debug) | |
# If an output file is specified, open it and write the header. | |
if args.output: | |
print(f"Logging telemetry to {args.output}...") | |
csv_file = open(args.output, 'w', newline='', encoding='utf-8') | |
csv_writer = csv.writer(csv_file) | |
header = ['timestamp', 'device_path', 'liquid_temp_c', 'pump_rpm', 'fan1_rpm', 'fan2_rpm', 'fan3_rpm'] | |
csv_writer.writerow(header) | |
print("\n--- Starting Telemetry Capture (Press Ctrl+C to exit) ---") | |
while True: | |
# The protocol requires closing, opening, reading, and closing the | |
# endpoint for each polling cycle. | |
# --- Poll Temperatures --- | |
send_command(device, CMD_CLOSE_ENDPOINT, ENDPOINT_TEMPS, debug=args.debug) | |
send_command(device, CMD_OPEN_ENDPOINT, ENDPOINT_TEMPS, debug=args.debug) | |
send_command(device, CMD_READ, debug=args.debug) | |
temp_packet = read_specific_response(device, DATA_TYPE_TEMPS, debug=args.debug) | |
send_command(device, CMD_CLOSE_ENDPOINT, ENDPOINT_TEMPS, debug=args.debug) | |
liquid_temp = None | |
if temp_packet: | |
# Unpack the 16-bit value and apply the scaling factor. | |
raw_temp = struct.unpack('<h', bytes([temp_packet[TEMP_VALUE_LOW], temp_packet[TEMP_VALUE_HIGH]]))[0] | |
liquid_temp = raw_temp / TEMP_SCALING_FACTOR | |
# --- Poll Speeds --- | |
send_command(device, CMD_CLOSE_ENDPOINT, ENDPOINT_SPEEDS, debug=args.debug) | |
send_command(device, CMD_OPEN_ENDPOINT, ENDPOINT_SPEEDS, debug=args.debug) | |
send_command(device, CMD_READ, debug=args.debug) | |
speed_packet = read_specific_response(device, DATA_TYPE_SPEEDS, debug=args.debug) | |
send_command(device, CMD_CLOSE_ENDPOINT, ENDPOINT_SPEEDS, debug=args.debug) | |
speeds = parse_speed_sensors(speed_packet) | |
# --- Process and Output Results --- | |
pump_rpm = speeds[PUMP_INDEX] if speeds and len(speeds) > PUMP_INDEX and speeds[PUMP_INDEX] is not None else None | |
fan_rpms = [] | |
if speeds and len(speeds) > FAN_START_INDEX + 2: | |
# Assuming 3 fans are reported starting at this index | |
fan_speeds = speeds[FAN_START_INDEX : FAN_START_INDEX + 3] | |
fan_rpms = [s if s is not None else None for s in fan_speeds] | |
else: | |
# Provide placeholders if fan data isn't available | |
fan_rpms = [None, None, None] | |
timestamp = datetime.now().isoformat() | |
# Either write to CSV or print to console based on arguments. | |
if csv_writer: | |
row = [timestamp, device_path_str, liquid_temp, pump_rpm] + fan_rpms | |
csv_writer.writerow(row) | |
else: | |
liquid_temp_str = f"{liquid_temp:.1f}°C" if liquid_temp is not None else "N/A" | |
pump_rpm_str = str(pump_rpm) if pump_rpm is not None else "N/A" | |
fan_rpm_strs = [str(s) if s is not None else "N/A" for s in fan_rpms] | |
print(f"{timestamp} | Liquid: {liquid_temp_str} | Pump: {pump_rpm_str} RPM | Fans: {', '.join(fan_rpm_strs)} RPM") | |
time.sleep(args.interval) | |
except KeyboardInterrupt: | |
print("\nExiting program due to user request (Ctrl+C).") | |
except Exception as e: | |
print(f"\nAn unexpected error occurred: {e}") | |
finally: | |
# This block ensures that the device is cleaned up properly, | |
# no matter how the program exits. | |
if csv_file: | |
csv_file.close() | |
if device: | |
print("Closing endpoints and returning to hardware mode...") | |
send_command(device, CMD_EXIT_SOFTWARE_MODE, debug=args.debug) | |
device.close() | |
print("Device connection closed.") | |
if __name__ == "__main__": | |
# Set up the command-line argument parser. | |
parser = argparse.ArgumentParser( | |
description="Read and log telemetry from a Corsair iCUE LINK Hub.", | |
formatter_class=argparse.RawTextHelpFormatter, | |
epilog="Example usage:\n" | |
" - Print to console every 5s: python script.py -i 5\n" | |
" - Log to a file: python script.py -o telemetry.csv\n" | |
" - Enable debug output: python script.py -d" | |
) | |
parser.add_argument( | |
'-o', '--output', | |
type=str, | |
help="Output CSV file path. If not provided, prints to console." | |
) | |
parser.add_argument( | |
'-i', '--interval', | |
type=float, | |
default=2.0, | |
help="Polling interval in seconds. Default: 2.0" | |
) | |
parser.add_argument( | |
'-d', '--debug', | |
action='store_true', | |
help="Enable verbose debug output to show raw packet data." | |
) | |
parsed_args = parser.parse_args() | |
main(parsed_args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment