Last active
May 11, 2020 16:02
-
-
Save pshchelo/952d247b4dec1bacc6e023a343e29ba8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Example script on how to get the OpenIDConnect refresh token | |
developed / tested with Okta https://developer.okta.com/ | |
should be good with any generic OpenIDConnect provider and | |
self-deployed Kubernetes tied into it. | |
Inspired by | |
https://github.com/micahhausler/k8s-oidc-helper | |
https://github.com/making/k8s-keycloak-oidc-helper | |
https://github.com/okta/samples-python-flask/tree/master/okta-hosted-login | |
https://requests-oauthlib.readthedocs.io/en/latest/oauth2_workflow.html#web-application-flow | |
https://medium.com/@mrbobbytables/kubernetes-day-2-operations-authn-authz-with-oidc-and-a-little-help-from-keycloak-de4ea1bdbbe | |
""" | |
# token example | |
# { | |
# "token_type": "Bearer", | |
# "expires_in": 3600, | |
# "access_token": "...", # <-- JWT | |
# "scope": ["offline_access", "email", "openid"], | |
# "refresh_token": "...", # <-- some random string/secret | |
# "id_token": "...", # <-- JWT | |
# "expires_at": 1586890825.6915326 | |
# } | |
import os | |
import webbrowser | |
from requests_oauthlib import OAuth2Session | |
import yaml | |
oauth_server_uri = os.getenv("OAUTH_URI") | |
client_id = os.getenv("OAUTH_CLIENT_ID") | |
client_secret = os.getenv("OAUTH_CLIENT_SECRET") | |
# default is from Okta default when creating a webapp but with HTTPS scheme | |
redirect_uri = os.getenv("OAUTH_REDIRECT_URI", "https://localhost:8080/authorization-code/callback") | |
verify = os.getenv("OAUTH_CA_BUNDLE", None) | |
def get_auth(oauth_url): | |
# scopes and kwargs to authorization_url are what may differentiate | |
# one OpenID IdP from another. | |
scope = [ | |
"openid", # mandatory for OpenIDConnect auth | |
"email", # smallest and most consistent scope and claim | |
"offline_access", # needed to actually ask for refresh_token | |
] | |
oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scope) | |
auth_server_info = oauth.get( | |
f"{oauth_url}/.well-known/openid-configuration", | |
withhold_token=True | |
).json() | |
auth_url = auth_server_info["authorization_endpoint"] | |
token_url = auth_server_info["token_endpoint"] | |
authorization_url, state = oauth.authorization_url( | |
auth_url, | |
access_type="offline", # not sure if it is actually always needed, | |
# may be a cargo-cult from Google-based example | |
) | |
print(f'Please go to "{authorization_url}" and authorize access.') | |
webbrowser.open(authorization_url) | |
authorization_response = input( | |
"Enter the full callback URL as attempted by the browser: " | |
) | |
token = oauth.fetch_token( | |
token_url, | |
authorization_response=authorization_response, | |
client_secret=client_secret, | |
timeout=60, | |
verify=verify, | |
) | |
# discover user email | |
userinfo = oauth.get( | |
auth_server_info["userinfo_endpoint"], | |
timeout=60, | |
verify=verify, | |
).json() | |
return userinfo["email"], token | |
def kubeconfig_user(token): | |
user = { | |
"auth-provider": { | |
"name": "oidc", | |
"config": { | |
"client-id": client_id, | |
"idp-issuer-url": oauth_server_uri, | |
"id-token": token["id_token"], | |
"refresh-token": token.get("refresh_token"), | |
} | |
} | |
} | |
if client_secret: | |
user["auth-provider"]["config"]["client-secret"] = client_secret | |
if verify: | |
user["auth-provider"]["config"]["idp-certificate-authority"] = verify | |
return user | |
def auth(): | |
user, token = get_auth(oauth_server_uri) | |
print(f"Got token:\n{token}") | |
kube_user = kubeconfig_user(token) | |
config_snippet = {"users": [{"name": user, "user": kube_user}]} | |
print("\nPaste/merge this user into your $KUBECONFIG\n") | |
print(yaml.safe_dump(config_snippet)) | |
with open("/tmp/kubeuser", "w") as f: | |
yaml.safe_dump(kube_user, f) | |
print("persisted in /tmp/kubeuser") | |
return token.get("expires_in", 3600) | |
def refresh(): | |
with open("/tmp/kubeuser") as f: | |
user = yaml.safe_load(f) | |
config = user["auth-provider"]["config"] | |
oauth = OAuth2Session() | |
auth_server_info = oauth.get( | |
f"{config['idp-issuer-url']}/.well-known/openid-configuration", | |
withhold_token=True, | |
verify=config.get("idp-certificate-authority"), | |
timeout=60, | |
).json() | |
token = oauth.refresh_token( | |
token_url=auth_server_info["token_endpoint"], | |
refresh_token=config["refresh-token"], | |
client_id=config["client-id"], | |
client_secret=config.get("client-secret"), | |
verify=config.get("idp-certificate-authority"), | |
timeout=60, | |
) | |
return token | |
expires_in = auth() | |
# time.sleep(expires_in * 1.1) | |
# refresh() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment