Skip to content

Instantly share code, notes, and snippets.

@maggie44
Last active February 1, 2025 16:18
Show Gist options
  • Save maggie44/186e067ab9c97fc446978718bff640c5 to your computer and use it in GitHub Desktop.
Save maggie44/186e067ab9c97fc446978718bff640c5 to your computer and use it in GitHub Desktop.
Rough Python Supervisor approach
# Flask:
class supervisor_device(Resource):
def get(self):
response = curl(method="get", path="/v1/device?apikey=")
return response.json(), response.status_code
##
def check_supervisor(supervisor_retries, timeout):
# Check Balena Supervisor is ready
retry = 1
while True:
try:
supervisor_status = requests.get(
f'{os.environ["BALENA_SUPERVISOR_ADDRESS"]}/ping',
timeout=timeout,
)
if supervisor_status.status_code == 200:
break
except Exception:
logger.info(
"Waiting for Balena Supervisor to be ready. "
f"Retry {str(retry)}."
)
if retry == supervisor_retries:
logger.error("Supervisor could not be reached.")
raise SupervisorUnreachable
time.sleep(2)
retry = retry + 1
return {"message": "Supervisor up"}
def curl(supervisor_retries=8, timeout=5, **cmd):
check_supervisor(supervisor_retries, timeout)
logger.debug(f"Curl commands = {cmd}")
# Process curl requests
try:
path = (
os.environ["BALENA_SUPERVISOR_ADDRESS"]
+ cmd["path"]
+ os.environ["BALENA_SUPERVISOR_API_KEY"]
)
# Post method
if cmd["method"] == "post-json":
response = requests.post(path, json=cmd["data"], timeout=timeout)
# Patch method
elif cmd["method"] == "patch":
response = requests.patch(path, json=cmd["data"], timeout=timeout)
# Get method
elif cmd["method"] == "get":
response = requests.get(path, timeout=timeout)
except Exception:
logger.exception("Supervisor curl request error.")
raise SupervisorCurlFailed
try:
response.raise_for_status()
except Exception:
logger.exception("Failed to send request to Supervisor")
raise SupervisorCurlFailed
# Return response
return response
def device_host_config(hostname, **kwargs):
new_hostname = curl(
method="patch",
path="/v1/device/host-config?apikey=",
data={"network": {"hostname": hostname}},
**kwargs,
)
return new_hostname
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment