Created
July 15, 2025 17:48
-
-
Save RajChowdhury240/5bfefcb4a0b8d73dd6985c513d55dcb7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
AWS IAM Tag Removal Automation | |
Removes specified tags from IAM roles across multiple AWS accounts | |
""" | |
import boto3 | |
import csv | |
import threading | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from dataclasses import dataclass | |
from datetime import datetime | |
from typing import List, Tuple, Dict, Any | |
import time | |
import sys | |
from rich.console import Console | |
from rich.progress import Progress, TaskID, SpinnerColumn, TextColumn, BarColumn, TimeRemainingColumn | |
from rich.table import Table | |
from rich.panel import Panel | |
from rich.text import Text | |
@dataclass | |
class AccountInfo: | |
account_id: str | |
role_name: str | |
@dataclass | |
class ProcessingResult: | |
account_id: str | |
role_name: str | |
status: str | |
roles_processed: int | |
tags_removed: int | |
error_message: str = "" | |
processing_time: float = 0.0 | |
class IAMTagRemover: | |
def __init__(self, accounts_file: str, tag_key: str = "syf:iam:network-perimeter-exception", | |
tag_value: str = "true", max_workers: int = 10): | |
self.accounts_file = accounts_file | |
self.tag_key = tag_key | |
self.tag_value = tag_value | |
self.max_workers = max_workers | |
self.console = Console() | |
self.results: List[ProcessingResult] = [] | |
self.lock = threading.Lock() | |
def load_accounts(self) -> List[AccountInfo]: | |
"""Load account information from the text file""" | |
accounts = [] | |
try: | |
with open(self.accounts_file, 'r') as file: | |
for line_num, line in enumerate(file, 1): | |
line = line.strip() | |
if not line: | |
continue | |
parts = line.split(',') | |
if len(parts) != 2: | |
self.console.print(f"[yellow]Warning: Invalid format on line {line_num}: {line}[/yellow]") | |
continue | |
account_id, role_name = parts[0].strip(), parts[1].strip() | |
accounts.append(AccountInfo(account_id, role_name)) | |
except FileNotFoundError: | |
self.console.print(f"[red]Error: File {self.accounts_file} not found[/red]") | |
sys.exit(1) | |
except Exception as e: | |
self.console.print(f"[red]Error reading file: {str(e)}[/red]") | |
sys.exit(1) | |
return accounts | |
def assume_role(self, account_id: str, role_name: str) -> boto3.Session: | |
"""Assume the specified role in the target account""" | |
sts_client = boto3.client('sts') | |
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}" | |
try: | |
response = sts_client.assume_role( | |
RoleArn=role_arn, | |
RoleSessionName=f"iam-tag-removal-{account_id}-{int(time.time())}" | |
) | |
credentials = response['Credentials'] | |
session = boto3.Session( | |
aws_access_key_id=credentials['AccessKeyId'], | |
aws_secret_access_key=credentials['SecretAccessKey'], | |
aws_session_token=credentials['SessionToken'] | |
) | |
return session | |
except Exception as e: | |
raise Exception(f"Failed to assume role {role_arn}: {str(e)}") | |
def remove_tags_from_role(self, iam_client, role_name: str) -> bool: | |
"""Remove specified tags from a single IAM role""" | |
try: | |
# Get current tags | |
response = iam_client.list_role_tags(RoleName=role_name) | |
current_tags = response.get('Tags', []) | |
# Check if the target tag exists | |
tag_exists = any( | |
tag['Key'] == self.tag_key and tag['Value'] == self.tag_value | |
for tag in current_tags | |
) | |
if tag_exists: | |
# Remove the tag | |
iam_client.untag_role( | |
RoleName=role_name, | |
TagKeys=[self.tag_key] | |
) | |
return True | |
return False | |
except Exception as e: | |
raise Exception(f"Failed to remove tag from role {role_name}: {str(e)}") | |
def process_account(self, account_info: AccountInfo, progress: Progress, task_id: TaskID) -> ProcessingResult: | |
"""Process a single AWS account""" | |
start_time = time.time() | |
result = ProcessingResult( | |
account_id=account_info.account_id, | |
role_name=account_info.role_name, | |
status="Failed", | |
roles_processed=0, | |
tags_removed=0 | |
) | |
try: | |
# Update progress | |
progress.update(task_id, description=f"Processing account {account_info.account_id}") | |
# Assume role | |
session = self.assume_role(account_info.account_id, account_info.role_name) | |
iam_client = session.client('iam') | |
# Get all IAM roles | |
paginator = iam_client.get_paginator('list_roles') | |
roles_processed = 0 | |
tags_removed = 0 | |
for page in paginator.paginate(): | |
for role in page['Roles']: | |
role_name = role['RoleName'] | |
try: | |
if self.remove_tags_from_role(iam_client, role_name): | |
tags_removed += 1 | |
roles_processed += 1 | |
except Exception as e: | |
# Log individual role errors but continue processing | |
self.console.print(f"[yellow]Warning: Error processing role {role_name} in account {account_info.account_id}: {str(e)}[/yellow]") | |
result.status = "Success" | |
result.roles_processed = roles_processed | |
result.tags_removed = tags_removed | |
except Exception as e: | |
result.error_message = str(e) | |
result.status = "Failed" | |
finally: | |
result.processing_time = time.time() - start_time | |
return result | |
def generate_csv_report(self, results: List[ProcessingResult], filename: str = None): | |
"""Generate CSV report with processing results""" | |
if filename is None: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"iam_tag_removal_report_{timestamp}.csv" | |
with open(filename, 'w', newline='') as csvfile: | |
fieldnames = [ | |
'Account ID', 'Role Name', 'Status', 'Roles Processed', | |
'Tags Removed', 'Processing Time (s)', 'Error Message' | |
] | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
writer.writeheader() | |
for result in results: | |
writer.writerow({ | |
'Account ID': result.account_id, | |
'Role Name': result.role_name, | |
'Status': result.status, | |
'Roles Processed': result.roles_processed, | |
'Tags Removed': result.tags_removed, | |
'Processing Time (s)': f"{result.processing_time:.2f}", | |
'Error Message': result.error_message | |
}) | |
return filename | |
def display_summary(self, results: List[ProcessingResult]): | |
"""Display summary statistics""" | |
total_accounts = len(results) | |
successful_accounts = sum(1 for r in results if r.status == "Success") | |
failed_accounts = total_accounts - successful_accounts | |
total_roles_processed = sum(r.roles_processed for r in results) | |
total_tags_removed = sum(r.tags_removed for r in results) | |
total_time = sum(r.processing_time for r in results) | |
table = Table(title="Processing Summary") | |
table.add_column("Metric", style="cyan") | |
table.add_column("Value", style="magenta") | |
table.add_row("Total Accounts", str(total_accounts)) | |
table.add_row("Successful", str(successful_accounts)) | |
table.add_row("Failed", str(failed_accounts)) | |
table.add_row("Total Roles Processed", str(total_roles_processed)) | |
table.add_row("Total Tags Removed", str(total_tags_removed)) | |
table.add_row("Total Processing Time", f"{total_time:.2f}s") | |
table.add_row("Average Time per Account", f"{total_time/total_accounts:.2f}s") | |
self.console.print(table) | |
# Show failed accounts if any | |
if failed_accounts > 0: | |
self.console.print("\n[red]Failed Accounts:[/red]") | |
failed_table = Table() | |
failed_table.add_column("Account ID", style="red") | |
failed_table.add_column("Error", style="yellow") | |
for result in results: | |
if result.status == "Failed": | |
failed_table.add_row(result.account_id, result.error_message) | |
self.console.print(failed_table) | |
def run(self): | |
"""Main execution method""" | |
self.console.print(Panel.fit( | |
"[bold blue]AWS IAM Tag Removal Automation[/bold blue]\n" | |
f"Target Tag: {self.tag_key} = {self.tag_value}", | |
title="Starting Process" | |
)) | |
# Load accounts | |
accounts = self.load_accounts() | |
self.console.print(f"[green]Loaded {len(accounts)} accounts from {self.accounts_file}[/green]") | |
# Setup progress tracking | |
with Progress( | |
SpinnerColumn(), | |
TextColumn("[progress.description]{task.description}"), | |
BarColumn(), | |
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
TimeRemainingColumn(), | |
console=self.console | |
) as progress: | |
main_task = progress.add_task("Overall Progress", total=len(accounts)) | |
# Process accounts with threading | |
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
# Submit all tasks | |
future_to_account = { | |
executor.submit(self.process_account, account, progress, main_task): account | |
for account in accounts | |
} | |
# Collect results as they complete | |
for future in as_completed(future_to_account): | |
account = future_to_account[future] | |
try: | |
result = future.result() | |
with self.lock: | |
self.results.append(result) | |
# Update progress | |
progress.update(main_task, advance=1) | |
# Show current status | |
status_color = "green" if result.status == "Success" else "red" | |
self.console.print( | |
f"[{status_color}]{result.account_id}[/{status_color}]: " | |
f"{result.status} - {result.roles_processed} roles, " | |
f"{result.tags_removed} tags removed" | |
) | |
except Exception as e: | |
self.console.print(f"[red]Unexpected error processing {account.account_id}: {str(e)}[/red]") | |
# Generate report and display summary | |
report_filename = self.generate_csv_report(self.results) | |
self.console.print(f"\n[green]CSV report generated: {report_filename}[/green]") | |
self.display_summary(self.results) | |
def main(): | |
"""Main function""" | |
console = Console() | |
# Configuration | |
accounts_file = "accounts.txt" # Change this to your file path | |
tag_key = "syf:iam:network-perimeter-exception" | |
tag_value = "true" | |
max_workers = 10 # Adjust based on your needs and AWS limits | |
console.print(Panel.fit( | |
"[bold]AWS IAM Tag Removal Tool[/bold]\n\n" | |
f"Accounts file: {accounts_file}\n" | |
f"Target tag: {tag_key} = {tag_value}\n" | |
f"Max workers: {max_workers}", | |
title="Configuration" | |
)) | |
# Create and run the remover | |
remover = IAMTagRemover( | |
accounts_file=accounts_file, | |
tag_key=tag_key, | |
tag_value=tag_value, | |
max_workers=max_workers | |
) | |
try: | |
remover.run() | |
except KeyboardInterrupt: | |
console.print("\n[yellow]Process interrupted by user[/yellow]") | |
except Exception as e: | |
console.print(f"\n[red]Fatal error: {str(e)}[/red]") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment