Skip to content

Instantly share code, notes, and snippets.

@mcdamo
Forked from HQJaTu/ipmi-updater.py
Last active July 14, 2025 20:59
  • Select an option

Select an option

Revisions

  1. mcdamo revised this gist Oct 20, 2020. 1 changed file with 9 additions and 4 deletions.
    13 changes: 9 additions & 4 deletions ipmi-updater.py
    Original file line number Diff line number Diff line change
    @@ -312,10 +312,11 @@ def _get_upload_data(self, cert_data, key_data):
    def create_updater(args):
    session = requests.session()

    if not args.model:
    model = determine_model(session, args.ipmi_url)
    if args.model is None:
    model = determine_model(session, args.ipmi_url, args.debug)
    else:
    model = args.model

    if not args.quiet:
    print("Board model is " + model)

    @@ -326,14 +327,18 @@ def create_updater(args):
    else:
    raise Exception(f"Unknown model: {model}")

    def determine_model(session, ipmi_url):
    def determine_model(session, ipmi_url, debug):
    redfish_url = f'{ipmi_url}/redfish/v1/'

    try:
    r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    except (ConnectionError, requests.exceptions.SSLError) as err:
    print("Failed to determine model: connection error")
    if debug:
    print(err)
    exit(2)
    if not r.ok:
    print(f"Failed to determine model (try --model): {r.status_code} {r.reason}")
    exit(2)

    data = r.json()
  2. mcdamo revised this gist Oct 20, 2020. 1 changed file with 40 additions and 13 deletions.
    53 changes: 40 additions & 13 deletions ipmi-updater.py
    Original file line number Diff line number Diff line change
    @@ -45,7 +45,14 @@ def __init__(self, session, ipmi_url):
    self.use_b64encoded_login = True

    self._csrf_token = None


    error_log = logging.getLogger("IPMIUpdater")
    error_log.setLevel(logging.ERROR)
    self.setLogger(error_log)

    def setLogger(self, logger):
    self.logger = logger

    def get_csrf_token(self, url_name):
    if self._csrf_token is not None:
    return self._csrf_token
    @@ -69,7 +76,7 @@ def get_csrf_headers(self, url_name):
    if csrf_token is not None:
    headers["CSRF_TOKEN"] = csrf_token

    print("HEADERS:", headers)
    self.logger.debug("HEADERS:%s" % headers)
    return headers

    def get_xhr_headers(self, url_name):
    @@ -135,15 +142,15 @@ def get_ipmi_cert_info(self):
    if not result.ok:
    return False

    self.logger.debug(result.text)
    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO> <STATUS>
    status = root.xpath('//IPMI/SSL_INFO/STATUS')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    has_cert = int(status.get('CERT_EXIST'))
    has_cert = bool(has_cert)
    has_cert = bool(int(status.get('CERT_EXIST')))
    if has_cert:
    valid_from = status.get('VALID_FROM')
    valid_until = status.get('VALID_UNTIL')
    @@ -171,15 +178,15 @@ def get_ipmi_cert_valid(self):
    if not result.ok:
    return False

    self.logger.debug(result.text)
    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO>
    status = root.xpath('//IPMI/SSL_INFO')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    valid_cert = int(status.get('VALIDATE'))
    return bool(valid_cert)
    return bool(int(status.get('VALIDATE')))

    def upload_cert(self, key_file, cert_file):
    """
    @@ -261,13 +268,22 @@ def _get_op_data(self, op, r):

    def _get_upload_data(self, cert_data, key_data):
    return [
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')),
    ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream'))
    ('cert_file', ('cert.pem', cert_data, 'application/octet-stream')),
    ('key_file', ('privkey.pem', key_data, 'application/octet-stream'))
    ]

    def _check_reboot_result(self, result):
    if '<STATE CODE="OK"/>' not in result.text:
    self.logger.debug(result.text)
    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO>
    status = root.xpath('//IPMI/BMC_RESET/STATE')
    if not status:
    return False
    if status[0].get('CODE') == 'OK':
    return True
    return False
    #if '<STATE CODE="OK"/>' not in result.text:
    # return False


    class IPMIX11Updater(IPMIUpdater):
    @@ -296,8 +312,10 @@ def _get_upload_data(self, cert_data, key_data):
    def create_updater(args):
    session = requests.session()

    # First determine if we are X10 or X11
    model = determine_model(session, args.ipmi_url)
    if not args.model:
    model = determine_model(session, args.ipmi_url)
    else:
    model = args.model
    if not args.quiet:
    print("Board model is " + model)

    @@ -330,6 +348,8 @@ def main():
    parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate')
    parser.add_argument('--ipmi-url', required=True,
    help='Supermicro IPMI 2.0 URL')
    parser.add_argument('--model', required=False,
    help='Board model, eg. X10 or X11')
    parser.add_argument('--key-file', required=True,
    help='X.509 Private key filename')
    parser.add_argument('--cert-file', required=True,
    @@ -342,6 +362,8 @@ def main():
    help='The default is to reboot the IPMI after upload for the change to take effect.')
    parser.add_argument('--quiet', action='store_true',
    help='Do not output anything if successful')
    parser.add_argument('--debug', action='store_true',
    help='Output additional debugging')
    args = parser.parse_args()

    # Confirm args
    @@ -354,11 +376,11 @@ def main():
    if args.ipmi_url[-1] == '/':
    args.ipmi_url = args.ipmi_url[0:-1]

    if not args.quiet:
    if args.debug:
    import http.client as http_client
    http_client.HTTPConnection.debuglevel = 1

    # Enable reuest logging
    # Enable request logging
    logging.basicConfig()
    logging.getLogger().setLevel(logging.DEBUG)
    requests_log = logging.getLogger("requests.packages.urllib3")
    @@ -369,6 +391,10 @@ def main():
    requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)

    updater = create_updater(args)
    if args.debug:
    debug_log = logging.getLogger("IPMIUpdater")
    debug_log.setLevel(logging.DEBUG)
    updater.setLogger(debug_log)

    if not updater.login(args.username, args.password):
    print("Login failed. Cannot continue!")
    @@ -413,3 +439,4 @@ def main():

    if __name__ == "__main__":
    main()

  3. mcdamo revised this gist Oct 20, 2020. 1 changed file with 300 additions and 184 deletions.
    484 changes: 300 additions & 184 deletions ipmi-updater.py
    Original file line number Diff line number Diff line change
    @@ -21,186 +21,310 @@

    import os
    import argparse
    import re
    import requests
    import logging
    from base64 import b64encode
    from datetime import datetime
    from lxml import etree
    from urllib.parse import urlparse
    from requests.auth import HTTPBasicAuth

    REQUEST_TIMEOUT = 5.0

    LOGIN_URL = '%s/cgi/login.cgi'
    IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi'
    UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi'
    REBOOT_IPMI_URL = '%s/cgi/BMCReset.cgi'
    CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl'


    def login(session, url, username, password):
    """
    Log into IPMI interface
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :param username: username to use for logging in
    :param password: password to use for logging in
    :return: bool
    """
    login_data = {
    'name': username,
    'pwd': password
    }

    login_url = LOGIN_URL % url
    try:
    result = session.post(login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False
    if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text:
    return False

    return True


    def get_ipmi_cert_info(session, url):
    """
    Verify existing certificate information
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :return: dict
    """
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_STATUS.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    #for cookie in session.cookies:
    # print(cookie)
    ipmi_info_url = IPMI_CERT_INFO_URL % url
    try:
    result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO> <STATUS>
    status = root.xpath('//IPMI/SSL_INFO/STATUS')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    has_cert = int(status.get('CERT_EXIST'))
    has_cert = bool(has_cert)
    if has_cert:
    valid_from = status.get('VALID_FROM')
    valid_until = status.get('VALID_UNTIL')

    return {
    'has_cert': has_cert,
    'valid_from': valid_from,
    'valid_until': valid_until
    }

    def get_ipmi_cert_valid(session, url):
    """
    Verify existing certificate information
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :return: bool
    """
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_VALIDATE.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    #for cookie in session.cookies:
    # print(cookie)
    ipmi_info_url = IPMI_CERT_INFO_URL % url
    try:
    result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO>
    status = root.xpath('//IPMI/SSL_INFO')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    valid_cert = int(status.get('VALIDATE'))
    return bool(valid_cert)

    def upload_cert(session, url, key_file, cert_file):
    """
    Send X.509 certificate and private key to server
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :param key_file: filename to X.509 certificate private key
    :param cert_file: filename to X.509 certificate PEM
    :return:
    """
    with open(key_file, 'rb') as filehandle:
    key_data = filehandle.read()
    with open(cert_file, 'rb') as filehandle:
    cert_data = filehandle.read()
    files_to_upload = [
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')),
    ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream'))
    ]

    upload_cert_url = UPLOAD_CERT_URL % url
    try:
    result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html':
    # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked'
    return False
    if 'CONFPAGE_RESET' not in result.text:
    return False
    return True
    class IPMIUpdater:
    def __init__(self, session, ipmi_url):
    self.session = session
    self.ipmi_url = ipmi_url

    self.login_url = f'{ipmi_url}/cgi/login.cgi'
    self.cert_info_url = f'{ipmi_url}/cgi/ipmi.cgi'
    self.upload_cert_url = f'{ipmi_url}/cgi/upload_ssl.cgi'
    self.url_redirect_template = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s'

    self.use_b64encoded_login = True

    self._csrf_token = None

    def get_csrf_token(self, url_name):
    if self._csrf_token is not None:
    return self._csrf_token

    page_url = self.url_redirect_template % url_name
    result = self.session.get(page_url)
    result.raise_for_status()

    match = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]*)"\);', result.text)
    if match:
    return match.group(1)

    def get_csrf_headers(self, url_name):
    page_url = self.url_redirect_template % url_name

    headers = {
    "Origin": self.ipmi_url,
    "Referer": page_url,
    }
    csrf_token = self.get_csrf_token(url_name)
    if csrf_token is not None:
    headers["CSRF_TOKEN"] = csrf_token

    print("HEADERS:", headers)
    return headers

    def get_xhr_headers(self, url_name):
    headers = self.get_csrf_headers(url_name)
    headers["X-Requested-With"] = "XMLHttpRequest"
    return headers

    def login(self, username, password):
    """
    Log into IPMI interface
    :param username: username to use for logging in
    :param password: password to use for logging in
    :return: bool
    """
    if self.use_b64encoded_login:
    login_data = {
    'name': b64encode(username.encode("UTF-8")),
    'pwd': b64encode(password.encode("UTF-8")),
    'check': '00'
    }
    else:
    login_data = {
    'name': username,
    'pwd': password
    }

    try:
    result = self.session.post(self.login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False
    if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text:
    return False

    # Set mandatory cookies:
    url_parts = urlparse(self.ipmi_url)
    # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl
    mandatory_cookies = {
    'langSetFlag': '0',
    'language': 'English'
    }
    for cookie_name, cookie_value in mandatory_cookies.items():
    self.session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname)

    return True


    def get_ipmi_cert_info(self):
    """
    Verify existing certificate information
    :return: dict
    """

    headers = self.get_xhr_headers("config_ssl")

    cert_info_data = self._get_op_data('SSL_STATUS.XML', '(0,0)')

    try:
    result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO> <STATUS>
    status = root.xpath('//IPMI/SSL_INFO/STATUS')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    has_cert = int(status.get('CERT_EXIST'))
    has_cert = bool(has_cert)
    if has_cert:
    valid_from = status.get('VALID_FROM')
    valid_until = status.get('VALID_UNTIL')

    return {
    'has_cert': has_cert,
    'valid_from': valid_from,
    'valid_until': valid_until
    }

    def get_ipmi_cert_valid(self):
    """
    Verify existing certificate information
    :return: bool
    """

    headers = self.get_xhr_headers("config_ssl")

    cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)')

    try:
    result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO>
    status = root.xpath('//IPMI/SSL_INFO')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    valid_cert = int(status.get('VALIDATE'))
    return bool(valid_cert)

    def upload_cert(self, key_file, cert_file):
    """
    Send X.509 certificate and private key to server
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :param key_file: filename to X.509 certificate private key
    :param cert_file: filename to X.509 certificate PEM
    :return:
    """
    with open(key_file, 'rb') as filehandle:
    key_data = filehandle.read()
    with open(cert_file, 'rb') as filehandle:
    cert_data = filehandle.read()
    # extract certificates only (IMPI doesn't like DH PARAMS)
    cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n'

    files_to_upload = self._get_upload_data(cert_data, key_data)

    headers = self.get_csrf_headers("config_ssl")
    csrf_token = self.get_csrf_token("config_ssl")
    csrf_data = {}
    if csrf_token is not None:
    csrf_data["CSRF_TOKEN"] = csrf_token

    try:
    result = self.session.post(self.upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False


    if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html':
    # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked'
    return False
    if 'CONFPAGE_RESET' not in result.text:
    return False

    return True


    def _check_reboot_result(self, result):
    return True

    def reboot_ipmi(self):
    # do we need a different Referer here?
    headers = self.get_xhr_headers("config_ssl")

    reboot_data = self._get_op_data('main_bmcreset', None)

    try:
    result = self.session.post(self.reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if not self._check_reboot_result(result):
    return False
    return True

    class IPMIX10Updater(IPMIUpdater):
    def __init__(self, session, ipmi_url):
    super().__init__(session, ipmi_url)
    self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi'
    self.use_b64encoded_login = False

    def _get_op_data(self, op, r):
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    data = {
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    if r is not None:
    data[op] = r
    return data

    def _get_upload_data(self, cert_data, key_data):
    return [
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')),
    ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream'))
    ]

    def _check_reboot_result(self, result):
    if '<STATE CODE="OK"/>' not in result.text:
    return False


    class IPMIX11Updater(IPMIUpdater):
    def __init__(self, session, ipmi_url):
    super().__init__(session, ipmi_url)
    self.reboot_url = f'{ipmi_url}/cgi/op.cgi'
    self.use_b64encoded_login = True

    def _get_op_data(self, op, r):
    data = {
    'op': op
    }

    if r is not None:
    data['r'] = r
    data['_'] = ''
    return data

    def _get_upload_data(self, cert_data, key_data):
    return [
    ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')),
    ('key_file', ('privkey.pem', key_data, 'application/octet-stream'))
    ]


    def create_updater(args):
    session = requests.session()

    # First determine if we are X10 or X11
    model = determine_model(session, args.ipmi_url)
    if not args.quiet:
    print("Board model is " + model)

    def reboot_ipmi(session, url):
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')
    if model == "X10":
    return IPMIX10Updater(session, args.ipmi_url)
    elif model == "X11":
    return IPMIX11Updater(session, args.ipmi_url)
    else:
    raise Exception(f"Unknown model: {model}")

    reboot_data = {
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    def determine_model(session, ipmi_url):
    redfish_url = f'{ipmi_url}/redfish/v1/'

    upload_cert_url = REBOOT_IPMI_URL % url
    try:
    result = session.post(upload_cert_url, reboot_data, timeout=REQUEST_TIMEOUT, verify=False)
    r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    #print("Url: %s" % upload_cert_url)
    #print(result.headers)
    #print(result.text)
    if '<STATE CODE="OK"/>' not in result.text:
    return False
    exit(2)
    if not r.ok:
    exit(2)

    return True
    data = r.json()

    # The UpdateService methods are only available on newer X11 based boards
    if "UpdateService" in data:
    return "X11"
    else:
    return "X10"

    def main():
    parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate')
    @@ -231,6 +355,9 @@ def main():
    args.ipmi_url = args.ipmi_url[0:-1]

    if not args.quiet:
    import http.client as http_client
    http_client.HTTPConnection.debuglevel = 1

    # Enable reuest logging
    logging.basicConfig()
    logging.getLogger().setLevel(logging.DEBUG)
    @@ -240,45 +367,34 @@ def main():

    # Start the operation
    requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
    session = requests.session()
    if not login(session, args.ipmi_url, args.username, args.password):
    print("Login failed. Cannot continue!")
    exit(2)

    updater = create_updater(args)

    # Set mandatory cookies:
    url_parts = urlparse(args.ipmi_url)
    # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl
    mandatory_cookies = {
    'langSetFlag': '0',
    'language': 'English',
    'mainpage': 'configuration',
    'subpage': 'config_ssl'
    }
    for cookie_name, cookie_value in mandatory_cookies.items():
    session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname)
    if not updater.login(args.username, args.password):
    print("Login failed. Cannot continue!")
    exit(2)

    cert_info = get_ipmi_cert_info(session, args.ipmi_url)
    cert_info = updater.get_ipmi_cert_info()
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    exit(2)
    if not args.quiet and cert_info['has_cert']:
    print("There exists a certificate, which is valid until: %s" % cert_info['valid_until'])

    # Go upload!
    if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file):
    if not updater.upload_cert(args.key_file, args.cert_file):
    print("Failed to upload X.509 files to IPMI!")
    exit(2)

    cert_valid = get_ipmi_cert_valid(session, args.ipmi_url)
    cert_valid = updater.get_ipmi_cert_valid()
    if not cert_valid:
    print("Uploads failed validation")
    exit(2)

    if not args.quiet:
    print("Uploaded files ok.")

    cert_info = get_ipmi_cert_info(session, args.ipmi_url)
    cert_info = updater.get_ipmi_cert_info()
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    exit(2)
    @@ -288,7 +404,7 @@ def main():
    if not args.no_reboot:
    if not args.quiet:
    print("Rebooting IPMI to apply changes.")
    if not reboot_ipmi(session, args.ipmi_url):
    if not updater.reboot_ipmi():
    print("Rebooting failed! Go reboot it manually?")

    if not args.quiet:
  4. @oxc oxc revised this gist Aug 31, 2020. 1 changed file with 83 additions and 101 deletions.
    184 changes: 83 additions & 101 deletions ipmi-updater.py
    Original file line number Diff line number Diff line change
    @@ -33,26 +33,24 @@
    REQUEST_TIMEOUT = 5.0

    class IPMIUpdater:
    def __init__(self, session, ipmi_url, model):
    def __init__(self, session, ipmi_url):
    self.session = session
    self.ipmi_url = ipmi_url
    self.model = model

    self.LOGIN_URL = f'{ipmi_url}/cgi/login.cgi'
    self.IPMI_CERT_INFO_URL = f'{ipmi_url}/cgi/ipmi.cgi'
    self.UPLOAD_CERT_URL = f'{ipmi_url}/cgi/upload_ssl.cgi'
    self.REBOOT_IPMI_X10_URL = f'{ipmi_url}/cgi/BMCReset.cgi'
    self.REBOOT_IPMI_X11_URL = f'{ipmi_url}/cgi/op.cgi'
    self.URL_REDIRECT_URL = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s'
    self.REDFISH_ROOT = f'{ipmi_url}/redfish/v1/'
    self.login_url = f'{ipmi_url}/cgi/login.cgi'
    self.cert_info_url = f'{ipmi_url}/cgi/ipmi.cgi'
    self.upload_cert_url = f'{ipmi_url}/cgi/upload_ssl.cgi'
    self.url_redirect_template = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s'

    self.use_b64encoded_login = True

    self._csrf_token = None

    def get_csrf_token(self, url_name):
    if self._csrf_token is not None:
    return self._csrf_token

    page_url = self.URL_REDIRECT_URL % url_name
    page_url = self.url_redirect_template % url_name
    result = self.session.get(page_url)
    result.raise_for_status()

    @@ -61,7 +59,7 @@ def get_csrf_token(self, url_name):
    return match.group(1)

    def get_csrf_headers(self, url_name):
    page_url = self.URL_REDIRECT_URL % url_name
    page_url = self.url_redirect_template % url_name

    headers = {
    "Origin": self.ipmi_url,
    @@ -86,20 +84,7 @@ def login(self, username, password):
    :param password: password to use for logging in
    :return: bool
    """
    if False:
    try:
    session_url = self.REDFISH_ROOT + "/SessionService/Sessions"
    session_data = {
    'UserName': username,
    'Password': password
    }
    result = self.session.post(session_url, json=session_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if True:
    if self.use_b64encoded_login:
    login_data = {
    'name': b64encode(username.encode("UTF-8")),
    'pwd': b64encode(password.encode("UTF-8")),
    @@ -112,7 +97,7 @@ def login(self, username, password):
    }

    try:
    result = self.session.post(self.LOGIN_URL, login_data, timeout=REQUEST_TIMEOUT, verify=False)
    result = self.session.post(self.login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    @@ -141,26 +126,10 @@ def get_ipmi_cert_info(self):

    headers = self.get_xhr_headers("config_ssl")

    if self.model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_STATUS.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    elif self.model == "X11":
    cert_info_data = {
    'op': 'SSL_STATUS.XML',
    'r': '(0,0)',
    '_': ''
    }
    cert_info_data = self._get_op_data('SSL_STATUS.XML', '(0,0)')

    #for cookie in session.cookies:
    # print(cookie)
    try:
    ipmi_info_url = self.IPMI_CERT_INFO_URL
    result = self.session.post(ipmi_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    print("Result.text:", result.text)
    result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    @@ -193,26 +162,10 @@ def get_ipmi_cert_valid(self):

    headers = self.get_xhr_headers("config_ssl")

    if self.model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_VALIDATE.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    elif self.model == "X11":
    cert_info_data = {
    'op': 'SSL_VALIDATE.XML',
    'r': '(0,0)',
    '_': ''
    }

    cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)')

    #for cookie in session.cookies:
    # print(cookie)
    try:
    ipmi_info_url = self.IPMI_CERT_INFO_URL
    result = self.session.post(ipmi_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    @@ -245,24 +198,16 @@ def upload_cert(self, key_file, cert_file):
    # extract certificates only (IMPI doesn't like DH PARAMS)
    cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n'

    if self.model == "X10":
    files_to_upload = [
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')),
    ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream'))
    ]
    elif self.model == "X11":
    files_to_upload = [
    ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')),
    ('key_file', ('privkey.pem', key_data, 'application/octet-stream'))
    ]
    files_to_upload = self._get_upload_data(cert_data, key_data)

    headers = self.get_csrf_headers("config_ssl")
    csrf_token = self.get_csrf_token("config_ssl")
    csrf_data = { "CSRF_TOKEN": csrf_token }
    csrf_data = {}
    if csrf_token is not None:
    csrf_data["CSRF_TOKEN"] = csrf_token

    try:
    upload_cert_url = self.UPLOAD_CERT_URL
    result = self.session.post(upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    result = self.session.post(self.upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    @@ -278,39 +223,76 @@ def upload_cert(self, key_file, cert_file):
    return True


    def _check_reboot_result(self, result):
    return True

    def reboot_ipmi(self):
    # do we need a different Referer here?
    headers = self.get_xhr_headers("config_ssl")

    if self.model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    reboot_data = {
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    reboot_url = self.REBOOT_IPMI_X10_URL
    elif self.model == "X11":
    reboot_data = {
    'op': 'main_bmcreset',
    '_': ''
    }

    reboot_url = self.REBOOT_IPMI_X11_URL
    reboot_data = self._get_op_data('main_bmcreset', None)

    try:
    result = self.session.post(reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    result = self.session.post(self.reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if self.model == "X10":
    if '<STATE CODE="OK"/>' not in result.text:
    return False

    if not self._check_reboot_result(result):
    return False
    return True

    class IPMIX10Updater(IPMIUpdater):
    def __init__(self, session, ipmi_url):
    super().__init__(session, ipmi_url)
    self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi'
    self.use_b64encoded_login = False

    def _get_op_data(self, op, r):
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    data = {
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    if r is not None:
    data[op] = r
    return data

    def _get_upload_data(self, cert_data, key_data):
    return [
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')),
    ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream'))
    ]

    def _check_reboot_result(self, result):
    if '<STATE CODE="OK"/>' not in result.text:
    return False


    class IPMIX11Updater(IPMIUpdater):
    def __init__(self, session, ipmi_url):
    super().__init__(session, ipmi_url)
    self.reboot_url = f'{ipmi_url}/cgi/op.cgi'
    self.use_b64encoded_login = True

    def _get_op_data(self, op, r):
    data = {
    'op': op
    }

    if r is not None:
    data['r'] = r
    data['_'] = ''
    return data

    def _get_upload_data(self, cert_data, key_data):
    return [
    ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')),
    ('key_file', ('privkey.pem', key_data, 'application/octet-stream'))
    ]


    def create_updater(args):
    session = requests.session()

    @@ -319,8 +301,12 @@ def create_updater(args):
    if not args.quiet:
    print("Board model is " + model)

    return IPMIUpdater(session, args.ipmi_url, model)

    if model == "X10":
    return IPMIX10Updater(session, args.ipmi_url)
    elif model == "X11":
    return IPMIX11Updater(session, args.ipmi_url)
    else:
    raise Exception(f"Unknown model: {model}")

    def determine_model(session, ipmi_url):
    redfish_url = f'{ipmi_url}/redfish/v1/'
    @@ -391,8 +377,6 @@ def main():
    cert_info = updater.get_ipmi_cert_info()
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    if updater.model == "X11":
    print("Try checking either your IPMI network config or your IPMI license!")
    exit(2)
    if not args.quiet and cert_info['has_cert']:
    print("There exists a certificate, which is valid until: %s" % cert_info['valid_until'])
    @@ -413,8 +397,6 @@ def main():
    cert_info = updater.get_ipmi_cert_info()
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    if updater.model == "X11":
    print("Try checking either your IPMI network config or your IPMI license!")
    exit(2)
    if not args.quiet and cert_info['has_cert']:
    print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until'])
  5. @oxc oxc revised this gist Aug 31, 2020. 1 changed file with 287 additions and 218 deletions.
    505 changes: 287 additions & 218 deletions ipmi-updater.py
    Original file line number Diff line number Diff line change
    @@ -24,224 +24,306 @@
    import re
    import requests
    import logging
    from base64 import b64encode
    from datetime import datetime
    from lxml import etree
    from urllib.parse import urlparse
    from requests.auth import HTTPBasicAuth

    REQUEST_TIMEOUT = 5.0

    LOGIN_URL = '%s/cgi/login.cgi'
    IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi'
    UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi'
    REBOOT_IPMI_X10_URL = '%s/cgi/BMCReset.cgi'
    REBOOT_IPMI_X11_URL = '%s/cgi/op.cgi'
    CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl'
    REDFISH_ROOT = '%s/redfish/v1/'

    def login(session, url, username, password):
    """
    Log into IPMI interface
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :param username: username to use for logging in
    :param password: password to use for logging in
    :return: bool
    """
    login_data = {
    'name': username,
    'pwd': password
    }

    login_url = LOGIN_URL % url
    try:
    result = session.post(login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False
    if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text:
    return False

    return True


    def get_ipmi_cert_info(session, url, model):
    """
    Verify existing certificate information
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :return: dict
    """

    if model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_STATUS.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    elif model == "X11":
    cert_info_data = {
    'op': 'SSL_STATUS.XML',
    'r': '(0,0)',
    '_': ''
    class IPMIUpdater:
    def __init__(self, session, ipmi_url, model):
    self.session = session
    self.ipmi_url = ipmi_url
    self.model = model

    self.LOGIN_URL = f'{ipmi_url}/cgi/login.cgi'
    self.IPMI_CERT_INFO_URL = f'{ipmi_url}/cgi/ipmi.cgi'
    self.UPLOAD_CERT_URL = f'{ipmi_url}/cgi/upload_ssl.cgi'
    self.REBOOT_IPMI_X10_URL = f'{ipmi_url}/cgi/BMCReset.cgi'
    self.REBOOT_IPMI_X11_URL = f'{ipmi_url}/cgi/op.cgi'
    self.URL_REDIRECT_URL = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s'
    self.REDFISH_ROOT = f'{ipmi_url}/redfish/v1/'

    self._csrf_token = None

    def get_csrf_token(self, url_name):
    if self._csrf_token is not None:
    return self._csrf_token

    page_url = self.URL_REDIRECT_URL % url_name
    result = self.session.get(page_url)
    result.raise_for_status()

    match = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]*)"\);', result.text)
    if match:
    return match.group(1)

    def get_csrf_headers(self, url_name):
    page_url = self.URL_REDIRECT_URL % url_name

    headers = {
    "Origin": self.ipmi_url,
    "Referer": page_url,
    }
    csrf_token = self.get_csrf_token(url_name)
    if csrf_token is not None:
    headers["CSRF_TOKEN"] = csrf_token

    print("HEADERS:", headers)
    return headers

    def get_xhr_headers(self, url_name):
    headers = self.get_csrf_headers(url_name)
    headers["X-Requested-With"] = "XMLHttpRequest"
    return headers

    def login(self, username, password):
    """
    Log into IPMI interface
    :param username: username to use for logging in
    :param password: password to use for logging in
    :return: bool
    """
    if False:
    try:
    session_url = self.REDFISH_ROOT + "/SessionService/Sessions"
    session_data = {
    'UserName': username,
    'Password': password
    }
    result = self.session.post(session_url, json=session_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if True:
    login_data = {
    'name': b64encode(username.encode("UTF-8")),
    'pwd': b64encode(password.encode("UTF-8")),
    'check': '00'
    }
    else:
    login_data = {
    'name': username,
    'pwd': password
    }

    try:
    result = self.session.post(self.LOGIN_URL, login_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False
    if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text:
    return False

    #for cookie in session.cookies:
    # print(cookie)
    ipmi_info_url = IPMI_CERT_INFO_URL % url
    try:
    result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO> <STATUS>
    status = root.xpath('//IPMI/SSL_INFO/STATUS')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    has_cert = int(status.get('CERT_EXIST'))
    has_cert = bool(has_cert)
    if has_cert:
    valid_from = status.get('VALID_FROM')
    valid_until = status.get('VALID_UNTIL')

    return {
    'has_cert': has_cert,
    'valid_from': valid_from,
    'valid_until': valid_until
    }

    def get_ipmi_cert_valid(session, url, model):
    """
    Verify existing certificate information
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :return: bool
    """

    if model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_VALIDATE.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    # Set mandatory cookies:
    url_parts = urlparse(self.ipmi_url)
    # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl
    mandatory_cookies = {
    'langSetFlag': '0',
    'language': 'English'
    }
    elif model == "X11":
    cert_info_data = {
    'op': 'SSL_VALIDATE.XML',
    'r': '(0,0)',
    '_': ''
    for cookie_name, cookie_value in mandatory_cookies.items():
    self.session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname)

    return True


    def get_ipmi_cert_info(self):
    """
    Verify existing certificate information
    :return: dict
    """

    headers = self.get_xhr_headers("config_ssl")

    if self.model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_STATUS.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    elif self.model == "X11":
    cert_info_data = {
    'op': 'SSL_STATUS.XML',
    'r': '(0,0)',
    '_': ''
    }

    #for cookie in session.cookies:
    # print(cookie)
    try:
    ipmi_info_url = self.IPMI_CERT_INFO_URL
    result = self.session.post(ipmi_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    print("Result.text:", result.text)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO> <STATUS>
    status = root.xpath('//IPMI/SSL_INFO/STATUS')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    has_cert = int(status.get('CERT_EXIST'))
    has_cert = bool(has_cert)
    if has_cert:
    valid_from = status.get('VALID_FROM')
    valid_until = status.get('VALID_UNTIL')

    return {
    'has_cert': has_cert,
    'valid_from': valid_from,
    'valid_until': valid_until
    }

    def get_ipmi_cert_valid(self):
    """
    Verify existing certificate information
    :return: bool
    """

    headers = self.get_xhr_headers("config_ssl")

    if self.model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_VALIDATE.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    elif self.model == "X11":
    cert_info_data = {
    'op': 'SSL_VALIDATE.XML',
    'r': '(0,0)',
    '_': ''
    }


    #for cookie in session.cookies:
    # print(cookie)
    try:
    ipmi_info_url = self.IPMI_CERT_INFO_URL
    result = self.session.post(ipmi_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    #for cookie in session.cookies:
    # print(cookie)
    ipmi_info_url = IPMI_CERT_INFO_URL % url
    try:
    result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO>
    status = root.xpath('//IPMI/SSL_INFO')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    valid_cert = int(status.get('VALIDATE'))
    return bool(valid_cert)

    def upload_cert(session, url, key_file, cert_file, model):
    """
    Send X.509 certificate and private key to server
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :param key_file: filename to X.509 certificate private key
    :param cert_file: filename to X.509 certificate PEM
    :return:
    """
    with open(key_file, 'rb') as filehandle:
    key_data = filehandle.read()
    with open(cert_file, 'rb') as filehandle:
    cert_data = filehandle.read()
    # extract certificates only (IMPI doesn't like DH PARAMS)
    cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n'



    if model == "X10":
    files_to_upload = [
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')),
    ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream'))
    ]
    elif model == "X11":
    files_to_upload = [
    ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')),
    ('key_file', ('privkey.pem', key_data, 'application/octet-stream'))
    ]

    upload_cert_url = UPLOAD_CERT_URL % url
    try:
    result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False
    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO>
    status = root.xpath('//IPMI/SSL_INFO')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    valid_cert = int(status.get('VALIDATE'))
    return bool(valid_cert)

    def upload_cert(self, key_file, cert_file):
    """
    Send X.509 certificate and private key to server
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :param key_file: filename to X.509 certificate private key
    :param cert_file: filename to X.509 certificate PEM
    :return:
    """
    with open(key_file, 'rb') as filehandle:
    key_data = filehandle.read()
    with open(cert_file, 'rb') as filehandle:
    cert_data = filehandle.read()
    # extract certificates only (IMPI doesn't like DH PARAMS)
    cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n'

    if self.model == "X10":
    files_to_upload = [
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')),
    ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream'))
    ]
    elif self.model == "X11":
    files_to_upload = [
    ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')),
    ('key_file', ('privkey.pem', key_data, 'application/octet-stream'))
    ]

    headers = self.get_csrf_headers("config_ssl")
    csrf_token = self.get_csrf_token("config_ssl")
    csrf_data = { "CSRF_TOKEN": csrf_token }

    try:
    upload_cert_url = self.UPLOAD_CERT_URL
    result = self.session.post(upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html':
    # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked'
    return False
    if 'CONFPAGE_RESET' not in result.text:
    return False

    return True
    if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html':
    # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked'
    return False
    if 'CONFPAGE_RESET' not in result.text:
    return False

    return True

    def reboot_ipmi(session, url, model):
    if model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    reboot_data = {
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    def reboot_ipmi(self):
    # do we need a different Referer here?
    headers = self.get_xhr_headers("config_ssl")

    reboot_url = REBOOT_IPMI_X10_URL % url
    elif model == "X11":
    reboot_data = {
    'op': 'main_bmcreset',
    '_': ''
    }
    if self.model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    reboot_url = REBOOT_IPMI_X11_URL % url
    reboot_data = {
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    try:
    result = session.post(reboot_url, reboot_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False
    reboot_url = self.REBOOT_IPMI_X10_URL
    elif self.model == "X11":
    reboot_data = {
    'op': 'main_bmcreset',
    '_': ''
    }

    reboot_url = self.REBOOT_IPMI_X11_URL

    if model == "X10":
    if '<STATE CODE="OK"/>' not in result.text:
    try:
    result = self.session.post(reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    return True
    if self.model == "X10":
    if '<STATE CODE="OK"/>' not in result.text:
    return False

    return True

    def create_updater(args):
    session = requests.session()

    # First determine if we are X10 or X11
    model = determine_model(session, args.ipmi_url)
    if not args.quiet:
    print("Board model is " + model)

    return IPMIUpdater(session, args.ipmi_url, model)


    def determine_model(session, url):
    redfish_url = REDFISH_ROOT % url
    def determine_model(session, ipmi_url):
    redfish_url = f'{ipmi_url}/redfish/v1/'

    try:
    r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False)
    @@ -287,6 +369,9 @@ def main():
    args.ipmi_url = args.ipmi_url[0:-1]

    if not args.quiet:
    import http.client as http_client
    http_client.HTTPConnection.debuglevel = 1

    # Enable reuest logging
    logging.basicConfig()
    logging.getLogger().setLevel(logging.DEBUG)
    @@ -296,55 +381,39 @@ def main():

    # Start the operation
    requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
    session = requests.session()

    # First determine if we are X10 or X11
    model = determine_model(session, args.ipmi_url)
    if not args.quiet:
    print("Board model is " + model)
    updater = create_updater(args)

    if not login(session, args.ipmi_url, args.username, args.password):
    if not updater.login(args.username, args.password):
    print("Login failed. Cannot continue!")
    exit(2)

    # Set mandatory cookies:
    url_parts = urlparse(args.ipmi_url)
    # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl
    mandatory_cookies = {
    'langSetFlag': '0',
    'language': 'English',
    'mainpage': 'configuration',
    'subpage': 'config_ssl'
    }
    for cookie_name, cookie_value in mandatory_cookies.items():
    session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname)

    cert_info = get_ipmi_cert_info(session, args.ipmi_url, model)
    cert_info = updater.get_ipmi_cert_info()
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    if model == "X11":
    if updater.model == "X11":
    print("Try checking either your IPMI network config or your IPMI license!")
    exit(2)
    if not args.quiet and cert_info['has_cert']:
    print("There exists a certificate, which is valid until: %s" % cert_info['valid_until'])

    # Go upload!
    if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file, model):
    if not updater.upload_cert(args.key_file, args.cert_file):
    print("Failed to upload X.509 files to IPMI!")
    exit(2)

    cert_valid = get_ipmi_cert_valid(session, args.ipmi_url, model)
    cert_valid = updater.get_ipmi_cert_valid()
    if not cert_valid:
    print("Uploads failed validation")
    exit(2)

    if not args.quiet:
    print("Uploaded files ok.")

    cert_info = get_ipmi_cert_info(session, args.ipmi_url, model)
    cert_info = updater.get_ipmi_cert_info()
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    if model == "X11":
    if updater.model == "X11":
    print("Try checking either your IPMI network config or your IPMI license!")
    exit(2)
    if not args.quiet and cert_info['has_cert']:
    @@ -353,7 +422,7 @@ def main():
    if not args.no_reboot:
    if not args.quiet:
    print("Rebooting IPMI to apply changes.")
    if not reboot_ipmi(session, args.ipmi_url, model):
    if not updater.reboot_ipmi():
    print("Rebooting failed! Go reboot it manually?")

    if not args.quiet:
  6. @oxc oxc revised this gist Aug 31, 2020. 1 changed file with 4 additions and 0 deletions.
    4 changes: 4 additions & 0 deletions ipmi-updater.py
    100644 → 100755
    Original file line number Diff line number Diff line change
    @@ -21,6 +21,7 @@

    import os
    import argparse
    import re
    import requests
    import logging
    from datetime import datetime
    @@ -176,6 +177,9 @@ def upload_cert(session, url, key_file, cert_file, model):
    key_data = filehandle.read()
    with open(cert_file, 'rb') as filehandle:
    cert_data = filehandle.read()
    # extract certificates only (IMPI doesn't like DH PARAMS)
    cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n'



    if model == "X10":
  7. @oxc oxc revised this gist Oct 3, 2019. 1 changed file with 83 additions and 84 deletions.
    167 changes: 83 additions & 84 deletions ipmi-updater.py
    Original file line number Diff line number Diff line change
    @@ -33,12 +33,10 @@
    LOGIN_URL = '%s/cgi/login.cgi'
    IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi'
    UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi'
    REBOOT_IPMI_URL = '%s/cgi/BMCReset.cgi'
    REBOOT_IPMI_X10_URL = '%s/cgi/BMCReset.cgi'
    REBOOT_IPMI_X11_URL = '%s/cgi/op.cgi'
    CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl'
    REDFISH_ROOT = '%s/redfish/v1/'
    REDFISH_SSL = '%sUpdateService/SSLCert/'
    REDFISH_SSL_UPLOAD = '%sUpdateService/SSLCert/Actions/SSLCert.Upload'
    REDFISH_BMC_REBOOT = '%sManagers/1/Actions/Manager.Reset'

    def login(session, url, username, password):
    """
    @@ -68,35 +66,28 @@ def login(session, url, username, password):
    return True


    def get_ipmi_cert_info(session, url, model, user, password):
    def get_ipmi_cert_info(session, url, model):
    """
    Verify existing certificate information
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :return: dict
    """
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    if model == "X11":
    try:
    r = session.get(REDFISH_SSL % (REDFISH_ROOT % url), auth=HTTPBasicAuth(user, password), verify=False)
    except ConnectionError:
    return False
    if not r.ok:
    return False
    if model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    data = r.json()
    return {
    'has_cert': True,
    'valid_from': data['VaildFrom'], # Yes, Supermicro made a typo in their BMC API.
    'valid_until': data['GoodTHRU']
    cert_info_data = {
    'SSL_STATUS.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    elif model == "X11":
    cert_info_data = {
    'op': 'SSL_STATUS.XML',
    'r': '(0,0)',
    '_': ''
    }

    cert_info_data = {
    'SSL_STATUS.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    #for cookie in session.cookies:
    # print(cookie)
    @@ -127,20 +118,29 @@ def get_ipmi_cert_info(session, url, model, user, password):
    'valid_until': valid_until
    }

    def get_ipmi_cert_valid(session, url):
    def get_ipmi_cert_valid(session, url, model):
    """
    Verify existing certificate information
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :return: bool
    """
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_VALIDATE.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    if model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_VALIDATE.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    elif model == "X11":
    cert_info_data = {
    'op': 'SSL_VALIDATE.XML',
    'r': '(0,0)',
    '_': ''
    }


    #for cookie in session.cookies:
    # print(cookie)
    @@ -162,7 +162,7 @@ def get_ipmi_cert_valid(session, url):
    valid_cert = int(status.get('VALIDATE'))
    return bool(valid_cert)

    def upload_cert(session, url, key_file, cert_file, model, user, password):
    def upload_cert(session, url, key_file, cert_file, model):
    """
    Send X.509 certificate and private key to server
    :param session: Current session object
    @@ -177,52 +177,54 @@ def upload_cert(session, url, key_file, cert_file, model, user, password):
    with open(cert_file, 'rb') as filehandle:
    cert_data = filehandle.read()


    if model == "X10":
    files_to_upload = [
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')),
    ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream'))
    ]
    upload_cert_url = UPLOAD_CERT_URL % url

    elif model == "X11":
    upload_cert_url = REDFISH_SSL_UPLOAD % (REDFISH_ROOT % url)

    files_to_upload = {
    'cert_file' : ('fullchain.pem', open(cert_file, 'rb')),
    'key_file' : ('privkey.pem', open(key_file, 'rb'))
    }

    files_to_upload = [
    ('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')),
    ('key_file', ('privkey.pem', key_data, 'application/octet-stream'))
    ]

    upload_cert_url = UPLOAD_CERT_URL % url
    try:
    result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, auth=HTTPBasicAuth(user, password) if model == "X11" else None, verify=False)
    result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if model == "X10":
    if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html':
    # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked'
    return False
    if 'CONFPAGE_RESET' not in result.text:
    return False
    if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html':
    # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked'
    return False
    if 'CONFPAGE_RESET' not in result.text:
    return False

    return True


    def reboot_ipmi(session, url, model, user, password):
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')
    def reboot_ipmi(session, url, model):
    if model == "X10":
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    reboot_data = {
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }
    reboot_data = {
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    if model == "X10":
    reboot_url = REBOOT_IPMI_URL % url
    reboot_url = REBOOT_IPMI_X10_URL % url
    elif model == "X11":
    reboot_url = REDFISH_BMC_REBOOT % (REDFISH_ROOT % url)
    reboot_data = {
    'op': 'main_bmcreset',
    '_': ''
    }

    reboot_url = REBOOT_IPMI_X11_URL % url

    try:
    result = session.post(reboot_url, reboot_data, timeout=REQUEST_TIMEOUT, auth=HTTPBasicAuth(user, password) if model == "X11" else None, verify=False)
    result = session.post(reboot_url, reboot_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    @@ -250,7 +252,7 @@ def determine_model(session, url):
    if "UpdateService" in data:
    return "X11"
    else:
    return "X10"
    return "X10"

    def main():
    parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate')
    @@ -297,24 +299,23 @@ def main():
    if not args.quiet:
    print("Board model is " + model)

    if model == "X10":
    if not login(session, args.ipmi_url, args.username, args.password):
    print("Login failed. Cannot continue!")
    exit(2)

    # Set mandatory cookies:
    url_parts = urlparse(args.ipmi_url)
    # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl
    mandatory_cookies = {
    'langSetFlag': '0',
    'language': 'English',
    'mainpage': 'configuration',
    'subpage': 'config_ssl'
    }
    for cookie_name, cookie_value in mandatory_cookies.items():
    session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname)
    if not login(session, args.ipmi_url, args.username, args.password):
    print("Login failed. Cannot continue!")
    exit(2)

    # Set mandatory cookies:
    url_parts = urlparse(args.ipmi_url)
    # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl
    mandatory_cookies = {
    'langSetFlag': '0',
    'language': 'English',
    'mainpage': 'configuration',
    'subpage': 'config_ssl'
    }
    for cookie_name, cookie_value in mandatory_cookies.items():
    session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname)

    cert_info = get_ipmi_cert_info(session, args.ipmi_url, model, args.username, args.password)
    cert_info = get_ipmi_cert_info(session, args.ipmi_url, model)
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    if model == "X11":
    @@ -324,21 +325,19 @@ def main():
    print("There exists a certificate, which is valid until: %s" % cert_info['valid_until'])

    # Go upload!
    if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file, model, args.username, args.password):
    if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file, model):
    print("Failed to upload X.509 files to IPMI!")
    exit(2)

    # Redfish currently doesn't have a way to download the certificate on X11 boards
    if model == "X10":
    cert_valid = get_ipmi_cert_valid(session, args.ipmi_url)
    if not cert_valid:
    print("Uploads failed validation")
    exit(2)
    cert_valid = get_ipmi_cert_valid(session, args.ipmi_url, model)
    if not cert_valid:
    print("Uploads failed validation")
    exit(2)

    if not args.quiet:
    print("Uploaded files ok.")
    if not args.quiet:
    print("Uploaded files ok.")

    cert_info = get_ipmi_cert_info(session, args.ipmi_url, model, args.username, args.password)
    cert_info = get_ipmi_cert_info(session, args.ipmi_url, model)
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    if model == "X11":
    @@ -350,7 +349,7 @@ def main():
    if not args.no_reboot:
    if not args.quiet:
    print("Rebooting IPMI to apply changes.")
    if not reboot_ipmi(session, args.ipmi_url, model, args.username, args.password):
    if not reboot_ipmi(session, args.ipmi_url, model):
    print("Rebooting failed! Go reboot it manually?")

    if not args.quiet:
  8. @dmerner dmerner revised this gist Sep 25, 2019. 1 changed file with 164 additions and 50 deletions.
    214 changes: 164 additions & 50 deletions ipmi-updater.py
    Original file line number Diff line number Diff line change
    @@ -22,9 +22,11 @@
    import os
    import argparse
    import requests
    import logging
    from datetime import datetime
    from lxml import etree
    from urllib.parse import urlparse
    from requests.auth import HTTPBasicAuth

    REQUEST_TIMEOUT = 5.0

    @@ -33,7 +35,10 @@
    UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi'
    REBOOT_IPMI_URL = '%s/cgi/BMCReset.cgi'
    CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl'

    REDFISH_ROOT = '%s/redfish/v1/'
    REDFISH_SSL = '%sUpdateService/SSLCert/'
    REDFISH_SSL_UPLOAD = '%sUpdateService/SSLCert/Actions/SSLCert.Upload'
    REDFISH_BMC_REBOOT = '%sManagers/1/Actions/Manager.Reset'

    def login(session, url, username, password):
    """
    @@ -63,7 +68,7 @@ def login(session, url, username, password):
    return True


    def get_ipmi_cert_info(session, url):
    def get_ipmi_cert_info(session, url, model, user, password):
    """
    Verify existing certificate information
    :param session: Current session object
    @@ -73,13 +78,28 @@ def get_ipmi_cert_info(session, url):
    """
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    if model == "X11":
    try:
    r = session.get(REDFISH_SSL % (REDFISH_ROOT % url), auth=HTTPBasicAuth(user, password), verify=False)
    except ConnectionError:
    return False
    if not r.ok:
    return False

    data = r.json()
    return {
    'has_cert': True,
    'valid_from': data['VaildFrom'], # Yes, Supermicro made a typo in their BMC API.
    'valid_until': data['GoodTHRU']
    }

    cert_info_data = {
    'SSL_STATUS.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    for cookie in session.cookies:
    print(cookie)
    #for cookie in session.cookies:
    # print(cookie)
    ipmi_info_url = IPMI_CERT_INFO_URL % url
    try:
    result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False)
    @@ -107,8 +127,42 @@ def get_ipmi_cert_info(session, url):
    'valid_until': valid_until
    }

    def get_ipmi_cert_valid(session, url):
    """
    Verify existing certificate information
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :return: bool
    """
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_VALIDATE.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    #for cookie in session.cookies:
    # print(cookie)
    ipmi_info_url = IPMI_CERT_INFO_URL % url
    try:
    result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO>
    status = root.xpath('//IPMI/SSL_INFO')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    valid_cert = int(status.get('VALIDATE'))
    return bool(valid_cert)

    def upload_cert(session, url, key_file, cert_file):
    def upload_cert(session, url, key_file, cert_file, model, user, password):
    """
    Send X.509 certificate and private key to server
    :param session: Current session object
    @@ -122,51 +176,81 @@ def upload_cert(session, url, key_file, cert_file):
    key_data = filehandle.read()
    with open(cert_file, 'rb') as filehandle:
    cert_data = filehandle.read()
    files_to_upload = [
    ('/tmp/cert.key', ('cert.key', key_data, 'application/octet-stream')),
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/x-x509-ca-cert'))
    ]

    upload_cert_url = UPLOAD_CERT_URL % url
    if model == "X10":
    files_to_upload = [
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')),
    ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream'))
    ]
    upload_cert_url = UPLOAD_CERT_URL % url

    elif model == "X11":
    upload_cert_url = REDFISH_SSL_UPLOAD % (REDFISH_ROOT % url)

    files_to_upload = {
    'cert_file' : ('fullchain.pem', open(cert_file, 'rb')),
    'key_file' : ('privkey.pem', open(key_file, 'rb'))
    }


    try:
    result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, verify=False)
    result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, auth=HTTPBasicAuth(user, password) if model == "X11" else None, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html':
    # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked'
    return False
    if 'CONFPAGE_RESET' not in result.text:
    return False
    if model == "X10":
    if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html':
    # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked'
    return False
    if 'CONFPAGE_RESET' not in result.text:
    return False

    return True


    def reboot_ipmi(session, url):
    def reboot_ipmi(session, url, model, user, password):
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    reboot_data = {
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    upload_cert_url = REBOOT_IPMI_URL % url
    if model == "X10":
    reboot_url = REBOOT_IPMI_URL % url
    elif model == "X11":
    reboot_url = REDFISH_BMC_REBOOT % (REDFISH_ROOT % url)
    try:
    result = session.post(upload_cert_url, reboot_data, timeout=REQUEST_TIMEOUT, verify=False)
    result = session.post(reboot_url, reboot_data, timeout=REQUEST_TIMEOUT, auth=HTTPBasicAuth(user, password) if model == "X11" else None, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    print("Url: %s" % upload_cert_url)
    print(result.headers)
    print(result.text)
    if '<STATE CODE="OK"/>' not in result.text:
    return False
    if model == "X10":
    if '<STATE CODE="OK"/>' not in result.text:
    return False

    return True

    def determine_model(session, url):
    redfish_url = REDFISH_ROOT % url

    try:
    r = session.get(redfish_url, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    exit(2)
    if not r.ok:
    exit(2)

    data = r.json()

    # The UpdateService methods are only available on newer X11 based boards
    if "UpdateService" in data:
    return "X11"
    else:
    return "X10"

    def main():
    parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate')
    @@ -180,8 +264,10 @@ def main():
    help='IPMI username with admin access')
    parser.add_argument('--password', required=True,
    help='IPMI user password')
    parser.add_argument('--no-reboot',
    parser.add_argument('--no-reboot', action='store_true',
    help='The default is to reboot the IPMI after upload for the change to take effect.')
    parser.add_argument('--quiet', action='store_true',
    help='Do not output anything if successful')
    args = parser.parse_args()

    # Confirm args
    @@ -194,53 +280,81 @@ def main():
    if args.ipmi_url[-1] == '/':
    args.ipmi_url = args.ipmi_url[0:-1]

    if not args.quiet:
    # Enable reuest logging
    logging.basicConfig()
    logging.getLogger().setLevel(logging.DEBUG)
    requests_log = logging.getLogger("requests.packages.urllib3")
    requests_log.setLevel(logging.DEBUG)
    requests_log.propagate = True

    # Start the operation
    requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
    session = requests.session()
    if not login(session, args.ipmi_url, args.username, args.password):
    print("Login failed. Cannot continue!")
    exit(2)


    # Set mandatory cookies:
    url_parts = urlparse(args.ipmi_url)
    # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl
    mandatory_cookies = {
    'langSetFlag': '0',
    'language': 'English',
    'mainpage': 'configuration',
    'subpage': 'config_ssl'
    }
    for cookie_name, cookie_value in mandatory_cookies.items():
    session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname)

    cert_info = get_ipmi_cert_info(session, args.ipmi_url)
    # First determine if we are X10 or X11
    model = determine_model(session, args.ipmi_url)
    if not args.quiet:
    print("Board model is " + model)

    if model == "X10":
    if not login(session, args.ipmi_url, args.username, args.password):
    print("Login failed. Cannot continue!")
    exit(2)

    # Set mandatory cookies:
    url_parts = urlparse(args.ipmi_url)
    # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl
    mandatory_cookies = {
    'langSetFlag': '0',
    'language': 'English',
    'mainpage': 'configuration',
    'subpage': 'config_ssl'
    }
    for cookie_name, cookie_value in mandatory_cookies.items():
    session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname)

    cert_info = get_ipmi_cert_info(session, args.ipmi_url, model, args.username, args.password)
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    if model == "X11":
    print("Try checking either your IPMI network config or your IPMI license!")
    exit(2)
    if cert_info['has_cert']:
    if not args.quiet and cert_info['has_cert']:
    print("There exists a certificate, which is valid until: %s" % cert_info['valid_until'])

    # Go upload!
    if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file):
    if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file, model, args.username, args.password):
    print("Failed to upload X.509 files to IPMI!")
    exit(2)

    print("Uploaded files ok.")
    # Redfish currently doesn't have a way to download the certificate on X11 boards
    if model == "X10":
    cert_valid = get_ipmi_cert_valid(session, args.ipmi_url)
    if not cert_valid:
    print("Uploads failed validation")
    exit(2)

    if not args.quiet:
    print("Uploaded files ok.")

    cert_info = get_ipmi_cert_info(session, args.ipmi_url)
    cert_info = get_ipmi_cert_info(session, args.ipmi_url, model, args.username, args.password)
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    if model == "X11":
    print("Try checking either your IPMI network config or your IPMI license!")
    exit(2)
    if cert_info['has_cert']:
    if not args.quiet and cert_info['has_cert']:
    print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until'])

    if not args.no_reboot:
    print("Rebooting IPMI to apply changes.")
    if not reboot_ipmi(session, args.ipmi_url):
    if not args.quiet:
    print("Rebooting IPMI to apply changes.")
    if not reboot_ipmi(session, args.ipmi_url, model, args.username, args.password):
    print("Rebooting failed! Go reboot it manually?")

    print("All done!")
    if not args.quiet:
    print("All done!")


    if __name__ == "__main__":
  9. mcdamo revised this gist Feb 7, 2019. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion ipmi-updater.py
    Original file line number Diff line number Diff line change
    @@ -287,7 +287,7 @@ def main():

    if not args.no_reboot:
    if not args.quiet:
    print("Rebooting IPMI to apply changes.")
    print("Rebooting IPMI to apply changes.")
    if not reboot_ipmi(session, args.ipmi_url):
    print("Rebooting failed! Go reboot it manually?")

  10. mcdamo revised this gist Feb 7, 2019. 1 changed file with 66 additions and 14 deletions.
    80 changes: 66 additions & 14 deletions ipmi-updater.py
    100644 → 100755
    Original file line number Diff line number Diff line change
    @@ -22,6 +22,7 @@
    import os
    import argparse
    import requests
    import logging
    from datetime import datetime
    from lxml import etree
    from urllib.parse import urlparse
    @@ -78,8 +79,8 @@ def get_ipmi_cert_info(session, url):
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    for cookie in session.cookies:
    print(cookie)
    #for cookie in session.cookies:
    # print(cookie)
    ipmi_info_url = IPMI_CERT_INFO_URL % url
    try:
    result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False)
    @@ -107,6 +108,40 @@ def get_ipmi_cert_info(session, url):
    'valid_until': valid_until
    }

    def get_ipmi_cert_valid(session, url):
    """
    Verify existing certificate information
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :return: bool
    """
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_VALIDATE.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    #for cookie in session.cookies:
    # print(cookie)
    ipmi_info_url = IPMI_CERT_INFO_URL % url
    try:
    result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO>
    status = root.xpath('//IPMI/SSL_INFO')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    valid_cert = int(status.get('VALIDATE'))
    return bool(valid_cert)

    def upload_cert(session, url, key_file, cert_file):
    """
    @@ -123,8 +158,8 @@ def upload_cert(session, url, key_file, cert_file):
    with open(cert_file, 'rb') as filehandle:
    cert_data = filehandle.read()
    files_to_upload = [
    ('/tmp/cert.key', ('cert.key', key_data, 'application/octet-stream')),
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/x-x509-ca-cert'))
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/octet-stream')),
    ('/tmp/key.pem', ('key.pem', key_data, 'application/octet-stream'))
    ]

    upload_cert_url = UPLOAD_CERT_URL % url
    @@ -140,7 +175,6 @@ def upload_cert(session, url, key_file, cert_file):
    return False
    if 'CONFPAGE_RESET' not in result.text:
    return False

    return True


    @@ -159,9 +193,9 @@ def reboot_ipmi(session, url):
    if not result.ok:
    return False

    print("Url: %s" % upload_cert_url)
    print(result.headers)
    print(result.text)
    #print("Url: %s" % upload_cert_url)
    #print(result.headers)
    #print(result.text)
    if '<STATE CODE="OK"/>' not in result.text:
    return False

    @@ -180,8 +214,10 @@ def main():
    help='IPMI username with admin access')
    parser.add_argument('--password', required=True,
    help='IPMI user password')
    parser.add_argument('--no-reboot',
    parser.add_argument('--no-reboot', action='store_true',
    help='The default is to reboot the IPMI after upload for the change to take effect.')
    parser.add_argument('--quiet', action='store_true',
    help='Do not output anything if successful')
    args = parser.parse_args()

    # Confirm args
    @@ -194,6 +230,14 @@ def main():
    if args.ipmi_url[-1] == '/':
    args.ipmi_url = args.ipmi_url[0:-1]

    if not args.quiet:
    # Enable reuest logging
    logging.basicConfig()
    logging.getLogger().setLevel(logging.DEBUG)
    requests_log = logging.getLogger("requests.packages.urllib3")
    requests_log.setLevel(logging.DEBUG)
    requests_log.propagate = True

    # Start the operation
    requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
    session = requests.session()
    @@ -218,29 +262,37 @@ def main():
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    exit(2)
    if cert_info['has_cert']:
    if not args.quiet and cert_info['has_cert']:
    print("There exists a certificate, which is valid until: %s" % cert_info['valid_until'])

    # Go upload!
    if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file):
    print("Failed to upload X.509 files to IPMI!")
    exit(2)

    print("Uploaded files ok.")
    cert_valid = get_ipmi_cert_valid(session, args.ipmi_url)
    if not cert_valid:
    print("Uploads failed validation")
    exit(2)

    if not args.quiet:
    print("Uploaded files ok.")

    cert_info = get_ipmi_cert_info(session, args.ipmi_url)
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    exit(2)
    if cert_info['has_cert']:
    if not args.quiet and cert_info['has_cert']:
    print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until'])

    if not args.no_reboot:
    print("Rebooting IPMI to apply changes.")
    if not args.quiet:
    print("Rebooting IPMI to apply changes.")
    if not reboot_ipmi(session, args.ipmi_url):
    print("Rebooting failed! Go reboot it manually?")

    print("All done!")
    if not args.quiet:
    print("All done!")


    if __name__ == "__main__":
  11. @HQJaTu HQJaTu revised this gist Oct 21, 2018. 1 changed file with 40 additions and 5 deletions.
    45 changes: 40 additions & 5 deletions ipmi-updater.py
    Original file line number Diff line number Diff line change
    @@ -24,13 +24,15 @@
    import requests
    from datetime import datetime
    from lxml import etree
    from urllib.parse import urlparse

    REQUEST_TIMEOUT = 5.0

    LOGIN_URL = '%s/cgi/login.cgi'
    IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi'
    UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi'
    REBOOT_IPMI_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl_fw_reset'
    REBOOT_IPMI_URL = '%s/cgi/BMCReset.cgi'
    CONFIG_CERT_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl'


    def login(session, url, username, password):
    @@ -76,6 +78,8 @@ def get_ipmi_cert_info(session, url):
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    for cookie in session.cookies:
    print(cookie)
    ipmi_info_url = IPMI_CERT_INFO_URL % url
    try:
    result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False)
    @@ -119,8 +123,8 @@ def upload_cert(session, url, key_file, cert_file):
    with open(cert_file, 'rb') as filehandle:
    cert_data = filehandle.read()
    files_to_upload = [
    ('/tmp/key.pem', ('/tmp/key.pem', key_data, 'application/octet-stream')),
    ('/tmp/cert.pem', ('/tmp/cert.pem', cert_data, 'application/x-x509-ca-cert'))
    ('/tmp/cert.key', ('cert.key', key_data, 'application/octet-stream')),
    ('/tmp/cert.pem', ('cert.pem', cert_data, 'application/x-x509-ca-cert'))
    ]

    upload_cert_url = UPLOAD_CERT_URL % url
    @@ -141,15 +145,24 @@ def upload_cert(session, url, key_file, cert_file):


    def reboot_ipmi(session, url):
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    reboot_data = {
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    upload_cert_url = REBOOT_IPMI_URL % url
    try:
    result = session.get(upload_cert_url, timeout=REQUEST_TIMEOUT, verify=False)
    result = session.post(upload_cert_url, reboot_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if 'LANG_FW_RESET_DESC1' not in result.text:
    print("Url: %s" % upload_cert_url)
    print(result.headers)
    print(result.text)
    if '<STATE CODE="OK"/>' not in result.text:
    return False

    return True
    @@ -187,6 +200,20 @@ def main():
    if not login(session, args.ipmi_url, args.username, args.password):
    print("Login failed. Cannot continue!")
    exit(2)


    # Set mandatory cookies:
    url_parts = urlparse(args.ipmi_url)
    # Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl
    mandatory_cookies = {
    'langSetFlag': '0',
    'language': 'English',
    'mainpage': 'configuration',
    'subpage': 'config_ssl'
    }
    for cookie_name, cookie_value in mandatory_cookies.items():
    session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname)

    cert_info = get_ipmi_cert_info(session, args.ipmi_url)
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    @@ -200,6 +227,14 @@ def main():
    exit(2)

    print("Uploaded files ok.")

    cert_info = get_ipmi_cert_info(session, args.ipmi_url)
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    exit(2)
    if cert_info['has_cert']:
    print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until'])

    if not args.no_reboot:
    print("Rebooting IPMI to apply changes.")
    if not reboot_ipmi(session, args.ipmi_url):
  12. @HQJaTu HQJaTu created this gist Jul 12, 2018.
    212 changes: 212 additions & 0 deletions ipmi-updater.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,212 @@
    #!/usr/bin/env python3

    # vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python

    # This file is part of Supermicro IPMI certificate updater.
    # Supermicro IPMI certificate updater is free software: you can
    # redistribute it and/or modify it under the terms of the GNU General Public
    # License as published by the Free Software Foundation, version 2.
    #
    # This program is distributed in the hope that it will be useful, but WITHOUT
    # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
    # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
    # details.
    #
    # You should have received a copy of the GNU General Public License along with
    # this program; if not, write to the Free Software Foundation, Inc., 51
    # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
    #
    # Copyright (c) Jari Turkia


    import os
    import argparse
    import requests
    from datetime import datetime
    from lxml import etree

    REQUEST_TIMEOUT = 5.0

    LOGIN_URL = '%s/cgi/login.cgi'
    IPMI_CERT_INFO_URL = '%s/cgi/ipmi.cgi'
    UPLOAD_CERT_URL = '%s/cgi/upload_ssl.cgi'
    REBOOT_IPMI_URL = '%s/cgi/url_redirect.cgi?url_name=config_ssl_fw_reset'


    def login(session, url, username, password):
    """
    Log into IPMI interface
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :param username: username to use for logging in
    :param password: password to use for logging in
    :return: bool
    """
    login_data = {
    'name': username,
    'pwd': password
    }

    login_url = LOGIN_URL % url
    try:
    result = session.post(login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False
    if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text:
    return False

    return True


    def get_ipmi_cert_info(session, url):
    """
    Verify existing certificate information
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :return: dict
    """
    timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT')

    cert_info_data = {
    'SSL_STATUS.XML': '(0,0)',
    'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)'
    }

    ipmi_info_url = IPMI_CERT_INFO_URL % url
    try:
    result = session.post(ipmi_info_url, cert_info_data, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    root = etree.fromstring(result.text)
    # <?xml> <IPMI> <SSL_INFO> <STATUS>
    status = root.xpath('//IPMI/SSL_INFO/STATUS')
    if not status:
    return False
    # Since xpath will return a list, just pick the first one from it.
    status = status[0]
    has_cert = int(status.get('CERT_EXIST'))
    has_cert = bool(has_cert)
    if has_cert:
    valid_from = status.get('VALID_FROM')
    valid_until = status.get('VALID_UNTIL')

    return {
    'has_cert': has_cert,
    'valid_from': valid_from,
    'valid_until': valid_until
    }


    def upload_cert(session, url, key_file, cert_file):
    """
    Send X.509 certificate and private key to server
    :param session: Current session object
    :type session requests.session
    :param url: base-URL to IPMI
    :param key_file: filename to X.509 certificate private key
    :param cert_file: filename to X.509 certificate PEM
    :return:
    """
    with open(key_file, 'rb') as filehandle:
    key_data = filehandle.read()
    with open(cert_file, 'rb') as filehandle:
    cert_data = filehandle.read()
    files_to_upload = [
    ('/tmp/key.pem', ('/tmp/key.pem', key_data, 'application/octet-stream')),
    ('/tmp/cert.pem', ('/tmp/cert.pem', cert_data, 'application/x-x509-ca-cert'))
    ]

    upload_cert_url = UPLOAD_CERT_URL % url
    try:
    result = session.post(upload_cert_url, files=files_to_upload, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html':
    # On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked'
    return False
    if 'CONFPAGE_RESET' not in result.text:
    return False

    return True


    def reboot_ipmi(session, url):
    upload_cert_url = REBOOT_IPMI_URL % url
    try:
    result = session.get(upload_cert_url, timeout=REQUEST_TIMEOUT, verify=False)
    except ConnectionError:
    return False
    if not result.ok:
    return False

    if 'LANG_FW_RESET_DESC1' not in result.text:
    return False

    return True


    def main():
    parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate')
    parser.add_argument('--ipmi-url', required=True,
    help='Supermicro IPMI 2.0 URL')
    parser.add_argument('--key-file', required=True,
    help='X.509 Private key filename')
    parser.add_argument('--cert-file', required=True,
    help='X.509 Certificate filename')
    parser.add_argument('--username', required=True,
    help='IPMI username with admin access')
    parser.add_argument('--password', required=True,
    help='IPMI user password')
    parser.add_argument('--no-reboot',
    help='The default is to reboot the IPMI after upload for the change to take effect.')
    args = parser.parse_args()

    # Confirm args
    if not os.path.isfile(args.key_file):
    print("--key-file '%s' doesn't exist!" % args.key_file)
    exit(2)
    if not os.path.isfile(args.cert_file):
    print("--cert-file '%s' doesn't exist!" % args.cert_file)
    exit(2)
    if args.ipmi_url[-1] == '/':
    args.ipmi_url = args.ipmi_url[0:-1]

    # Start the operation
    requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
    session = requests.session()
    if not login(session, args.ipmi_url, args.username, args.password):
    print("Login failed. Cannot continue!")
    exit(2)
    cert_info = get_ipmi_cert_info(session, args.ipmi_url)
    if not cert_info:
    print("Failed to extract certificate information from IPMI!")
    exit(2)
    if cert_info['has_cert']:
    print("There exists a certificate, which is valid until: %s" % cert_info['valid_until'])

    # Go upload!
    if not upload_cert(session, args.ipmi_url, args.key_file, args.cert_file):
    print("Failed to upload X.509 files to IPMI!")
    exit(2)

    print("Uploaded files ok.")
    if not args.no_reboot:
    print("Rebooting IPMI to apply changes.")
    if not reboot_ipmi(session, args.ipmi_url):
    print("Rebooting failed! Go reboot it manually?")

    print("All done!")


    if __name__ == "__main__":
    main()