Skip to content

Instantly share code, notes, and snippets.

@Dominik1123
Last active December 8, 2022 18:25
Poll emails from byom.de
import argparse
from datetime import datetime
import difflib
from functools import partial
import logging
import os.path
import requests
import time
parser = argparse.ArgumentParser()
parser.add_argument('email', type=str.lower)
parser.add_argument('--contains', nargs='+', help='Triggers if a poll contains any of the provided values')
parser.add_argument('--interval', default=60, type=int, help='Polling interval in seconds')
parser.add_argument('--save', action='store_true', help='Save new text from polls when triggered')
# Add custom functions here to trigger on new content from a poll.
# Functions should accept two arguments `old, current` where
# `old` is the text from the previous poll and `current` is the text
# from the current poll. A true return value will cause a trigger.
checks = []
# Add custom functions here to react on a trigger from a poll.
# Functions should accept two arguments `old, current` where
# `old` is the text from the previous poll and `current` is the text
# from the current poll.
callbacks = []
headers_template = {
'Host': 'api.byom.de',
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:70.0) Gecko/20100101 Firefox/70.0',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://www.byom.de/',
'DNT': '1',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
def get_new_text(old, new):
diff = difflib.ndiff(old.splitlines(keepends=True), new.splitlines(keepends=True))
return ''.join(line[2:] for line in diff if line.startswith('+ '))
def check_contains(old, new, *, strings):
new = get_new_text(old, new)
return any(x in new for x in strings)
def save_new_text(old, new):
now = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
f_name = f'{os.path.splitext(__file__)[0]}_{now}.txt'
with open(f_name, 'w') as fh:
fh.write(get_new_text(old, new))
logging.info('saving new text to %s', f_name)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
args = parser.parse_args()
if args.contains:
checks.insert(0, partial(check_contains, strings=args.contains))
logging.info('Adding contains check: %s', args.contains)
if args.save:
callbacks.insert(0, save_new_text)
old = ''
while True:
logging.info('checking mails')
r = requests.post('https://www.byom.de/nachrichten/privatdetekteien',
data={'main-search': args.email})
headers = headers_template.copy()
headers.update(Cookie=f'__cfduid={r.cookies["__cfduid"]}')
r = requests.get(f'https://api.byom.de/mails/{args.email}?alt=json-in-script&callback=angular.callbacks._a',
headers=headers)
logging.debug('poll text length: %d', len(r.text))
if any(func(old, r.text) for func in checks):
logging.info('new trigger')
for func in callbacks:
func(old, r.text)
old = r.text
time.sleep(args.interval)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment