Created
May 26, 2020 13:41
-
-
Save impredicative/009837e475402403dac39fc2a82f8549 to your computer and use it in GitHub Desktop.
BufferingSMTPHandler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime, logging, smtplib, threading, time, Queue | |
class BufferingSMTPHandler(logging.Handler): | |
"""Set up a buffering SMTP logging handler.""" | |
# Configurable parameters | |
_POLL_INTERVAL = 5 # Interval between checks for sets of new records. | |
_POLL_DURATION_MAX = 10 # If a record is available, max time to continue | |
# polling for more records. | |
_SEND_INTERVAL = 2 * 60 # Interval between sends. | |
# Setup class environment | |
_Q = Queue.Queue() | |
_LOCK = threading.Lock() | |
_LAST_SEND_TIME = float('-inf') | |
def state(self): | |
"""Return a dict containing keys and values providing information about | |
the state of the handler.""" | |
#time_now = time.time() | |
time_now = datetime.datetime.utcnow() | |
# Calculate time since last email | |
if self._LAST_SEND_TIME != float('-inf'): | |
time_of_last = datetime.datetime.utcfromtimestamp(self._LAST_SEND_TIME) | |
time_since_last = time_now - time_of_last | |
else: | |
time_since_last = '(none sent yet)' | |
# Time to next earliest possible email | |
if self._LAST_SEND_TIME != float('-inf'): | |
time_of_next = datetime.datetime.utcfromtimestamp(self._LAST_SEND_TIME+self._SEND_INTERVAL) | |
time_of_next = max(time_now, time_of_next) | |
time_until_next = time_of_next - time_now | |
else: | |
time_until_next = time_now - time_now | |
return {'Total number of unprocessed errors': self._Q.qsize() + self._q.qsize(), | |
'Intervals': 'Poll: {}s, Send: {}s'.format(self._POLL_INTERVAL, self._SEND_INTERVAL), | |
'Poll duration max': '{}s'.format(self._POLL_DURATION_MAX), | |
'Time since last email': time_since_last, | |
'Time to next earliest possible email': 'at least {}'.format(time_until_next), | |
# This simplification doesn't account for _POLL_INTERVAL and _POLL_DURATION_MAX, etc. | |
'Recipients': self._header['toaddrs_str'], | |
} | |
def __init__(self, fromaddr, toaddrs, subject): | |
# Setup instance environment | |
self._active = True | |
self._q = Queue.Queue() # this is different from self._Q | |
# Construct email header | |
self._header = {'fromaddr': fromaddr, | |
'toaddrs': toaddrs, | |
'toaddrs_str': ','.join(toaddrs), | |
'subject': subject, | |
} | |
self._header['header'] = 'From: {fromaddr}\r\nTo: {toaddrs_str}\r\nSubject: {subject}\r\n\r\n'.format(**self._header) | |
# Start main buffer-processor thread | |
thread_name = '{}Thread'.format(self.__class__.__name__) | |
# Note: The class is intentionally not inherited from threading.Thread, | |
# as doing so was found to result in the target thread not being | |
# named correctly, possibly due to a namespace collision. | |
thread = threading.Thread(target=self.run, name=thread_name) | |
thread.daemon = True | |
thread.start() | |
super(BufferingSMTPHandler, self).__init__() | |
def close(self): | |
"""Process some remaining records.""" | |
super(BufferingSMTPHandler, self).close() | |
self._active = False | |
self._POLL_DURATION_MAX = min(0.25, self._POLL_DURATION_MAX) | |
# no need to set self.__class__._POLL_DURATION_MAX | |
self._process_recordset() | |
def emit(self, record): | |
"""Queue a record into the class queue so it can be emitted | |
collectively.""" | |
# This method can be called by various threads. | |
self._Q.put(self.format(record)) | |
def run(self): | |
"""Periodically flush the buffer.""" | |
while self._active: | |
with self._LOCK: # protects _LAST_SEND_TIME and _q | |
next_send_time = self._LAST_SEND_TIME + self._SEND_INTERVAL | |
if time.time() > next_send_time: | |
self._process_recordset() | |
sleep_time = self._POLL_INTERVAL | |
else: | |
# assert (next_send_time != -inf) | |
sleep_time = max(next_send_time - time.time(), 0) | |
time.sleep(sleep_time) | |
def _process_recordset(self): | |
"""Process a set of records buffered in class queue.""" | |
try: | |
self._move_recordset_from_Q_to_q() | |
if not self._q.empty(): | |
self._send_records_from_q() | |
self.__class__._LAST_SEND_TIME = time.time() | |
except (KeyboardInterrupt, SystemExit): | |
pass | |
def _move_recordset_from_Q_to_q(self): | |
"""Move a set of records from class queue to instance queue.""" | |
deadline = time.time() + self._POLL_DURATION_MAX | |
while time.time() < deadline: | |
try: | |
self._q.put(self._Q.get_nowait()) | |
self._Q.task_done() | |
except Queue.Empty: | |
if self._q.empty(): | |
break | |
time.sleep(0.1) | |
def _send_records_from_q(self): | |
"""Send records that are in instance queue.""" | |
records = [] | |
try: | |
# Get formatted records from instance queue | |
while True: | |
records.append(self._q.get_nowait()) | |
self._q.task_done() | |
except (Queue.Empty, KeyboardInterrupt, SystemExit): | |
pass | |
finally: | |
# Send formatted records from instance queue | |
if records: | |
body = 'Included messages: {}\r\n'.format(len(records)) | |
num_pending_messages = self._Q.qsize() + self._q.qsize() | |
if num_pending_messages > 0: | |
body += 'Pending messages: {}\r\n'.format(num_pending_messages) | |
# Add main content of message body | |
body += '\r\n' | |
body += '\r\n\r\n'.join(records) | |
msg = self._header['header'] + body | |
smtp = smtplib.SMTP() | |
smtp.connect() | |
smtp.sendmail(self._header['fromaddr'], self._header['toaddrs'], msg) | |
smtp.quit() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment