Last active
June 26, 2025 06:52
-
-
Save smuuf/bd41614013b26aa9951761686fde3bed to your computer and use it in GitHub Desktop.
Python: Find conflicting tests with pytest (race conditions, dependency on order of tests)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Runs repeatedly pytest with random ordering and tries to determine a minimal | |
set of tests that affect each other such that one of them fails because of | |
other tests being executed before it. | |
Usage: | |
$ python find_conflicting_tests.py path/to/tests/ [path/to/tests2/ ...] \ | |
[... other pytest initial args, such as --random-order-seed=123] | |
""" | |
from __future__ import annotations | |
import re | |
import sys | |
import time | |
import shlex | |
import signal | |
import textwrap | |
import subprocess | |
from enum import Enum | |
from typing import Optional | |
from collections.abc import Iterable | |
MAX_ATTEMPTS = 100 | |
PYTEST_ARGS = [ | |
'--random-order', | |
'--random-order-bucket=global', | |
'-vvv', | |
] | |
class Color: | |
# Reset. | |
_ = '\033[0m' | |
# Styles | |
BOLD = '\033[1m' | |
ITALIC = '\033[3m' | |
UNDERLINE = '\033[4m' | |
UNDERLINE_THICK = '\033[21m' | |
HIGHLIGHTED = '\033[7m' | |
HIGHLIGHTED_BLACK = '\033[40m' | |
HIGHLIGHTED_RED = '\033[41m' | |
HIGHLIGHTED_GREEN = '\033[42m' | |
HIGHLIGHTED_YELLOW = '\033[43m' | |
HIGHLIGHTED_BLUE = '\033[44m' | |
HIGHLIGHTED_PURPLE = '\033[45m' | |
HIGHLIGHTED_CYAN = '\033[46m' | |
HIGHLIGHTED_GREY = '\033[47m' | |
HIGHLIGHTED_GREY_LIGHT = '\033[100m' | |
HIGHLIGHTED_RED_LIGHT = '\033[101m' | |
HIGHLIGHTED_GREEN_LIGHT = '\033[102m' | |
HIGHLIGHTED_YELLOW_LIGHT = '\033[103m' | |
HIGHLIGHTED_BLUE_LIGHT = '\033[104m' | |
HIGHLIGHTED_PURPLE_LIGHT = '\033[105m' | |
HIGHLIGHTED_CYAN_LIGHT = '\033[106m' | |
HIGHLIGHTED_WHITE_LIGHT = '\033[107m' | |
STRIKE_THROUGH = '\033[9m' | |
MARGIN_1 = '\033[51m' | |
# Colors | |
BLACK = '\033[30m' | |
RED_DARK = '\033[31m' | |
GREEN_DARK = '\033[32m' | |
YELLOW_DARK = '\033[33m' | |
BLUE_DARK = '\033[34m' | |
PURPLE_DARK = '\033[35m' | |
CYAN_DARK = '\033[36m' | |
GREY = '\033[37m' | |
GREY_DARK = '\033[90m' | |
RED = '\033[91m' | |
GREEN = '\033[92m' | |
YELLOW = '\033[93m' | |
BLUE = '\033[94m' | |
PURPLE = '\033[95m' | |
CYAN = '\033[96m' | |
WHITE = '\033[97m' | |
def pretty(symbol: Symbol | str, *texts, indent=0, **kwargs): | |
if isinstance(symbol, Symbol): | |
symbol_str = f"{Symbol.__COLORS__.get(symbol, '')}[{symbol}]{Color._}" | |
else: | |
symbol_str = f"[{symbol}]" | |
print(f"{' ' * indent}{symbol_str}", *[*texts, Color._], **kwargs) | |
class Symbol(str, Enum): | |
INFO = 'i' | |
SHELL = '$' | |
OK = '✓' | |
BAD = '!' | |
__COLORS__ = { | |
INFO: Color.BLUE, | |
SHELL: Color.CYAN_DARK, | |
OK: Color.GREEN_DARK, | |
BAD: Color.RED, | |
} | |
def run_pytest( | |
test_args: Iterable[str], | |
highlight_lines: Optional[dict[str, str]] = None, | |
): | |
"""Run pytest with the given arguments and return (success, output_lines, | |
relevant test names). | |
""" | |
highlight_lines = highlight_lines or {} | |
# We don't escape/quote test_args, which may contain characters like ', |, | |
# or ", but which Popen handles just fine even if not escaped/quoted. | |
cmd = ["pytest", *PYTEST_ARGS, *test_args] | |
# We escape args only for printing. | |
escaped_test_args = list(map(shlex.quote, test_args)) | |
args_str = ' '.join(["pytest", *PYTEST_ARGS, *escaped_test_args]) | |
pretty(Symbol.SHELL, f"{Color.GREEN}{textwrap.shorten(args_str, 1200)}") | |
process = subprocess.Popen( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
) | |
collected_subset: list[str] = [] | |
output_lines: list[str] = [] | |
failed_index: int | None = None | |
random_seed: int | None = None | |
assert process.stdout is not None | |
assert process.stderr is not None | |
while True: | |
line = process.stdout.readline() | |
if not line: | |
break | |
line = line.rstrip() | |
# If any of the keys in highlight_lines is present in the line, | |
# highlight the text (color is the key's associated value in | |
# highlight_lines dict). | |
# NOTE: Separate out_line so we don't modify the line itself, which we | |
# gather and expect them "clean". | |
out_line = line | |
for needle in highlight_lines: | |
if needle in line: | |
out_line = f"{highlight_lines[needle]}{line}{Color._}" | |
break | |
pretty('out', f"{Color.GREY_DARK}{out_line}{Color._}", indent=1) | |
output_lines.append(line) | |
if "random-order-seed=" in line: | |
random_seed = int(line.rpartition('=')[2]) | |
if "::" in line and ("PASSED" in line or "FAILED" in line or "SKIPPED" in line): | |
m = re.search(r'(.*)\s+(PASSED|FAILED|SKIPPED)', line) | |
if m is not None: | |
test_name = m.groups()[0] | |
collected_subset.append(test_name) | |
if "FAILED" in line and failed_index is None: | |
failed_index = len(collected_subset) - 1 | |
break | |
if failed_index is not None: | |
pretty(Symbol.INFO, "Sending SIGINT to pytest") | |
process.send_signal(signal.SIGINT) | |
else: | |
pretty(Symbol.INFO, "Waiting for pytest to finish") | |
process.wait() | |
for line in process.stdout.readlines(): | |
pretty('out', line.strip(), indent=1) | |
for line in process.stderr.readlines(): | |
pretty('err', line.strip(), indent=1) | |
success = failed_index is None | |
failed_test = collected_subset[failed_index] if failed_index is not None else None | |
return success, collected_subset, failed_test, random_seed | |
def shrink_test_set( | |
tests: list[str], | |
failed_test: str, | |
aggressive_but_unreliable: bool = False, | |
): | |
"""Try to shrink the test list to the minimal set that reproduces the | |
failure. | |
""" | |
random_seed = None | |
subset = tests | |
pretty(Symbol.INFO, f"Shrinking initial set of {len(tests)} tests...") | |
for attempt in range(MAX_ATTEMPTS): | |
pretty(Symbol.INFO, f"Attempt {attempt + 1} ...") | |
success, new_subset, now_failed_test, random_seed = run_pytest( | |
subset, | |
highlight_lines={failed_test: Color.YELLOW}, | |
) | |
if not success: | |
assert now_failed_test | |
# Persist the knowledge about the original failing-test for next | |
# test runs. | |
if failed_test != now_failed_test: | |
pretty( | |
Symbol.INFO, | |
f"Found new target failing-test: {now_failed_test} " | |
f"(previously {failed_test})", | |
) | |
failed_test = now_failed_test | |
pretty( | |
Symbol.OK, | |
f"Failure reproduced with {len(subset)} " | |
f"tests (--random-order-seed={random_seed})", | |
) | |
old_len = len(subset) | |
subset = new_subset | |
new_len = len(subset) | |
pretty( | |
Symbol.INFO, | |
f"Reducing set from {old_len} to {new_len}.", | |
indent=1, | |
) | |
if len(subset) == 2: | |
break | |
else: | |
pretty(Symbol.BAD, "No failure in this run.") | |
if aggressive_but_unreliable: | |
# At this point: | |
# 1. There's a failing test we know about (otherwise we wouldn't | |
# be here in the shrinking phase). | |
# 2. We can assume that all tests executed before the | |
# currently passing failing-test do not cause the | |
# failing-test to fail - so we can remove them. | |
# | |
# WARNING: This can speed up things, but cannot be always | |
# trusted. Try (uncomment) the code below at your own risk. | |
# | |
# Why is it unreliable? Imagine situation: | |
# 1. T1 ok, T2 ok, T3 ok, T4 fail. | |
# 2. T1 ok, T2 ok, T4 ok, T3 ok. | |
# | |
# One would be tempted to eliminate T3 as potential culprit | |
# for T4 failing, but that might not be the case. It might be | |
# that T3 broke T4, because T3 itself was affected somehow | |
# (but still passing) by T1 or T2. In that case to reproduce | |
# T4 failing we still need T3 and we cannot eliminate it. | |
# | |
if failed_test in new_subset: | |
old_len = len(subset) | |
subset = new_subset[new_subset.index(failed_test):] | |
new_len = len(subset) | |
pretty( | |
Symbol.INFO, | |
f"Reducing set from {old_len} to {new_len}.", | |
indent=1, | |
) | |
time.sleep(1) | |
return subset, random_seed | |
def main(start_args: list[str], aggressive_but_unreliable: bool = False): | |
pretty(Symbol.INFO, f"Starting search with args {start_args} ...") | |
subset = [] | |
failed_test = None | |
for attempt in range(MAX_ATTEMPTS): | |
pretty(Symbol.INFO, f"Full run attempt {attempt + 1} ...") | |
success, subset, failed_test, random_seed = run_pytest(start_args) | |
if not success: | |
pretty( | |
Symbol.OK, | |
f"Failure detected (--random-order-seed={random_seed})", | |
) | |
subset = subset | |
break | |
time.sleep(1) | |
if len(subset) == 1: | |
pretty(Symbol.INFO, "The first test failed") | |
if not subset: | |
pretty(Symbol.INFO, "No failures found after many attempts. Exiting") | |
sys.exit(0) | |
pretty(Symbol.INFO, f"Found initial conflict set with {len(subset)} tests") | |
pretty( | |
Symbol.INFO, | |
f"Initial target failing-test: {failed_test}", | |
) | |
assert failed_test is not None | |
minimized, random_seed = shrink_test_set( | |
tests=subset, | |
failed_test=failed_test, | |
aggressive_but_unreliable=aggressive_but_unreliable, | |
) | |
pretty( | |
Symbol.OK, | |
f"Final minimized set of tests (--random-order-seed={random_seed}):", | |
) | |
for t in minimized: | |
print(f" {t}") | |
if len(minimized) == 2: | |
pretty(Symbol.OK, "Minimal pair identified") | |
if __name__ == "__main__": | |
args = sys.argv | |
if len(args) < 2: | |
print("Usage: python find_conflicting_tests.py " | |
"[--aggressive-but-unreliable] path/to/tests/") | |
sys.exit(1) | |
aggressive_but_unreliable = False | |
if args[1] == '--aggressive-but-unreliable': | |
pretty(Symbol.INFO, "aggressive_but_unreliable=True") | |
aggressive_but_unreliable = True | |
args = args[1:] | |
main(args[1:], aggressive_but_unreliable) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment