Skip to content

Instantly share code, notes, and snippets.

@filipeandre
Created August 22, 2025 15:17
Show Gist options
  • Save filipeandre/e580d83f6aad500c849cff5b25288455 to your computer and use it in GitHub Desktop.
Save filipeandre/e580d83f6aad500c849cff5b25288455 to your computer and use it in GitHub Desktop.
Trafic report for load balancers
#!/usr/bin/env python3
"""
lb_traffic_report.py (human-friendly)
Enumerate all ALBs, NLBs, and CLBs in the current AWS account & region and fetch key
CloudWatch metrics for a given time window. Outputs a summary table (stdout) and
optionally writes a CSV.
Human-friendly improvements:
- Byte values shown using dynamic units (B/KB/MB/GB/TB) with 2 decimal places.
- Integer values printed with thousands separators.
- CSV includes both raw bytes and human-readable bytes.
Usage examples:
python lb_traffic_report.py --region us-east-1 --start 2025-08-22T03:45:00Z --end 2025-08-22T04:15:00Z
python lb_traffic_report.py --period 60 --minutes 30 --csv lb_metrics.csv
Requires:
- boto3, botocore, python-dateutil, tabulate (pip install boto3 python-dateutil tabulate)
- AWS credentials with permission to: elbv2:DescribeLoadBalancers, elb:DescribeLoadBalancers,
cloudwatch:GetMetricData, sts:GetCallerIdentity
"""
import argparse
import csv
import sys
import time
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Tuple, Optional
import boto3
from botocore.config import Config
from botocore.exceptions import BotoCoreError, ClientError
from dateutil import parser as dtparser
from tabulate import tabulate
ISO_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
def parse_args() -> argparse.Namespace:
ap = argparse.ArgumentParser(description="Report LB traffic & errors for ALB/NLB/CLB (human-friendly)")
g_time = ap.add_mutually_exclusive_group()
g_time.add_argument("--minutes", type=int, default=30, help="Lookback window in minutes (default: 30)")
g_time.add_argument("--start", type=str, help="ISO8601 start time, e.g., 2025-08-22T03:45:00Z")
ap.add_argument("--end", type=str, help="ISO8601 end time, default: now")
ap.add_argument("--period", type=int, default=60, help="CloudWatch period in seconds (default: 60)")
ap.add_argument("--region", type=str, help="AWS region (defaults to env/profile)")
ap.add_argument("--profile", type=str, help="AWS profile name")
ap.add_argument("--csv", type=str, help="Write results to CSV path")
ap.add_argument("--max-queries", type=int, default=450, help="Batch size for GetMetricData (default: 450)")
return ap.parse_args()
def iso(dt: datetime) -> str:
return dt.astimezone(timezone.utc).strftime(ISO_FORMAT)
def resolve_window(args: argparse.Namespace) -> Tuple[datetime, datetime]:
end = dtparser.isoparse(args.end).astimezone(timezone.utc) if args.end else datetime.now(timezone.utc)
if args.start:
start = dtparser.isoparse(args.start).astimezone(timezone.utc)
else:
start = end - timedelta(minutes=args.minutes or 30)
if start >= end:
raise ValueError("Start must be earlier than end")
return start, end
def make_session(args: argparse.Namespace) -> boto3.Session:
if args.profile:
return boto3.Session(profile_name=args.profile, region_name=args.region)
return boto3.Session(region_name=args.region)
def backoff_sleep(attempt: int):
time.sleep(min(2 ** attempt, 10))
def list_elbv2_load_balancers(elbv2) -> List[Dict]:
lbs = []
paginator = elbv2.get_paginator('describe_load_balancers')
for page in paginator.paginate():
lbs.extend(page.get("LoadBalancers", []))
return lbs
def list_elb_classic(elb) -> List[Dict]:
lbs = []
paginator = elb.get_paginator('describe_load_balancers')
for page in paginator.paginate():
for d in page.get("LoadBalancerDescriptions", []):
lbs.append(d)
return lbs
def arn_to_elbv2_metric_name(arn: str) -> str:
# ARN: arn:aws:elasticloadbalancing:region:acct:loadbalancer/app/name/hash
# Metric dimension "LoadBalancer" uses the "app/name/hash" or "net/name/hash" part
return arn.split("loadbalancer/")[-1]
def build_metric_queries_for_alb(lb_name: str, period: int) -> List[Dict]:
dim = [{"Name": "LoadBalancer", "Value": lb_name}]
ns = "AWS/ApplicationELB"
return [
{"Id": f"alb_req_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "RequestCount", "Dimensions": dim}, "Period": period, "Stat": "Sum"}},
{"Id": f"alb_bytes_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "ProcessedBytes", "Dimensions": dim}, "Period": period, "Stat": "Sum"}},
{"Id": f"alb_5xx_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "HTTPCode_ELB_5XX_Count", "Dimensions": dim}, "Period": period, "Stat": "Sum"}},
{"Id": f"alb_rt_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "TargetResponseTime", "Dimensions": dim}, "Period": period, "Stat": "Average"}},
]
def build_metric_queries_for_nlb(lb_name: str, period: int) -> List[Dict]:
dim = [{"Name": "LoadBalancer", "Value": lb_name}]
ns = "AWS/NetworkELB"
queries = [
{"Id": f"nlb_bytes_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "ProcessedBytes", "Dimensions": dim}, "Period": period, "Stat": "Sum"}},
{"Id": f"nlb_flows_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "ActiveFlowCount", "Dimensions": dim}, "Period": period, "Stat": "Average"}},
]
# Optional metric (not in all regions / accounts)
queries.append({"Id": f"nlb_rst_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "TCP_Client_Reset_Count", "Dimensions": dim}, "Period": period, "Stat": "Sum"}, "ReturnData": True})
return queries
def build_metric_queries_for_clb(lb_name: str, period: int) -> List[Dict]:
dim = [{"Name": "LoadBalancerName", "Value": lb_name}]
ns = "AWS/ELB"
return [
{"Id": f"clb_req_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "RequestCount", "Dimensions": dim}, "Period": period, "Stat": "Sum"}},
{"Id": f"clb_lat_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "Latency", "Dimensions": dim}, "Period": period, "Stat": "Average"}},
{"Id": f"clb_5xx_{_safe_id(lb_name)}", "MetricStat": {"Metric": {"Namespace": ns, "MetricName": "HTTPCode_ELB_5XX", "Dimensions": dim}, "Period": period, "Stat": "Sum"}},
]
def _safe_id(s: str) -> str:
return s.lower().replace("/", "_").replace("-", "_").replace(":", "_")[:255]
def chunked(iterable, size):
chunk = []
for item in iterable:
chunk.append(item)
if len(chunk) >= size:
yield chunk
chunk = []
if chunk:
yield chunk
def get_metric_data(cw, queries: List[Dict], start: datetime, end: datetime, max_queries: int):
# CW GetMetricData max 500 queries per request; use a little buffer.
results = []
for group in chunked(queries, max_queries):
attempt = 0
while True:
try:
resp = cw.get_metric_data(
MetricDataQueries=group,
StartTime=start,
EndTime=end,
ScanBy="TimestampAscending"
)
results.extend(resp.get("MetricDataResults", []))
token = resp.get("NextToken")
while token:
resp = cw.get_metric_data(
MetricDataQueries=group,
StartTime=start, EndTime=end,
NextToken=token,
ScanBy="TimestampAscending"
)
results.extend(resp.get("MetricDataResults", []))
token = resp.get("NextToken")
break
except (BotoCoreError, ClientError):
attempt += 1
if attempt > 5:
raise
backoff_sleep(attempt)
return results
def summarize_results(kind: str, names: List[str], results: List[Dict]) -> List[Dict]:
# Results come back keyed by Id. We'll map totals and peaks.
summary = {n: {"kind": kind, "name": n, "sum_req": 0.0, "sum_bytes": 0.0, "sum_5xx": 0.0,
"avg_latency": None, "avg_tg_resp": None, "avg_flows": None, "sum_resets": 0.0}
for n in names}
# Helper to parse Id convention
for r in results:
id_ = r["Id"]
values = r.get("Values", [])
if not values:
continue
total = sum(values)
avg = sum(values) / len(values) if values else None
parts = id_.split("_")
metric_key = "_".join(parts[:2]) # alb_req, alb_bytes, nlb_flows, clb_req, etc.
lb_frag = "_".join(parts[2:])
target = None
for n in names:
if _safe_id(n).endswith(lb_frag):
target = n
break
if not target and names:
target = names[0]
if metric_key in ("alb_req", "clb_req"):
summary[target]["sum_req"] += total
if metric_key in ("alb_bytes", "nlb_bytes"):
summary[target]["sum_bytes"] += total
if metric_key in ("alb_5xx", "clb_5xx"):
summary[target]["sum_5xx"] += total
if metric_key == "alb_rt":
summary[target]["avg_tg_resp"] = avg
if metric_key == "clb_lat":
summary[target]["avg_latency"] = avg
if metric_key == "nlb_flows":
summary[target]["avg_flows"] = avg
if metric_key == "nlb_rst":
summary[target]["sum_resets"] += total
return list(summary.values())
def human_bytes(num: float) -> str:
"""Render bytes with dynamic binary units (KiB, MiB, GiB...) but with common KB/MB/GB labels."""
if num is None or num == 0:
return "0 B"
step = 1024.0
units = ["B", "KB", "MB", "GB", "TB", "PB"]
idx = 0
n = float(num)
while n >= step and idx < len(units) - 1:
n /= step
idx += 1
return f"{n:.2f} {units[idx]}"
def fmt_int(n: Optional[float]) -> str:
if not n:
return ""
try:
return f"{int(n):,}"
except Exception:
return str(int(n))
def main():
args = parse_args()
start, end = resolve_window(args)
session = make_session(args)
cfg = Config(retries={"max_attempts": 10, "mode": "adaptive"})
elbv2 = session.client("elbv2", config=cfg)
elb = session.client("elb", config=cfg)
cw = session.client("cloudwatch", config=cfg)
sts = session.client("sts", config=cfg)
acct = sts.get_caller_identity().get("Account")
region = session.region_name or "unknown"
# Gather LBs
alb_nlb = list_elbv2_load_balancers(elbv2)
clb = list_elb_classic(elb)
alb_names = [arn_to_elbv2_metric_name(lb["LoadBalancerArn"]) for lb in alb_nlb if lb.get("Type") == "application"]
nlb_names = [arn_to_elbv2_metric_name(lb["LoadBalancerArn"]) for lb in alb_nlb if lb.get("Type") == "network"]
clb_names = [lb["LoadBalancerName"] for lb in clb]
queries = []
for name in alb_names:
queries += build_metric_queries_for_alb(name, args.period)
for name in nlb_names:
queries += build_metric_queries_for_nlb(name, args.period)
for name in clb_names:
queries += build_metric_queries_for_clb(name, args.period)
if not queries:
print("No load balancers found in this account/region.", file=sys.stderr)
return 1
results = get_metric_data(cw, queries, start, end, max(args.max_queries, 100))
alb_summary = summarize_results("ALB", alb_names, results) if alb_names else []
nlb_summary = summarize_results("NLB", nlb_names, results) if nlb_names else []
clb_summary = summarize_results("CLB", clb_names, results) if clb_names else []
combined = alb_summary + nlb_summary + clb_summary
# Sort by bytes then requests
combined.sort(key=lambda x: (x.get("sum_bytes", 0.0), x.get("sum_req", 0.0)), reverse=True)
# Pretty print
print(f"\nAccount: {acct} Region: {region} Window: {iso(start)} → {iso(end)} Period: {args.period}s\n")
headers = ["Kind", "Name (metric key)", "Requests", "Bytes", "5XX", "Avg Target RT (s)", "Avg CLB Lat (s)", "Avg NLB Flows", "TCP Client Resets"]
rows = []
for s in combined:
rows.append([
s["kind"],
s["name"],
fmt_int(s["sum_req"]),
human_bytes(s["sum_bytes"]),
fmt_int(s["sum_5xx"]),
f"{s['avg_tg_resp']:.4f}" if s.get("avg_tg_resp") is not None else "",
f"{s['avg_latency']:.4f}" if s.get("avg_latency") is not None else "",
f"{s['avg_flows']:.2f}" if s.get("avg_flows") is not None else "",
fmt_int(s.get("sum_resets")),
])
print(tabulate(rows, headers=headers, tablefmt="github"))
# CSV output (includes both raw bytes and human-readable)
if args.csv:
with open(args.csv, "w", newline="") as f:
w = csv.writer(f)
w.writerow(["account", "region", "start", "end",
"kind", "name", "sum_req", "sum_bytes_raw", "sum_bytes_human",
"sum_5xx", "avg_target_response_time_s",
"avg_clb_latency_s", "avg_nlb_flows", "tcp_client_resets"])
for s in combined:
w.writerow([
acct, region, iso(start), iso(end),
s["kind"], s["name"],
int(s["sum_req"]) if s["sum_req"] else 0,
int(s["sum_bytes"]) if s["sum_bytes"] else 0,
human_bytes(s["sum_bytes"]),
int(s["sum_5xx"]) if s["sum_5xx"] else 0,
f"{s['avg_tg_resp']:.6f}" if s.get("avg_tg_resp") is not None else "",
f"{s['avg_latency']:.6f}" if s.get("avg_latency") is not None else "",
f"{s['avg_flows']:.6f}" if s.get("avg_flows") is not None else "",
int(s["sum_resets"]) if s.get("sum_resets") else 0
])
print(f"\nWrote CSV: {args.csv}")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment