Created
August 17, 2019 03:01
-
-
Save sparky3387/482f40f6b248ecd6c3d0d08e16a512e0 to your computer and use it in GitHub Desktop.
NVIDIA HDMI Control Device (Does not work properly)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import asyncio | |
import websockets | |
import logging | |
import json | |
logger = logging.getLogger('websockets') | |
logger.setLevel(logging.INFO) | |
logger.addHandler(logging.StreamHandler()) | |
import cec | |
from time import sleep | |
def log_info(message): | |
print(message) | |
JSONMETHODTOCEC = [ | |
('Input.Select','','00'), | |
('Input.Left','','03'), | |
('Input.Right','','04'), | |
('Input.Up','','01'), | |
('Input.Down','','02'), | |
('Input.ShowOSD','','UNUSED'), | |
('Input.Info','','UNUSED'), | |
('Input.Back','','0D'), | |
('Input.Home','','09'), | |
('Input.SendText','','UNUSED'), | |
('VideoLibrary.Scan','','UNUSED'), | |
('Input.ContextMenu','','UNUSED'), | |
('Player.GetActivePlayers','','UNUSED'), | |
#Pause then play | |
('Player.PlayPause',1,'46'), | |
('Player.PlayPause',2,'44'), | |
('Player.Stop','','45'), | |
('Input.ExecuteAction','play','44'), | |
('Input.ExecuteAction','mute','43'), | |
('Input.ExecuteAction','stepback','48'), | |
('Input.ExecuteAction','stepforward','49'), | |
('Input.ExecuteAction','skipprevious','4C'), | |
('Input.ExecuteAction','skipnext','4B'), | |
('Input.ExecuteAction','volumeup','41'), | |
('Input.ExecuteAction','volumedown','42') | |
] | |
cec.init() # use default adapter | |
devices = cec.list_devices() | |
shieldnumber = None | |
tvnumber = None | |
while (1): | |
log_info("Initializing CEC-Control detecting devices") | |
for device in devices: | |
if (cec.Device(device).osd_string=="SHIELD"): | |
shieldnumber = cec.Device(device) | |
if shieldnumber is not None: | |
break | |
sleep(5) | |
async def json_recieve(websocket, path): | |
#So the first time will be pause second play | |
playerpause = 2 | |
#Security through obscurity unfortunetly.... | |
if path=='/jsonrpc': | |
while True: | |
try: | |
json_string = await websocket.recv() | |
json_loaded = json.loads(json_string) | |
if len(json_loaded)>1 and 'method' in json_loaded: | |
if json_loaded['method'] == 'Player.GetActivePlayers': | |
playerpause+=1 | |
if playerpause==3: | |
playerpause=1 | |
await websocket.send('{"id": 1, "jsonrpc": "2.0", "result": [ { "playerid": 1, "type": "video" } ]}') | |
continue | |
elif json_loaded['method'] == 'Player.PlayPause': | |
opcode = next(bytes.fromhex(opcode) for item, action, opcode in JSONMETHODTOCEC if item==json_loaded['method'] and action==playerpause) | |
elif json_loaded['method'] == 'Input.ExecuteAction': | |
opcode = next(bytes.fromhex(opcode) for item, action, opcode in JSONMETHODTOCEC if item==json_loaded['method'] and action==json_loaded['params']['action']) | |
else: | |
opcode = next(bytes.fromhex(opcode) for item, action, opcode in JSONMETHODTOCEC if item==json_loaded['method']) | |
shieldaddress = shieldnumber.address | |
shieldnumber.transmit(0x44,opcode) | |
#Simulate human presses | |
#This code is where it breaks, it waits too long to send the key up code, forcing every touch to be a long touch | |
#sleep(0.1) | |
#cec.transmit(shieldaddress,0x8b,opcode) | |
shieldnumber.transmit(0x8b,opcode) | |
await websocket.send('{"id":1,"jsonrpc":"2.0","result":"OK"}') | |
except websockets.exceptions.ConnectionClosedOK: | |
pass | |
except websockets.exceptions.ConnectionClosedError: | |
pass | |
#Enter IP Address and Port for your Kassi configuration | |
start_server = websockets.serve(json_recieve, "192.168.1.1", 9090) | |
asyncio.get_event_loop().run_until_complete(start_server) | |
asyncio.get_event_loop().run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment