Created
August 22, 2025 15:17
-
-
Save filipeandre/e580d83f6aad500c849cff5b25288455 to your computer and use it in GitHub Desktop.
Trafic report for load balancers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
lb_traffic_report.py (human-friendly) | |
Enumerate all ALBs, NLBs, and CLBs in the current AWS account & region and fetch key | |
CloudWatch metrics for a given time window. Outputs a summary table (stdout) and | |
optionally writes a CSV. | |
Human-friendly improvements: | |
- Byte values shown using dynamic units (B/KB/MB/GB/TB) with 2 decimal places. | |
- Integer values printed with thousands separators. | |
- CSV includes both raw bytes and human-readable bytes. | |
Usage examples: | |
python lb_traffic_report.py --region us-east-1 --start 2025-08-22T03:45:00Z --end 2025-08-22T04:15:00Z | |
python lb_traffic_report.py --period 60 --minutes 30 --csv lb_metrics.csv | |
Requires: | |
- boto3, botocore, python-dateutil, tabulate (pip install boto3 python-dateutil tabulate) | |
- AWS credentials with permission to: elbv2:DescribeLoadBalancers, elb:DescribeLoadBalancers, | |
cloudwatch:GetMetricData, sts:GetCallerIdentity | |
""" | |
import argparse | |
import csv | |
import sys | |
import time | |
from datetime import datetime, timedelta, timezone | |
from typing import Dict, List, Tuple, Optional | |
import boto3 | |
from botocore.config import Config | |
from botocore.exceptions import BotoCoreError, ClientError | |
from dateutil import parser as dtparser | |
from tabulate import tabulate | |
ISO_FORMAT = "%Y-%m-%dT%H:%M:%SZ" | |
def parse_args() -> argparse.Namespace: | |
ap = argparse.ArgumentParser(description="Report LB traffic & errors for ALB/NLB/CLB (human-friendly)") | |
g_time = ap.add_mutually_exclusive_group() | |
g_time.add_argument("--minutes", type=int, default=30, help="Lookback window in minutes (default: 30)") | |
g_time.add_argument("--start", type=str, help="ISO8601 start time, e.g., 2025-08-22T03:45:00Z") | |
ap.add_argument("--end", type=str, help="ISO8601 end time, default: now") | |
ap.add_argument("--period", type=int, default=60, help="CloudWatch period in seconds (default: 60)") | |
ap.add_argument("--region", type=str, help="AWS region (defaults to env/profile)") | |
ap.add_argument("--profile", type=str, help="AWS profile name") | |
ap.add_argument("--csv", type=str, help="Write results to CSV path") | |
ap.add_argument("--max-queries", type=int, default=450, help="Batch size for GetMetricData (default: 450)") | |
return ap.parse_args() | |
def iso(dt: datetime) -> str: | |
return dt.astimezone(timezone.utc).strftime(ISO_FORMAT) | |
def resolve_window(args: argparse.Namespace) -> Tuple[datetime, datetime]: | |
end = dtparser.isoparse(args.end).astimezone(timezone.utc) if args.end else datetime.now(timezone.utc) | |
if args.start: | |
start = dtparser.isoparse(args.start).astimezone(timezone.utc) | |
else: | |
start = end - timedelta(minutes=args.minutes or 30) | |
if start >= end: | |
raise ValueError("Start must be earlier than end") | |
return start, end | |
def make_session(args: argparse.Namespace) -> boto3.Session: | |
if args.profile: | |
return boto3.Session(profile_name=args.profile, region_name=args.region) | |
return boto3.Session(region_name=args.region) | |
def backoff_sleep(attempt: int): | |
time.sleep(min(2 ** attempt, 10)) | |
def list_elbv2_load_balancers(elbv2) -> List[Dict]: | |
lbs = [] | |
paginator = elbv2.get_paginator('describe_load_balancers') | |
for page in paginator.paginate(): | |
lbs.extend(page.get("LoadBalancers", [])) | |
return lbs | |
def list_elb_classic(elb) -> List[Dict]: | |
lbs = [] | |
paginator = elb.get_paginator('describe_load_balancers') | |
for page in paginator.paginate(): | |
for d in page.get("LoadBalancerDescriptions", []): | |
lbs.append(d) | |
return lbs | |
def arn_to_elbv2_metric_name(arn: str) -> str: | |
# ARN: arn:aws:elasticloadbalancing:region:acct:loadbalancer/app/name/hash | |
# Metric dimension "LoadBalancer" uses the "app/name/hash" or "net/name/hash" part | |
return arn.split("loadbalancer/")[-1] | |
def build_metric_queries_for_alb(lb_name: str, period: int) -> List[Dict]: | |
dim = [{"Name": "LoadBalancer", "Value": lb_name}] | |
ns = "AWS/ApplicationELB" | |
return [ | |
{"Id": f"alb_req_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "RequestCount", "Dimensions": dim}, "Period": period, "Stat": "Sum"}}, | |
{"Id": f"alb_bytes_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "ProcessedBytes", "Dimensions": dim}, "Period": period, "Stat": "Sum"}}, | |
{"Id": f"alb_5xx_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "HTTPCode_ELB_5XX_Count", "Dimensions": dim}, "Period": period, "Stat": "Sum"}}, | |
{"Id": f"alb_rt_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "TargetResponseTime", "Dimensions": dim}, "Period": period, "Stat": "Average"}}, | |
] | |
def build_metric_queries_for_nlb(lb_name: str, period: int) -> List[Dict]: | |
dim = [{"Name": "LoadBalancer", "Value": lb_name}] | |
ns = "AWS/NetworkELB" | |
queries = [ | |
{"Id": f"nlb_bytes_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "ProcessedBytes", "Dimensions": dim}, "Period": period, "Stat": "Sum"}}, | |
{"Id": f"nlb_flows_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "ActiveFlowCount", "Dimensions": dim}, "Period": period, "Stat": "Average"}}, | |
] | |
# Optional metric (not in all regions / accounts) | |
queries.append({"Id": f"nlb_rst_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "TCP_Client_Reset_Count", "Dimensions": dim}, "Period": period, "Stat": "Sum"}, "ReturnData": True}) | |
return queries | |
def build_metric_queries_for_clb(lb_name: str, period: int) -> List[Dict]: | |
dim = [{"Name": "LoadBalancerName", "Value": lb_name}] | |
ns = "AWS/ELB" | |
return [ | |
{"Id": f"clb_req_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "RequestCount", "Dimensions": dim}, "Period": period, "Stat": "Sum"}}, | |
{"Id": f"clb_lat_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "Latency", "Dimensions": dim}, "Period": period, "Stat": "Average"}}, | |
{"Id": f"clb_5xx_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "HTTPCode_ELB_5XX", "Dimensions": dim}, "Period": period, "Stat": "Sum"}}, | |
] | |
def _safe_id(s: str) -> str: | |
return s.lower().replace("/", "_").replace("-", "_").replace(":", "_")[:255] | |
def chunked(iterable, size): | |
chunk = [] | |
for item in iterable: | |
chunk.append(item) | |
if len(chunk) >= size: | |
yield chunk | |
chunk = [] | |
if chunk: | |
yield chunk | |
def get_metric_data(cw, queries: List[Dict], start: datetime, end: datetime, max_queries: int): | |
# CW GetMetricData max 500 queries per request; use a little buffer. | |
results = [] | |
for group in chunked(queries, max_queries): | |
attempt = 0 | |
while True: | |
try: | |
resp = cw.get_metric_data( | |
MetricDataQueries=group, | |
StartTime=start, | |
EndTime=end, | |
ScanBy="TimestampAscending" | |
) | |
results.extend(resp.get("MetricDataResults", [])) | |
token = resp.get("NextToken") | |
while token: | |
resp = cw.get_metric_data( | |
MetricDataQueries=group, | |
StartTime=start, EndTime=end, | |
NextToken=token, | |
ScanBy="TimestampAscending" | |
) | |
results.extend(resp.get("MetricDataResults", [])) | |
token = resp.get("NextToken") | |
break | |
except (BotoCoreError, ClientError): | |
attempt += 1 | |
if attempt > 5: | |
raise | |
backoff_sleep(attempt) | |
return results | |
def summarize_results(kind: str, names: List[str], results: List[Dict]) -> List[Dict]: | |
# Results come back keyed by Id. We'll map totals and peaks. | |
summary = {n: {"kind": kind, "name": n, "sum_req": 0.0, "sum_bytes": 0.0, "sum_5xx": 0.0, | |
"avg_latency": None, "avg_tg_resp": None, "avg_flows": None, "sum_resets": 0.0} | |
for n in names} | |
# Helper to parse Id convention | |
for r in results: | |
id_ = r["Id"] | |
values = r.get("Values", []) | |
if not values: | |
continue | |
total = sum(values) | |
avg = sum(values) / len(values) if values else None | |
parts = id_.split("_") | |
metric_key = "_".join(parts[:2]) # alb_req, alb_bytes, nlb_flows, clb_req, etc. | |
lb_frag = "_".join(parts[2:]) | |
target = None | |
for n in names: | |
if _safe_id(n).endswith(lb_frag): | |
target = n | |
break | |
if not target and names: | |
target = names[0] | |
if metric_key in ("alb_req", "clb_req"): | |
summary[target]["sum_req"] += total | |
if metric_key in ("alb_bytes", "nlb_bytes"): | |
summary[target]["sum_bytes"] += total | |
if metric_key in ("alb_5xx", "clb_5xx"): | |
summary[target]["sum_5xx"] += total | |
if metric_key == "alb_rt": | |
summary[target]["avg_tg_resp"] = avg | |
if metric_key == "clb_lat": | |
summary[target]["avg_latency"] = avg | |
if metric_key == "nlb_flows": | |
summary[target]["avg_flows"] = avg | |
if metric_key == "nlb_rst": | |
summary[target]["sum_resets"] += total | |
return list(summary.values()) | |
def human_bytes(num: float) -> str: | |
"""Render bytes with dynamic binary units (KiB, MiB, GiB...) but with common KB/MB/GB labels.""" | |
if num is None or num == 0: | |
return "0 B" | |
step = 1024.0 | |
units = ["B", "KB", "MB", "GB", "TB", "PB"] | |
idx = 0 | |
n = float(num) | |
while n >= step and idx < len(units) - 1: | |
n /= step | |
idx += 1 | |
return f"{n:.2f} {units[idx]}" | |
def fmt_int(n: Optional[float]) -> str: | |
if not n: | |
return "" | |
try: | |
return f"{int(n):,}" | |
except Exception: | |
return str(int(n)) | |
def main(): | |
args = parse_args() | |
start, end = resolve_window(args) | |
session = make_session(args) | |
cfg = Config(retries={"max_attempts": 10, "mode": "adaptive"}) | |
elbv2 = session.client("elbv2", config=cfg) | |
elb = session.client("elb", config=cfg) | |
cw = session.client("cloudwatch", config=cfg) | |
sts = session.client("sts", config=cfg) | |
acct = sts.get_caller_identity().get("Account") | |
region = session.region_name or "unknown" | |
# Gather LBs | |
alb_nlb = list_elbv2_load_balancers(elbv2) | |
clb = list_elb_classic(elb) | |
alb_names = [arn_to_elbv2_metric_name(lb["LoadBalancerArn"]) for lb in alb_nlb if lb.get("Type") == "application"] | |
nlb_names = [arn_to_elbv2_metric_name(lb["LoadBalancerArn"]) for lb in alb_nlb if lb.get("Type") == "network"] | |
clb_names = [lb["LoadBalancerName"] for lb in clb] | |
queries = [] | |
for name in alb_names: | |
queries += build_metric_queries_for_alb(name, args.period) | |
for name in nlb_names: | |
queries += build_metric_queries_for_nlb(name, args.period) | |
for name in clb_names: | |
queries += build_metric_queries_for_clb(name, args.period) | |
if not queries: | |
print("No load balancers found in this account/region.", file=sys.stderr) | |
return 1 | |
results = get_metric_data(cw, queries, start, end, max(args.max_queries, 100)) | |
alb_summary = summarize_results("ALB", alb_names, results) if alb_names else [] | |
nlb_summary = summarize_results("NLB", nlb_names, results) if nlb_names else [] | |
clb_summary = summarize_results("CLB", clb_names, results) if clb_names else [] | |
combined = alb_summary + nlb_summary + clb_summary | |
# Sort by bytes then requests | |
combined.sort(key=lambda x: (x.get("sum_bytes", 0.0), x.get("sum_req", 0.0)), reverse=True) | |
# Pretty print | |
print(f"\nAccount: {acct} Region: {region} Window: {iso(start)} → {iso(end)} Period: {args.period}s\n") | |
headers = ["Kind", "Name (metric key)", "Requests", "Bytes", "5XX", "Avg Target RT (s)", "Avg CLB Lat (s)", "Avg NLB Flows", "TCP Client Resets"] | |
rows = [] | |
for s in combined: | |
rows.append([ | |
s["kind"], | |
s["name"], | |
fmt_int(s["sum_req"]), | |
human_bytes(s["sum_bytes"]), | |
fmt_int(s["sum_5xx"]), | |
f"{s['avg_tg_resp']:.4f}" if s.get("avg_tg_resp") is not None else "", | |
f"{s['avg_latency']:.4f}" if s.get("avg_latency") is not None else "", | |
f"{s['avg_flows']:.2f}" if s.get("avg_flows") is not None else "", | |
fmt_int(s.get("sum_resets")), | |
]) | |
print(tabulate(rows, headers=headers, tablefmt="github")) | |
# CSV output (includes both raw bytes and human-readable) | |
if args.csv: | |
with open(args.csv, "w", newline="") as f: | |
w = csv.writer(f) | |
w.writerow(["account", "region", "start", "end", | |
"kind", "name", "sum_req", "sum_bytes_raw", "sum_bytes_human", | |
"sum_5xx", "avg_target_response_time_s", | |
"avg_clb_latency_s", "avg_nlb_flows", "tcp_client_resets"]) | |
for s in combined: | |
w.writerow([ | |
acct, region, iso(start), iso(end), | |
s["kind"], s["name"], | |
int(s["sum_req"]) if s["sum_req"] else 0, | |
int(s["sum_bytes"]) if s["sum_bytes"] else 0, | |
human_bytes(s["sum_bytes"]), | |
int(s["sum_5xx"]) if s["sum_5xx"] else 0, | |
f"{s['avg_tg_resp']:.6f}" if s.get("avg_tg_resp") is not None else "", | |
f"{s['avg_latency']:.6f}" if s.get("avg_latency") is not None else "", | |
f"{s['avg_flows']:.6f}" if s.get("avg_flows") is not None else "", | |
int(s["sum_resets"]) if s.get("sum_resets") else 0 | |
]) | |
print(f"\nWrote CSV: {args.csv}") | |
return 0 | |
if __name__ == "__main__": | |
try: | |
sys.exit(main()) | |
except Exception as e: | |
print(f"ERROR: {e}", file=sys.stderr) | |
sys.exit(2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment