Skip to content

Instantly share code, notes, and snippets.

@maxisoft
Last active February 18, 2025 14:15
Show Gist options
  • Save maxisoft/706cb18545b916ca2ca71c0286054457 to your computer and use it in GitHub Desktop.
Save maxisoft/706cb18545b916ca2ca71c0286054457 to your computer and use it in GitHub Desktop.
DaemonThreadPoolExecutor: A custom ThreadPoolExecutor that creates daemon threads by temporarily monkey patching threading.Thread.start and internal _threads_queues. This ensures background tasks don't block program exit. Use with caution!
import concurrent.futures
import importlib
import threading
import time
import weakref
from typing import Callable, TypeVar
__all__ = ["DaemonThreadPoolExecutor"]
_T = TypeVar('_T')
# Global reentrant lock to ensure that the monkey patch hook is applied only once in a controlled context.
_PATCH_LOCK = threading.RLock()
# Bounded semaphore to avoid unintended recursive calls to the hook.
_PATCH_SEMAPHORE = threading.BoundedSemaphore(1)
# A weak-key dictionary mapping threads to their work queues.
# This is used to temporarily override the internal _threads_queues in concurrent.futures.thread.
_CUSTOM_THREADS_QUEUES = weakref.WeakKeyDictionary()
class DaemonThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"""
A variant of ThreadPoolExecutor whose worker threads are daemon threads.
In Python, daemon threads are abruptly stopped when the main program exits,
meaning that tasks in the thread pool may not complete if the program shuts down.
This executor is useful when you do not want background tasks to prevent the
process from terminating.
This class temporarily monkey patches threading.Thread.start to mark threads
as daemon before starting them. It also patches the internal _threads_queues attribute
in the module concurrent.futures.thread with a custom weak reference dictionary,
preventing the default exit handler from waiting on non-daemon threads.
CAUTION: Monkey patching standard library functions is fragile and may introduce
unexpected side effects. Use with caution and ensure extensive testing.
"""
@staticmethod
def _create_patched_start(original_start: Callable) -> Callable:
"""
Create a patched version of threading.Thread.start that sets the thread as daemon before starting.
Args:
original_start: The original threading.Thread.start method.
Returns:
A callable that wraps the original start method to enforce daemon status.
Raises:
RuntimeError: If the thread does not appear to be in its initial state
or if the hook lock cannot be acquired.
"""
assert callable(original_start), "provided function must be callable"
def patched_start(thread: threading.Thread):
"""
Patched version of threading.Thread.start.
Args:
thread: The thread instance that is about to be started.
Returns:
The result of the original start method.
Raises:
RuntimeError: If the thread's state is not initial, or if the patch lock is already held.
"""
# Check the thread is in its initial state (heuristic via its repr)
if "initial" not in repr(thread):
raise RuntimeError("Thread is not in initial state")
# Attempt to acquire the global patch lock; do not block if it is already held.
if not _PATCH_LOCK.acquire(blocking=False):
raise RuntimeError("Internal error: PATCH_LOCK is already locked")
thread.daemon = True # Enforce daemon attribute
# Call the original start method
return original_start(thread)
return patched_start
def submit(
self,
fn: Callable[..., _T],
/,
*args,
**kwargs,
) -> concurrent.futures.Future[_T]:
"""
Submit a callable to be executed with the given arguments.
This method temporarily monkey patches threading.Thread.start to ensure that
threads created by the executor are daemon threads. It also replaces the internal
_threads_queues variable in the concurrent.futures.thread module with a custom weakref
dictionary to prevent the default exit handler from waiting on worker threads. Both
changes are reverted after the task is submitted.
Args:
fn: The callable to be executed.
*args: Positional arguments to pass to the callable.
**kwargs: Keyword arguments to pass to the callable.
Returns:
A Future representing the asynchronous execution of the callable.
Raises:
RuntimeError: If the thread state is not initial or if the semaphore indicates a recursive call.
"""
# Save the original Thread.start method
original_start = threading.Thread.start
with _PATCH_LOCK:
# Prevent recursive or unintended hook calls.
if not _PATCH_SEMAPHORE.acquire(blocking=False):
raise RuntimeError("Internal error: PATCH_SEMAPHORE is locked, potential recursive call")
try:
# Temporarily replace Thread.start with the patched version.
threading.Thread.start = self._create_patched_start(original_start)
# Access the internal module for thread management.
concurrent_thread_mod = importlib.import_module("concurrent.futures.thread")
# Save the original _threads_queues and replace with our custom weakref dictionary.
original_threads_queues = getattr(concurrent_thread_mod, "_threads_queues")
setattr(concurrent_thread_mod, "_threads_queues", _CUSTOM_THREADS_QUEUES)
try:
# Submit the task using the superclass implementation.
return super().submit(fn, *args, **kwargs)
finally:
# Restore the original _threads_queues and Thread.start.
setattr(concurrent_thread_mod, "_threads_queues", original_threads_queues)
threading.Thread.start = original_start
finally:
_PATCH_SEMAPHORE.release()
def _main():
"""
Demonstrates the usage of DaemonThreadPoolExecutor.
Tasks submitted include quick print jobs and a longer sleep job to illustrate that
daemon threads do not block the program's exit. Even if a thread is sleeping, the program
will exit if the main thread finishes.
"""
executor = DaemonThreadPoolExecutor()
# Submit multiple tasks to print 'spam'
for _ in range(5):
executor.submit(print, "spam")
# Submit additional tasks: printing 'eggs' and a delayed sleep (10 seconds).
executor.submit(print, "eggs")
executor.submit(time.sleep, 10)
print("Note: Even if one thread is sleeping, the program will exit immediately on main thread completion.")
print("Done")
if __name__ == "__main__":
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment