Skip to content

Instantly share code, notes, and snippets.

@aodin
Created June 17, 2025 19:54
Show Gist options
  • Save aodin/bf6bd99d2bc1b1e871a910bdafbb2eb5 to your computer and use it in GitHub Desktop.
Save aodin/bf6bd99d2bc1b1e871a910bdafbb2eb5 to your computer and use it in GitHub Desktop.
Parse nginx logs spread across multiple child directories into a single log file for a month
from __future__ import annotations
import argparse
import calendar
from dataclasses import dataclass
from datetime import datetime
import glob
import gzip
import os
import re
@dataclass(frozen=True, eq=True)
class YearMonth:
year: int
month: int
@property
def name(self) -> str:
return calendar.month_name[self.month]
def next(self) -> YearMonth:
if self.month == 12:
return YearMonth(self.year + 1, 1)
return YearMonth(self.year, self.month + 1)
# Regular expression to extract IP, timestamp, and request.
# Assumes the default nginx log format:
# 127.0.0.1 - - [14/Mar/2025:13:55:36 +0000] "GET /index.html HTTP/1.1" 200 612 ...
LOG_PATTERN = re.compile(
r'^(?P<ip>(?:\d{1,3}(?:\.\d{1,3}){3}|[0-9a-fA-F:]+))\s+-\s+-\s+\[(?P<timestamp>[^\]]+)\]\s+"(?P<request>[^"]+)"'
)
def get_log_directories(target_month: YearMonth, base_path: str = "."):
next_month = target_month.next()
matches = []
for name in os.listdir(base_path):
path = os.path.join(base_path, name)
if os.path.isdir(path):
try:
dt = datetime.strptime(name, "%Y_%m_%d")
# Get directories from the current or subsequent month, since logs
# might be collected for a previous month
folder_month = YearMonth(dt.year, dt.month)
if folder_month == target_month or folder_month == next_month:
matches.append(name)
except ValueError:
continue # Skip names that don't match the format
return matches
def parse_line(line):
"""
Parse a single log line using the expected nginx format.
Returns a tuple (ip, timestamp, path) if parsing is successful, else None.
"""
match = LOG_PATTERN.match(line)
if match:
ip = match.group("ip")
timestamp = match.group("timestamp")
request = match.group("request")
parts = request.split()
if len(parts) >= 2:
path = parts[1]
return (ip, timestamp, path)
return None
def process_file(target, filename, seen, entries):
"""
Process a single file (plain text or gzip compressed).
For each log line, extract (ip, timestamp, path) and add to entries if not seen.
"""
# Choose the appropriate opener based on file extension.
if filename.endswith(".gz"):
opener = gzip.open
mode = "rt"
else:
opener = open
mode = "r"
with opener(filename, mode) as f:
for line in f:
line = line.strip()
if not line:
continue
parsed = parse_line(line)
if parsed and parsed not in seen:
try:
# Only save June requests
# Parse the timestamp string. Expected format example: "14/Mar/2025:13:55:36 +0000"
dt = datetime.strptime(parsed[1], "%d/%b/%Y:%H:%M:%S %z")
if dt.month != target.month or dt.year != target.year:
continue
except Exception as e:
print(e)
continue
seen.add(parsed)
entries.append(line)
def main(target: YearMonth):
directories = get_log_directories(target)
# This set holds tuples of (ip, timestamp, path) to ensure uniqueness.
seen = set()
entries = []
# Regular expression to match valid access log filenames:
# Matches: access.log, access.log.1, access.log.2.gz, etc.
filename_regex = re.compile(r"access\.log(\.\d+)?(\.gz)?$")
for d in directories:
if not os.path.isdir(d):
print(f"Directory not found: {d}")
continue
# Find files starting with "access.log" in the directory.
for file in glob.glob(os.path.join(d, "access.log*")):
basename = os.path.basename(file)
if filename_regex.match(basename):
process_file(target, file, seen, entries)
# Write unique entries to "unique.log" in the current directory.
print(f"Unique {target.name} log entries: {len(entries)}")
with open(f"{target.name.lower()}.log", "w") as outfile:
for entry in entries:
outfile.write(entry + "\n")
if __name__ == "__main__":
now = datetime.now()
parser = argparse.ArgumentParser(description="Parse logs for a year and month.")
parser.add_argument("--year", type=int, default=now.year, help="Year (e.g., 2025)")
parser.add_argument(
"--month",
type=int,
default=now.month,
choices=range(1, 13),
help="Month (1-12)",
)
args = parser.parse_args()
target = YearMonth(args.year, args.month)
main(target)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment