Created
July 2, 2021 07:04
-
-
Save fboender/cdb4c1afae00e4a032b9611d10f8ba1e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import os | |
import subprocess | |
import json | |
class CmdError(Exception): | |
def __init__(self, msg, exitcode, stderr): | |
self.exitcode = exitcode | |
self.stderr = stderr | |
super().__init__(msg) | |
def cmd(cmd, input=None, env=None, raise_err=True, env_clean=False, | |
auto_decode=True): | |
""" | |
Run command `cmd` in a shell. | |
`input` (string) is passed in the process' STDIN. | |
If `env` (dict) is given, the environment is updated with it. If | |
`env_clean` is `True`, the subprocess will start with a clean environment | |
and not inherit the current process' environment. `env` is still applied. | |
If `raise_err` is `True` (default), a `CmdError` is raised when the return | |
code is not zero. | |
Returns a dictionary: | |
{ | |
'stdout': <string>, | |
'stderr': <string>, | |
'exitcode': <int> | |
} | |
If `auto_decode` is True, both `stdout` and `stderr` are automatically | |
decoded from the system default encoding to unicode strings. This will fail | |
if the output is not in that encoding (e.g. it contains binary data). | |
Otherwise, stdout and stderr are of type `<bytes>`. | |
""" | |
p_env = {} | |
if env_clean is False: | |
p_env.update(os.environ) | |
if env is not None: | |
p_env.update(env) | |
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE, | |
env=p_env) | |
stdout, stderr = p.communicate(input) | |
if auto_decode is True: | |
stdout = stdout.decode(sys.getdefaultencoding()) | |
stderr = stderr.decode(sys.getdefaultencoding()) | |
if p.returncode != 0 and raise_err is True: | |
msg = "Command '{}' returned with exit code {}".format(cmd, | |
p.returncode) | |
raise CmdError(msg, p.returncode, stderr) | |
return { | |
'stdout': stdout, | |
'stderr': stderr, | |
'exitcode': p.returncode | |
} | |
def docker_get_container_ids(): | |
result = cmd("docker ps -a --format '{{.ID}}'") | |
return [line.strip() for line in result["stdout"].splitlines()] | |
def docker_get_inspect(container_ids): | |
result = cmd("docker inspect {}".format(container_ids)) | |
return result["stdout"] | |
def json_flatten(doc, prefix=""): | |
if isinstance(doc, list): | |
for i, v in enumerate(doc): | |
json_flatten(v, "{}[{}]".format(prefix, i)) | |
elif isinstance(doc, dict): | |
for k, v in doc.items(): | |
json_flatten(v, "{}.{}".format(prefix, k)) | |
else: | |
sys.stdout.write("{} = {}\n".format(prefix.lstrip(".").lower(), doc)) | |
if __name__ == "__main__": | |
container_ids = " ".join(docker_get_container_ids()) | |
container_info = json.loads(docker_get_inspect(container_ids)) | |
for container in container_info: | |
json_flatten(container, "{}:{}".format(container["Id"][:12], container["Name"][1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment