Created
February 2, 2022 00:31
-
-
Save jlinoff/464423f9e8a3922f5458f4b256e6ca92 to your computer and use it in GitHub Desktop.
Report all of the AWS service or resource types present in your accounts and regions from the command line (CLI).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
Report all of the service or resource types present in accounts and | |
regions. | |
It generates six different reports: | |
1. diff the services between two accounts. | |
2. diff the resources between two accounts. | |
3. summary of services present in one or more accounts. | |
4. summary of resources present in one or more accounts. | |
5. complete ARN report of services present in one or more accounts. | |
6. complete ARN of resources present in one or more accounts. | |
Each account is specified by its profile name and region separated by | |
a colon. You can specify as many accounts as you like. See help (`-h`) | |
for more information. | |
The tool supports filtering so you can limit the scope of the | |
reports. | |
You can specify accept criteria using `-a`, like this to see | |
only the lambdas: `-a 'arn:aws:lambda:'`. | |
In addition, you can specify reject criteria using `-r` to see | |
everything _except_ the lambdas: `-r 'arn:aws:lambda:'`. See the | |
online help (`-h`) for more information. | |
This tools uses the "resourcegroupstaggingapi" CLI service. | |
Although it works well for accounts and regions but it is not clear | |
that this service works for global services or resources. | |
It assumes that the user is logged into the account using a profile. | |
To use this tool in pipenv do this: | |
$ pipenv install --python python3 types-boto3 boto3 pylint mypy | |
$ pipenv run ./aws-inventory.py -h | |
$ pipenv run pylint ./aws-inventory.py | |
$ pipenv run mypy ./aws-inventory.py | |
You will need to have profiles and be logged in to use this tool. | |
''' | |
import argparse | |
import datetime | |
import inspect | |
import os | |
import re | |
import sys | |
from typing import Any, Dict, Tuple | |
import boto3 | |
__version__ = '1.0.0' | |
AWS_PROFILE = os.getenv('AWS_PROFILE', '') | |
AWS_REGION = os.getenv('AWS_REGION', '') | |
RPD = int(os.getenv('RPD', '100')) # records per dot | |
DPL = int(os.getenv('DPL', '100')) # dots per line | |
def info(msg: str, level: int = 1) -> None: | |
''' | |
Output an info message. | |
''' | |
lnum = inspect.stack()[level].lineno | |
print(f'SERVER:{lnum}: {msg}') | |
def warn(msg: str, level: int = 1) -> None: | |
''' | |
Output a warning message. | |
''' | |
lnum = inspect.stack()[level].lineno | |
print(f'\033[31mWARNING:{lnum}: {msg}\033[0m', file=sys.stderr) | |
def err(msg: str, level: int = 1, xcode: int = 1, abort: bool = True) -> None: | |
''' | |
Output an error message and exit. | |
''' | |
lnum = inspect.stack()[level].lineno | |
print(f'\033[31mERROR:{lnum}: {msg}\033[0m', file=sys.stderr) | |
if abort: | |
sys.exit(xcode) | |
def getopts() -> argparse.Namespace: | |
''' | |
Command line options. | |
@returns The command line options. | |
''' | |
def gettext(string): | |
''' | |
Convert to upper case to make things consistent. | |
''' | |
lookup = { | |
'usage: ': 'USAGE:', | |
'positional arguments': 'POSITIONAL ARGUMENTS', | |
'optional arguments': 'OPTIONAL ARGUMENTS', | |
'show this help message and exit': 'Show this help message and exit.\n ', | |
} | |
return lookup.get(string, string) | |
argparse._ = gettext # type: ignore # to capitalize help headers | |
base = os.path.basename(sys.argv[0]) | |
usage = '\n {base} [OPTIONS] ACCOUNTS' | |
desc = 'DESCRIPTION:{0}'.format('\n '.join(__doc__.split('\n'))) | |
epilog = fr''' | |
EXAMPLES: | |
# Example 1: help | |
$ {base} -h | |
# Example 2: Summary of the first 1,000 services. | |
$ {base} -Ss -m 1000 -v int:eu-west-1 | |
# Example 3: Summary of the first 1,000 resources. | |
$ {base} -s -m 1000 -v int:eu-west-1 | |
# Example 4: Summary of services for two accounts. | |
$ {base} -Ss -v work:eu-west-1 test:eu-west-1 | |
# Example 5: Summary of services for three accounts. | |
$ {base} -Ssv work:eu-west-1 test:eu-west-1 green:eu-west-1 | |
# Example 6: Summary of resources for three accounts. | |
$ {base} -sv work:eu-west-1 test:eu-west-1 green:eu-west-1 | |
# Example 7: Summary of resource differences between two accounts. | |
$ {base} -sdv blue:eu-west-1 green:eu-west-1 | |
LICENSE | |
MIT Open Source | |
VERSION | |
{base} {__version__} | |
''' | |
afc = argparse.RawTextHelpFormatter | |
parser = argparse.ArgumentParser(formatter_class=afc, | |
description=desc[:-2], | |
usage=usage, | |
epilog=epilog.rstrip() + '\n ') | |
parser.add_argument('-a', '--accept', | |
action='store', | |
default='', | |
help='''\ | |
Accept ARN by pattern. | |
This is a regular expression. | |
The default is to accept all ARNs. | |
ex: -f '^arn:aws:lambda|^arn:aws:session' | |
''') | |
parser.add_argument('-d', '--diff', | |
action='store_true', | |
default='', | |
help='''\ | |
REport the differences between the first | |
profile/region and all of the others. | |
''') | |
parser.add_argument('-m', '--max', | |
action='store', | |
type=int, | |
default=0, | |
help='''\ | |
The maximum number of resources to collect. | |
This is mainly for debugging. | |
''') | |
parser.add_argument('-l', '--long', | |
action='store_true', | |
help='''\ | |
The long report. Lists all of the ARNs. | |
For a complete report with the summary | |
and ARNs, specify '-ls'. | |
''') | |
parser.add_argument('-n', '--name-max', | |
action='store', | |
type=int, | |
default=0, | |
help='''\ | |
The maximum number of characters | |
in a resource or service name. | |
This is mainly for debugging. | |
''') | |
parser.add_argument('-R', '--reject', | |
action='store', | |
default='', | |
help='''\ | |
Reject ARN by pattern. | |
This is a regular expression. | |
The default is to reject no ARNs. | |
ex: -f '^arn:aws:lambda|^arn:aws:session' | |
''') | |
parser.add_argument('-s', '--summary', | |
action='store_true', | |
help='''\ | |
Summary only. | |
This is the default if neither -l or -s is specified. | |
Do not list all of the ARNs individually. | |
''') | |
parser.add_argument('-S', '--service-only', | |
action='store_true', | |
help='''\ | |
Serices only summary only. | |
Do not list all of the resource-types. | |
''') | |
parser.add_argument('-v', '--verbose', | |
action='count', | |
default=0, | |
help='''\ | |
Increase the level of verbosity. | |
''') | |
parser.add_argument('-V', '--version', | |
action='version', | |
version='%(prog)s version {__version__}', | |
help='''\ | |
Show program's version number and exit. | |
''') | |
parser.add_argument('ACCOUNTS', | |
nargs='*', | |
help='''\ | |
The AWS account profile name and region separated | |
by a colon. | |
''') | |
opts = parser.parse_args() | |
if not opts.summary and not opts.long: | |
opts.summary = True | |
return opts | |
def parse_accounts(opts: argparse.Namespace) -> list: | |
'''populate account the profile and regions | |
Args: | |
opts: Command line options. | |
Returns: | |
accounts: The augmented list of accounts specified on the command line. | |
''' | |
accounts = [] | |
for rec in opts.ACCOUNTS: | |
if rec.count(':') > 1: | |
err(f'invalid profile argument, too many colons: {rec}') | |
profile, region = rec.split(':') | |
if not profile: | |
if AWS_PROFILE: | |
profile = AWS_PROFILE | |
else: | |
err(f'missing profile in "{rec}" and AWS_PROFILE is not defined') | |
if not region: | |
if AWS_REGION: | |
region = AWS_REGION | |
else: | |
err(f'missing region in "{rec}" and AWS_REGION is not defined') | |
session = boto3.Session(profile_name=profile) | |
aid = session.client('sts', region_name=region).get_caller_identity().get('Account') | |
account = {'profile': profile, | |
'region': region, | |
'aid': aid} | |
accounts.append(account) | |
if not accounts: | |
accounts.append({'profile': AWS_PROFILE, 'region': AWS_REGION}) | |
return accounts | |
def collect( # pylint: disable=too-many-branches,too-many-locals,too-many-branches,too-many-statements | |
opts: argparse.Namespace, | |
profile: str, | |
region: str | |
) -> Tuple[dict, dict, int]: | |
'''collect the data | |
Args: | |
opts: Command line options. | |
profile: The account profile name. | |
region: The account region. | |
Returns: | |
arns: A dictionary of the collected ARN records. | |
by_types: The ARNs organized by service name or resource type. | |
total: The total number of records processed before filtering. | |
''' | |
by_types : Dict[str, Any] = {} | |
arns : Dict[str, Any] = {} | |
tokens : Dict[str, int] = {} | |
accept = re.compile(opts.accept) if opts.accept else re.compile('.') | |
reject = re.compile(opts.reject) if opts.reject else re.compile('.') | |
session = boto3.Session(profile_name=profile) | |
client = session.client('resourcegroupstaggingapi', region_name=region) | |
resources = client.get_resources() | |
total = 0 | |
if opts.verbose: | |
info(f'collecting resource data from "{profile}" in "{region}"') | |
while True: # pylint: disable=too-many-nested-blocks | |
token = resources['PaginationToken'] if 'PaginationToken' in resources else '' | |
if token in tokens: | |
break | |
if not token: | |
break | |
tokens[token] = 1 | |
for entry in resources['ResourceTagMappingList']: | |
arn = entry['ResourceARN'] | |
flds = arn.split(':',6) | |
total += 1 | |
assert arn not in arns | |
# See if we hit the max (--max) | |
if 0 < opts.max < len(arns): | |
if opts.verbose == 1: | |
print(f' {total:>8} {len(arns):>8}\n', flush=True) | |
return arns, by_types, total | |
# Print out the verbose messages. | |
# note that len(arns) != total when accept/reject filtering is used | |
if opts.verbose: | |
if RPD and (total % RPD) == 0: | |
print('.', flush=True, end='') | |
if DPL and (total % (RPD*DPL)) == 0: | |
print(f' {total:>8} {len(arns):>8}\n', flush=True) | |
# Figure out the service and resource-type | |
# arn:aws:s3:::int-gateway-prod-stream20190730195153550400000003 | |
# arn:aws:lambda:eu-west-1:143384653512:function:walid-workflow-rule-api | |
service = flds[2] | |
if len(flds) > 6: | |
# (7) arn:partition:service:region:account-id:resource-type:resource-id | |
resource_type = flds[5].split('/')[0] | |
if not resource_type: | |
# arn:aws:apigateway:eu-west-1::/usageplans/zghz7h | |
resource_type = service | |
elif len(flds) > 5: | |
# (6) arn:partition:service:region:account-id:resource-id | |
# (6) arn:partition:service:region:account-id:resource-type/resource-id | |
resource_type = '' | |
if '/' in flds[5]: | |
resource_type = flds[5].split('/')[0] | |
if not resource_type: | |
# arn:aws:apigateway:eu-west-1::/usageplans/zghz7h | |
resource_type = service | |
else: | |
resource_type = service | |
if resource_type.count('/') > 0: | |
err(f'internal resource parsing error!\n{arn}\n{resource_type}:{flds}') | |
# Get the resource type from the ARN. | |
if opts.service_only: | |
rtype = f'{service}' | |
else: | |
rtype = f'{service}:{resource_type}' | |
if opts.name_max: | |
rtype = rtype[opts.name_max:] | |
# Filter. | |
if not accept.search(arn): | |
continue | |
if opts.reject and reject.search(arn): | |
continue | |
if rtype not in by_types: | |
by_types[rtype] = [] | |
# Update the collections. | |
by_types[rtype].append(entry) | |
arns[arn] = entry | |
resources = client.get_resources(PaginationToken=token) | |
if opts.verbose == 1: | |
total = len(arns) | |
print(f' {total:>8} {len(arns)}\n', flush=True) | |
return arns, by_types, total | |
def collect_from_accounts(opts: argparse.Namespace, accounts: list) -> Tuple[list, dict]: | |
'''collect data for all accounts. | |
Args: | |
opts: Command line options. | |
accounts: The accounts to process. | |
''' | |
accounts = parse_accounts(opts) | |
recs = [] | |
cols = {} | |
for account in accounts: | |
profile = account['profile'] | |
region = account['region'] | |
aid = account['aid'] | |
arns, by_types, total = collect(opts, profile, region) | |
cwidth = max([len(profile), len(region), len(aid), 8]) | |
rec = { | |
'profile': profile, | |
'region': region, | |
'aid': aid, | |
'cwidth': cwidth, | |
'arns': arns, | |
'by_types': by_types, | |
'total': total | |
} | |
recs.append(rec) | |
for key in sorted(by_types, key=str.lower): | |
if key not in cols: | |
cols[key] = 0 | |
cols[key] += 1 | |
if opts.verbose: | |
info(f'collected {len(arns)} resources for "{profile}" in "{region}" ({aid})') | |
return recs, cols | |
def summary( #pylint: disable=too-many-branches,too-many-locals,too-many-statements | |
opts: argparse.Namespace, | |
recs: list, | |
cols: dict, | |
today: str | |
): | |
'''Summary report. | |
Args: | |
opts: Command line options. | |
recs: Collected, transformed records. | |
cols: The column definitions. | |
today: The report date. | |
''' | |
if opts.verbose: | |
info('summary report') | |
klen = max(9, max([len(x) for x in cols])) if len(cols) else 9 | |
print('') | |
if opts.service_only: | |
print(f'Account Service Summary Report {today}') | |
else: | |
print(f'Account Resource Summary Report {today}') | |
if opts.accept: | |
print(f'Accept Pattern: "{opts.accept}"') | |
if opts.reject: | |
print(f'Reject Pattern: "{opts.accept}"') | |
print('') | |
# Header line 1:. account id | |
print(f'{"":<{klen}}', end='') | |
for i, rec in enumerate(recs): | |
aid = rec['aid'] | |
cwidth = rec['cwidth'] | |
if i and opts.diff: | |
print(f' {"":<8}', end='') | |
print(f' {aid:<{cwidth}}', end='') | |
print('') | |
# Header line 2: profile | |
print(f'{"":<{klen}}', end='') | |
for i, rec in enumerate(recs): | |
profile = rec['profile'] | |
cwidth = rec['cwidth'] | |
if i and opts.diff: | |
print(f' {"":<8}', end='') | |
print(f' {profile:<{cwidth}}', end='') | |
print('') | |
# Header line 3: region | |
if opts.service_only: | |
print(f'{"Service":<{klen}}', end='') | |
else: | |
print(f'{"Resource":<{klen}}', end='') | |
for i, rec in enumerate(recs): | |
region = rec['region'] | |
cwidth = rec['cwidth'] | |
if i and opts.diff: | |
print(f' {"diff":<8}', end='') | |
print(f' {region:<{cwidth}}', end='') | |
print('') | |
# Header line 4: separators | |
print('='*klen, end='') | |
for i, rec in enumerate(recs): | |
cwidth = rec['cwidth'] | |
if i and opts.diff: | |
print(' ' + '='*8, end='') | |
print(' ' + '='*cwidth, end='') | |
print('') | |
# Count for each resource. | |
for key in sorted(cols, key=str.lower): | |
print(f'{key:<{klen}}', end='') | |
for i, rec in enumerate(recs): | |
cwidth = rec['cwidth'] | |
by_types = rec['by_types'] | |
try: | |
count = len(by_types[key]) | |
except KeyError: | |
count = 0 | |
if i == 0 or not opts.diff: | |
print(f' {count:>{cwidth}}', end='', flush=True) | |
else: | |
try: | |
by_types0 = recs[0]['by_types'] | |
count0 = len(by_types0[key]) | |
except KeyError: | |
count0 = 0 | |
diff = count - count0 | |
print(f' {diff:>8} {count:>{cwidth}}', end='', flush=True) | |
print('') | |
# Totals row | |
print(f'{"TOTALS":<{klen}}', end='') | |
for i, rec in enumerate(recs): | |
cwidth = rec['cwidth'] | |
count = sum([len(b) for b in rec['by_types'].values()]) | |
if i and opts.diff: | |
print(f' {"":<8}', end='') | |
print(f' {count:>{cwidth}}', end='', flush=True) | |
print('') | |
print('') | |
def report_arns(opts: argparse.Namespace, recs: list, today: str): | |
'''Report all of the arns | |
Args: | |
opts: Command line options. | |
recs: Collected, transformed records. | |
today: The report date. | |
''' | |
if opts.verbose: | |
info('arn report') | |
print('') | |
if opts.service_only: | |
print(f'Account Service ARN Report {today}\n') | |
else: | |
print(f'Account Resource ARN Report {today}\n') | |
collected = {} | |
for rec in recs: | |
arns = rec['arns'] | |
for arn in sorted(arns, key=str.lower): | |
collected[arn] = True | |
for arn in sorted(collected, key=str.lower): | |
print(f'{arn}') | |
def main(): | |
''' | |
main | |
''' | |
today = datetime.datetime.now().isoformat(timespec="seconds") | |
opts = getopts() | |
accounts = parse_accounts(opts) | |
recs, cols = collect_from_accounts(opts, accounts) | |
if opts.summary: | |
summary(opts, recs, cols, today) | |
if opts.long: | |
report_arns(opts, recs, today) | |
if opts.verbose: | |
info('done') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment