Created
August 17, 2018 19:24
-
-
Save 0x61726b/b74de278cc805a29eea55871e3cfef6c to your computer and use it in GitHub Desktop.
pi
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import datetime | |
import click | |
import requests | |
from flask import Flask, request, send_from_directory, abort, jsonify | |
from functools import wraps | |
import config | |
import socket | |
app = Flask(__name__) | |
app.url_map.strict_slashes = False | |
logging.basicConfig(level=logging.INFO) | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
logs_dir = os.path.join(current_dir, "logs") | |
handler = logging.FileHandler(filename='{}/{}.log'.format(logs_dir, datetime.datetime.now().strftime('%Y-%m-%d')), | |
encoding='utf-8', mode='a') | |
handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s')) | |
logger = logging.getLogger() | |
logger.addHandler(handler) | |
from models import * | |
init_db() | |
from wol import wake_on_lan | |
@app.before_request | |
def clear_trailing(): | |
from flask import redirect, request | |
rp = request.path | |
if rp != '/' and rp.endswith('/'): | |
return redirect(rp[:-1]) | |
def fingerprint_required(f): | |
@wraps(f) | |
def decorated_function(*args, **kwargs): | |
if request.form['device_id'] is None or request.form['api_key'] is None: | |
return jsonify(ok=False, reason="unauthorized") | |
return f(*args, **kwargs) | |
return decorated_function | |
def target_host_check(f): | |
@wraps(f) | |
def decorated_function(*args, **kwargs): | |
device_id = request.form['device_id'] | |
try: | |
user = User.select().where(User.device_id == device_id).get() | |
except: | |
return jsonify(ok=False, reason="Device key did not match any users"), 400 | |
if user.device_id != device_id: | |
return jsonify(ok=False, reason="Device IDs are different."), 400 | |
target_host = request.form['target_host'] | |
try: | |
network_device = NetworkDevice.select().where(NetworkDevice.user == user, | |
NetworkDevice.hostname == target_host).get() | |
except: | |
return jsonify(ok=False, reason="Could not find any device belonging to this user."), 400 | |
return f(*args, **kwargs) | |
return decorated_function | |
def get_ip_by_hostname(hostname): | |
return socket.gethostbyname(hostname) | |
@app.route('/ping/', methods = ["POST"]) | |
@target_host_check | |
def do_ping(): | |
device_id = request.form['device_id'] | |
target_host = request.form['target_host'] | |
# Get user | |
user = User.select().where(User.device_id == device_id).get() | |
# Get network device | |
network_device = NetworkDevice.select().where(NetworkDevice.user == user, | |
NetworkDevice.hostname == target_host).get() | |
host_ip = network_device.ip_address | |
# Scan network to see if the local IP address has changed | |
# network_ip = get_ip_by_hostname(network_device.hostname) | |
# if host_ip != network_ip: | |
# host_ip = network_ip | |
# network_device.ip_address = host_ip | |
# try: | |
# network_device.save() | |
# except: | |
# logger.error("Could not save new IP!") | |
# Ping the target host | |
try: | |
r = requests.get(f'http://{host_ip}:{config.CLIENT_PORT}/ping') | |
if r.status_code == 200: | |
return jsonify(ok=True) | |
except: | |
return jsonify(ok=False, reason="Target host is offline."), 200 | |
@app.route('/do/', methods = ['POST']) | |
@target_host_check | |
def do_action(): | |
# Actions | |
# pc_shut_down | |
# pc_launch | |
action = request.form['action'] | |
target_host = request.form['target_host'] | |
device_id = request.form['device_id'] | |
# Get user | |
user = User.select().where(User.device_id == device_id).get() | |
# Get network device | |
network_device = NetworkDevice.select().where(NetworkDevice.user == user, | |
NetworkDevice.hostname == target_host).get() | |
host_ip = network_device.ip_address | |
# Scan network to see if the local IP address has changed | |
# network_ip = get_ip_by_hostname(network_device.hostname) | |
# if host_ip != network_ip: | |
# host_ip = network_ip | |
# network_device.ip_address = host_ip | |
# try: | |
# network_device.save() | |
# except: | |
# logger.error("Could not save new IP!") | |
# Ping the target host | |
if action != "turn_on": | |
try: | |
r = requests.get(f'http://{host_ip}:{config.CLIENT_PORT}/ping') | |
if r.status_code != 200: | |
return jsonify(ok=False, reason="Target host is offline."), 404 | |
except: | |
return jsonify(ok=False, reason="Target host is offline."), 404 | |
# Pass the action | |
try: | |
r = None | |
if action == "turn_off" or action == "screenshot": | |
r = requests.post(f'http://{host_ip}:{config.CLIENT_PORT}/do', data = { 'action': action }) | |
if action == "turn_on": | |
try: | |
wake_on_lan(host_ip) | |
return jsonify(ok=True) | |
except Exception as ex: | |
logger.exception("Error waking on LAN.") | |
return jsonify(ok=False) | |
if action == "audio_level": | |
r = requests.post(f'http://{host_ip}:{config.CLIENT_PORT}/do', data={'action': action, 'audio_level': request.form['audio_level'] }) | |
if r.status_code == 200: | |
return jsonify(ok=True) | |
else: | |
return jsonify(ok=False, reason="Could not perform action"), 503 | |
except Exception as ex: | |
logger.error(ex) | |
return jsonify(ok=False, reason="Request to host failed"), 503 | |
@app.route('/login/', methods = ['POST']) | |
@fingerprint_required | |
def login_user(): | |
device_id = request.form['device_id'] | |
logger.info(f"Login Request: {device_id}") | |
try: | |
user = User.select().where(User.device_id == device_id).get() | |
except DoesNotExist: | |
logger.info("This device key does not exist. Creating...") | |
try: | |
user = User.create(device_id = device_id) | |
except: | |
return jsonify(ok=False) | |
try: | |
device_list = NetworkDevice.select().where(NetworkDevice.user == user) | |
device_list_json = [] | |
for d in device_list: | |
device_list_json.append(dict(hostname = d.hostname, ip = d.ip_address)) | |
if len(device_list_json) == 0: | |
return jsonify(ok=False) | |
return jsonify(ok=True,devices=device_list_json, user_name=user.user_name, device_id=user.device_id) | |
except Exception as ex: | |
logger.error(ex) | |
pass | |
return jsonify(ok=False) | |
@click.group(invoke_without_command=True, options_metavar='[options]') | |
@click.pass_context | |
def main(ctx): | |
if ctx.invoked_subcommand is None: | |
while True: | |
app.run(host='0.0.0.0', port=config.PI_PORT) | |
print("Restarting...") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment