Last active
November 23, 2022 05:49
-
-
Save jhernand/515f13fd4ec2463e3ecd05c961079f9c to your computer and use it in GitHub Desktop.
Simple integration between Prometheus alert manager and Ansible Tower
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import http.client | |
import http.server | |
import json | |
import ssl | |
import urllib.parse | |
# AWX connection and job template details: | |
AWX_HOST = 'tower.private' | |
AWX_PORT = 443 | |
AWX_USER = 'alert-healer' | |
AWX_PASSWORD = '...' | |
AWX_PROJECT = 'Healing actions' | |
AWX_TEMPLATE = 'Ensure oVirt host' | |
class Handler(http.server.BaseHTTPRequestHandler): | |
def do_POST(self): | |
# Read and parse the request body: | |
length = int(self.headers['Content-Length']) | |
body = self.rfile.read(length) | |
body = body.decode("utf-8") | |
body = json.loads(body) | |
print("Alert manager request:\n%s" % self.indent(body)) | |
# Send an empty response: | |
self.send_response(200) | |
self.end_headers() | |
# Process all the alerts: | |
alerts = body["alerts"] | |
for alert in alerts: | |
self.process_alert(alert) | |
def process_alert(self, alert): | |
# Request the authentication token: | |
token = self.get_token() | |
# Build the query to find the job template: | |
query = { | |
"name": AWX_TEMPLATE, | |
"project__name": AWX_PROJECT, | |
} | |
query = urllib.parse.urlencode(query) | |
# Send the request to find the job template: | |
response = self.send_request( | |
method='GET', | |
path="/api/v2/job_templates/?%s" % query, | |
token=token, | |
) | |
# Get the identifier of the job template: | |
template_id = response["results"][0]["id"] | |
# Send the request to launch the job template, including all the labels | |
# of the alert as extra variables for the AWX job template: | |
extra_vars = alert["labels"] | |
extra_vars = json.dumps(extra_vars) | |
self.send_request( | |
method='POST', | |
path="/api/v2/job_templates/%s/launch/" % template_id, | |
token=token, | |
body={ | |
"extra_vars": extra_vars, | |
}, | |
) | |
def get_token(self): | |
response = self.send_request( | |
method='POST', | |
path="/api/v2/authtoken/", | |
body={ | |
"username": AWX_USER, | |
"password": AWX_PASSWORD, | |
}, | |
) | |
return response["token"] | |
def send_request(self, method, path, token=None, body=None): | |
print("AWX method: %s" % method) | |
print("AWX path: %s" % path) | |
if token is not None: | |
print("AWX token: %s" % token) | |
if body is not None: | |
print("AWX request:\n%s" % self.indent(body)) | |
try: | |
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) | |
context.verify_mode = ssl.CERT_NONE | |
context.check_hostname = False | |
connection = http.client.HTTPSConnection( | |
host=AWX_HOST, | |
port=AWX_PORT, | |
context=context, | |
) | |
body = json.dumps(body) | |
body = body.encode("utf-8") | |
headers = { | |
"Content-type": "application/json", | |
"Accept": "application/json", | |
} | |
if token is not None: | |
headers["Authorization"] = "Token %s" % token | |
print(headers) | |
connection.request( | |
method=method, | |
url=path, | |
headers=headers, | |
body=body, | |
) | |
response = connection.getresponse() | |
body = response.read() | |
body = body.decode("utf-8") | |
body = json.loads(body) | |
print("AWX response:\n%s" % self.indent(body)) | |
return body | |
finally: | |
connection.close() | |
def indent(self, data): | |
return json.dumps(data, indent=2) | |
# Start the web server: | |
address = ('localhost', 9099) | |
server = http.server.HTTPServer(address, Handler) | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment