Revisions
-
mcdamo revised this gist
Oct 20, 2020 . 1 changed file with 9 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -312,10 +312,11 @@ def _get_upload_data(self, cert_data, key_data): def create_updater(args): session = requests.session() if args.model is None: model = determine_model(session, args.ipmi_url, args.debug) else: model = args.model if not args.quiet: print("Board model is " + model) @@ -326,14 +327,18 @@ def create_updater(args): else: raise Exception(f"Unknown model: {model}") def determine_model(session, ipmi_url, debug): redfish_url = f'{ipmi_url}/redfish/v1/' try: r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False) except (ConnectionError, requests.exceptions.SSLError) as err: print("Failed to determine model: connection error") if debug: print(err) exit(2) if not r.ok: print(f"Failed to determine model (try --model): {r.status_code} {r.reason}") exit(2) data = r.json() -
mcdamo revised this gist
Oct 20, 2020 . 1 changed file with 40 additions and 13 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -45,7 +45,14 @@ def __init__(self, session, ipmi_url): self.use_b64encoded_login = True self._csrf_token = None error_log = logging.getLogger("IPMIUpdater") error_log.setLevel(logging.ERROR) self.setLogger(error_log) def setLogger(self, logger): self.logger = logger def get_csrf_token(self, url_name): if self._csrf_token is not None: return self._csrf_token @@ -69,7 +76,7 @@ def get_csrf_headers(self, url_name): if csrf_token is not None: headers["CSRF_TOKEN"] = csrf_token self.logger.debug("HEADERS:%s" % headers) return headers def get_xhr_headers(self, url_name): @@ -135,15 +142,15 @@ def get_ipmi_cert_info(self): if not result.ok: return False self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> <STATUS> status = root.xpath('//IPMI/SSL_INFO/STATUS') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] has_cert = bool(int(status.get('CERT_EXIST'))) if has_cert: valid_from = status.get('VALID_FROM') valid_until = status.get('VALID_UNTIL') @@ -171,15 +178,15 @@ def get_ipmi_cert_valid(self): if not result.ok: return False self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] return bool(int(status.get('VALIDATE'))) def upload_cert(self, key_file, cert_file): """ @@ -261,13 +268,22 @@ def _get_op_data(self, op, r): def _get_upload_data(self, cert_data, key_data): return [ ('cert_file', ('cert.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] def _check_reboot_result(self, result): self.logger.debug(result.text) root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/BMC_RESET/STATE') if not status: return False if status[0].get('CODE') == 'OK': return True return False #if '<STATE CODE="OK"/>' not in result.text: # return False class IPMIX11Updater(IPMIUpdater): @@ -296,8 +312,10 @@ def _get_upload_data(self, cert_data, key_data): def create_updater(args): session = requests.session() if not args.model: model = determine_model(session, args.ipmi_url) else: model = args.model if not args.quiet: print("Board model is " + model) @@ -330,6 +348,8 @@ def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') parser.add_argument('--ipmi-url', required=True, help='Supermicro IPMI 2.0 URL') parser.add_argument('--model', required=False, help='Board model, eg. X10 or X11') parser.add_argument('--key-file', required=True, help='X.509 Private key filename') parser.add_argument('--cert-file', required=True, @@ -342,6 +362,8 @@ def main(): help='The default is to reboot the IPMI after upload for the change to take effect.') parser.add_argument('--quiet', action='store_true', help='Do not output anything if successful') parser.add_argument('--debug', action='store_true', help='Output additional debugging') args = parser.parse_args() # Confirm args @@ -354,11 +376,11 @@ def main(): if args.ipmi_url[-1] == '/': args.ipmi_url = args.ipmi_url[0:-1] if args.debug: import http.client as http_client http_client.HTTPConnection.debuglevel = 1 # Enable request logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") @@ -369,6 +391,10 @@ def main(): requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) updater = create_updater(args) if args.debug: debug_log = logging.getLogger("IPMIUpdater") debug_log.setLevel(logging.DEBUG) updater.setLogger(debug_log) if not updater.login(args.username, args.password): print("Login failed. Cannot continue!") @@ -413,3 +439,4 @@ def main(): if __name__ == "__main__": main() -
mcdamo revised this gist
Oct 20, 2020 . 1 changed file with 300 additions and 184 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -21,186 +21,310 @@ import os import argparse import re import requests import logging from base64 import b64encode from datetime import datetime from lxml import etree from urllib.parse import urlparse from requests.auth import HTTPBasicAuth REQUEST_TIMEOUT = 5.0 class IPMIUpdater: def __init__(self, session, ipmi_url): self.session = session self.ipmi_url = ipmi_url self.login_url = f'{ipmi_url}/cgi/login.cgi' self.cert_info_url = f'{ipmi_url}/cgi/ipmi.cgi' self.upload_cert_url = f'{ipmi_url}/cgi/upload_ssl.cgi' self.url_redirect_template = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s' self.use_b64encoded_login = True self._csrf_token = None def get_csrf_token(self, url_name): if self._csrf_token is not None: return self._csrf_token page_url = self.url_redirect_template % url_name result = self.session.get(page_url) result.raise_for_status() match = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]*)"\);', result.text) if match: return match.group(1) def get_csrf_headers(self, url_name): page_url = self.url_redirect_template % url_name headers = { "Origin": self.ipmi_url, "Referer": page_url, } csrf_token = self.get_csrf_token(url_name) if csrf_token is not None: headers["CSRF_TOKEN"] = csrf_token print("HEADERS:", headers) return headers def get_xhr_headers(self, url_name): headers = self.get_csrf_headers(url_name) headers["X-Requested-With"] = "XMLHttpRequest" return headers def login(self, username, password): """ Log into IPMI interface :param username: username to use for logging in :param password: password to use for logging in :return: bool """ if self.use_b64encoded_login: login_data = { 'name': b64encode(username.encode("UTF-8")), 'pwd': b64encode(password.encode("UTF-8")), 'check': '00' } else: login_data = { 'name': username, 'pwd': password } try: result = self.session.post(self.login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text: return False # Set mandatory cookies: url_parts = urlparse(self.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English' } for cookie_name, cookie_value in mandatory_cookies.items(): self.session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) return True def get_ipmi_cert_info(self): """ Verify existing certificate information :return: dict """ headers = self.get_xhr_headers("config_ssl") cert_info_data = self._get_op_data('SSL_STATUS.XML', '(0,0)') try: result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> <STATUS> status = root.xpath('//IPMI/SSL_INFO/STATUS') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] has_cert = int(status.get('CERT_EXIST')) has_cert = bool(has_cert) if has_cert: valid_from = status.get('VALID_FROM') valid_until = status.get('VALID_UNTIL') return { 'has_cert': has_cert, 'valid_from': valid_from, 'valid_until': valid_until } def get_ipmi_cert_valid(self): """ Verify existing certificate information :return: bool """ headers = self.get_xhr_headers("config_ssl") cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)') try: result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] valid_cert = int(status.get('VALIDATE')) return bool(valid_cert) def upload_cert(self, key_file, cert_file): """ Send X.509 certificate and private key to server :param session: Current session object :type session requests.session :param url: base-URL to IPMI :param key_file: filename to X.509 certificate private key :param cert_file: filename to X.509 certificate PEM :return: """ with open(key_file, 'rb') as filehandle: key_data = filehandle.read() with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() # extract certificates only (IMPI doesn't like DH PARAMS) cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' files_to_upload = self._get_upload_data(cert_data, key_data) headers = self.get_csrf_headers("config_ssl") csrf_token = self.get_csrf_token("config_ssl") csrf_data = {} if csrf_token is not None: csrf_data["CSRF_TOKEN"] = csrf_token try: result = self.session.post(self.upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def _check_reboot_result(self, result): return True def reboot_ipmi(self): # do we need a different Referer here? headers = self.get_xhr_headers("config_ssl") reboot_data = self._get_op_data('main_bmcreset', None) try: result = self.session.post(self.reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if not self._check_reboot_result(result): return False return True class IPMIX10Updater(IPMIUpdater): def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi' self.use_b64encoded_login = False def _get_op_data(self, op, r): timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } if r is not None: data[op] = r return data def _get_upload_data(self, cert_data, key_data): return [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] def _check_reboot_result(self, result): if '<STATE CODE="OK"/>' not in result.text: return False class IPMIX11Updater(IPMIUpdater): def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) self.reboot_url = f'{ipmi_url}/cgi/op.cgi' self.use_b64encoded_login = True def _get_op_data(self, op, r): data = { 'op': op } if r is not None: data['r'] = r data['_'] = '' return data def _get_upload_data(self, cert_data, key_data): return [ ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] def create_updater(args): session = requests.session() # First determine if we are X10 or X11 model = determine_model(session, args.ipmi_url) if not args.quiet: print("Board model is " + model) if model == "X10": return IPMIX10Updater(session, args.ipmi_url) elif model == "X11": return IPMIX11Updater(session, args.ipmi_url) else: raise Exception(f"Unknown model: {model}") def determine_model(session, ipmi_url): redfish_url = f'{ipmi_url}/redfish/v1/' try: r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: exit(2) if not r.ok: exit(2) data = r.json() # The UpdateService methods are only available on newer X11 based boards if "UpdateService" in data: return "X11" else: return "X10" def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') @@ -231,6 +355,9 @@ def main(): args.ipmi_url = args.ipmi_url[0:-1] if not args.quiet: import http.client as http_client http_client.HTTPConnection.debuglevel = 1 # Enable reuest logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) @@ -240,45 +367,34 @@ def main(): # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) updater = create_updater(args) if not updater.login(args.username, args.password): print("Login failed. Cannot continue!") exit(2) cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not updater.upload_cert(args.key_file, args.cert_file): print("Failed to upload X.509 files to IPMI!") exit(2) cert_valid = updater.get_ipmi_cert_valid() if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) @@ -288,7 +404,7 @@ def main(): if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not updater.reboot_ipmi(): print("Rebooting failed! Go reboot it manually?") if not args.quiet: -
oxc revised this gist
Aug 31, 2020 . 1 changed file with 83 additions and 101 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -33,26 +33,24 @@ REQUEST_TIMEOUT = 5.0 class IPMIUpdater: def __init__(self, session, ipmi_url): self.session = session self.ipmi_url = ipmi_url self.login_url = f'{ipmi_url}/cgi/login.cgi' self.cert_info_url = f'{ipmi_url}/cgi/ipmi.cgi' self.upload_cert_url = f'{ipmi_url}/cgi/upload_ssl.cgi' self.url_redirect_template = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s' self.use_b64encoded_login = True self._csrf_token = None def get_csrf_token(self, url_name): if self._csrf_token is not None: return self._csrf_token page_url = self.url_redirect_template % url_name result = self.session.get(page_url) result.raise_for_status() @@ -61,7 +59,7 @@ def get_csrf_token(self, url_name): return match.group(1) def get_csrf_headers(self, url_name): page_url = self.url_redirect_template % url_name headers = { "Origin": self.ipmi_url, @@ -86,20 +84,7 @@ def login(self, username, password): :param password: password to use for logging in :return: bool """ if self.use_b64encoded_login: login_data = { 'name': b64encode(username.encode("UTF-8")), 'pwd': b64encode(password.encode("UTF-8")), @@ -112,7 +97,7 @@ def login(self, username, password): } try: result = self.session.post(self.login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: @@ -141,26 +126,10 @@ def get_ipmi_cert_info(self): headers = self.get_xhr_headers("config_ssl") cert_info_data = self._get_op_data('SSL_STATUS.XML', '(0,0)') try: result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: @@ -193,26 +162,10 @@ def get_ipmi_cert_valid(self): headers = self.get_xhr_headers("config_ssl") cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)') try: result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: @@ -245,24 +198,16 @@ def upload_cert(self, key_file, cert_file): # extract certificates only (IMPI doesn't like DH PARAMS) cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' files_to_upload = self._get_upload_data(cert_data, key_data) headers = self.get_csrf_headers("config_ssl") csrf_token = self.get_csrf_token("config_ssl") csrf_data = {} if csrf_token is not None: csrf_data["CSRF_TOKEN"] = csrf_token try: result = self.session.post(self.upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: @@ -278,39 +223,76 @@ def upload_cert(self, key_file, cert_file): return True def _check_reboot_result(self, result): return True def reboot_ipmi(self): # do we need a different Referer here? headers = self.get_xhr_headers("config_ssl") reboot_data = self._get_op_data('main_bmcreset', None) try: result = self.session.post(self.reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if not self._check_reboot_result(result): return False return True class IPMIX10Updater(IPMIUpdater): def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi' self.use_b64encoded_login = False def _get_op_data(self, op, r): timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } if r is not None: data[op] = r return data def _get_upload_data(self, cert_data, key_data): return [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] def _check_reboot_result(self, result): if '<STATE CODE="OK"/>' not in result.text: return False class IPMIX11Updater(IPMIUpdater): def __init__(self, session, ipmi_url): super().__init__(session, ipmi_url) self.reboot_url = f'{ipmi_url}/cgi/op.cgi' self.use_b64encoded_login = True def _get_op_data(self, op, r): data = { 'op': op } if r is not None: data['r'] = r data['_'] = '' return data def _get_upload_data(self, cert_data, key_data): return [ ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] def create_updater(args): session = requests.session() @@ -319,8 +301,12 @@ def create_updater(args): if not args.quiet: print("Board model is " + model) if model == "X10": return IPMIX10Updater(session, args.ipmi_url) elif model == "X11": return IPMIX11Updater(session, args.ipmi_url) else: raise Exception(f"Unknown model: {model}") def determine_model(session, ipmi_url): redfish_url = f'{ipmi_url}/redfish/v1/' @@ -391,8 +377,6 @@ def main(): cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) @@ -413,8 +397,6 @@ def main(): cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if not args.quiet and cert_info['has_cert']: print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until']) -
oxc revised this gist
Aug 31, 2020 . 1 changed file with 287 additions and 218 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -24,224 +24,306 @@ import re import requests import logging from base64 import b64encode from datetime import datetime from lxml import etree from urllib.parse import urlparse from requests.auth import HTTPBasicAuth REQUEST_TIMEOUT = 5.0 class IPMIUpdater: def __init__(self, session, ipmi_url, model): self.session = session self.ipmi_url = ipmi_url self.model = model self.LOGIN_URL = f'{ipmi_url}/cgi/login.cgi' self.IPMI_CERT_INFO_URL = f'{ipmi_url}/cgi/ipmi.cgi' self.UPLOAD_CERT_URL = f'{ipmi_url}/cgi/upload_ssl.cgi' self.REBOOT_IPMI_X10_URL = f'{ipmi_url}/cgi/BMCReset.cgi' self.REBOOT_IPMI_X11_URL = f'{ipmi_url}/cgi/op.cgi' self.URL_REDIRECT_URL = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s' self.REDFISH_ROOT = f'{ipmi_url}/redfish/v1/' self._csrf_token = None def get_csrf_token(self, url_name): if self._csrf_token is not None: return self._csrf_token page_url = self.URL_REDIRECT_URL % url_name result = self.session.get(page_url) result.raise_for_status() match = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]*)"\);', result.text) if match: return match.group(1) def get_csrf_headers(self, url_name): page_url = self.URL_REDIRECT_URL % url_name headers = { "Origin": self.ipmi_url, "Referer": page_url, } csrf_token = self.get_csrf_token(url_name) if csrf_token is not None: headers["CSRF_TOKEN"] = csrf_token print("HEADERS:", headers) return headers def get_xhr_headers(self, url_name): headers = self.get_csrf_headers(url_name) headers["X-Requested-With"] = "XMLHttpRequest" return headers def login(self, username, password): """ Log into IPMI interface :param username: username to use for logging in :param password: password to use for logging in :return: bool """ if False: try: session_url = self.REDFISH_ROOT + "/SessionService/Sessions" session_data = { 'UserName': username, 'Password': password } result = self.session.post(session_url, json=session_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if True: login_data = { 'name': b64encode(username.encode("UTF-8")), 'pwd': b64encode(password.encode("UTF-8")), 'check': '00' } else: login_data = { 'name': username, 'pwd': password } try: result = self.session.post(self.LOGIN_URL, login_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text: return False # Set mandatory cookies: url_parts = urlparse(self.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English' } for cookie_name, cookie_value in mandatory_cookies.items(): self.session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) return True def get_ipmi_cert_info(self): """ Verify existing certificate information :return: dict """ headers = self.get_xhr_headers("config_ssl") if self.model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_STATUS.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } elif self.model == "X11": cert_info_data = { 'op': 'SSL_STATUS.XML', 'r': '(0,0)', '_': '' } #for cookie in session.cookies: # print(cookie) try: ipmi_info_url = self.IPMI_CERT_INFO_URL result = self.session.post(ipmi_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) print("Result.text:", result.text) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> <STATUS> status = root.xpath('//IPMI/SSL_INFO/STATUS') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] has_cert = int(status.get('CERT_EXIST')) has_cert = bool(has_cert) if has_cert: valid_from = status.get('VALID_FROM') valid_until = status.get('VALID_UNTIL') return { 'has_cert': has_cert, 'valid_from': valid_from, 'valid_until': valid_until } def get_ipmi_cert_valid(self): """ Verify existing certificate information :return: bool """ headers = self.get_xhr_headers("config_ssl") if self.model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_VALIDATE.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } elif self.model == "X11": cert_info_data = { 'op': 'SSL_VALIDATE.XML', 'r': '(0,0)', '_': '' } #for cookie in session.cookies: # print(cookie) try: ipmi_info_url = self.IPMI_CERT_INFO_URL result = self.session.post(ipmi_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] valid_cert = int(status.get('VALIDATE')) return bool(valid_cert) def upload_cert(self, key_file, cert_file): """ Send X.509 certificate and private key to server :param session: Current session object :type session requests.session :param url: base-URL to IPMI :param key_file: filename to X.509 certificate private key :param cert_file: filename to X.509 certificate PEM :return: """ with open(key_file, 'rb') as filehandle: key_data = filehandle.read() with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() # extract certificates only (IMPI doesn't like DH PARAMS) cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' if self.model == "X10": files_to_upload = [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] elif self.model == "X11": files_to_upload = [ ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] headers = self.get_csrf_headers("config_ssl") csrf_token = self.get_csrf_token("config_ssl") csrf_data = { "CSRF_TOKEN": csrf_token } try: upload_cert_url = self.UPLOAD_CERT_URL result = self.session.post(upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def reboot_ipmi(self): # do we need a different Referer here? headers = self.get_xhr_headers("config_ssl") if self.model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') reboot_data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } reboot_url = self.REBOOT_IPMI_X10_URL elif self.model == "X11": reboot_data = { 'op': 'main_bmcreset', '_': '' } reboot_url = self.REBOOT_IPMI_X11_URL try: result = self.session.post(reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if self.model == "X10": if '<STATE CODE="OK"/>' not in result.text: return False return True def create_updater(args): session = requests.session() # First determine if we are X10 or X11 model = determine_model(session, args.ipmi_url) if not args.quiet: print("Board model is " + model) return IPMIUpdater(session, args.ipmi_url, model) def determine_model(session, ipmi_url): redfish_url = f'{ipmi_url}/redfish/v1/' try: r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False) @@ -287,6 +369,9 @@ def main(): args.ipmi_url = args.ipmi_url[0:-1] if not args.quiet: import http.client as http_client http_client.HTTPConnection.debuglevel = 1 # Enable reuest logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) @@ -296,55 +381,39 @@ def main(): # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) updater = create_updater(args) if not updater.login(args.username, args.password): print("Login failed. Cannot continue!") exit(2) cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") if updater.model == "X11": print("Try checking either your IPMI network config or your IPMI license!") exit(2) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not updater.upload_cert(args.key_file, args.cert_file): print("Failed to upload X.509 files to IPMI!") exit(2) cert_valid = updater.get_ipmi_cert_valid() if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = updater.get_ipmi_cert_info() if not cert_info: print("Failed to extract certificate information from IPMI!") if updater.model == "X11": print("Try checking either your IPMI network config or your IPMI license!") exit(2) if not args.quiet and cert_info['has_cert']: @@ -353,7 +422,7 @@ def main(): if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not updater.reboot_ipmi(): print("Rebooting failed! Go reboot it manually?") if not args.quiet: -
oxc revised this gist
Aug 31, 2020 . 1 changed file with 4 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -21,6 +21,7 @@ import os import argparse import re import requests import logging from datetime import datetime @@ -176,6 +177,9 @@ def upload_cert(session, url, key_file, cert_file, model): key_data = filehandle.read() with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() # extract certificates only (IMPI doesn't like DH PARAMS) cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' if model == "X10": -
oxc revised this gist
Oct 3, 2019 . 1 changed file with 83 additions and 84 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -33,12 +33,10 @@ LOGIN_URL = '%s/cgi/login.cgi' IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi' UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi' REBOOT_IPMI_X10_URL = '%s/cgi/BMCReset.cgi' REBOOT_IPMI_X11_URL = '%s/cgi/op.cgi' CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl' REDFISH_ROOT = '%s/redfish/v1/' def login(session, url, username, password): """ @@ -68,35 +66,28 @@ def login(session, url, username, password): return True def get_ipmi_cert_info(session, url, model): """ Verify existing certificate information :param session: Current session object :type session requests.session :param url: base-URL to IPMI :return: dict """ if model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_STATUS.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } elif model == "X11": cert_info_data = { 'op': 'SSL_STATUS.XML', 'r': '(0,0)', '_': '' } #for cookie in session.cookies: # print(cookie) @@ -127,20 +118,29 @@ def get_ipmi_cert_info(session, url, model, user, password): 'valid_until': valid_until } def get_ipmi_cert_valid(session, url, model): """ Verify existing certificate information :param session: Current session object :type session requests.session :param url: base-URL to IPMI :return: bool """ if model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_VALIDATE.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } elif model == "X11": cert_info_data = { 'op': 'SSL_VALIDATE.XML', 'r': '(0,0)', '_': '' } #for cookie in session.cookies: # print(cookie) @@ -162,7 +162,7 @@ def get_ipmi_cert_valid(session, url): valid_cert = int(status.get('VALIDATE')) return bool(valid_cert) def upload_cert(session, url, key_file, cert_file, model): """ Send X.509 certificate and private key to server :param session: Current session object @@ -177,52 +177,54 @@ def upload_cert(session, url, key_file, cert_file, model, user, password): with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() if model == "X10": files_to_upload = [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] elif model == "X11": files_to_upload = [ ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), ('key_file', ('privkey.pem', key_data, 'application/octet-stream')) ] upload_cert_url = UPLOAD_CERT_URL % url try: result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def reboot_ipmi(session, url, model): if model == "X10": timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') reboot_data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } reboot_url = REBOOT_IPMI_X10_URL % url elif model == "X11": reboot_data = { 'op': 'main_bmcreset', '_': '' } reboot_url = REBOOT_IPMI_X11_URL % url try: result = session.post(reboot_url, reboot_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: @@ -250,7 +252,7 @@ def determine_model(session, url): if "UpdateService" in data: return "X11" else: return "X10" def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') @@ -297,24 +299,23 @@ def main(): if not args.quiet: print("Board model is " + model) if not login(session, args.ipmi_url, args.username, args.password): print("Login failed. Cannot continue!") exit(2) # Set mandatory cookies: url_parts = urlparse(args.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English', 'mainpage': 'configuration', 'subpage': 'config_ssl' } for cookie_name, cookie_value in mandatory_cookies.items(): session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) cert_info = get_ipmi_cert_info(session, args.ipmi_url, model) if not cert_info: print("Failed to extract certificate information from IPMI!") if model == "X11": @@ -324,21 +325,19 @@ def main(): print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file, model): print("Failed to upload X.509 files to IPMI!") exit(2) cert_valid = get_ipmi_cert_valid(session, args.ipmi_url, model) if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = get_ipmi_cert_info(session, args.ipmi_url, model) if not cert_info: print("Failed to extract certificate information from IPMI!") if model == "X11": @@ -350,7 +349,7 @@ def main(): if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url, model): print("Rebooting failed! Go reboot it manually?") if not args.quiet: -
dmerner revised this gist
Sep 25, 2019 . 1 changed file with 164 additions and 50 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -22,9 +22,11 @@ import os import argparse import requests import logging from datetime import datetime from lxml import etree from urllib.parse import urlparse from requests.auth import HTTPBasicAuth REQUEST_TIMEOUT = 5.0 @@ -33,7 +35,10 @@ UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi' REBOOT_IPMI_URL = '%s/cgi/BMCReset.cgi' CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl' REDFISH_ROOT = '%s/redfish/v1/' REDFISH_SSL = '%sUpdateService/SSLCert/' REDFISH_SSL_UPLOAD = '%sUpdateService/SSLCert/Actions/SSLCert.Upload' REDFISH_BMC_REBOOT = '%sManagers/1/Actions/Manager.Reset' def login(session, url, username, password): """ @@ -63,7 +68,7 @@ def login(session, url, username, password): return True def get_ipmi_cert_info(session, url, model, user, password): """ Verify existing certificate information :param session: Current session object @@ -73,13 +78,28 @@ def get_ipmi_cert_info(session, url): """ timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') if model == "X11": try: r = session.get(REDFISH_SSL % (REDFISH_ROOT % url), auth=HTTPBasicAuth(user, password), verify=False) except ConnectionError: return False if not r.ok: return False data = r.json() return { 'has_cert': True, 'valid_from': data['VaildFrom'], # Yes, Supermicro made a typo in their BMC API. 'valid_until': data['GoodTHRU'] } cert_info_data = { 'SSL_STATUS.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } #for cookie in session.cookies: # print(cookie) ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) @@ -107,8 +127,42 @@ def get_ipmi_cert_info(session, url): 'valid_until': valid_until } def get_ipmi_cert_valid(session, url): """ Verify existing certificate information :param session: Current session object :type session requests.session :param url: base-URL to IPMI :return: bool """ timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_VALIDATE.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } #for cookie in session.cookies: # print(cookie) ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] valid_cert = int(status.get('VALIDATE')) return bool(valid_cert) def upload_cert(session, url, key_file, cert_file, model, user, password): """ Send X.509 certificate and private key to server :param session: Current session object @@ -122,51 +176,81 @@ def upload_cert(session, url, key_file, cert_file): key_data = filehandle.read() with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() if model == "X10": files_to_upload = [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] upload_cert_url = UPLOAD_CERT_URL % url elif model == "X11": upload_cert_url = REDFISH_SSL_UPLOAD % (REDFISH_ROOT % url) files_to_upload = { 'cert_file' : ('fullchain.pem', open(cert_file, 'rb')), 'key_file' : ('privkey.pem', open(key_file, 'rb')) } try: result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, auth=HTTPBasicAuth(user, password) if model == "X11" else None, verify=False) except ConnectionError: return False if not result.ok: return False if model == "X10": if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def reboot_ipmi(session, url, model, user, password): timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') reboot_data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } if model == "X10": reboot_url = REBOOT_IPMI_URL % url elif model == "X11": reboot_url = REDFISH_BMC_REBOOT % (REDFISH_ROOT % url) try: result = session.post(reboot_url, reboot_data, timeout=REQUEST_TIMEOUT, auth=HTTPBasicAuth(user, password) if model == "X11" else None, verify=False) except ConnectionError: return False if not result.ok: return False if model == "X10": if '<STATE CODE="OK"/>' not in result.text: return False return True def determine_model(session, url): redfish_url = REDFISH_ROOT % url try: r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: exit(2) if not r.ok: exit(2) data = r.json() # The UpdateService methods are only available on newer X11 based boards if "UpdateService" in data: return "X11" else: return "X10" def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') @@ -180,8 +264,10 @@ def main(): help='IPMI username with admin access') parser.add_argument('--password', required=True, help='IPMI user password') parser.add_argument('--no-reboot', action='store_true', help='The default is to reboot the IPMI after upload for the change to take effect.') parser.add_argument('--quiet', action='store_true', help='Do not output anything if successful') args = parser.parse_args() # Confirm args @@ -194,53 +280,81 @@ def main(): if args.ipmi_url[-1] == '/': args.ipmi_url = args.ipmi_url[0:-1] if not args.quiet: # Enable reuest logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) session = requests.session() # First determine if we are X10 or X11 model = determine_model(session, args.ipmi_url) if not args.quiet: print("Board model is " + model) if model == "X10": if not login(session, args.ipmi_url, args.username, args.password): print("Login failed. Cannot continue!") exit(2) # Set mandatory cookies: url_parts = urlparse(args.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English', 'mainpage': 'configuration', 'subpage': 'config_ssl' } for cookie_name, cookie_value in mandatory_cookies.items(): session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) cert_info = get_ipmi_cert_info(session, args.ipmi_url, model, args.username, args.password) if not cert_info: print("Failed to extract certificate information from IPMI!") if model == "X11": print("Try checking either your IPMI network config or your IPMI license!") exit(2) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file, model, args.username, args.password): print("Failed to upload X.509 files to IPMI!") exit(2) # Redfish currently doesn't have a way to download the certificate on X11 boards if model == "X10": cert_valid = get_ipmi_cert_valid(session, args.ipmi_url) if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = get_ipmi_cert_info(session, args.ipmi_url, model, args.username, args.password) if not cert_info: print("Failed to extract certificate information from IPMI!") if model == "X11": print("Try checking either your IPMI network config or your IPMI license!") exit(2) if not args.quiet and cert_info['has_cert']: print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until']) if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url, model, args.username, args.password): print("Rebooting failed! Go reboot it manually?") if not args.quiet: print("All done!") if __name__ == "__main__": -
mcdamo revised this gist
Feb 7, 2019 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -287,7 +287,7 @@ def main(): if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url): print("Rebooting failed! Go reboot it manually?") -
mcdamo revised this gist
Feb 7, 2019 . 1 changed file with 66 additions and 14 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -22,6 +22,7 @@ import os import argparse import requests import logging from datetime import datetime from lxml import etree from urllib.parse import urlparse @@ -78,8 +79,8 @@ def get_ipmi_cert_info(session, url): 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } #for cookie in session.cookies: # print(cookie) ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) @@ -107,6 +108,40 @@ def get_ipmi_cert_info(session, url): 'valid_until': valid_until } def get_ipmi_cert_valid(session, url): """ Verify existing certificate information :param session: Current session object :type session requests.session :param url: base-URL to IPMI :return: bool """ timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_VALIDATE.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } #for cookie in session.cookies: # print(cookie) ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> status = root.xpath('//IPMI/SSL_INFO') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] valid_cert = int(status.get('VALIDATE')) return bool(valid_cert) def upload_cert(session, url, key_file, cert_file): """ @@ -123,8 +158,8 @@ def upload_cert(session, url, key_file, cert_file): with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() files_to_upload = [ ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')), ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream')) ] upload_cert_url = UPLOAD_CERT_URL % url @@ -140,7 +175,6 @@ def upload_cert(session, url, key_file, cert_file): return False if 'CONFPAGE_RESET' not in result.text: return False return True @@ -159,9 +193,9 @@ def reboot_ipmi(session, url): if not result.ok: return False #print("Url: %s" % upload_cert_url) #print(result.headers) #print(result.text) if '<STATE CODE="OK"/>' not in result.text: return False @@ -180,8 +214,10 @@ def main(): help='IPMI username with admin access') parser.add_argument('--password', required=True, help='IPMI user password') parser.add_argument('--no-reboot', action='store_true', help='The default is to reboot the IPMI after upload for the change to take effect.') parser.add_argument('--quiet', action='store_true', help='Do not output anything if successful') args = parser.parse_args() # Confirm args @@ -194,6 +230,14 @@ def main(): if args.ipmi_url[-1] == '/': args.ipmi_url = args.ipmi_url[0:-1] if not args.quiet: # Enable reuest logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) session = requests.session() @@ -218,29 +262,37 @@ def main(): if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if not args.quiet and cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file): print("Failed to upload X.509 files to IPMI!") exit(2) cert_valid = get_ipmi_cert_valid(session, args.ipmi_url) if not cert_valid: print("Uploads failed validation") exit(2) if not args.quiet: print("Uploaded files ok.") cert_info = get_ipmi_cert_info(session, args.ipmi_url) if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if not args.quiet and cert_info['has_cert']: print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until']) if not args.no_reboot: if not args.quiet: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url): print("Rebooting failed! Go reboot it manually?") if not args.quiet: print("All done!") if __name__ == "__main__": -
HQJaTu revised this gist
Oct 21, 2018 . 1 changed file with 40 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -24,13 +24,15 @@ import requests from datetime import datetime from lxml import etree from urllib.parse import urlparse REQUEST_TIMEOUT = 5.0 LOGIN_URL = '%s/cgi/login.cgi' IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi' UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi' REBOOT_IPMI_URL = '%s/cgi/BMCReset.cgi' CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl' def login(session, url, username, password): @@ -76,6 +78,8 @@ def get_ipmi_cert_info(session, url): 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } for cookie in session.cookies: print(cookie) ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) @@ -119,8 +123,8 @@ def upload_cert(session, url, key_file, cert_file): with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() files_to_upload = [ ('/tmp/cert.key', ('cert.key', key_data, 'application/octet-stream')), ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/x-x509-ca-cert')) ] upload_cert_url = UPLOAD_CERT_URL % url @@ -141,15 +145,24 @@ def upload_cert(session, url, key_file, cert_file): def reboot_ipmi(session, url): timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') reboot_data = { 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } upload_cert_url = REBOOT_IPMI_URL % url try: result = session.post(upload_cert_url, reboot_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False print("Url: %s" % upload_cert_url) print(result.headers) print(result.text) if '<STATE CODE="OK"/>' not in result.text: return False return True @@ -187,6 +200,20 @@ def main(): if not login(session, args.ipmi_url, args.username, args.password): print("Login failed. Cannot continue!") exit(2) # Set mandatory cookies: url_parts = urlparse(args.ipmi_url) # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl mandatory_cookies = { 'langSetFlag': '0', 'language': 'English', 'mainpage': 'configuration', 'subpage': 'config_ssl' } for cookie_name, cookie_value in mandatory_cookies.items(): session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) cert_info = get_ipmi_cert_info(session, args.ipmi_url) if not cert_info: print("Failed to extract certificate information from IPMI!") @@ -200,6 +227,14 @@ def main(): exit(2) print("Uploaded files ok.") cert_info = get_ipmi_cert_info(session, args.ipmi_url) if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if cert_info['has_cert']: print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until']) if not args.no_reboot: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url): -
HQJaTu created this gist
Jul 12, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,212 @@ #!/usr/bin/env python3 # vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python # This file is part of Supermicro IPMI certificate updater. # Supermicro IPMI certificate updater is free software: you can # redistribute it and/or modify it under the terms of the GNU General Public # License as published by the Free Software Foundation, version 2. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Copyright (c) Jari Turkia import os import argparse import requests from datetime import datetime from lxml import etree REQUEST_TIMEOUT = 5.0 LOGIN_URL = '%s/cgi/login.cgi' IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi' UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi' REBOOT_IPMI_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl_fw_reset' def login(session, url, username, password): """ Log into IPMI interface :param session: Current session object :type session requests.session :param url: base-URL to IPMI :param username: username to use for logging in :param password: password to use for logging in :return: bool """ login_data = { 'name': username, 'pwd': password } login_url = LOGIN_URL % url try: result = session.post(login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text: return False return True def get_ipmi_cert_info(session, url): """ Verify existing certificate information :param session: Current session object :type session requests.session :param url: base-URL to IPMI :return: dict """ timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') cert_info_data = { 'SSL_STATUS.XML': '(0,0)', 'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' } ipmi_info_url = IPMI_CERT_INFO_URL % url try: result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False root = etree.fromstring(result.text) # <?xml> <IPMI> <SSL_INFO> <STATUS> status = root.xpath('//IPMI/SSL_INFO/STATUS') if not status: return False # Since xpath will return a list, just pick the first one from it. status = status[0] has_cert = int(status.get('CERT_EXIST')) has_cert = bool(has_cert) if has_cert: valid_from = status.get('VALID_FROM') valid_until = status.get('VALID_UNTIL') return { 'has_cert': has_cert, 'valid_from': valid_from, 'valid_until': valid_until } def upload_cert(session, url, key_file, cert_file): """ Send X.509 certificate and private key to server :param session: Current session object :type session requests.session :param url: base-URL to IPMI :param key_file: filename to X.509 certificate private key :param cert_file: filename to X.509 certificate PEM :return: """ with open(key_file, 'rb') as filehandle: key_data = filehandle.read() with open(cert_file, 'rb') as filehandle: cert_data = filehandle.read() files_to_upload = [ ('/tmp/key.pem', ('/tmp/key.pem', key_data, 'application/octet-stream')), ('/tmp/cert.pem', ('/tmp/cert.pem', cert_data, 'application/x-x509-ca-cert')) ] upload_cert_url = UPLOAD_CERT_URL % url try: result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' return False if 'CONFPAGE_RESET' not in result.text: return False return True def reboot_ipmi(session, url): upload_cert_url = REBOOT_IPMI_URL % url try: result = session.get(upload_cert_url, timeout=REQUEST_TIMEOUT, verify=False) except ConnectionError: return False if not result.ok: return False if 'LANG_FW_RESET_DESC1' not in result.text: return False return True def main(): parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') parser.add_argument('--ipmi-url', required=True, help='Supermicro IPMI 2.0 URL') parser.add_argument('--key-file', required=True, help='X.509 Private key filename') parser.add_argument('--cert-file', required=True, help='X.509 Certificate filename') parser.add_argument('--username', required=True, help='IPMI username with admin access') parser.add_argument('--password', required=True, help='IPMI user password') parser.add_argument('--no-reboot', help='The default is to reboot the IPMI after upload for the change to take effect.') args = parser.parse_args() # Confirm args if not os.path.isfile(args.key_file): print("--key-file '%s' doesn't exist!" % args.key_file) exit(2) if not os.path.isfile(args.cert_file): print("--cert-file '%s' doesn't exist!" % args.cert_file) exit(2) if args.ipmi_url[-1] == '/': args.ipmi_url = args.ipmi_url[0:-1] # Start the operation requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) session = requests.session() if not login(session, args.ipmi_url, args.username, args.password): print("Login failed. Cannot continue!") exit(2) cert_info = get_ipmi_cert_info(session, args.ipmi_url) if not cert_info: print("Failed to extract certificate information from IPMI!") exit(2) if cert_info['has_cert']: print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) # Go upload! if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file): print("Failed to upload X.509 files to IPMI!") exit(2) print("Uploaded files ok.") if not args.no_reboot: print("Rebooting IPMI to apply changes.") if not reboot_ipmi(session, args.ipmi_url): print("Rebooting failed! Go reboot it manually?") print("All done!") if __name__ == "__main__": main()