Last active
July 24, 2025 13:13
-
-
Save RajChowdhury240/f834c832421c46f41c8edad2e53c4b82 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
python role_tag_removal.py \ | |
--accounts-file accounts.txt \ | |
--assume-role ca-iam-cie-a \ | |
--tag-key syf:iam:network-perimeter-exception \ | |
--tag-value true \ | |
--max-workers 10 \ | |
--output-file custom_results.csv | |
''' | |
#!/usr/bin/env python3 | |
""" | |
AWS Role Tag Removal Automation Script | |
This script assumes a specified role in multiple AWS accounts and removes | |
specific tags from target roles as defined in the accounts.txt file. | |
The script uses AWS Organizations to retrieve account names and requires | |
permissions to call organizations:DescribeAccount from the management account. | |
Usage: | |
python role_tag_removal.py --accounts-file accounts.txt | |
Requirements: | |
pip install boto3 rich click | |
AWS Permissions Required: | |
- organizations:DescribeAccount (in management account) | |
- sts:AssumeRole (for target accounts) | |
- iam:ListRoleTags, iam:UntagRole (in target roles) | |
""" | |
import boto3 | |
import click | |
import csv | |
import threading | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from dataclasses import dataclass, asdict | |
from datetime import datetime | |
from pathlib import Path | |
from typing import List, Tuple, Optional | |
import time | |
from rich.console import Console | |
from rich.progress import Progress, TaskID, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn | |
from rich.table import Table | |
from rich import print as rprint | |
console = Console() | |
@dataclass | |
class AccountInfo: | |
account_id: str | |
role_name: str | |
@dataclass | |
class ProcessingResult: | |
account_id: str | |
account_name: str | |
role_name: str | |
removed_tags: str | |
error: Optional[str] = None | |
success: bool = True | |
class RoleTagRemover: | |
def __init__(self, assume_role_name: str = 'ca-iam-cie-a', | |
target_tag_key: str = 'syf:iam:network-perimeter-exception', | |
target_tag_value: str = 'true'): | |
self.assume_role_name = assume_role_name | |
self.target_tag_key = target_tag_key | |
self.target_tag_value = target_tag_value | |
self.results: List[ProcessingResult] = [] | |
self.results_lock = threading.Lock() | |
def load_accounts(self, file_path: str) -> List[AccountInfo]: | |
"""Load account information from the specified file.""" | |
accounts = [] | |
try: | |
with open(file_path, 'r') as f: | |
for line_num, line in enumerate(f, 1): | |
line = line.strip() | |
if not line or line.startswith('#'): | |
continue | |
try: | |
parts = line.split(',') | |
if len(parts) != 2: | |
console.print(f"[yellow]Warning: Invalid format on line {line_num}: {line}[/yellow]") | |
continue | |
account_id, role_name = parts[0].strip(), parts[1].strip() | |
accounts.append(AccountInfo(account_id, role_name)) | |
except Exception as e: | |
console.print(f"[red]Error parsing line {line_num}: {e}[/red]") | |
except FileNotFoundError: | |
console.print(f"[red]Error: File {file_path} not found[/red]") | |
raise | |
except Exception as e: | |
console.print(f"[red]Error reading file {file_path}: {e}[/red]") | |
raise | |
return accounts | |
def assume_role(self, account_id: str, role_name: str) -> Optional[boto3.Session]: | |
"""Assume the specified role in the target account.""" | |
try: | |
sts_client = boto3.client('sts') | |
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}" | |
response = sts_client.assume_role( | |
RoleArn=role_arn, | |
RoleSessionName=f"tag-removal-{int(time.time())}" | |
) | |
credentials = response['Credentials'] | |
session = boto3.Session( | |
aws_access_key_id=credentials['AccessKeyId'], | |
aws_secret_access_key=credentials['SecretAccessKey'], | |
aws_session_token=credentials['SessionToken'] | |
) | |
return session | |
except Exception as e: | |
console.print(f"[red]Failed to assume role {role_name} in account {account_id}: {e}[/red]") | |
return None | |
def get_account_name_from_orgs(self, account_id: str) -> str: | |
"""Get the account name from AWS Organizations.""" | |
try: | |
# Use the default session (management account) to query Organizations | |
org_client = boto3.client('organizations') | |
response = org_client.describe_account(AccountId=account_id) | |
return response['Account']['Name'] | |
except Exception as e: | |
console.print(f"[yellow]Warning: Could not get account name for {account_id} from Organizations: {e}[/yellow]") | |
return account_id # Return account ID as fallback | |
def remove_role_tags(self, session: boto3.Session, role_name: str, account_id: str) -> Tuple[bool, str, str]: | |
"""Remove specified tags from the target role.""" | |
try: | |
iam_client = session.client('iam') | |
# Get current role tags | |
try: | |
response = iam_client.list_role_tags(RoleName=role_name) | |
current_tags = response.get('Tags', []) | |
except iam_client.exceptions.NoSuchEntityException: | |
return False, f"Role {role_name} not found", "N/A" | |
# Find tags to remove | |
tags_to_remove = [] | |
for tag in current_tags: | |
if (tag['Key'] == self.target_tag_key and | |
tag['Value'] == self.target_tag_value): | |
tags_to_remove.append(tag['Key']) | |
# Remove tags if found | |
if tags_to_remove: | |
iam_client.untag_role( | |
RoleName=role_name, | |
TagKeys=tags_to_remove | |
) | |
removed_tags_str = f"{self.target_tag_key}={self.target_tag_value}" | |
return True, f"Successfully removed {len(tags_to_remove)} tag(s)", removed_tags_str | |
else: | |
return True, "No matching tags found to remove", "None" | |
except Exception as e: | |
return False, f"Error removing tags: {str(e)}", "Error" | |
def process_account(self, account_info: AccountInfo, progress: Progress, task_id: TaskID) -> ProcessingResult: | |
"""Process a single account - assume role and remove tags.""" | |
account_id = account_info.account_id | |
target_role_name = account_info.role_name | |
progress.update(task_id, description=f"Processing account {account_id}...") | |
# Assume the intermediate role | |
session = self.assume_role(account_id, self.assume_role_name) | |
if not session: | |
result = ProcessingResult( | |
account_id=account_id, | |
account_name="Error", | |
role_name=target_role_name, | |
removed_tags="Error", | |
error=f"Failed to assume role {self.assume_role_name}", | |
success=False | |
) | |
with self.results_lock: | |
self.results.append(result) | |
return result | |
# Get account name from Organizations | |
account_name = self.get_account_name_from_orgs(account_id) | |
# Remove tags from target role | |
success, message, removed_tags = self.remove_role_tags(session, target_role_name, account_id) | |
result = ProcessingResult( | |
account_id=account_id, | |
account_name=account_name, | |
role_name=target_role_name, | |
removed_tags=removed_tags, | |
error=None if success else message, | |
success=success | |
) | |
with self.results_lock: | |
self.results.append(result) | |
progress.update(task_id, description=f"Completed account {account_id}") | |
return result | |
def process_accounts_threaded(self, accounts: List[AccountInfo], max_workers: int = 10) -> List[ProcessingResult]: | |
"""Process multiple accounts using threading.""" | |
console.print(f"\n[bold blue]Processing {len(accounts)} accounts with {max_workers} threads...[/bold blue]") | |
with Progress( | |
SpinnerColumn(), | |
TextColumn("[progress.description]{task.description}"), | |
BarColumn(), | |
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
TimeElapsedColumn(), | |
console=console | |
) as progress: | |
main_task = progress.add_task("Overall Progress", total=len(accounts)) | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
# Submit all tasks | |
future_to_account = { | |
executor.submit(self.process_account, account, progress, main_task): account | |
for account in accounts | |
} | |
# Process completed tasks | |
for future in as_completed(future_to_account): | |
try: | |
result = future.result() | |
progress.advance(main_task) | |
except Exception as e: | |
account = future_to_account[future] | |
console.print(f"[red]Error processing account {account.account_id}: {e}[/red]") | |
progress.advance(main_task) | |
return self.results | |
def display_results_table(self) -> None: | |
"""Display results in a rich table format.""" | |
table = Table(title="AWS Role Tag Removal Results", show_header=True, header_style="bold magenta") | |
table.add_column("Account ID", style="cyan", no_wrap=True) | |
table.add_column("Account Name", style="blue") | |
table.add_column("Role Name", style="green") | |
table.add_column("Removed Tags", style="yellow") | |
table.add_column("Status", style="bold") | |
success_count = 0 | |
error_count = 0 | |
for result in sorted(self.results, key=lambda x: x.account_id): | |
if result.success: | |
status = "[green]✓ Success[/green]" | |
success_count += 1 | |
else: | |
status = "[red]✗ Failed[/red]" | |
error_count += 1 | |
table.add_row( | |
result.account_id, | |
result.account_name, | |
result.role_name, | |
result.removed_tags, | |
status | |
) | |
console.print("\n") | |
console.print(table) | |
# Summary | |
console.print(f"\n[bold]Summary:[/bold]") | |
console.print(f"[green]Successful: {success_count}[/green]") | |
console.print(f"[red]Failed: {error_count}[/red]") | |
console.print(f"[blue]Total: {len(self.results)}[/blue]") | |
def save_results_csv(self, output_file: str = None) -> str: | |
"""Save results to CSV file.""" | |
if not output_file: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
output_file = f"role_tag_removal_results_{timestamp}.csv" | |
fieldnames = ['AccountID', 'AccountName', 'RoleName', 'RemovedTags', 'Status', 'Error'] | |
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
writer.writeheader() | |
for result in sorted(self.results, key=lambda x: x.account_id): | |
writer.writerow({ | |
'AccountID': result.account_id, | |
'AccountName': result.account_name, | |
'RoleName': result.role_name, | |
'RemovedTags': result.removed_tags, | |
'Status': 'Success' if result.success else 'Failed', | |
'Error': result.error or '' | |
}) | |
return output_file | |
@click.command() | |
@click.option('--accounts-file', '-f', default='accounts.txt', | |
help='Path to the accounts file (default: accounts.txt)') | |
@click.option('--assume-role', '-r', default='ca-iam-cie-a', | |
help='Role name to assume in target accounts (default: ca-iam-cie-a)') | |
@click.option('--tag-key', '-k', default='syf:iam:network-perimeter-exception', | |
help='Tag key to remove (default: syf:iam:network-perimeter-exception)') | |
@click.option('--tag-value', '-v', default='true', | |
help='Tag value to match for removal (default: true)') | |
@click.option('--max-workers', '-w', default=10, type=int, | |
help='Maximum number of worker threads (default: 10)') | |
@click.option('--output-file', '-o', help='Output CSV file name (auto-generated if not specified)') | |
def main(accounts_file: str, assume_role: str, tag_key: str, tag_value: str, | |
max_workers: int, output_file: str): | |
""" | |
AWS Role Tag Removal Automation | |
This script processes multiple AWS accounts, assumes specified roles, | |
and removes target tags from IAM roles. | |
""" | |
console.print("[bold blue]AWS Role Tag Removal Automation[/bold blue]") | |
console.print(f"[dim]Target tag: {tag_key}={tag_value}[/dim]") | |
console.print(f"[dim]Assume role: {assume_role}[/dim]") | |
console.print(f"[dim]Account names retrieved from AWS Organizations[/dim]\n") | |
try: | |
# Initialize the remover | |
remover = RoleTagRemover( | |
assume_role_name=assume_role, | |
target_tag_key=tag_key, | |
target_tag_value=tag_value | |
) | |
# Load accounts | |
console.print(f"[bold]Loading accounts from {accounts_file}...[/bold]") | |
accounts = remover.load_accounts(accounts_file) | |
console.print(f"[green]Loaded {len(accounts)} accounts[/green]") | |
if not accounts: | |
console.print("[red]No accounts found to process[/red]") | |
return | |
# Process accounts | |
results = remover.process_accounts_threaded(accounts, max_workers) | |
# Display results | |
remover.display_results_table() | |
# Save to CSV | |
output_path = remover.save_results_csv(output_file) | |
console.print(f"\n[bold green]Results saved to: {output_path}[/bold green]") | |
except KeyboardInterrupt: | |
console.print("\n[yellow]Operation cancelled by user[/yellow]") | |
except Exception as e: | |
console.print(f"\n[red]Error: {e}[/red]") | |
raise | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment