Created
August 24, 2023 03:31
-
-
Save techberet/996ef719c92b6d3406e4e03ba94672cc to your computer and use it in GitHub Desktop.
Inseego M2000 Hotspot Shutdown Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.10 | |
# quick and dirty and semi-simple script which shutsdown an inseego M2000 hotspot | |
# also can send information on the usage stats to DynamoDB, | |
# and if battery is low can send a PushOver notification to | |
# encourage you to plug it in. | |
# you need to create a local_secrets.py which has all your secrets in it (password, aws, pushover, etc). | |
# should be run periodically (crontab or systemd), and probably should also gate it on a successful ping to 192.168.1.1 | |
import requests | |
import re | |
import hashlib | |
ENABLE_DYNAMO = True | |
from local_secrets import MIFI_ADMIN_PASSWORD | |
if ENABLE_DYNAMO: | |
from local_secrets import AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, AWS_REGION | |
import boto3 | |
ENABLE_PUSHOVER = True | |
if ENABLE_PUSHOVER: | |
from local_secrets import PUSHOVER_API_KEY, PUSHOVER_USER_KEY | |
import chump | |
import json | |
import time | |
import socket | |
TIMEOUT_S = 2 | |
BATT_NOTIFY_PERCENTAGE = 65 | |
def Merge(dict1, dict2): | |
dict2.update(dict1) | |
return(dict2) | |
def get_hostname(): | |
return socket.gethostname().split('.')[0] | |
def find_item(page_text, item_name, dictionary_key, output_dictionary): | |
# example: | |
# <div class="input" id="internetStatusConnectedTime">00:00:23:38 (dd:hh:mm:ss)</div> | |
string_pattern = r'<div class="input" id="{substitute}">(.+)</div>'.format(substitute=item_name) | |
# find the actual item | |
result = re.search(string_pattern, page_text) | |
output_dictionary[dictionary_key] = result.groups()[0] | |
return output_dictionary | |
def decode_internet_page(internet_page_text, output={}): | |
# stuff we want: | |
# status (connected, etc) | |
# network name (T-Mobile) | |
# technology (5G) | |
# time connected (0:13:14) | |
# received (17.34MB) | |
# transmitted (1.34MB) | |
output = find_item(internet_page_text, 'internetStatusConnectionStatus', 'connStatus', output) | |
output = find_item(internet_page_text, 'internetStatusNetworkName', 'connName', output) | |
output = find_item(internet_page_text, 'internetStatusTechnology', 'connTech', output) | |
output = find_item(internet_page_text, 'internetStatusConnectedTime', 'connTime', output) | |
output = find_item(internet_page_text, 'internetStatusDataReceived', 'dataRx', output) | |
output = find_item(internet_page_text, 'internetStatusDataSent', 'dataTx', output) | |
return output | |
def check_aws_validity(key_id, secret): | |
try: | |
client = boto3.client('s3', aws_access_key_id=key_id, aws_secret_access_key=secret) | |
response = client.list_buckets() | |
print(f"VERIFIED! SEEING {response} S3 BUCKETS!") | |
return True | |
except Exception as e: | |
print(e) | |
if str(e)!="An error occurred (InvalidAccessKeyId) when calling the ListBuckets operation: The AWS Access Key Id you provided does not exist in our records.": | |
return True | |
return False | |
def push_stuff_to_dynamo(data): | |
# we use the epoch timestamp in millis as the key | |
key = str(int(time.time() * 1000.0)) | |
# now we push this thing! | |
dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION, aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) | |
data['timestamp'] = key | |
TableName = 'hotspot_data' | |
table = dynamodb.Table(TableName) | |
response = table.put_item(Item=data) | |
print("Item added successfully:", response) | |
if __name__ == "__main__": | |
print("Hotspot shutdown app!") | |
if ENABLE_DYNAMO: | |
# TODO: should probably have a nicer failure mode... | |
assert check_aws_validity(AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY) | |
session = requests.Session() | |
print(session.cookies.get_dict()) | |
response = session.get('http://192.168.1.1/login', timeout=TIMEOUT_S) | |
# grab the gSecureToken from the output | |
response_content = response.content.decode('ascii') | |
with open('login_page.txt', 'w') as ip_out: | |
ip_out.write(response_content) | |
# get index | |
# doing it the quick and dirty way | |
# TODO: switch to regex or something | |
gSecureTokenStart = 'name="gSecureToken" id="gSecureToken" value="' | |
start_of_secure_token = response_content.index(gSecureTokenStart) | |
actual_start = start_of_secure_token + len(gSecureTokenStart) | |
actual_end = actual_start + 40 | |
gSecureToken = response_content[actual_start:actual_end] | |
print("gSecureToken =", gSecureToken) | |
# cool, now we need to generate the SHA1 | |
shaPassword = hashlib.sha1(MIFI_ADMIN_PASSWORD.encode('ascii') + gSecureToken.encode('ascii')).hexdigest() | |
# next let's try and login | |
login_data = f"shaPassword={shaPassword}&gSecureToken={gSecureToken}" | |
print("Submit Login...") | |
login_response = session.post('http://192.168.1.1/submitLogin', data=login_data, timeout=TIMEOUT_S) | |
print("Grabbing internet...") | |
internet_response = session.get('http://192.168.1.1/internet/', timeout=TIMEOUT_S) | |
print(session.cookies.get_dict()) | |
with open('internet_page.txt', 'w') as ip_out: | |
ip_out.write(internet_response.text) | |
internet_page_response = decode_internet_page(internet_response.text) | |
print(internet_page_response) | |
# data usage page | |
print("Grabbing data usage...") | |
data_usage_response = session.get('http://192.168.1.1/datausage/info/', timeout=TIMEOUT_S) | |
data_usage_data = json.loads(data_usage_response.text) | |
merged = Merge(data_usage_data, internet_page_response) | |
internet_page_response["execution_time"] = time.time() | |
print("Net page response:", internet_page_response) | |
# grab status info #http://192.168.1.1/srv/status?_=1687727483606 | |
status_response_raw = session.get(f'http://192.168.1.1/srv/status?_={int(time.time() * 1000)}', timeout=TIMEOUT_S) | |
status_data = json.loads(status_response_raw.text)['statusData'] | |
print("Status data", status_data) | |
merged = Merge(status_data, merged) | |
status_output = {} | |
fields_of_interest = { | |
'statusBarBatteryPercent': 'battPercent' | |
, 'statusBarBytesReceived': 'bytesRx' | |
, 'statusBarBytesTotal': 'bytesTotal' | |
, 'statusBarBytesTransmitted': 'bytesTx' | |
, 'statusBarNetwork': 'network' | |
, 'statusBarNetworkID': 'networkId' | |
, 'statusBarBand': 'networkBand' | |
, 'statusBarPrimaryClientListSize': 'primaryClients' | |
, 'statusBarTechnology': 'cellTech' | |
, 'statusBarWiFiClientListSize': 'wifiClients' | |
} | |
for field_of_interest in fields_of_interest: | |
# check to see if it is a float, int, or text | |
field_text = status_data[field_of_interest] | |
field_num = None | |
try: | |
if '.' in field_text: | |
field_num = float(field_text) | |
else: | |
field_num = int(field_text) | |
except ValueError: | |
pass | |
field_data = field_text if field_num == None else field_num | |
status_output[fields_of_interest[field_of_interest]] = field_data | |
connected_devices_raw = session.get(f'http://192.168.1.1/connecteddevices/refresh/', timeout=TIMEOUT_S).text | |
connected_devices = json.loads(connected_devices_raw) | |
print("Connected Devices", connected_devices) | |
print("Status output:", status_output) | |
status_output["clients"] = connected_devices | |
status_output["source"] = get_hostname() | |
print("Full status out:", status_output) | |
if ENABLE_DYNAMO: | |
push_stuff_to_dynamo(status_output) | |
if ENABLE_PUSHOVER: | |
if status_output['battPercent'] < BATT_NOTIFY_PERCENTAGE: | |
c_app = chump.Application(PUSHOVER_API_KEY) | |
c_user = c_app.get_user(PUSHOVER_USER_KEY) | |
c_user.send_message(f"Hotspot battery: {status_output['battPercent']}%. Please charge me!") | |
# next try and shut down the device | |
# this is hidden behind a variable for dev purposes (if you want to iterate and make sure everything works without | |
# needing to wait for the device to boot up each time). | |
SHUTDOWN = True | |
if SHUTDOWN: | |
print("Grabbing first shutdown page...") | |
first_shutdown_response = session.get('http://192.168.1.1/shutdown/', timeout=TIMEOUT_S) | |
# need to update the gSecureToken... | |
new_gSecureToken_start = 'gSecureToken : "' | |
assert new_gSecureToken_start in first_shutdown_response.text | |
start_of_new_secure_token = first_shutdown_response.text.index(new_gSecureToken_start) + len(new_gSecureToken_start) | |
new_gSecureToken = first_shutdown_response.text[start_of_new_secure_token:start_of_new_secure_token+40] | |
print(f"New gsecure token: {new_gSecureToken}") | |
print("Do Shutdown...") | |
shutdown_response = session.post('http://192.168.1.1/shutdown/doshutdown/', data=f"gSecureToken={new_gSecureToken}", timeout=TIMEOUT_S) | |
print(shutdown_response.content.decode('ascii')) | |
else: | |
print("Skipping shutdown!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment