Created
June 17, 2025 19:54
-
-
Save aodin/bf6bd99d2bc1b1e871a910bdafbb2eb5 to your computer and use it in GitHub Desktop.
Parse nginx logs spread across multiple child directories into a single log file for a month
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import argparse | |
import calendar | |
from dataclasses import dataclass | |
from datetime import datetime | |
import glob | |
import gzip | |
import os | |
import re | |
@dataclass(frozen=True, eq=True) | |
class YearMonth: | |
year: int | |
month: int | |
@property | |
def name(self) -> str: | |
return calendar.month_name[self.month] | |
def next(self) -> YearMonth: | |
if self.month == 12: | |
return YearMonth(self.year + 1, 1) | |
return YearMonth(self.year, self.month + 1) | |
# Regular expression to extract IP, timestamp, and request. | |
# Assumes the default nginx log format: | |
# 127.0.0.1 - - [14/Mar/2025:13:55:36 +0000] "GET /index.html HTTP/1.1" 200 612 ... | |
LOG_PATTERN = re.compile( | |
r'^(?P<ip>(?:\d{1,3}(?:\.\d{1,3}){3}|[0-9a-fA-F:]+))\s+-\s+-\s+\[(?P<timestamp>[^\]]+)\]\s+"(?P<request>[^"]+)"' | |
) | |
def get_log_directories(target_month: YearMonth, base_path: str = "."): | |
next_month = target_month.next() | |
matches = [] | |
for name in os.listdir(base_path): | |
path = os.path.join(base_path, name) | |
if os.path.isdir(path): | |
try: | |
dt = datetime.strptime(name, "%Y_%m_%d") | |
# Get directories from the current or subsequent month, since logs | |
# might be collected for a previous month | |
folder_month = YearMonth(dt.year, dt.month) | |
if folder_month == target_month or folder_month == next_month: | |
matches.append(name) | |
except ValueError: | |
continue # Skip names that don't match the format | |
return matches | |
def parse_line(line): | |
""" | |
Parse a single log line using the expected nginx format. | |
Returns a tuple (ip, timestamp, path) if parsing is successful, else None. | |
""" | |
match = LOG_PATTERN.match(line) | |
if match: | |
ip = match.group("ip") | |
timestamp = match.group("timestamp") | |
request = match.group("request") | |
parts = request.split() | |
if len(parts) >= 2: | |
path = parts[1] | |
return (ip, timestamp, path) | |
return None | |
def process_file(target, filename, seen, entries): | |
""" | |
Process a single file (plain text or gzip compressed). | |
For each log line, extract (ip, timestamp, path) and add to entries if not seen. | |
""" | |
# Choose the appropriate opener based on file extension. | |
if filename.endswith(".gz"): | |
opener = gzip.open | |
mode = "rt" | |
else: | |
opener = open | |
mode = "r" | |
with opener(filename, mode) as f: | |
for line in f: | |
line = line.strip() | |
if not line: | |
continue | |
parsed = parse_line(line) | |
if parsed and parsed not in seen: | |
try: | |
# Only save June requests | |
# Parse the timestamp string. Expected format example: "14/Mar/2025:13:55:36 +0000" | |
dt = datetime.strptime(parsed[1], "%d/%b/%Y:%H:%M:%S %z") | |
if dt.month != target.month or dt.year != target.year: | |
continue | |
except Exception as e: | |
print(e) | |
continue | |
seen.add(parsed) | |
entries.append(line) | |
def main(target: YearMonth): | |
directories = get_log_directories(target) | |
# This set holds tuples of (ip, timestamp, path) to ensure uniqueness. | |
seen = set() | |
entries = [] | |
# Regular expression to match valid access log filenames: | |
# Matches: access.log, access.log.1, access.log.2.gz, etc. | |
filename_regex = re.compile(r"access\.log(\.\d+)?(\.gz)?$") | |
for d in directories: | |
if not os.path.isdir(d): | |
print(f"Directory not found: {d}") | |
continue | |
# Find files starting with "access.log" in the directory. | |
for file in glob.glob(os.path.join(d, "access.log*")): | |
basename = os.path.basename(file) | |
if filename_regex.match(basename): | |
process_file(target, file, seen, entries) | |
# Write unique entries to "unique.log" in the current directory. | |
print(f"Unique {target.name} log entries: {len(entries)}") | |
with open(f"{target.name.lower()}.log", "w") as outfile: | |
for entry in entries: | |
outfile.write(entry + "\n") | |
if __name__ == "__main__": | |
now = datetime.now() | |
parser = argparse.ArgumentParser(description="Parse logs for a year and month.") | |
parser.add_argument("--year", type=int, default=now.year, help="Year (e.g., 2025)") | |
parser.add_argument( | |
"--month", | |
type=int, | |
default=now.month, | |
choices=range(1, 13), | |
help="Month (1-12)", | |
) | |
args = parser.parse_args() | |
target = YearMonth(args.year, args.month) | |
main(target) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment