Skip to content

Instantly share code, notes, and snippets.

@pshchelo
Last active May 11, 2020 16:02
Show Gist options
  • Save pshchelo/952d247b4dec1bacc6e023a343e29ba8 to your computer and use it in GitHub Desktop.
Save pshchelo/952d247b4dec1bacc6e023a343e29ba8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Example script on how to get the OpenIDConnect refresh token
developed / tested with Okta https://developer.okta.com/
should be good with any generic OpenIDConnect provider and
self-deployed Kubernetes tied into it.
Inspired by
https://github.com/micahhausler/k8s-oidc-helper
https://github.com/making/k8s-keycloak-oidc-helper
https://github.com/okta/samples-python-flask/tree/master/okta-hosted-login
https://requests-oauthlib.readthedocs.io/en/latest/oauth2_workflow.html#web-application-flow
https://medium.com/@mrbobbytables/kubernetes-day-2-operations-authn-authz-with-oidc-and-a-little-help-from-keycloak-de4ea1bdbbe
"""
# token example
# {
# "token_type": "Bearer",
# "expires_in": 3600,
# "access_token": "...", # <-- JWT
# "scope": ["offline_access", "email", "openid"],
# "refresh_token": "...", # <-- some random string/secret
# "id_token": "...", # <-- JWT
# "expires_at": 1586890825.6915326
# }
import os
import webbrowser
from requests_oauthlib import OAuth2Session
import yaml
oauth_server_uri = os.getenv("OAUTH_URI")
client_id = os.getenv("OAUTH_CLIENT_ID")
client_secret = os.getenv("OAUTH_CLIENT_SECRET")
# default is from Okta default when creating a webapp but with HTTPS scheme
redirect_uri = os.getenv("OAUTH_REDIRECT_URI", "https://localhost:8080/authorization-code/callback")
verify = os.getenv("OAUTH_CA_BUNDLE", None)
def get_auth(oauth_url):
# scopes and kwargs to authorization_url are what may differentiate
# one OpenID IdP from another.
scope = [
"openid", # mandatory for OpenIDConnect auth
"email", # smallest and most consistent scope and claim
"offline_access", # needed to actually ask for refresh_token
]
oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scope)
auth_server_info = oauth.get(
f"{oauth_url}/.well-known/openid-configuration",
withhold_token=True
).json()
auth_url = auth_server_info["authorization_endpoint"]
token_url = auth_server_info["token_endpoint"]
authorization_url, state = oauth.authorization_url(
auth_url,
access_type="offline", # not sure if it is actually always needed,
# may be a cargo-cult from Google-based example
)
print(f'Please go to "{authorization_url}" and authorize access.')
webbrowser.open(authorization_url)
authorization_response = input(
"Enter the full callback URL as attempted by the browser: "
)
token = oauth.fetch_token(
token_url,
authorization_response=authorization_response,
client_secret=client_secret,
timeout=60,
verify=verify,
)
# discover user email
userinfo = oauth.get(
auth_server_info["userinfo_endpoint"],
timeout=60,
verify=verify,
).json()
return userinfo["email"], token
def kubeconfig_user(token):
user = {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": client_id,
"idp-issuer-url": oauth_server_uri,
"id-token": token["id_token"],
"refresh-token": token.get("refresh_token"),
}
}
}
if client_secret:
user["auth-provider"]["config"]["client-secret"] = client_secret
if verify:
user["auth-provider"]["config"]["idp-certificate-authority"] = verify
return user
def auth():
user, token = get_auth(oauth_server_uri)
print(f"Got token:\n{token}")
kube_user = kubeconfig_user(token)
config_snippet = {"users": [{"name": user, "user": kube_user}]}
print("\nPaste/merge this user into your $KUBECONFIG\n")
print(yaml.safe_dump(config_snippet))
with open("/tmp/kubeuser", "w") as f:
yaml.safe_dump(kube_user, f)
print("persisted in /tmp/kubeuser")
return token.get("expires_in", 3600)
def refresh():
with open("/tmp/kubeuser") as f:
user = yaml.safe_load(f)
config = user["auth-provider"]["config"]
oauth = OAuth2Session()
auth_server_info = oauth.get(
f"{config['idp-issuer-url']}/.well-known/openid-configuration",
withhold_token=True,
verify=config.get("idp-certificate-authority"),
timeout=60,
).json()
token = oauth.refresh_token(
token_url=auth_server_info["token_endpoint"],
refresh_token=config["refresh-token"],
client_id=config["client-id"],
client_secret=config.get("client-secret"),
verify=config.get("idp-certificate-authority"),
timeout=60,
)
return token
expires_in = auth()
# time.sleep(expires_in * 1.1)
# refresh()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment