Created
June 19, 2023 17:16
-
-
Save alexeldeib/5c4191d98f4568a61ae951e1a3951f48 to your computer and use it in GitHub Desktop.
Azure Wireserver Extension config extraction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import http.client | |
import xml.etree.ElementTree as ET | |
from urllib.parse import urlparse | |
import json | |
from subprocess import Popen, PIPE, STDOUT | |
import base64 | |
try: | |
# request goalstate from wireserver | |
wireserver = "168.63.129.16" | |
conn = http.client.HTTPConnection(wireserver) | |
conn.request("GET", "/machine/?comp=goalstate", headers={"x-ms-version": "2012-11-30"}) | |
goalstate_response = conn.getresponse() | |
# response body is xml. read and parse. | |
goalstate_xml_str = goalstate_response.read() | |
goalstate_dom = ET.fromstring(goalstate_xml_str) | |
# extract URL of extension config from parsed XML | |
extension_config_str=goalstate_dom.find("./Container/RoleInstanceList/RoleInstance/Configuration/ExtensionsConfig").text | |
# url is http://168.63.129.16/foo/bar/baz?foo=bar&bar=baz | |
# split scheme and host, use only path with query params | |
# http connecton is already opened to wireserver | |
extension_config_url=urlparse(extension_config_str) | |
query_path=f"{extension_config_url.path}?{extension_config_url.query}" | |
# 2nd request to extension config URL. | |
conn.request("GET", query_path, headers={"x-ms-version": "2012-11-30"}) | |
extension_config_response = conn.getresponse() | |
extension_config_str = extension_config_response.read() | |
extension_config_dom = ET.fromstring(extension_config_str) | |
# CSE config is b64-encoded json embedded inside xml | |
# extract it, base64 decode it, and parse it as json. | |
extension_config_json_str = extension_config_dom.find("./PluginSettings/Plugin/[@name='Microsoft.Azure.Extensions.CustomScript']/RuntimeSettings").text | |
extjson = json.loads(extension_config_json_str) | |
# extract the certificate used to encrypt the protect settings | |
# also extract the encrypted protected settings. | |
thumbprint = extjson['runtimeSettings'][0]['handlerSettings']['protectedSettingsCertThumbprint'] | |
enc_settings = extjson['runtimeSettings'][0]['handlerSettings']['protectedSettings'] | |
protected_settings=base64.b64decode(enc_settings) | |
key=f"/var/lib/waagent/{thumbprint}.prv" | |
# shell out to openssl for decryption. | |
# avoids relying on 3p python imports | |
# lets us use only python stdlib for compat/no egress calls. | |
p = Popen(['openssl', 'smime', '-decrypt', '-inform', 'der', '-inkey', key], stdout=PIPE, stdin=PIPE, stderr=STDOUT) | |
sslout = p.communicate(input=protected_settings)[0] | |
# the output has been b64-decoded to binary DER then decrypted. | |
# the result is the original json AKS set for extension settings. | |
# from this, we extract the actual command we wanted to execute | |
csejson = json.loads(sslout.decode())['commandToExecute'] | |
print(csejson) | |
except Exception: | |
print(f"failed to extract cse: {Exception}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment