Created
August 13, 2013 16:22
-
-
Save engelju/6222894 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import re | |
import json | |
import time | |
import datetime | |
import requests | |
import operator | |
import concurrent.futures | |
API_URL = "http://apify.ifc0nfig.com/tpb/search" | |
def get_newest_episode(name_): | |
def episode_of(s): | |
patterns = ("S(\d{1,2})E(\d{1,2})[^\w]", "[^\w](\d{1,2})x(\d{1,2})[^\w]") | |
for pattern in patterns: | |
match = re.search(pattern, s) | |
if match: | |
season, episode = map(int, match.groups()) | |
return season, episode | |
return None | |
def convertdate(s): | |
s = s.lower() | |
formats = ("today %H:%M", | |
"y-day %H:%M", | |
"%m-%d %H:%M", | |
"%Y %H:%M") | |
for f in formats[2:]: | |
try: | |
t = time.strptime(s, f) | |
if f.startswith("t"): t += datetime.today() | |
elif f.startswith("y"): t += datetime.today() - datetime.timedelta(days=1) | |
return time.strptime(s, f) | |
except ValueError: | |
pass | |
return time.gmtime() | |
name = name_.strip().lower() | |
params = {"id": name, "top": 20} | |
req = requests.get(API_URL, params=params, timeout=10) | |
if not req.ok: return (name, None) | |
answer = req.content.decode() | |
torrents = [i for i in json.loads(answer) if i["trusted"]] | |
for i in torrents: i["episode"] = episode_of(i["name"]) | |
torrents = [i for i in torrents if i["episode"]] | |
m = max(torrents, key=operator.itemgetter("episode")) | |
m["date"] = convertdate(m["uploaded"]) | |
return (name_, m) | |
series = [ | |
"How I Met Your Mother", | |
"Doctor Who", | |
"The Simpsons", | |
"Two and a Half Men", | |
"Elementary", | |
"Bones", | |
"The Big Bang Theory", | |
"Revolution" | |
] | |
episodes = [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: | |
fs = [executor.submit(get_newest_episode, s) for s in series] | |
for f in concurrent.futures.as_completed(fs): | |
name, d = f.result() | |
if d: | |
print(name, "S%02dE%2d" % d["episode"], "-", d["size"], "-", d["uploaded"], | |
"(%s)" % d["name"], "[%s/%s]" % (d["leechers"], d["seeders"])) | |
print(d["magnet"]) | |
else: | |
print("Connection error when searching for %s." % name) | |
print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment