Skip to content

Instantly share code, notes, and snippets.

@RajChowdhury240
Last active July 24, 2025 13:13
Show Gist options
  • Save RajChowdhury240/f834c832421c46f41c8edad2e53c4b82 to your computer and use it in GitHub Desktop.
Save RajChowdhury240/f834c832421c46f41c8edad2e53c4b82 to your computer and use it in GitHub Desktop.
'''
python role_tag_removal.py \
--accounts-file accounts.txt \
--assume-role ca-iam-cie-a \
--tag-key syf:iam:network-perimeter-exception \
--tag-value true \
--max-workers 10 \
--output-file custom_results.csv
'''
#!/usr/bin/env python3
"""
AWS Role Tag Removal Automation Script
This script assumes a specified role in multiple AWS accounts and removes
specific tags from target roles as defined in the accounts.txt file.
The script uses AWS Organizations to retrieve account names and requires
permissions to call organizations:DescribeAccount from the management account.
Usage:
python role_tag_removal.py --accounts-file accounts.txt
Requirements:
pip install boto3 rich click
AWS Permissions Required:
- organizations:DescribeAccount (in management account)
- sts:AssumeRole (for target accounts)
- iam:ListRoleTags, iam:UntagRole (in target roles)
"""
import boto3
import click
import csv
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import List, Tuple, Optional
import time
from rich.console import Console
from rich.progress import Progress, TaskID, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
from rich.table import Table
from rich import print as rprint
console = Console()
@dataclass
class AccountInfo:
account_id: str
role_name: str
@dataclass
class ProcessingResult:
account_id: str
account_name: str
role_name: str
removed_tags: str
error: Optional[str] = None
success: bool = True
class RoleTagRemover:
def __init__(self, assume_role_name: str = 'ca-iam-cie-a',
target_tag_key: str = 'syf:iam:network-perimeter-exception',
target_tag_value: str = 'true'):
self.assume_role_name = assume_role_name
self.target_tag_key = target_tag_key
self.target_tag_value = target_tag_value
self.results: List[ProcessingResult] = []
self.results_lock = threading.Lock()
def load_accounts(self, file_path: str) -> List[AccountInfo]:
"""Load account information from the specified file."""
accounts = []
try:
with open(file_path, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith('#'):
continue
try:
parts = line.split(',')
if len(parts) != 2:
console.print(f"[yellow]Warning: Invalid format on line {line_num}: {line}[/yellow]")
continue
account_id, role_name = parts[0].strip(), parts[1].strip()
accounts.append(AccountInfo(account_id, role_name))
except Exception as e:
console.print(f"[red]Error parsing line {line_num}: {e}[/red]")
except FileNotFoundError:
console.print(f"[red]Error: File {file_path} not found[/red]")
raise
except Exception as e:
console.print(f"[red]Error reading file {file_path}: {e}[/red]")
raise
return accounts
def assume_role(self, account_id: str, role_name: str) -> Optional[boto3.Session]:
"""Assume the specified role in the target account."""
try:
sts_client = boto3.client('sts')
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
response = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=f"tag-removal-{int(time.time())}"
)
credentials = response['Credentials']
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return session
except Exception as e:
console.print(f"[red]Failed to assume role {role_name} in account {account_id}: {e}[/red]")
return None
def get_account_name_from_orgs(self, account_id: str) -> str:
"""Get the account name from AWS Organizations."""
try:
# Use the default session (management account) to query Organizations
org_client = boto3.client('organizations')
response = org_client.describe_account(AccountId=account_id)
return response['Account']['Name']
except Exception as e:
console.print(f"[yellow]Warning: Could not get account name for {account_id} from Organizations: {e}[/yellow]")
return account_id # Return account ID as fallback
def remove_role_tags(self, session: boto3.Session, role_name: str, account_id: str) -> Tuple[bool, str, str]:
"""Remove specified tags from the target role."""
try:
iam_client = session.client('iam')
# Get current role tags
try:
response = iam_client.list_role_tags(RoleName=role_name)
current_tags = response.get('Tags', [])
except iam_client.exceptions.NoSuchEntityException:
return False, f"Role {role_name} not found", "N/A"
# Find tags to remove
tags_to_remove = []
for tag in current_tags:
if (tag['Key'] == self.target_tag_key and
tag['Value'] == self.target_tag_value):
tags_to_remove.append(tag['Key'])
# Remove tags if found
if tags_to_remove:
iam_client.untag_role(
RoleName=role_name,
TagKeys=tags_to_remove
)
removed_tags_str = f"{self.target_tag_key}={self.target_tag_value}"
return True, f"Successfully removed {len(tags_to_remove)} tag(s)", removed_tags_str
else:
return True, "No matching tags found to remove", "None"
except Exception as e:
return False, f"Error removing tags: {str(e)}", "Error"
def process_account(self, account_info: AccountInfo, progress: Progress, task_id: TaskID) -> ProcessingResult:
"""Process a single account - assume role and remove tags."""
account_id = account_info.account_id
target_role_name = account_info.role_name
progress.update(task_id, description=f"Processing account {account_id}...")
# Assume the intermediate role
session = self.assume_role(account_id, self.assume_role_name)
if not session:
result = ProcessingResult(
account_id=account_id,
account_name="Error",
role_name=target_role_name,
removed_tags="Error",
error=f"Failed to assume role {self.assume_role_name}",
success=False
)
with self.results_lock:
self.results.append(result)
return result
# Get account name from Organizations
account_name = self.get_account_name_from_orgs(account_id)
# Remove tags from target role
success, message, removed_tags = self.remove_role_tags(session, target_role_name, account_id)
result = ProcessingResult(
account_id=account_id,
account_name=account_name,
role_name=target_role_name,
removed_tags=removed_tags,
error=None if success else message,
success=success
)
with self.results_lock:
self.results.append(result)
progress.update(task_id, description=f"Completed account {account_id}")
return result
def process_accounts_threaded(self, accounts: List[AccountInfo], max_workers: int = 10) -> List[ProcessingResult]:
"""Process multiple accounts using threading."""
console.print(f"\n[bold blue]Processing {len(accounts)} accounts with {max_workers} threads...[/bold blue]")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=console
) as progress:
main_task = progress.add_task("Overall Progress", total=len(accounts))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_account = {
executor.submit(self.process_account, account, progress, main_task): account
for account in accounts
}
# Process completed tasks
for future in as_completed(future_to_account):
try:
result = future.result()
progress.advance(main_task)
except Exception as e:
account = future_to_account[future]
console.print(f"[red]Error processing account {account.account_id}: {e}[/red]")
progress.advance(main_task)
return self.results
def display_results_table(self) -> None:
"""Display results in a rich table format."""
table = Table(title="AWS Role Tag Removal Results", show_header=True, header_style="bold magenta")
table.add_column("Account ID", style="cyan", no_wrap=True)
table.add_column("Account Name", style="blue")
table.add_column("Role Name", style="green")
table.add_column("Removed Tags", style="yellow")
table.add_column("Status", style="bold")
success_count = 0
error_count = 0
for result in sorted(self.results, key=lambda x: x.account_id):
if result.success:
status = "[green]✓ Success[/green]"
success_count += 1
else:
status = "[red]✗ Failed[/red]"
error_count += 1
table.add_row(
result.account_id,
result.account_name,
result.role_name,
result.removed_tags,
status
)
console.print("\n")
console.print(table)
# Summary
console.print(f"\n[bold]Summary:[/bold]")
console.print(f"[green]Successful: {success_count}[/green]")
console.print(f"[red]Failed: {error_count}[/red]")
console.print(f"[blue]Total: {len(self.results)}[/blue]")
def save_results_csv(self, output_file: str = None) -> str:
"""Save results to CSV file."""
if not output_file:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"role_tag_removal_results_{timestamp}.csv"
fieldnames = ['AccountID', 'AccountName', 'RoleName', 'RemovedTags', 'Status', 'Error']
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for result in sorted(self.results, key=lambda x: x.account_id):
writer.writerow({
'AccountID': result.account_id,
'AccountName': result.account_name,
'RoleName': result.role_name,
'RemovedTags': result.removed_tags,
'Status': 'Success' if result.success else 'Failed',
'Error': result.error or ''
})
return output_file
@click.command()
@click.option('--accounts-file', '-f', default='accounts.txt',
help='Path to the accounts file (default: accounts.txt)')
@click.option('--assume-role', '-r', default='ca-iam-cie-a',
help='Role name to assume in target accounts (default: ca-iam-cie-a)')
@click.option('--tag-key', '-k', default='syf:iam:network-perimeter-exception',
help='Tag key to remove (default: syf:iam:network-perimeter-exception)')
@click.option('--tag-value', '-v', default='true',
help='Tag value to match for removal (default: true)')
@click.option('--max-workers', '-w', default=10, type=int,
help='Maximum number of worker threads (default: 10)')
@click.option('--output-file', '-o', help='Output CSV file name (auto-generated if not specified)')
def main(accounts_file: str, assume_role: str, tag_key: str, tag_value: str,
max_workers: int, output_file: str):
"""
AWS Role Tag Removal Automation
This script processes multiple AWS accounts, assumes specified roles,
and removes target tags from IAM roles.
"""
console.print("[bold blue]AWS Role Tag Removal Automation[/bold blue]")
console.print(f"[dim]Target tag: {tag_key}={tag_value}[/dim]")
console.print(f"[dim]Assume role: {assume_role}[/dim]")
console.print(f"[dim]Account names retrieved from AWS Organizations[/dim]\n")
try:
# Initialize the remover
remover = RoleTagRemover(
assume_role_name=assume_role,
target_tag_key=tag_key,
target_tag_value=tag_value
)
# Load accounts
console.print(f"[bold]Loading accounts from {accounts_file}...[/bold]")
accounts = remover.load_accounts(accounts_file)
console.print(f"[green]Loaded {len(accounts)} accounts[/green]")
if not accounts:
console.print("[red]No accounts found to process[/red]")
return
# Process accounts
results = remover.process_accounts_threaded(accounts, max_workers)
# Display results
remover.display_results_table()
# Save to CSV
output_path = remover.save_results_csv(output_file)
console.print(f"\n[bold green]Results saved to: {output_path}[/bold green]")
except KeyboardInterrupt:
console.print("\n[yellow]Operation cancelled by user[/yellow]")
except Exception as e:
console.print(f"\n[red]Error: {e}[/red]")
raise
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment