Skip to content

Instantly share code, notes, and snippets.

@danielscholl
Created June 18, 2025 23:13
Show Gist options
  • Save danielscholl/67c85f51c32caf5ac871ddd09fdcd9ba to your computer and use it in GitHub Desktop.
Save danielscholl/67c85f51c32caf5ac871ddd09fdcd9ba to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# /// script
# dependencies = [
# "typer>=0.16.0", # CLI framework with automatic help generation
# "rich>=14.0.0", # Rich console output and formatting
# "pydantic>=2.11.7", # Data validation and settings management
# "pyyaml>=6.0", # YAML processing for resource definitions
# ]
# ///
"""
AKS Workload Identity Demo Script
This script demonstrates Azure Kubernetes Service (AKS) workload identity integration
with Key Vault, allowing pods to authenticate to Azure services and access secrets
without storing credentials.
"""
import json
import random
import string
import subprocess
import tempfile
import time
from pathlib import Path
from typing import Optional
import typer
import yaml
from pydantic import BaseModel
from rich.console import Console
from rich.panel import Panel
from rich.syntax import Syntax
from rich.table import Table
from rich.theme import Theme
# Initialize Rich console with custom theme
theme = Theme({
"success": "green",
"error": "bold red",
"warning": "bold yellow",
"info": "cyan",
"command1": "bold blue", # Azure CLI commands
"command2": "bold green", # kubectl commands
"dim": "dim white",
})
console = Console(width=120, theme=theme)
class DemoConfig(BaseModel):
"""Configuration for the AKS Workload Identity demo"""
resource_group: str
location: str = "eastus2"
size: str = "Standard_D4ds_v5"
unique_id: str
# Derived resource names
@property
def managed_identity_name(self) -> str:
return f"wi-identity-{self.unique_id}"
@property
def cluster_name(self) -> str:
return f"aks-{self.unique_id}"
@property
def key_vault_name(self) -> str:
return f"kv-{self.unique_id}"
@property
def namespace(self) -> str:
return "workload-identity-demo"
@property
def service_account_name(self) -> str:
return "workload-identity-sa"
def generate_unique_id() -> str:
"""Generate a unique identifier for resources"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
def run_command(command: str, description: str, capture_output: bool = True) -> subprocess.CompletedProcess:
"""Execute a command and handle output"""
console.print(f"Executing: {description}...")
try:
result = subprocess.run(
command,
shell=True,
capture_output=capture_output,
text=True,
check=True
)
if capture_output and result.stdout:
# Display command output in a panel
if result.stdout.strip().startswith('{') or result.stdout.strip().startswith('['):
# JSON output
try:
formatted_json = json.dumps(json.loads(result.stdout), indent=2)
console.print(Panel(
Syntax(formatted_json, "json", theme="monokai", line_numbers=False),
title="πŸ’» Command Output",
border_style="dim",
width=120,
expand=False,
padding=(1, 2)
))
except json.JSONDecodeError:
console.print(Panel(
result.stdout,
title="πŸ’» Command Output",
border_style="dim",
width=120,
expand=False,
padding=(1, 2)
))
else:
# Plain text output
console.print(Panel(
result.stdout,
title="πŸ’» Command Output",
border_style="dim",
width=120,
expand=False,
padding=(1, 2)
))
console.print(f"βœ“ {description} completed successfully")
return result
except subprocess.CalledProcessError as e:
console.print(Panel(
f"Command failed: {command}\nError: {e.stderr}",
title="❌ Error",
border_style="error",
width=120,
expand=False,
padding=(1, 2)
))
raise typer.Exit(1)
def display_command(command: str, title: str, command_type: str = "command1"):
"""Display a command in a syntax-highlighted panel"""
console.print()
console.print(Panel(
Syntax(command, "bash", theme="monokai", line_numbers=False),
title=f"πŸ”§ {title}",
border_style=command_type,
width=120,
expand=False,
padding=(1, 2)
))
def check_prerequisites():
"""Check if required tools are installed"""
console.print(Panel(
"Checking prerequisites...",
title="πŸ” Prerequisites Check",
border_style="info",
width=120,
expand=False,
padding=(1, 2)
))
# Check Azure CLI
try:
result = subprocess.run(["az", "--version"], capture_output=True, text=True, check=True)
console.print("βœ“ Azure CLI is installed")
except (subprocess.CalledProcessError, FileNotFoundError):
console.print("[error]βœ— Azure CLI is not installed or not accessible[/error]")
raise typer.Exit(1)
# Check kubectl
try:
result = subprocess.run(["kubectl", "version", "--client"], capture_output=True, text=True, check=True)
console.print("βœ“ kubectl is installed")
except (subprocess.CalledProcessError, FileNotFoundError):
console.print("[error]βœ— kubectl is not installed or not accessible[/error]")
raise typer.Exit(1)
# Check Azure login status
try:
result = subprocess.run(["az", "account", "show"], capture_output=True, text=True, check=True)
account_info = json.loads(result.stdout)
console.print(f"βœ“ Logged in to Azure as: {account_info.get('user', {}).get('name', 'Unknown')}")
except (subprocess.CalledProcessError, json.JSONDecodeError):
console.print("[error]βœ— Not logged in to Azure. Please run 'az login'[/error]")
raise typer.Exit(1)
def create_resource_group(config: DemoConfig):
"""Create Azure resource group"""
command = f"""az group create \\
--name {config.resource_group} \\
--location {config.location} \\
--tags purpose=demo component=workload-identity"""
display_command(command, "Creating Resource Group")
run_command(command, "Creating Resource Group")
def create_managed_identity(config: DemoConfig) -> str:
"""Create user-assigned managed identity and return client ID"""
command = f"""az identity create \\
--resource-group {config.resource_group} \\
--name {config.managed_identity_name}"""
display_command(command, "Creating Managed Identity")
result = run_command(command, "Creating Managed Identity")
identity_info = json.loads(result.stdout)
return identity_info["clientId"]
def create_key_vault(config: DemoConfig):
"""Create Key Vault and add demo secrets"""
# Create Key Vault
command = f"""az keyvault create \\
--resource-group {config.resource_group} \\
--name {config.key_vault_name} \\
--location {config.location} \\
--enable-rbac-authorization"""
display_command(command, "Creating Key Vault")
run_command(command, "Creating Key Vault")
# Add demo secrets
secrets = {
"demo-secret-1": "Hello from Key Vault!",
"demo-secret-2": "Workload Identity is working!",
"demo-secret-3": "No credentials stored in the pod!"
}
for secret_name, secret_value in secrets.items():
command = f"""az keyvault secret set \\
--vault-name {config.key_vault_name} \\
--name {secret_name} \\
--value "{secret_value}" """
display_command(command, f"Adding Secret: {secret_name}")
run_command(command, f"Adding Secret: {secret_name}")
def assign_rbac_permissions(config: DemoConfig, client_id: str):
"""Assign required RBAC permissions to managed identity"""
# Get subscription ID
result = subprocess.run(
"az account show --query id -o tsv",
shell=True, capture_output=True, text=True, check=True
)
subscription_id = result.stdout.strip()
# Assign Key Vault Secrets User role
command = f"""az role assignment create \\
--role "Key Vault Secrets User" \\
--assignee {client_id} \\
--scope "/subscriptions/{subscription_id}/resourceGroups/{config.resource_group}/providers/Microsoft.KeyVault/vaults/{config.key_vault_name}" """
display_command(command, "Assigning Key Vault Secrets User Role")
run_command(command, "Assigning Key Vault Secrets User Role")
def create_aks_cluster(config: DemoConfig):
"""Create AKS cluster with workload identity enabled"""
command = f"""az aks create \\
--resource-group {config.resource_group} \\
--name {config.cluster_name} \\
--node-vm-size {config.size} \\
--node-count 1 \\
--enable-workload-identity \\
--enable-oidc-issuer \\
--enable-addons azure-keyvault-secrets-provider \\
--generate-ssh-keys"""
display_command(command, "Creating AKS Cluster")
run_command(command, "Creating AKS Cluster")
# Get kubeconfig
command = f"""az aks get-credentials \\
--resource-group {config.resource_group} \\
--name {config.cluster_name} \\
--overwrite-existing"""
display_command(command, "Getting AKS Credentials", "command2")
run_command(command, "Getting AKS Credentials")
def get_oidc_issuer_url(config: DemoConfig) -> str:
"""Get OIDC issuer URL from AKS cluster"""
command = f"""az aks show \\
--resource-group {config.resource_group} \\
--name {config.cluster_name} \\
--query oidcIssuerProfile.issuerUrl \\
--output tsv"""
display_command(command, "Getting OIDC Issuer URL")
result = run_command(command, "Getting OIDC Issuer URL")
return result.stdout.strip()
def create_federated_credential(config: DemoConfig, oidc_issuer_url: str):
"""Create federated identity credential"""
command = f"""az identity federated-credential create \\
--name kubernetes-federated-credential \\
--identity-name {config.managed_identity_name} \\
--resource-group {config.resource_group} \\
--issuer {oidc_issuer_url} \\
--subject system:serviceaccount:{config.namespace}:{config.service_account_name} \\
--audiences api://AzureADTokenExchange"""
display_command(command, "Creating Federated Identity Credential")
run_command(command, "Creating Federated Identity Credential")
def assign_node_rg_permissions(config: DemoConfig, client_id: str):
"""Assign Reader role to managed identity on AKS node resource group"""
# Get node resource group name
result = subprocess.run(
f"az aks show --resource-group {config.resource_group} --name {config.cluster_name} --query nodeResourceGroup -o tsv",
shell=True, capture_output=True, text=True, check=True
)
node_resource_group = result.stdout.strip()
# Get subscription ID
result = subprocess.run(
"az account show --query id -o tsv",
shell=True, capture_output=True, text=True, check=True
)
subscription_id = result.stdout.strip()
# Assign Reader role
command = f"""az role assignment create \\
--role "Reader" \\
--assignee {client_id} \\
--scope "/subscriptions/{subscription_id}/resourceGroups/{node_resource_group}" """
display_command(command, "Assigning Reader Role on Node Resource Group")
run_command(command, "Assigning Reader Role on Node Resource Group")
def create_kubernetes_resources(config: DemoConfig, client_id: str):
"""Create Kubernetes resources for workload identity"""
# Get tenant ID
result = subprocess.run(
"az account show --query tenantId -o tsv",
shell=True, capture_output=True, text=True, check=True
)
tenant_id = result.stdout.strip()
# Create namespace
namespace_yaml = f"""
apiVersion: v1
kind: Namespace
metadata:
name: {config.namespace}
labels:
azure.workload.identity/use: "true"
"""
console.print()
console.print(Panel(
Syntax(namespace_yaml.strip(), "yaml", theme="monokai", line_numbers=True),
title="πŸ“„ Namespace Configuration",
border_style="cyan",
width=120,
expand=False,
padding=(1, 2)
))
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(namespace_yaml)
f.flush()
command = f"kubectl apply -f {f.name}"
display_command(command, "Creating Namespace", "command2")
run_command(command, "Creating Namespace")
# Create service account
service_account_yaml = f"""
apiVersion: v1
kind: ServiceAccount
metadata:
name: {config.service_account_name}
namespace: {config.namespace}
annotations:
azure.workload.identity/client-id: {client_id}
labels:
azure.workload.identity/use: "true"
"""
console.print()
console.print(Panel(
Syntax(service_account_yaml.strip(), "yaml", theme="monokai", line_numbers=True),
title="πŸ“„ Service Account Configuration",
border_style="cyan",
width=120,
expand=False,
padding=(1, 2)
))
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(service_account_yaml)
f.flush()
command = f"kubectl apply -f {f.name}"
display_command(command, "Creating Service Account", "command2")
run_command(command, "Creating Service Account")
# Create SecretProviderClass
secret_provider_class_yaml = f"""
apiVersion: secrets-store.csi.x-k8s.io/v1
kind: SecretProviderClass
metadata:
name: azure-kvname-wi
namespace: {config.namespace}
spec:
provider: azure
parameters:
usePodIdentity: "false"
useVMManagedIdentity: "false"
clientID: {client_id}
keyvaultName: {config.key_vault_name}
cloudName: ""
objects: |
array:
- |
objectName: demo-secret-1
objectType: secret
objectVersion: ""
- |
objectName: demo-secret-2
objectType: secret
objectVersion: ""
- |
objectName: demo-secret-3
objectType: secret
objectVersion: ""
tenantId: {tenant_id}
"""
console.print()
console.print(Panel(
Syntax(secret_provider_class_yaml.strip(), "yaml", theme="monokai", line_numbers=True),
title="πŸ“„ SecretProviderClass Configuration",
border_style="cyan",
width=120,
expand=False,
padding=(1, 2)
))
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(secret_provider_class_yaml)
f.flush()
command = f"kubectl apply -f {f.name}"
display_command(command, "Creating SecretProviderClass", "command2")
run_command(command, "Creating SecretProviderClass")
def deploy_validation_job(config: DemoConfig):
"""Deploy validation job to test workload identity"""
job_yaml = f"""
apiVersion: batch/v1
kind: Job
metadata:
name: workload-identity-validation
namespace: {config.namespace}
spec:
template:
metadata:
labels:
azure.workload.identity/use: "true"
spec:
serviceAccountName: {config.service_account_name}
containers:
- name: validation
image: mcr.microsoft.com/azure-cli:latest
command:
- /bin/bash
- -c
- |
echo "=== Workload Identity Validation ==="
echo "Environment variables:"
env | grep AZURE_ | sort
echo ""
echo "Mounted secrets from Key Vault:"
ls -la /mnt/secrets/ || echo "No secrets directory found"
echo ""
if [ -d "/mnt/secrets" ]; then
for secret in /mnt/secrets/*; do
if [ -f "$secret" ]; then
echo "Secret $(basename $secret):"
cat "$secret"
echo ""
fi
done
fi
echo "=== Validation Complete ==="
volumeMounts:
- name: secrets-store
mountPath: "/mnt/secrets"
readOnly: true
volumes:
- name: secrets-store
csi:
driver: secrets-store.csi.k8s.io
readOnly: true
volumeAttributes:
secretProviderClass: "azure-kvname-wi"
restartPolicy: Never
backoffLimit: 3
"""
console.print()
console.print(Panel(
Syntax(job_yaml.strip(), "yaml", theme="monokai", line_numbers=True),
title="πŸ“„ Validation Job Configuration",
border_style="cyan",
width=120,
expand=False,
padding=(1, 2)
))
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(job_yaml)
f.flush()
command = f"kubectl apply -f {f.name}"
display_command(command, "Deploying Validation Job", "command2")
run_command(command, "Deploying Validation Job")
def check_job_status(config: DemoConfig, max_wait_minutes: int = 10):
"""Check validation job status and display results"""
job_name = "workload-identity-validation"
max_attempts = max_wait_minutes * 6 # Check every 10 seconds
console.print(f"[info]Waiting for validation job to complete (max {max_wait_minutes} minutes)...[/info]")
for attempt in range(max_attempts):
try:
# Check if job succeeded
result = subprocess.run(
f"kubectl get job {job_name} -n {config.namespace} -o jsonpath='{{.status.succeeded}}'",
shell=True, capture_output=True, text=True, check=True
)
if result.stdout.strip() == "1":
console.print("βœ“ Validation job completed successfully")
break
# Check if job failed
result = subprocess.run(
f"kubectl get job {job_name} -n {config.namespace} -o jsonpath='{{.status.failed}}'",
shell=True, capture_output=True, text=True, check=True
)
if result.stdout.strip() == "1":
console.print("[error]βœ— Validation job failed[/error]")
# Get pod logs for troubleshooting
subprocess.run(
f"kubectl logs -l job-name={job_name} -n {config.namespace}",
shell=True
)
raise typer.Exit(1)
except subprocess.CalledProcessError:
pass # Job may not exist yet
time.sleep(10)
else:
console.print(f"[warning]⚠️ Job did not complete within {max_wait_minutes} minutes[/warning]")
raise typer.Exit(1)
# Display job logs
command = f"kubectl logs -l job-name={job_name} -n {config.namespace}"
display_command(command, "Getting Validation Job Logs", "command2")
result = run_command(command, "Getting Validation Job Logs")
if result.stdout:
console.print(Panel(
Syntax(result.stdout, "bash", theme="monokai", line_numbers=False),
title="πŸ’» Validation Job Output",
border_style="dim",
width=120,
expand=False,
padding=(1, 2)
))
def display_summary(config: DemoConfig):
"""Display demo completion summary"""
console.print()
console.print(Panel(
f"""[success]πŸŽ‰ AKS Workload Identity Demo Completed Successfully![/success]
The demo has successfully proven that workload identity is working:
βœ… Azure infrastructure created and configured
βœ… AKS cluster deployed with workload identity enabled
βœ… Managed identity linked to Kubernetes service account
βœ… Key Vault secrets accessed without stored credentials
βœ… Validation job completed successfully
[info]Resources created:[/info]
β€’ Resource Group: {config.resource_group}
β€’ Managed Identity: {config.managed_identity_name}
β€’ AKS Cluster: {config.cluster_name}
β€’ Key Vault: {config.key_vault_name}
β€’ Kubernetes Namespace: {config.namespace}
To clean up resources, run:
[dim]az group delete --name {config.resource_group} --yes --no-wait[/dim]""",
title="πŸ† Demo Complete",
border_style="success",
width=120,
expand=False,
padding=(1, 2)
))
app = typer.Typer(help="AKS Workload Identity Demo Script")
@app.command()
def main(
resource_group: Optional[str] = typer.Option(
None,
"--resource-group",
"-g",
help="Resource group name (auto-generated if not provided)"
),
location: str = typer.Option(
"eastus2",
"--location",
"-l",
help="Azure region"
),
size: str = typer.Option(
"Standard_D4ds_v5",
"--size",
"-s",
help="Azure VM size for AKS nodes"
)
):
"""
Deploy AKS Workload Identity demo with Key Vault integration.
This script creates all necessary Azure and Kubernetes resources to demonstrate
workload identity, allowing pods to access Key Vault secrets without stored credentials.
"""
# Generate unique ID and set resource group name
unique_id = generate_unique_id()
if not resource_group:
resource_group = f"aks-wi-demo-{unique_id}"
config = DemoConfig(
resource_group=resource_group,
location=location,
size=size,
unique_id=unique_id
)
# Display welcome message
console.print(Panel(
f"""[info]AKS Workload Identity Demo[/info]
This script will demonstrate Azure Kubernetes Service workload identity
integration with Key Vault, showing how pods can access secrets without
storing credentials.
[info]Configuration:[/info]
β€’ Resource Group: {config.resource_group}
β€’ Location: {config.location}
β€’ VM Size: {config.size}
β€’ Unique ID: {config.unique_id}
The demo will create Azure infrastructure, deploy an AKS cluster,
configure workload identity, and validate the integration works correctly.""",
title="πŸš€ Welcome to AKS Workload Identity Demo",
border_style="info",
width=120,
expand=False,
padding=(1, 2)
))
try:
# Check prerequisites
check_prerequisites()
console.print()
# Create Azure infrastructure
create_resource_group(config)
console.print()
client_id = create_managed_identity(config)
console.print()
create_key_vault(config)
console.print()
assign_rbac_permissions(config, client_id)
console.print()
# Create AKS cluster
create_aks_cluster(config)
console.print()
# Configure workload identity
oidc_issuer_url = get_oidc_issuer_url(config)
console.print()
create_federated_credential(config, oidc_issuer_url)
console.print()
assign_node_rg_permissions(config, client_id)
console.print()
# Deploy Kubernetes resources
create_kubernetes_resources(config, client_id)
console.print()
# Deploy and validate
deploy_validation_job(config)
console.print()
check_job_status(config)
# Display summary
display_summary(config)
except typer.Exit:
raise
except Exception as e:
console.print(Panel(
f"Unexpected error: {str(e)}",
title="❌ Error",
border_style="error",
width=120,
expand=False,
padding=(1, 2)
))
raise typer.Exit(1)
if __name__ == "__main__":
app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment