Created
June 8, 2021 21:56
-
-
Save sgnn7/8e3a64f27e0770ae580ebc8b3cf13c02 to your computer and use it in GitHub Desktop.
Datadog Agent check that exercises the multiprocessing library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import sys | |
import time | |
import multiprocessing as mp | |
from multiprocessing import Process, Queue | |
try: | |
from datadog_checks.base import AgentCheck | |
except ImportError: | |
from checks import AgentCheck | |
# Options are "fork", "spawn", and "spawnserver" | |
# - Windows: "spawn" is default. "fork" is unavailable. | |
# - Darwin: "spawn" is default. | |
# - Linux: "fork" is default. | |
START_METHOD = 'spawn' | |
try: | |
from datadog_agent import log | |
except: | |
pass | |
__version__ = "1.0.0" | |
LOG_PATH = '/opt/datadog-agent/logs/log.txt' | |
if sys.platform == 'win32': | |
LOG_PATH = 'C:\ProgramData\Datadog\log.txt' | |
# We need a consistent way to log things | |
def create_logger(): | |
logger = mp.get_logger() | |
logger.setLevel(logging.NOTSET) | |
try: | |
os.remove(LOG_PATH) | |
except: | |
pass | |
formatter = logging.Formatter( \ | |
'[%(asctime)s| %(levelname)s| %(processName)s] %(message)s') | |
file_handler = logging.FileHandler(LOG_PATH) | |
file_handler.setFormatter(formatter) | |
class AgentHandler(logging.Handler): | |
def emit(self, record): | |
message = record.getMessage() | |
# This will not be available for `spawn`ed processes. Comment out if needed. | |
log(message, 0) | |
agent_handler = AgentHandler() | |
if not len(logger.handlers): | |
# We dump the output to both files and stdout | |
logger.addHandler(agent_handler) | |
logger.addHandler(file_handler) | |
return logger | |
mp_logger = create_logger() | |
# Code for the subprocesses/jobs that we will create | |
def runner(name, queue): | |
pid = os.getpid() | |
mp.get_logger().info("### Hello from {} (pid: {}) via module's logger".format(name, pid)) | |
logging_exception = None | |
try: | |
log("### Hello from {} (pid: {}) via Golang-injected 'log' method".format(name, pid), 0) | |
except Exception as e: | |
logging_exception = e | |
mp.get_logger().info("Failed to use Golang-injected 'log' method in {} (pid: {})".format(name, pid)) | |
# Queue is the main way to pass info between subrocessed back to the parent | |
queue.put("Hello from {} via queue (pid: {}). Injected-Golang logging exception: {}".format(name, pid, logging_exception)) | |
class HelloCheck(AgentCheck): | |
def check(self, instance): | |
mp.get_logger().info("*" * 50) | |
try: | |
log("Golang-injected logging from main thread works", 0) | |
except Exception as e: | |
mp.get_logger().info("Golang-injected logging from main thread DOES NOT WORK!") | |
if sys.version_info >= (3, 0): | |
mp.get_logger().info("Available start methods: {}".format(mp.get_all_start_methods())) | |
mp.get_logger().info("Current start method: {}".format(mp.get_start_method())) | |
# On at least 1 platform (Darwin), we have to force our context to use a fork | |
# See here for more info: https://bugs.python.org/issue33725 | |
# XXX: "fork" cannot be used on Windows | |
mp.set_start_method(START_METHOD, force=True) | |
mp.get_logger().info("New start method: {}".format(mp.get_start_method())) | |
queue = Queue() | |
mp.get_logger().info("Sys executable: {}".format(sys.executable)) | |
mp.get_logger().info("Exec prefix: {}".format(sys.exec_prefix)) | |
mp.get_logger().info("Sys prefix: {}".format(sys.prefix)) | |
if sys.version_info >= (3, 0): | |
from multiprocessing import spawn | |
mp.get_logger().info("Spawn exe: {}".format(spawn.get_executable())) | |
if hasattr(sys, 'argv'): | |
mp.get_logger().info("Argv: %s", sys.argv) | |
else: | |
mp.get_logger().info("Argv is empty!") | |
mp.get_logger().info("Starting check...") | |
# Run 5 jobs | |
jobs = [] | |
for index in range(5): | |
p = Process(target=runner, args=('testprocess_{}'.format(index), queue,)) | |
jobs.append(p) | |
mp.get_logger().info("Starting process #{}...".format(index)) | |
p.start() | |
mp.get_logger().info("Joining...") | |
# Coalesce the results | |
for index, job in enumerate(jobs): | |
mp.get_logger().info("Joining process #{}...".format(index)) | |
job.join() | |
mp.get_logger().info("Joined all") | |
# Print the results from the processes | |
mp.get_logger().info("*" * 50) | |
for index, _ in enumerate(jobs): | |
results = queue.get() | |
mp.get_logger().info(results) | |
mp.get_logger().info("*" * 50) | |
mp.get_logger().info("Done.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment