Created
July 25, 2025 06:16
-
-
Save bofm/963f913348a0090cc082bfb3328b60c6 to your computer and use it in GitHub Desktop.
OIDC debugger script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# /// script | |
# dependencies = [ | |
# "flask", | |
# "requests", | |
# "pyjwt[crypto]", | |
# "cryptography" | |
# ] | |
# /// | |
import os | |
import json | |
import secrets | |
import base64 | |
from urllib.parse import urlencode, parse_qs, urlparse | |
from datetime import datetime, timezone | |
import jwt | |
import requests | |
from flask import Flask, request, redirect, session, render_template_string | |
# In-memory storage for tokens (use Redis/database in production) | |
token_store = {} | |
app = Flask(__name__) | |
app.secret_key = secrets.token_hex(32) | |
# Configuration from environment | |
CLIENT_ID = os.getenv('CLIENT_ID') | |
CLIENT_SECRET = os.getenv('CLIENT_SECRET') | |
WELL_KNOWN_URL = os.getenv('WELL_KNOWN_URL') | |
SCOPES = os.getenv('SCOPES', 'openid profile email') | |
REDIRECT_URI = os.getenv('REDIRECT_URI', 'http://localhost:8000/callback') | |
# HTML Templates | |
INDEX_TEMPLATE = ''' | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>OIDC Debugger</title> | |
</head> | |
<body> | |
<h1>OIDC Debugger</h1> | |
{% if error %} | |
<p style="color: red;"><strong>Error:</strong> {{ error }}</p> | |
{% endif %} | |
<h2>Configuration</h2> | |
<ul> | |
<li><strong>Client ID:</strong> {{ client_id or 'Not configured' }}</li> | |
<li><strong>Client Secret:</strong> {{ '***' if client_secret else 'Not configured' }}</li> | |
<li><strong>Well-known URL:</strong> {{ well_known_url or 'Not configured' }}</li> | |
<li><strong>Scopes:</strong> {{ scopes }}</li> | |
<li><strong>Redirect URI:</strong> {{ redirect_uri }}</li> | |
</ul> | |
{% if discovery %} | |
<h2>OIDC Discovery</h2> | |
<ul> | |
<li><strong>Issuer:</strong> {{ discovery.issuer }}</li> | |
<li><strong>Authorization Endpoint:</strong> {{ discovery.authorization_endpoint }}</li> | |
<li><strong>Token Endpoint:</strong> {{ discovery.token_endpoint }}</li> | |
<li><strong>Userinfo Endpoint:</strong> {{ discovery.userinfo_endpoint }}</li> | |
<li><strong>JWKS URI:</strong> {{ discovery.jwks_uri }}</li> | |
</ul> | |
{% endif %} | |
{% if not client_id or not client_secret or not well_known_url %} | |
<p style="color: red;">Please set environment variables:</p> | |
<ul> | |
{% if not client_id %}<li>CLIENT_ID</li>{% endif %} | |
{% if not client_secret %}<li>CLIENT_SECRET</li>{% endif %} | |
{% if not well_known_url %}<li>WELL_KNOWN_URL</li>{% endif %} | |
</ul> | |
{% else %} | |
<form action="/login" method="post"> | |
<button type="submit">Start OAuth2/OIDC Flow</button> | |
</form> | |
{% endif %} | |
<form action="/clear" method="post"> | |
<button type="submit">Clear Session</button> | |
</form> | |
</body> | |
</html> | |
''' | |
TOKENS_TEMPLATE = ''' | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>OIDC Tokens</title> | |
</head> | |
<body> | |
<h1>OIDC Tokens</h1> | |
<p style="color: green;"><strong>Success!</strong> Tokens retrieved.</p> | |
{% if id_token_claims %} | |
<h2>ID Token</h2> | |
<pre>{{ id_token_claims }}</pre> | |
{% if id_token_exp %} | |
<p>Expires: {{ id_token_exp.strftime('%Y-%m-%d %H:%M:%S UTC') }} | |
<span style="color: {{ 'red' if id_token_expired else 'green' }};"> | |
({{ id_token_duration if id_token_duration else ('EXPIRED' if id_token_expired else 'Valid') }}) | |
</span></p> | |
{% endif %} | |
<details> | |
<summary>Raw ID Token</summary> | |
<pre>{{ id_token }}</pre> | |
</details> | |
{% endif %} | |
{% if access_token %} | |
<h2>Access Token</h2> | |
{% if access_token_claims %} | |
<pre>{{ access_token_claims }}</pre> | |
{% if access_token_exp %} | |
<p>Expires: {{ access_token_exp.strftime('%Y-%m-%d %H:%M:%S UTC') }} | |
<span style="color: {{ 'red' if access_token_expired else 'green' }};"> | |
({{ access_token_duration if access_token_duration else ('EXPIRED' if access_token_expired else 'Valid') }}) | |
</span></p> | |
{% endif %} | |
{% else %} | |
<p>Access token is not a JWT or could not be decoded.</p> | |
{% endif %} | |
<details> | |
<summary>Raw Access Token</summary> | |
<pre>{{ access_token }}</pre> | |
</details> | |
{% endif %} | |
{% if refresh_token %} | |
<h2>Refresh Token</h2> | |
{% if refresh_token_claims %} | |
<pre>{{ refresh_token_claims }}</pre> | |
{% if refresh_token_exp %} | |
<p>Expires: {{ refresh_token_exp.strftime('%Y-%m-%d %H:%M:%S UTC') }} | |
<span style="color: {{ 'red' if refresh_token_expired else 'green' }};"> | |
({{ refresh_token_duration if refresh_token_duration else ('EXPIRED' if refresh_token_expired else 'Valid') }}) | |
</span></p> | |
{% endif %} | |
{% else %} | |
<p>Refresh token is not a JWT or could not be decoded.</p> | |
{% endif %} | |
<details> | |
<summary>Raw Refresh Token</summary> | |
<pre>{{ refresh_token }}</pre> | |
</details> | |
{% endif %} | |
{% if userinfo %} | |
<h2>User Info</h2> | |
<pre>{{ userinfo }}</pre> | |
{% endif %} | |
<p> | |
<a href="/"><button>Start New Flow</button></a> | |
<form action="/userinfo" method="post" style="display: inline;"> | |
<button type="submit">Fetch User Info</button> | |
</form> | |
</p> | |
</body> | |
</html> | |
''' | |
def get_discovery_document(): | |
"""Fetch OIDC discovery document""" | |
try: | |
response = requests.get(WELL_KNOWN_URL, timeout=10) | |
response.raise_for_status() | |
return response.json() | |
except Exception as e: | |
print(f"Error fetching discovery document: {e}") | |
return None | |
def decode_jwt_token(token, verify=False): | |
"""Decode JWT token without verification for debugging""" | |
try: | |
if verify: | |
# For production, you'd want to verify with proper keys | |
decoded = jwt.decode(token, options={"verify_signature": False}) | |
else: | |
decoded = jwt.decode(token, options={"verify_signature": False}) | |
return decoded | |
except Exception as e: | |
print(f"Error decoding JWT: {e}") | |
return None | |
def format_token_expiry(exp_timestamp): | |
"""Format token expiry timestamp and check if expired.""" | |
try: | |
exp_time = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc) | |
now = datetime.now(timezone.utc) | |
is_expired = now > exp_time | |
# Calculate duration | |
if is_expired: | |
duration = now - exp_time | |
duration_str = f"Expired {format_duration(duration)} ago" | |
else: | |
duration = exp_time - now | |
duration_str = f"Valid for {format_duration(duration)}" | |
return exp_time, is_expired, duration_str | |
except: | |
return None, False, None | |
def format_duration(duration): | |
"""Format a timedelta into human-readable string.""" | |
total_seconds = int(duration.total_seconds()) | |
if total_seconds < 60: | |
return f"{total_seconds} seconds" | |
elif total_seconds < 3600: | |
minutes = total_seconds // 60 | |
seconds = total_seconds % 60 | |
return f"{minutes}m {seconds}s" | |
elif total_seconds < 86400: | |
hours = total_seconds // 3600 | |
minutes = (total_seconds % 3600) // 60 | |
return f"{hours}h {minutes}m" | |
else: | |
days = total_seconds // 86400 | |
hours = (total_seconds % 86400) // 3600 | |
return f"{days}d {hours}h" | |
@app.route('/') | |
def index(): | |
discovery = None | |
error = None | |
if WELL_KNOWN_URL: | |
discovery = get_discovery_document() | |
if not discovery: | |
error = "Failed to fetch OIDC discovery document" | |
return render_template_string(INDEX_TEMPLATE, | |
client_id=CLIENT_ID, | |
client_secret=CLIENT_SECRET, | |
well_known_url=WELL_KNOWN_URL, | |
scopes=SCOPES, | |
redirect_uri=REDIRECT_URI, | |
discovery=discovery, | |
error=error) | |
@app.route('/login', methods=['POST']) | |
def login(): | |
if not all([CLIENT_ID, CLIENT_SECRET, WELL_KNOWN_URL]): | |
return redirect('/') | |
discovery = get_discovery_document() | |
if not discovery: | |
return redirect('/') | |
# Generate state for CSRF protection | |
state = secrets.token_urlsafe(32) | |
session_id = secrets.token_urlsafe(32) | |
session['state'] = state | |
session['session_id'] = session_id | |
token_store[session_id] = {'discovery': discovery} | |
# Build authorization URL | |
auth_params = { | |
'response_type': 'code', | |
'client_id': CLIENT_ID, | |
'redirect_uri': REDIRECT_URI, | |
'scope': SCOPES, | |
'state': state | |
} | |
auth_url = f"{discovery['authorization_endpoint']}?{urlencode(auth_params)}" | |
return redirect(auth_url) | |
@app.route('/callback') | |
def callback(): | |
# Check for errors | |
if 'error' in request.args: | |
error_desc = request.args.get('error_description', 'Unknown error') | |
return f"<h1>OAuth Error</h1><p>{request.args['error']}: {error_desc}</p><a href='/'>Try again</a>" | |
# Verify state parameter | |
if request.args.get('state') != session.get('state'): | |
return "<h1>Error</h1><p>Invalid state parameter</p><a href='/'>Try again</a>" | |
# Get authorization code | |
code = request.args.get('code') | |
if not code: | |
return "<h1>Error</h1><p>No authorization code received</p><a href='/'>Try again</a>" | |
# Exchange code for tokens | |
session_id = session.get('session_id') | |
if not session_id or session_id not in token_store: | |
return "<h1>Error</h1><p>Session expired</p><a href='/'>Try again</a>" | |
discovery = token_store[session_id]['discovery'] | |
token_data = { | |
'grant_type': 'authorization_code', | |
'code': code, | |
'redirect_uri': REDIRECT_URI, | |
'client_id': CLIENT_ID, | |
'client_secret': CLIENT_SECRET | |
} | |
try: | |
response = requests.post(discovery['token_endpoint'], data=token_data, timeout=10) | |
response.raise_for_status() | |
tokens = response.json() | |
# Store tokens in server-side storage | |
token_store[session_id]['tokens'] = tokens | |
return redirect('/tokens') | |
except Exception as e: | |
return f"<h1>Token Exchange Error</h1><p>{str(e)}</p><a href='/'>Try again</a>" | |
@app.route('/tokens') | |
def show_tokens(): | |
session_id = session.get('session_id') | |
if not session_id or session_id not in token_store: | |
return redirect('/') | |
tokens = token_store[session_id].get('tokens') | |
if not tokens: | |
return redirect('/') | |
# Decode ID token | |
id_token_claims = None | |
id_token_exp = None | |
id_token_expired = False | |
id_token_duration = None | |
if 'id_token' in tokens: | |
id_token_claims = decode_jwt_token(tokens['id_token']) | |
if id_token_claims and 'exp' in id_token_claims: | |
id_token_exp, id_token_expired, id_token_duration = format_token_expiry(id_token_claims['exp']) | |
# Try to decode access token (might not be JWT) | |
access_token_claims = None | |
access_token_exp = None | |
access_token_expired = False | |
access_token_duration = None | |
if 'access_token' in tokens: | |
access_token_claims = decode_jwt_token(tokens['access_token']) | |
if access_token_claims and 'exp' in access_token_claims: | |
access_token_exp, access_token_expired, access_token_duration = format_token_expiry(access_token_claims['exp']) | |
# Try to decode refresh token (might not be JWT) | |
refresh_token_claims = None | |
refresh_token_exp = None | |
refresh_token_expired = False | |
refresh_token_duration = None | |
if 'refresh_token' in tokens: | |
refresh_token_claims = decode_jwt_token(tokens['refresh_token']) | |
if refresh_token_claims and 'exp' in refresh_token_claims: | |
refresh_token_exp, refresh_token_expired, refresh_token_duration = format_token_expiry(refresh_token_claims['exp']) | |
return render_template_string(TOKENS_TEMPLATE, | |
id_token=tokens.get('id_token'), | |
id_token_claims=json.dumps(id_token_claims, indent=2) if id_token_claims else None, | |
id_token_exp=id_token_exp, | |
id_token_expired=id_token_expired, | |
id_token_duration=id_token_duration, | |
access_token=tokens.get('access_token'), | |
access_token_claims=json.dumps(access_token_claims, indent=2) if access_token_claims else None, | |
access_token_exp=access_token_exp, | |
access_token_expired=access_token_expired, | |
access_token_duration=access_token_duration, | |
refresh_token=tokens.get('refresh_token'), | |
refresh_token_claims=json.dumps(refresh_token_claims, indent=2) if refresh_token_claims else None, | |
refresh_token_exp=refresh_token_exp, | |
refresh_token_expired=refresh_token_expired, | |
refresh_token_duration=refresh_token_duration, | |
userinfo=token_store[session_id].get('userinfo')) | |
@app.route('/userinfo', methods=['POST']) | |
def fetch_userinfo(): | |
session_id = session.get('session_id') | |
if not session_id or session_id not in token_store: | |
return redirect('/') | |
data = token_store[session_id] | |
tokens = data.get('tokens') | |
discovery = data.get('discovery') | |
if not tokens or not discovery or 'access_token' not in tokens: | |
return redirect('/tokens') | |
try: | |
headers = {'Authorization': f"Bearer {tokens['access_token']}"} | |
response = requests.get(discovery['userinfo_endpoint'], headers=headers, timeout=10) | |
response.raise_for_status() | |
userinfo = response.json() | |
token_store[session_id]['userinfo'] = json.dumps(userinfo, indent=2) | |
except Exception as e: | |
token_store[session_id]['userinfo'] = f"Error fetching userinfo: {str(e)}" | |
return redirect('/tokens') | |
@app.route('/clear', methods=['POST']) | |
def clear_session(): | |
session_id = session.get('session_id') | |
if session_id and session_id in token_store: | |
del token_store[session_id] | |
session.clear() | |
return redirect('/') | |
if __name__ == '__main__': | |
print("OIDC/OAuth2 Debugger") | |
print("====================") | |
print(f"CLIENT_ID: {'✓' if CLIENT_ID else '✗'}") | |
print(f"CLIENT_SECRET: {'✓' if CLIENT_SECRET else '✗'}") | |
print(f"WELL_KNOWN_URL: {'✓' if WELL_KNOWN_URL else '✗'}") | |
print(f"SCOPES: {SCOPES}") | |
print(f"REDIRECT_URI: {REDIRECT_URI}") | |
print("\nStarting server on http://localhost:8000") | |
print("Set the environment variables and navigate to the URL to begin debugging.") | |
app.run(host='localhost', port=8000, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment