Skip to content

Instantly share code, notes, and snippets.

@RajChowdhury240
Created July 15, 2025 17:48
Show Gist options
  • Save RajChowdhury240/5bfefcb4a0b8d73dd6985c513d55dcb7 to your computer and use it in GitHub Desktop.
Save RajChowdhury240/5bfefcb4a0b8d73dd6985c513d55dcb7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
AWS IAM Tag Removal Automation
Removes specified tags from IAM roles across multiple AWS accounts
"""
import boto3
import csv
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime
from typing import List, Tuple, Dict, Any
import time
import sys
from rich.console import Console
from rich.progress import Progress, TaskID, SpinnerColumn, TextColumn, BarColumn, TimeRemainingColumn
from rich.table import Table
from rich.panel import Panel
from rich.text import Text
@dataclass
class AccountInfo:
account_id: str
role_name: str
@dataclass
class ProcessingResult:
account_id: str
role_name: str
status: str
roles_processed: int
tags_removed: int
error_message: str = ""
processing_time: float = 0.0
class IAMTagRemover:
def __init__(self, accounts_file: str, tag_key: str = "syf:iam:network-perimeter-exception",
tag_value: str = "true", max_workers: int = 10):
self.accounts_file = accounts_file
self.tag_key = tag_key
self.tag_value = tag_value
self.max_workers = max_workers
self.console = Console()
self.results: List[ProcessingResult] = []
self.lock = threading.Lock()
def load_accounts(self) -> List[AccountInfo]:
"""Load account information from the text file"""
accounts = []
try:
with open(self.accounts_file, 'r') as file:
for line_num, line in enumerate(file, 1):
line = line.strip()
if not line:
continue
parts = line.split(',')
if len(parts) != 2:
self.console.print(f"[yellow]Warning: Invalid format on line {line_num}: {line}[/yellow]")
continue
account_id, role_name = parts[0].strip(), parts[1].strip()
accounts.append(AccountInfo(account_id, role_name))
except FileNotFoundError:
self.console.print(f"[red]Error: File {self.accounts_file} not found[/red]")
sys.exit(1)
except Exception as e:
self.console.print(f"[red]Error reading file: {str(e)}[/red]")
sys.exit(1)
return accounts
def assume_role(self, account_id: str, role_name: str) -> boto3.Session:
"""Assume the specified role in the target account"""
sts_client = boto3.client('sts')
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
try:
response = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=f"iam-tag-removal-{account_id}-{int(time.time())}"
)
credentials = response['Credentials']
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return session
except Exception as e:
raise Exception(f"Failed to assume role {role_arn}: {str(e)}")
def remove_tags_from_role(self, iam_client, role_name: str) -> bool:
"""Remove specified tags from a single IAM role"""
try:
# Get current tags
response = iam_client.list_role_tags(RoleName=role_name)
current_tags = response.get('Tags', [])
# Check if the target tag exists
tag_exists = any(
tag['Key'] == self.tag_key and tag['Value'] == self.tag_value
for tag in current_tags
)
if tag_exists:
# Remove the tag
iam_client.untag_role(
RoleName=role_name,
TagKeys=[self.tag_key]
)
return True
return False
except Exception as e:
raise Exception(f"Failed to remove tag from role {role_name}: {str(e)}")
def process_account(self, account_info: AccountInfo, progress: Progress, task_id: TaskID) -> ProcessingResult:
"""Process a single AWS account"""
start_time = time.time()
result = ProcessingResult(
account_id=account_info.account_id,
role_name=account_info.role_name,
status="Failed",
roles_processed=0,
tags_removed=0
)
try:
# Update progress
progress.update(task_id, description=f"Processing account {account_info.account_id}")
# Assume role
session = self.assume_role(account_info.account_id, account_info.role_name)
iam_client = session.client('iam')
# Get all IAM roles
paginator = iam_client.get_paginator('list_roles')
roles_processed = 0
tags_removed = 0
for page in paginator.paginate():
for role in page['Roles']:
role_name = role['RoleName']
try:
if self.remove_tags_from_role(iam_client, role_name):
tags_removed += 1
roles_processed += 1
except Exception as e:
# Log individual role errors but continue processing
self.console.print(f"[yellow]Warning: Error processing role {role_name} in account {account_info.account_id}: {str(e)}[/yellow]")
result.status = "Success"
result.roles_processed = roles_processed
result.tags_removed = tags_removed
except Exception as e:
result.error_message = str(e)
result.status = "Failed"
finally:
result.processing_time = time.time() - start_time
return result
def generate_csv_report(self, results: List[ProcessingResult], filename: str = None):
"""Generate CSV report with processing results"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"iam_tag_removal_report_{timestamp}.csv"
with open(filename, 'w', newline='') as csvfile:
fieldnames = [
'Account ID', 'Role Name', 'Status', 'Roles Processed',
'Tags Removed', 'Processing Time (s)', 'Error Message'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for result in results:
writer.writerow({
'Account ID': result.account_id,
'Role Name': result.role_name,
'Status': result.status,
'Roles Processed': result.roles_processed,
'Tags Removed': result.tags_removed,
'Processing Time (s)': f"{result.processing_time:.2f}",
'Error Message': result.error_message
})
return filename
def display_summary(self, results: List[ProcessingResult]):
"""Display summary statistics"""
total_accounts = len(results)
successful_accounts = sum(1 for r in results if r.status == "Success")
failed_accounts = total_accounts - successful_accounts
total_roles_processed = sum(r.roles_processed for r in results)
total_tags_removed = sum(r.tags_removed for r in results)
total_time = sum(r.processing_time for r in results)
table = Table(title="Processing Summary")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="magenta")
table.add_row("Total Accounts", str(total_accounts))
table.add_row("Successful", str(successful_accounts))
table.add_row("Failed", str(failed_accounts))
table.add_row("Total Roles Processed", str(total_roles_processed))
table.add_row("Total Tags Removed", str(total_tags_removed))
table.add_row("Total Processing Time", f"{total_time:.2f}s")
table.add_row("Average Time per Account", f"{total_time/total_accounts:.2f}s")
self.console.print(table)
# Show failed accounts if any
if failed_accounts > 0:
self.console.print("\n[red]Failed Accounts:[/red]")
failed_table = Table()
failed_table.add_column("Account ID", style="red")
failed_table.add_column("Error", style="yellow")
for result in results:
if result.status == "Failed":
failed_table.add_row(result.account_id, result.error_message)
self.console.print(failed_table)
def run(self):
"""Main execution method"""
self.console.print(Panel.fit(
"[bold blue]AWS IAM Tag Removal Automation[/bold blue]\n"
f"Target Tag: {self.tag_key} = {self.tag_value}",
title="Starting Process"
))
# Load accounts
accounts = self.load_accounts()
self.console.print(f"[green]Loaded {len(accounts)} accounts from {self.accounts_file}[/green]")
# Setup progress tracking
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
console=self.console
) as progress:
main_task = progress.add_task("Overall Progress", total=len(accounts))
# Process accounts with threading
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_account = {
executor.submit(self.process_account, account, progress, main_task): account
for account in accounts
}
# Collect results as they complete
for future in as_completed(future_to_account):
account = future_to_account[future]
try:
result = future.result()
with self.lock:
self.results.append(result)
# Update progress
progress.update(main_task, advance=1)
# Show current status
status_color = "green" if result.status == "Success" else "red"
self.console.print(
f"[{status_color}]{result.account_id}[/{status_color}]: "
f"{result.status} - {result.roles_processed} roles, "
f"{result.tags_removed} tags removed"
)
except Exception as e:
self.console.print(f"[red]Unexpected error processing {account.account_id}: {str(e)}[/red]")
# Generate report and display summary
report_filename = self.generate_csv_report(self.results)
self.console.print(f"\n[green]CSV report generated: {report_filename}[/green]")
self.display_summary(self.results)
def main():
"""Main function"""
console = Console()
# Configuration
accounts_file = "accounts.txt" # Change this to your file path
tag_key = "syf:iam:network-perimeter-exception"
tag_value = "true"
max_workers = 10 # Adjust based on your needs and AWS limits
console.print(Panel.fit(
"[bold]AWS IAM Tag Removal Tool[/bold]\n\n"
f"Accounts file: {accounts_file}\n"
f"Target tag: {tag_key} = {tag_value}\n"
f"Max workers: {max_workers}",
title="Configuration"
))
# Create and run the remover
remover = IAMTagRemover(
accounts_file=accounts_file,
tag_key=tag_key,
tag_value=tag_value,
max_workers=max_workers
)
try:
remover.run()
except KeyboardInterrupt:
console.print("\n[yellow]Process interrupted by user[/yellow]")
except Exception as e:
console.print(f"\n[red]Fatal error: {str(e)}[/red]")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment