Created
June 18, 2025 23:13
-
-
Save danielscholl/67c85f51c32caf5ac871ddd09fdcd9ba to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# /// script | |
# dependencies = [ | |
# "typer>=0.16.0", # CLI framework with automatic help generation | |
# "rich>=14.0.0", # Rich console output and formatting | |
# "pydantic>=2.11.7", # Data validation and settings management | |
# "pyyaml>=6.0", # YAML processing for resource definitions | |
# ] | |
# /// | |
""" | |
AKS Workload Identity Demo Script | |
This script demonstrates Azure Kubernetes Service (AKS) workload identity integration | |
with Key Vault, allowing pods to authenticate to Azure services and access secrets | |
without storing credentials. | |
""" | |
import json | |
import random | |
import string | |
import subprocess | |
import tempfile | |
import time | |
from pathlib import Path | |
from typing import Optional | |
import typer | |
import yaml | |
from pydantic import BaseModel | |
from rich.console import Console | |
from rich.panel import Panel | |
from rich.syntax import Syntax | |
from rich.table import Table | |
from rich.theme import Theme | |
# Initialize Rich console with custom theme | |
theme = Theme({ | |
"success": "green", | |
"error": "bold red", | |
"warning": "bold yellow", | |
"info": "cyan", | |
"command1": "bold blue", # Azure CLI commands | |
"command2": "bold green", # kubectl commands | |
"dim": "dim white", | |
}) | |
console = Console(width=120, theme=theme) | |
class DemoConfig(BaseModel): | |
"""Configuration for the AKS Workload Identity demo""" | |
resource_group: str | |
location: str = "eastus2" | |
size: str = "Standard_D4ds_v5" | |
unique_id: str | |
# Derived resource names | |
@property | |
def managed_identity_name(self) -> str: | |
return f"wi-identity-{self.unique_id}" | |
@property | |
def cluster_name(self) -> str: | |
return f"aks-{self.unique_id}" | |
@property | |
def key_vault_name(self) -> str: | |
return f"kv-{self.unique_id}" | |
@property | |
def namespace(self) -> str: | |
return "workload-identity-demo" | |
@property | |
def service_account_name(self) -> str: | |
return "workload-identity-sa" | |
def generate_unique_id() -> str: | |
"""Generate a unique identifier for resources""" | |
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) | |
def run_command(command: str, description: str, capture_output: bool = True) -> subprocess.CompletedProcess: | |
"""Execute a command and handle output""" | |
console.print(f"Executing: {description}...") | |
try: | |
result = subprocess.run( | |
command, | |
shell=True, | |
capture_output=capture_output, | |
text=True, | |
check=True | |
) | |
if capture_output and result.stdout: | |
# Display command output in a panel | |
if result.stdout.strip().startswith('{') or result.stdout.strip().startswith('['): | |
# JSON output | |
try: | |
formatted_json = json.dumps(json.loads(result.stdout), indent=2) | |
console.print(Panel( | |
Syntax(formatted_json, "json", theme="monokai", line_numbers=False), | |
title="π» Command Output", | |
border_style="dim", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
except json.JSONDecodeError: | |
console.print(Panel( | |
result.stdout, | |
title="π» Command Output", | |
border_style="dim", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
else: | |
# Plain text output | |
console.print(Panel( | |
result.stdout, | |
title="π» Command Output", | |
border_style="dim", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
console.print(f"β {description} completed successfully") | |
return result | |
except subprocess.CalledProcessError as e: | |
console.print(Panel( | |
f"Command failed: {command}\nError: {e.stderr}", | |
title="β Error", | |
border_style="error", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
raise typer.Exit(1) | |
def display_command(command: str, title: str, command_type: str = "command1"): | |
"""Display a command in a syntax-highlighted panel""" | |
console.print() | |
console.print(Panel( | |
Syntax(command, "bash", theme="monokai", line_numbers=False), | |
title=f"π§ {title}", | |
border_style=command_type, | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
def check_prerequisites(): | |
"""Check if required tools are installed""" | |
console.print(Panel( | |
"Checking prerequisites...", | |
title="π Prerequisites Check", | |
border_style="info", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
# Check Azure CLI | |
try: | |
result = subprocess.run(["az", "--version"], capture_output=True, text=True, check=True) | |
console.print("β Azure CLI is installed") | |
except (subprocess.CalledProcessError, FileNotFoundError): | |
console.print("[error]β Azure CLI is not installed or not accessible[/error]") | |
raise typer.Exit(1) | |
# Check kubectl | |
try: | |
result = subprocess.run(["kubectl", "version", "--client"], capture_output=True, text=True, check=True) | |
console.print("β kubectl is installed") | |
except (subprocess.CalledProcessError, FileNotFoundError): | |
console.print("[error]β kubectl is not installed or not accessible[/error]") | |
raise typer.Exit(1) | |
# Check Azure login status | |
try: | |
result = subprocess.run(["az", "account", "show"], capture_output=True, text=True, check=True) | |
account_info = json.loads(result.stdout) | |
console.print(f"β Logged in to Azure as: {account_info.get('user', {}).get('name', 'Unknown')}") | |
except (subprocess.CalledProcessError, json.JSONDecodeError): | |
console.print("[error]β Not logged in to Azure. Please run 'az login'[/error]") | |
raise typer.Exit(1) | |
def create_resource_group(config: DemoConfig): | |
"""Create Azure resource group""" | |
command = f"""az group create \\ | |
--name {config.resource_group} \\ | |
--location {config.location} \\ | |
--tags purpose=demo component=workload-identity""" | |
display_command(command, "Creating Resource Group") | |
run_command(command, "Creating Resource Group") | |
def create_managed_identity(config: DemoConfig) -> str: | |
"""Create user-assigned managed identity and return client ID""" | |
command = f"""az identity create \\ | |
--resource-group {config.resource_group} \\ | |
--name {config.managed_identity_name}""" | |
display_command(command, "Creating Managed Identity") | |
result = run_command(command, "Creating Managed Identity") | |
identity_info = json.loads(result.stdout) | |
return identity_info["clientId"] | |
def create_key_vault(config: DemoConfig): | |
"""Create Key Vault and add demo secrets""" | |
# Create Key Vault | |
command = f"""az keyvault create \\ | |
--resource-group {config.resource_group} \\ | |
--name {config.key_vault_name} \\ | |
--location {config.location} \\ | |
--enable-rbac-authorization""" | |
display_command(command, "Creating Key Vault") | |
run_command(command, "Creating Key Vault") | |
# Add demo secrets | |
secrets = { | |
"demo-secret-1": "Hello from Key Vault!", | |
"demo-secret-2": "Workload Identity is working!", | |
"demo-secret-3": "No credentials stored in the pod!" | |
} | |
for secret_name, secret_value in secrets.items(): | |
command = f"""az keyvault secret set \\ | |
--vault-name {config.key_vault_name} \\ | |
--name {secret_name} \\ | |
--value "{secret_value}" """ | |
display_command(command, f"Adding Secret: {secret_name}") | |
run_command(command, f"Adding Secret: {secret_name}") | |
def assign_rbac_permissions(config: DemoConfig, client_id: str): | |
"""Assign required RBAC permissions to managed identity""" | |
# Get subscription ID | |
result = subprocess.run( | |
"az account show --query id -o tsv", | |
shell=True, capture_output=True, text=True, check=True | |
) | |
subscription_id = result.stdout.strip() | |
# Assign Key Vault Secrets User role | |
command = f"""az role assignment create \\ | |
--role "Key Vault Secrets User" \\ | |
--assignee {client_id} \\ | |
--scope "/subscriptions/{subscription_id}/resourceGroups/{config.resource_group}/providers/Microsoft.KeyVault/vaults/{config.key_vault_name}" """ | |
display_command(command, "Assigning Key Vault Secrets User Role") | |
run_command(command, "Assigning Key Vault Secrets User Role") | |
def create_aks_cluster(config: DemoConfig): | |
"""Create AKS cluster with workload identity enabled""" | |
command = f"""az aks create \\ | |
--resource-group {config.resource_group} \\ | |
--name {config.cluster_name} \\ | |
--node-vm-size {config.size} \\ | |
--node-count 1 \\ | |
--enable-workload-identity \\ | |
--enable-oidc-issuer \\ | |
--enable-addons azure-keyvault-secrets-provider \\ | |
--generate-ssh-keys""" | |
display_command(command, "Creating AKS Cluster") | |
run_command(command, "Creating AKS Cluster") | |
# Get kubeconfig | |
command = f"""az aks get-credentials \\ | |
--resource-group {config.resource_group} \\ | |
--name {config.cluster_name} \\ | |
--overwrite-existing""" | |
display_command(command, "Getting AKS Credentials", "command2") | |
run_command(command, "Getting AKS Credentials") | |
def get_oidc_issuer_url(config: DemoConfig) -> str: | |
"""Get OIDC issuer URL from AKS cluster""" | |
command = f"""az aks show \\ | |
--resource-group {config.resource_group} \\ | |
--name {config.cluster_name} \\ | |
--query oidcIssuerProfile.issuerUrl \\ | |
--output tsv""" | |
display_command(command, "Getting OIDC Issuer URL") | |
result = run_command(command, "Getting OIDC Issuer URL") | |
return result.stdout.strip() | |
def create_federated_credential(config: DemoConfig, oidc_issuer_url: str): | |
"""Create federated identity credential""" | |
command = f"""az identity federated-credential create \\ | |
--name kubernetes-federated-credential \\ | |
--identity-name {config.managed_identity_name} \\ | |
--resource-group {config.resource_group} \\ | |
--issuer {oidc_issuer_url} \\ | |
--subject system:serviceaccount:{config.namespace}:{config.service_account_name} \\ | |
--audiences api://AzureADTokenExchange""" | |
display_command(command, "Creating Federated Identity Credential") | |
run_command(command, "Creating Federated Identity Credential") | |
def assign_node_rg_permissions(config: DemoConfig, client_id: str): | |
"""Assign Reader role to managed identity on AKS node resource group""" | |
# Get node resource group name | |
result = subprocess.run( | |
f"az aks show --resource-group {config.resource_group} --name {config.cluster_name} --query nodeResourceGroup -o tsv", | |
shell=True, capture_output=True, text=True, check=True | |
) | |
node_resource_group = result.stdout.strip() | |
# Get subscription ID | |
result = subprocess.run( | |
"az account show --query id -o tsv", | |
shell=True, capture_output=True, text=True, check=True | |
) | |
subscription_id = result.stdout.strip() | |
# Assign Reader role | |
command = f"""az role assignment create \\ | |
--role "Reader" \\ | |
--assignee {client_id} \\ | |
--scope "/subscriptions/{subscription_id}/resourceGroups/{node_resource_group}" """ | |
display_command(command, "Assigning Reader Role on Node Resource Group") | |
run_command(command, "Assigning Reader Role on Node Resource Group") | |
def create_kubernetes_resources(config: DemoConfig, client_id: str): | |
"""Create Kubernetes resources for workload identity""" | |
# Get tenant ID | |
result = subprocess.run( | |
"az account show --query tenantId -o tsv", | |
shell=True, capture_output=True, text=True, check=True | |
) | |
tenant_id = result.stdout.strip() | |
# Create namespace | |
namespace_yaml = f""" | |
apiVersion: v1 | |
kind: Namespace | |
metadata: | |
name: {config.namespace} | |
labels: | |
azure.workload.identity/use: "true" | |
""" | |
console.print() | |
console.print(Panel( | |
Syntax(namespace_yaml.strip(), "yaml", theme="monokai", line_numbers=True), | |
title="π Namespace Configuration", | |
border_style="cyan", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: | |
f.write(namespace_yaml) | |
f.flush() | |
command = f"kubectl apply -f {f.name}" | |
display_command(command, "Creating Namespace", "command2") | |
run_command(command, "Creating Namespace") | |
# Create service account | |
service_account_yaml = f""" | |
apiVersion: v1 | |
kind: ServiceAccount | |
metadata: | |
name: {config.service_account_name} | |
namespace: {config.namespace} | |
annotations: | |
azure.workload.identity/client-id: {client_id} | |
labels: | |
azure.workload.identity/use: "true" | |
""" | |
console.print() | |
console.print(Panel( | |
Syntax(service_account_yaml.strip(), "yaml", theme="monokai", line_numbers=True), | |
title="π Service Account Configuration", | |
border_style="cyan", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: | |
f.write(service_account_yaml) | |
f.flush() | |
command = f"kubectl apply -f {f.name}" | |
display_command(command, "Creating Service Account", "command2") | |
run_command(command, "Creating Service Account") | |
# Create SecretProviderClass | |
secret_provider_class_yaml = f""" | |
apiVersion: secrets-store.csi.x-k8s.io/v1 | |
kind: SecretProviderClass | |
metadata: | |
name: azure-kvname-wi | |
namespace: {config.namespace} | |
spec: | |
provider: azure | |
parameters: | |
usePodIdentity: "false" | |
useVMManagedIdentity: "false" | |
clientID: {client_id} | |
keyvaultName: {config.key_vault_name} | |
cloudName: "" | |
objects: | | |
array: | |
- | | |
objectName: demo-secret-1 | |
objectType: secret | |
objectVersion: "" | |
- | | |
objectName: demo-secret-2 | |
objectType: secret | |
objectVersion: "" | |
- | | |
objectName: demo-secret-3 | |
objectType: secret | |
objectVersion: "" | |
tenantId: {tenant_id} | |
""" | |
console.print() | |
console.print(Panel( | |
Syntax(secret_provider_class_yaml.strip(), "yaml", theme="monokai", line_numbers=True), | |
title="π SecretProviderClass Configuration", | |
border_style="cyan", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: | |
f.write(secret_provider_class_yaml) | |
f.flush() | |
command = f"kubectl apply -f {f.name}" | |
display_command(command, "Creating SecretProviderClass", "command2") | |
run_command(command, "Creating SecretProviderClass") | |
def deploy_validation_job(config: DemoConfig): | |
"""Deploy validation job to test workload identity""" | |
job_yaml = f""" | |
apiVersion: batch/v1 | |
kind: Job | |
metadata: | |
name: workload-identity-validation | |
namespace: {config.namespace} | |
spec: | |
template: | |
metadata: | |
labels: | |
azure.workload.identity/use: "true" | |
spec: | |
serviceAccountName: {config.service_account_name} | |
containers: | |
- name: validation | |
image: mcr.microsoft.com/azure-cli:latest | |
command: | |
- /bin/bash | |
- -c | |
- | | |
echo "=== Workload Identity Validation ===" | |
echo "Environment variables:" | |
env | grep AZURE_ | sort | |
echo "" | |
echo "Mounted secrets from Key Vault:" | |
ls -la /mnt/secrets/ || echo "No secrets directory found" | |
echo "" | |
if [ -d "/mnt/secrets" ]; then | |
for secret in /mnt/secrets/*; do | |
if [ -f "$secret" ]; then | |
echo "Secret $(basename $secret):" | |
cat "$secret" | |
echo "" | |
fi | |
done | |
fi | |
echo "=== Validation Complete ===" | |
volumeMounts: | |
- name: secrets-store | |
mountPath: "/mnt/secrets" | |
readOnly: true | |
volumes: | |
- name: secrets-store | |
csi: | |
driver: secrets-store.csi.k8s.io | |
readOnly: true | |
volumeAttributes: | |
secretProviderClass: "azure-kvname-wi" | |
restartPolicy: Never | |
backoffLimit: 3 | |
""" | |
console.print() | |
console.print(Panel( | |
Syntax(job_yaml.strip(), "yaml", theme="monokai", line_numbers=True), | |
title="π Validation Job Configuration", | |
border_style="cyan", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: | |
f.write(job_yaml) | |
f.flush() | |
command = f"kubectl apply -f {f.name}" | |
display_command(command, "Deploying Validation Job", "command2") | |
run_command(command, "Deploying Validation Job") | |
def check_job_status(config: DemoConfig, max_wait_minutes: int = 10): | |
"""Check validation job status and display results""" | |
job_name = "workload-identity-validation" | |
max_attempts = max_wait_minutes * 6 # Check every 10 seconds | |
console.print(f"[info]Waiting for validation job to complete (max {max_wait_minutes} minutes)...[/info]") | |
for attempt in range(max_attempts): | |
try: | |
# Check if job succeeded | |
result = subprocess.run( | |
f"kubectl get job {job_name} -n {config.namespace} -o jsonpath='{{.status.succeeded}}'", | |
shell=True, capture_output=True, text=True, check=True | |
) | |
if result.stdout.strip() == "1": | |
console.print("β Validation job completed successfully") | |
break | |
# Check if job failed | |
result = subprocess.run( | |
f"kubectl get job {job_name} -n {config.namespace} -o jsonpath='{{.status.failed}}'", | |
shell=True, capture_output=True, text=True, check=True | |
) | |
if result.stdout.strip() == "1": | |
console.print("[error]β Validation job failed[/error]") | |
# Get pod logs for troubleshooting | |
subprocess.run( | |
f"kubectl logs -l job-name={job_name} -n {config.namespace}", | |
shell=True | |
) | |
raise typer.Exit(1) | |
except subprocess.CalledProcessError: | |
pass # Job may not exist yet | |
time.sleep(10) | |
else: | |
console.print(f"[warning]β οΈ Job did not complete within {max_wait_minutes} minutes[/warning]") | |
raise typer.Exit(1) | |
# Display job logs | |
command = f"kubectl logs -l job-name={job_name} -n {config.namespace}" | |
display_command(command, "Getting Validation Job Logs", "command2") | |
result = run_command(command, "Getting Validation Job Logs") | |
if result.stdout: | |
console.print(Panel( | |
Syntax(result.stdout, "bash", theme="monokai", line_numbers=False), | |
title="π» Validation Job Output", | |
border_style="dim", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
def display_summary(config: DemoConfig): | |
"""Display demo completion summary""" | |
console.print() | |
console.print(Panel( | |
f"""[success]π AKS Workload Identity Demo Completed Successfully![/success] | |
The demo has successfully proven that workload identity is working: | |
β Azure infrastructure created and configured | |
β AKS cluster deployed with workload identity enabled | |
β Managed identity linked to Kubernetes service account | |
β Key Vault secrets accessed without stored credentials | |
β Validation job completed successfully | |
[info]Resources created:[/info] | |
β’ Resource Group: {config.resource_group} | |
β’ Managed Identity: {config.managed_identity_name} | |
β’ AKS Cluster: {config.cluster_name} | |
β’ Key Vault: {config.key_vault_name} | |
β’ Kubernetes Namespace: {config.namespace} | |
To clean up resources, run: | |
[dim]az group delete --name {config.resource_group} --yes --no-wait[/dim]""", | |
title="π Demo Complete", | |
border_style="success", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
app = typer.Typer(help="AKS Workload Identity Demo Script") | |
@app.command() | |
def main( | |
resource_group: Optional[str] = typer.Option( | |
None, | |
"--resource-group", | |
"-g", | |
help="Resource group name (auto-generated if not provided)" | |
), | |
location: str = typer.Option( | |
"eastus2", | |
"--location", | |
"-l", | |
help="Azure region" | |
), | |
size: str = typer.Option( | |
"Standard_D4ds_v5", | |
"--size", | |
"-s", | |
help="Azure VM size for AKS nodes" | |
) | |
): | |
""" | |
Deploy AKS Workload Identity demo with Key Vault integration. | |
This script creates all necessary Azure and Kubernetes resources to demonstrate | |
workload identity, allowing pods to access Key Vault secrets without stored credentials. | |
""" | |
# Generate unique ID and set resource group name | |
unique_id = generate_unique_id() | |
if not resource_group: | |
resource_group = f"aks-wi-demo-{unique_id}" | |
config = DemoConfig( | |
resource_group=resource_group, | |
location=location, | |
size=size, | |
unique_id=unique_id | |
) | |
# Display welcome message | |
console.print(Panel( | |
f"""[info]AKS Workload Identity Demo[/info] | |
This script will demonstrate Azure Kubernetes Service workload identity | |
integration with Key Vault, showing how pods can access secrets without | |
storing credentials. | |
[info]Configuration:[/info] | |
β’ Resource Group: {config.resource_group} | |
β’ Location: {config.location} | |
β’ VM Size: {config.size} | |
β’ Unique ID: {config.unique_id} | |
The demo will create Azure infrastructure, deploy an AKS cluster, | |
configure workload identity, and validate the integration works correctly.""", | |
title="π Welcome to AKS Workload Identity Demo", | |
border_style="info", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
try: | |
# Check prerequisites | |
check_prerequisites() | |
console.print() | |
# Create Azure infrastructure | |
create_resource_group(config) | |
console.print() | |
client_id = create_managed_identity(config) | |
console.print() | |
create_key_vault(config) | |
console.print() | |
assign_rbac_permissions(config, client_id) | |
console.print() | |
# Create AKS cluster | |
create_aks_cluster(config) | |
console.print() | |
# Configure workload identity | |
oidc_issuer_url = get_oidc_issuer_url(config) | |
console.print() | |
create_federated_credential(config, oidc_issuer_url) | |
console.print() | |
assign_node_rg_permissions(config, client_id) | |
console.print() | |
# Deploy Kubernetes resources | |
create_kubernetes_resources(config, client_id) | |
console.print() | |
# Deploy and validate | |
deploy_validation_job(config) | |
console.print() | |
check_job_status(config) | |
# Display summary | |
display_summary(config) | |
except typer.Exit: | |
raise | |
except Exception as e: | |
console.print(Panel( | |
f"Unexpected error: {str(e)}", | |
title="β Error", | |
border_style="error", | |
width=120, | |
expand=False, | |
padding=(1, 2) | |
)) | |
raise typer.Exit(1) | |
if __name__ == "__main__": | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment