Last active
February 1, 2025 16:18
-
-
Save maggie44/186e067ab9c97fc446978718bff640c5 to your computer and use it in GitHub Desktop.
Rough Python Supervisor approach
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Flask: | |
class supervisor_device(Resource): | |
def get(self): | |
response = curl(method="get", path="/v1/device?apikey=") | |
return response.json(), response.status_code | |
## | |
def check_supervisor(supervisor_retries, timeout): | |
# Check Balena Supervisor is ready | |
retry = 1 | |
while True: | |
try: | |
supervisor_status = requests.get( | |
f'{os.environ["BALENA_SUPERVISOR_ADDRESS"]}/ping', | |
timeout=timeout, | |
) | |
if supervisor_status.status_code == 200: | |
break | |
except Exception: | |
logger.info( | |
"Waiting for Balena Supervisor to be ready. " | |
f"Retry {str(retry)}." | |
) | |
if retry == supervisor_retries: | |
logger.error("Supervisor could not be reached.") | |
raise SupervisorUnreachable | |
time.sleep(2) | |
retry = retry + 1 | |
return {"message": "Supervisor up"} | |
def curl(supervisor_retries=8, timeout=5, **cmd): | |
check_supervisor(supervisor_retries, timeout) | |
logger.debug(f"Curl commands = {cmd}") | |
# Process curl requests | |
try: | |
path = ( | |
os.environ["BALENA_SUPERVISOR_ADDRESS"] | |
+ cmd["path"] | |
+ os.environ["BALENA_SUPERVISOR_API_KEY"] | |
) | |
# Post method | |
if cmd["method"] == "post-json": | |
response = requests.post(path, json=cmd["data"], timeout=timeout) | |
# Patch method | |
elif cmd["method"] == "patch": | |
response = requests.patch(path, json=cmd["data"], timeout=timeout) | |
# Get method | |
elif cmd["method"] == "get": | |
response = requests.get(path, timeout=timeout) | |
except Exception: | |
logger.exception("Supervisor curl request error.") | |
raise SupervisorCurlFailed | |
try: | |
response.raise_for_status() | |
except Exception: | |
logger.exception("Failed to send request to Supervisor") | |
raise SupervisorCurlFailed | |
# Return response | |
return response | |
def device_host_config(hostname, **kwargs): | |
new_hostname = curl( | |
method="patch", | |
path="/v1/device/host-config?apikey=", | |
data={"network": {"hostname": hostname}}, | |
**kwargs, | |
) | |
return new_hostname |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment