Skip to content

Instantly share code, notes, and snippets.

@vimholic
Last active March 3, 2026 01:46
Show Gist options
  • Select an option

  • Save vimholic/6322397dbd6d5a632693e5ee03aed4bb to your computer and use it in GitHub Desktop.

Select an option

Save vimholic/6322397dbd6d5a632693e5ee03aed4bb to your computer and use it in GitHub Desktop.
SJVA Agent WebP Patch - agent_base.py + module_ktv.py + module_yaml_base.py
# -*- coding: utf-8 -*-
import os, traceback, json, urllib, re, unicodedata, time, urllib2, io, subprocess
from io import open
from functools import wraps
import yaml
"""
class MetadataSearchResult(XMLObject):
def __init__(self, core, id, name=None, year=None, score=0, lang=None, thumb=None):
XMLObject.__init__(self, core, id=id, thumb=thumb, name=name, year=year, score=score, lang=lang)
self.tagName = "SearchResult"
"""
def d(data):
if type(data) in [type({}), type([])]:
import json
return '\n' + json.dumps(data, indent=4, ensure_ascii=False)
else:
return str(data)
class AgentBase(object):
key_map = {
'com.plexapp.agents.sjva_agent_jav_censored' : 'C', # C : censored dvd
'com.plexapp.agents.sjva_agent_jav_censored_ama' : 'D', # D : censored ama
'com.plexapp.agents.sjva_agent_jav_uncensored' : 'E', # E : uncensored
# W : western
'com.plexapp.agents.sjva_agent_jav_fc2' : 'L', # L : fc2
'com.plexapp.agents.sjva_agent_ktv' : 'K', # K : 국내TV
'com.plexapp.agents.sjva_agent_ftv' : 'F', # F : 외국TV
# F : FTV
# A : ani
'com.plexapp.agents.sjva_agent_ott_show' : 'P',
'com.plexapp.agents.sjva_agent_movie' : 'M', # M : 영화
'com.plexapp.agents.sjva_agent_music_normal' : 'S', # S : 멜론 앨범, 아티스트
#'com.plexapp.agents.sjva_agent_music_folder' : 'T', # T : 폴더 구조
# 오디오북?
'com.plexapp.agents.sjva_agent_audiobook' : 'B', # B : 오디오북
'com.plexapp.agents.sjva_agent_yaml' : 'Y', # Y : yaml
}
extra_map = {
'trailer' : TrailerObject,
'deletedscene' : DeletedSceneObject,
'behindthescenes' : BehindTheScenesObject,
'interview' : InterviewObject,
'sceneorsample' : SceneOrSampleObject,
'featurette' : FeaturetteObject,
'short' : ShortObject,
'other' : OtherObject,
'musicvideo' : MusicVideoObject,
'livemusicvideo' : LiveMusicVideoObject,
'lyricmusicvideo' : LyricMusicVideoObject,
'concertvideo' : ConcertVideoObject,
}
token = None
def search_result_line(self):
text = ' ' + ' '.ljust(80, "=") + ' '
return text
def try_except(original_function):
@wraps(original_function)
def wrapper_function(*args, **kwargs): #1
try:
return original_function(*args, **kwargs)
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
return wrapper_function
def send_search(self, module_name, keyword, manual, year=''):
try:
param = ''
if module_name in ['music_normal_artist', 'music_normal_album']:
param = module_name.split('_')[-1]
module_name = 'music_normal'
module_prefs = self.get_module_prefs(module_name)
sjva_mod_url = '/metadata/api/{module_name}'.format(module_name=module_name)
#url = '{ddns}/metadata/api/{module_name}/search?keyword={keyword}&manual={manual}&year={year}&call=plex&apikey={apikey}'.format(
url = '{ddns}{sjva_mod_url}/search?keyword={keyword}&manual={manual}&year={year}&call=plex&apikey={apikey}&param={param}'.format(
ddns=Prefs['server'] if module_prefs['server'] == '' else module_prefs['server'],
sjva_mod_url=sjva_mod_url,
module_name=module_name,
keyword=urllib.quote(keyword.encode('utf8')),
manual=manual,
year=year,
apikey=Prefs['apikey'] if module_prefs['apikey'] == '' else module_prefs['apikey'],
param=param,
)
Log(url)
return AgentBase.my_JSON_ObjectFromURL(url)
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def send_info(self, module_name, code, title=None):
try:
param = ''
if module_name in ['music_normal_artist', 'music_normal_album']:
param = module_name.split('_')[-1]
module_name = 'music_normal'
module_prefs = self.get_module_prefs(module_name)
sjva_mod_url = '/metadata/api/{module_name}'.format(module_name=module_name)
#url = '{ddns}/metadata/api/{module_name}/info?code={code}&call=plex&apikey={apikey}'.format(
url = '{ddns}{sjva_mod_url}/info?code={code}&call=plex&apikey={apikey}&param={param}'.format(
ddns=Prefs['server'] if module_prefs['server'] == '' else module_prefs['server'],
sjva_mod_url=sjva_mod_url,
module_name=module_name,
code=urllib.quote(code.encode('utf8')),
apikey=Prefs['apikey'] if module_prefs['apikey'] == '' else module_prefs['apikey'],
param=param,
)
if title is not None:
url += '&title=' + urllib.quote(title.encode('utf8'))
Log(url)
return AgentBase.my_JSON_ObjectFromURL(url)
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def send_episode_info(self, module_name, code):
try:
module_prefs = self.get_module_prefs(module_name)
url = '{ddns}/metadata/api/{module_name}/episode_info?code={code}&call=plex&apikey={apikey}'.format(
ddns=Prefs['server'] if module_prefs['server'] == '' else module_prefs['server'],
module_name=module_name,
code=urllib.quote(code.encode('utf8')),
apikey=Prefs['apikey'] if module_prefs['apikey'] == '' else module_prefs['apikey']
)
Log(url)
return AgentBase.my_JSON_ObjectFromURL(url)
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def change_html(self, text):
if text is not None:
return text.replace('&nbsp;', ' ').replace('&lt;', '<').replace('&gt;', '>').replace('&amp;', '&').replace('&quot;', '"').replace('&#35;', '#').replace('&#39;', "‘")
def get_module_prefs(self, module):
try:
ret = {'server':'', 'apikey':'', 'end_noti_filepath':'', 'include_time_info':''}
CURRENT_PATH = re.sub(r'^\\\\\?\\', '', os.getcwd())
pref_filepath = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(CURRENT_PATH))), 'Plug-in Support', 'Preferences', 'com.plexapp.agents.sjva_agent_%s.xml' % module)
if os.path.exists(pref_filepath):
tfile = open(pref_filepath, encoding='utf8')
text = tfile.read()
tfile.close()
if text is not None:
prefs = XML.ElementFromString(text)
for child in prefs.getchildren():
ret[child.tag] = '' if child.text is None else child.text
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
return ret
@staticmethod
def get_key(media):
try:
Log('...............................')
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s' % media.id)
section_id = str(data['MediaContainer']['librarySectionID'])
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/sections')
for item in data['MediaContainer']['Directory']:
if item['key'] == section_id:
Log("GET_KEY: %s", item)
return AgentBase.key_map[item['agent']]
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
@staticmethod
def my_JSON_ObjectFromURL(url, timeout=None, retry=3):
try:
if timeout is None:
timeout = int(Prefs['timeout'])
Log('my_JSON_ObjectFromURL retry : %s, url : %s', retry, url)
return JSON.ObjectFromURL(url, timeout=timeout)
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
if retry > 0:
time.sleep(1)
Log('RETRY : %s', retry)
return AgentBase.my_JSON_ObjectFromURL(url, timeout, retry=(retry-1))
else:
Log('CRITICAL my_JSON_ObjectFromURL error')
def get_keyword_from_file(self, media):
try:
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s' % media.id)
filename = data['MediaContainer']['Metadata'][0]['Media'][0]['Part'][0]['file']
ret = os.path.splitext(os.path.basename(filename))[0]
return ret
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def get_token(self):
try:
if self.token is None:
url = 'http://127.0.0.1:32400/myplex/account'
data = JSON.ObjectFromURL(url)
self.token = data['MyPlex']['authToken']
return self.token
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def get_json_filepath(self, media):
try:
json_filename = 'info.json'
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s?includeChildren=1' % media.id)
section_id = str(data['MediaContainer']['librarySectionID'])
#Log(self.d(data))
if data['MediaContainer']['Metadata'][0]['type'] == 'album':
#Log(d(data))
Log('타입 : 앨범')
if self.module_name in ['music_normal_album'] and 'Location' in data['MediaContainer']['Metadata'][0]:
folderpath = data['MediaContainer']['Metadata'][0]['Location'][0]['path']
return os.path.join(folderpath, 'album.json')
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s/children' % media.id)
#Log(self.d(data))
elif data['MediaContainer']['Metadata'][0]['type'] == 'artist':
Log('타입 : 아티스트')
"""
# 이거 너무 상위 폴더로 가버림.
if self.module_name in ['music_normal_artist'] and 'Location' in data['MediaContainer']['Metadata'][0]:
folderpath = data['MediaContainer']['Metadata'][0]['Location'][0]['path']
return os.path.join(folderpath, 'artist.json')
"""
json_filename = 'artist.json'
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s/children' % data['MediaContainer']['Metadata'][0]['Children']['Metadata'][0]['ratingKey'])
if 'Media' in data['MediaContainer']['Metadata'][0]:
filename = data['MediaContainer']['Metadata'][0]['Media'][0]['Part'][0]['file']
if self.module_name in ['movie']:
ret = os.path.join(os.path.dirname(filename), 'info.json')
elif self.module_name in ['jav_censored', 'jav_censored_ama', 'jav_fc2', 'jav_uncensored']:
section_id_list = []
if Prefs['filename_json'] is not None:
section_id_list = Prefs['filename_json'].split(',')
if Prefs['filename_json'] == 'all' or section_id in section_id_list:
tmp = os.path.splitext(os.path.basename(filename))
code = tmp[0].split(' ')[0]
if code[-2] == 'd' and code [-3] == 'c':
code = code[:-3].strip(' .-')
ret = os.path.join(os.path.dirname(filename), '%s.json' % code)
else:
ret = os.path.join(os.path.dirname(filename), 'info.json')
elif self.module_name in ['book']:
ret = os.path.join(os.path.dirname(filename), 'audio.json')
elif self.module_name in ['music_normal_album']:
parent = os.path.split(os.path.dirname(filename))[1]
match = re.match('(CD|DISC)\s?(?P<disc>\d+)', parent, re.IGNORECASE)
if match:
ret = os.path.join(os.path.dirname(os.path.dirname(filename)), 'album.json')
else:
ret = os.path.join(os.path.dirname(filename), 'album.json')
elif self.module_name in ['music_normal_artist']:
parent = os.path.split(os.path.dirname(filename))[1]
match = re.match('(CD|DISC)\s?(?P<disc>\d+)', parent, re.IGNORECASE)
if match:
album_root = os.path.dirname(os.path.dirname(filename))
else:
album_root = os.path.dirname(filename)
album_basename = os.path.basename(album_root)
if album_basename.count(' - ') == 1:
ret = os.path.join(album_root, 'artist.json')
else:
ret = os.path.join(os.path.dirname(album_root), 'artist.json')
elif 'Location' in data['MediaContainer']['Metadata'][0]:
# 쇼... ktv, ftv
folderpath = data['MediaContainer']['Metadata'][0]['Location'][0]['path']
ret = os.path.join(folderpath, 'info.json')
else:
ret = None
Log('info.json 위치 : %s' % ret)
return ret
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def save_info(self, media, info):
try:
ret = self.get_json_filepath(media)
Log('세이브 : %s', ret)
if ret is None:
return
import io
with io.open(ret, 'w', encoding="utf-8") as outfile:
data = json.dumps(info, ensure_ascii=False, indent=4)
if isinstance(data, str):
data = data.decode("utf-8")
outfile.write(data)
return True
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
return False
def get_info_json(self, media):
try:
filepath = self.get_json_filepath(media)
if filepath is None:
return
return self.read_json(filepath)
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def read_json(self, filepath):
data = None
if os.path.exists(filepath):
import io
with io.open(filepath, 'r', encoding="utf-8") as outfile:
tmp = outfile.read()
data = json.loads(tmp)
return data
# KTV에서 사용. 있으면 추가
# ftv에서 시즌정보
def append_info(self, media, key, info):
try:
ret = self.get_json_filepath(media)
if ret is None:
return
all_data = self.get_info_json(media)
if all_data is None:
all_data = {}
import io
with io.open(ret, 'w', encoding="utf-8") as outfile:
all_data[key] = info
data = json.dumps(all_data, ensure_ascii=False, indent=4)
data = data.decode('utf-8')
if isinstance(data, str):
data = data.decode("utf-8")
outfile.write(data)
return True
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
return False
def remove_info(self, media):
try:
ret = self.get_json_filepath(media)
# 구드공인 경우 캐시때문에 exists 함수 실패하는 것 같음.
if ret is not None: #and os.path.exists(ret):
Log("info.json 삭제1 %s", ret)
os.remove(ret)
#time.sleep(2)
except Exception as e:
try:
Log("info.json 삭제2 %s", ret)
#os.system('rm %s' % ret)
# 2021-11-27 by lapis https://sjva.me/bbs/board.php?bo_table=suggestions&wr_id=1978
os.system('rm "%s"' % ret)
except:
pass
#Log('Exception:%s', e)
#Log(traceback.format_exc())
def is_include_time_info(self, media):
try:
if Prefs['include_time_info'] == 'all':
return True
if Prefs['include_time_info'] == '' or Prefs['include_time_info'] is None:
return False
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s' % media.id)
section_id = str(data['MediaContainer']['librarySectionID'])
section_id_list = Prefs['include_time_info'].split(',')
return section_id in section_id_list
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
return False
def is_read_json(self, media):
try:
if Prefs['read_json'] == 'all':
return True
if Prefs['read_json'] == '' or Prefs['read_json'] is None:
return False
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s' % media.id)
section_id = str(data['MediaContainer']['librarySectionID'])
section_id_list = Prefs['read_json'].split(',')
return section_id in section_id_list
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
return False
def is_write_json(self, media):
try:
if Prefs['write_json'] == 'all':
return True
if Prefs['write_json'] == '' or Prefs['write_json'] is None:
return False
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s' % media.id)
section_id = str(data['MediaContainer']['librarySectionID'])
section_id_list = Prefs['write_json'].split(',')
return section_id in section_id_list
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
return False
def is_show_extra(self, media):
try:
if Prefs['show_extra_enabled'] == 'all':
return True
if Prefs['show_extra_enabled'] == '' or Prefs['show_extra_enabled'] is None:
return False
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s' % media.id)
section_id = str(data['MediaContainer']['librarySectionID'])
section_id_list = Prefs['show_extra_enabled'].split(',')
return section_id in section_id_list
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
return False
def is_collection_append(self, media):
try:
if Prefs['collection_disalbed'] == 'all':
return False
if Prefs['collection_disalbed'] == '' or Prefs['collection_disalbed'] is None:
return True
section_id_list = Prefs['collection_disalbed'].split(',')
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s' % media.id)
section_id = str(data['MediaContainer']['librarySectionID'])
return not (section_id in section_id_list)
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
return True
def d(self, data):
return json.dumps(data, indent=4, ensure_ascii=False)
# for YAML
def get(self, data, field, default):
ret = data.get(field, None)
if ret is None or ret == '':
ret = default
return ret
def get_bool(self, data, field, default):
ret = data.get(field, None)
if ret is None or ret == '':
ret = str(default)
if ret.lower() in ['true']:
return True
elif ret.lower() in ['false']:
return False
return ret
def webp_chunk(self, content):
try:
if content is None:
return None
if content[0:4] != b'RIFF' or content[8:12] != b'WEBP':
return None
return content[12:16]
except Exception:
return None
def is_webp_vp8x(self, content):
try:
return self.webp_chunk(content) == b'VP8X'
except Exception:
return False
def ffmpeg_convert_vp8_webp(self, content, ffmpeg_bin='/usr/local/bin/ffmpeg-static'):
"""Convert one-frame image bytes (WebP VP8X) to VP8 WebP using ffmpeg. Returns bytes or None."""
try:
args = [
ffmpeg_bin,
'-hide_banner', '-loglevel', 'error',
'-i', 'pipe:0',
'-map_metadata', '-1',
'-frames:v', '1',
'-vf', 'format=yuv420p',
'-c:v', 'libwebp',
'-lossless', '0',
'-q:v', '75',
'-f', 'webp',
'pipe:1'
]
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate(content)
if p.returncode == 0 and out:
return out
try:
Log('ffmpeg convert failed rc=%s err=%s', p.returncode, err)
except Exception:
pass
return None
except Exception as e:
try:
Log('ffmpeg convert exception: %s', e)
except Exception:
pass
return None
def safe_image_content(self, url):
"""Fetch image bytes. If WebP VP8X, try ffmpeg convert to VP8 WebP; on failure return None (skip)."""
if url is None or url == '':
return None
try:
content = HTTP.Request(url).content
except Exception as e:
try:
Log('safe_image_content fetch exception: %s', e)
except Exception:
pass
return None
try:
if self.is_webp_vp8x(content):
converted = self.ffmpeg_convert_vp8_webp(content)
if converted is not None:
return converted
return None
return content
except Exception as e:
try:
Log('safe_image_content exception: %s', e)
except Exception:
pass
return None
def get_list(self, data, field):
ret = data.get(field, None)
if ret is None:
ret = []
else:
if type(ret) != type([]):
ret = [x.strip() for x in ret.split(',')]
return ret
def get_person_list(self, data, field):
ret = data.get(field, None)
if ret is None:
ret = []
else:
if type(ret) != type([]):
tmp = []
for value in ret.split(','):
tmp.append({'name':value.strip()})
ret = tmp
return ret
def get_media_list(self, data, field):
ret = data.get(field, None)
if ret is None:
ret = []
else:
if type(ret) != type([]):
tmp = []
insert_index = -1
for idx, value in enumerate(ret.split(',')):
if value.startswith('http'):
tmp.append({'url':value.strip()})
insert_index = idx
else:
if insert_index > -1:
tmp[insert_index]['url'] = '%s,%s' % (tmp[insert_index]['url'], value)
ret = tmp
return ret
# 포인터가 아니다. 변수의 값이 넘어와버린다
# setattr로 클래스 변수 값을 셋한다.
# 그런데 기본형(string, int)이 아닌 것들은 포인터처럼 처리..
# set_data만 setattr로.. 나머지는 getattr로 변수주소를 받아 처리
def set_data(self, meta, data, field, is_primary):
try:
Log('set_data : %s', field)
value = self.get(data, field, None)
if value is not None:
if field == 'title_sort':
value = unicodedata.normalize('NFKD', value)
elif field in ['originally_available_at', 'available_at']:
value = Datetime.ParseDate(value).date()
elif field in ['rating', 'audience_rating']:
value = float(value)
elif field == 'year':
value = int(value)
setattr(meta, field, value)
elif is_primary:
setattr(meta, field, None)
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def set_data_list(self, meta, data, field, is_primary):
try:
meta = getattr(meta, field)
value = self.get_list(data, field)
if len(value) > 0:
meta.clear()
for tmp in value:
meta.add(tmp)
elif is_primary:
meta.clear()
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def set_data_person(self, meta, data, field, is_primary):
try:
meta = getattr(meta, field)
value = self.get_person_list(data, field)
if len(value) > 0:
meta.clear()
for person in value:
meta_person = meta.new()
meta_person.name = self.get(person, 'name', None)
meta_person.role = self.get(person, 'role', None)
meta_person.photo = self.get(person, 'photo', None)
elif is_primary:
meta.clear()
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def set_data_media(self, meta, data, field, is_primary):
try:
meta = getattr(meta, field)
value = self.get_media_list(data, field)
if len(value) > 0:
valid_names = []
for idx, media in enumerate(value):
src = media['thumb'] if 'thumb' in media else media['url']
content = self.safe_image_content(src)
if content is None:
continue
valid_names.append(media['url'])
meta[media['url']] = Proxy.Preview(content, sort_order=idx+1)
meta.validate_keys(valid_names)
elif is_primary:
meta.validate_keys([])
Log(meta)
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def set_data_reviews(self, meta, data, field, is_primary):
try:
meta = getattr(meta, field)
value = self.get(data, field, [])
if len(value) > 0:
meta.clear()
for review in value:
r = meta.new()
r.author = self.get(review, 'author', None)
r.source = self.get(review, 'source', None)
r.image = self.get(review, 'image', None)
r.link = self.get(review, 'link', None)
r.text = self.get(review, 'text', None)
elif is_primary:
meta.clear()
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def set_data_extras(self, meta, data, field, is_primary):
try:
meta = getattr(meta, field)
value = self.get(data, field, [])
if len(value) > 0:
for extra in value:
mode = self.get(extra, 'mode', None)
extra_type = self.get(extra, 'type', 'trailer')
extra_class = self.extra_map[extra_type.lower()]
url = 'sjva://sjva.me/playvideo/%s|%s' % (mode, extra.get('param'))
meta.add(
extra_class(
url=url,
title=self.change_html(extra.get('title', '')),
originally_available_at = Datetime.ParseDate(self.get(extra, 'originally_available_at', '1900-12-31')).date(),
thumb=self.get(extra, 'thumb', '')
)
)
elif is_primary:
#Log(meta)
#meta.clear()
pass
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def yaml_load(self, filepath):
#data = self.yaml_load(filepath)
#data = yaml.load(io.open(filepath), Loader=yaml.BaseLoader)
try:
data = yaml.load(io.open(filepath, encoding='utf-8'), Loader=yaml.BaseLoader)
except:
data = yaml.load(io.open(filepath, encoding='euc-kr'), Loader=yaml.BaseLoader)
return data
def get_code_from_folderpath(self, media):
# 2024.09.23 폴더명에서 정보얻기
# 카테고리는 char 무시. 영화:M, 쇼:F
try:
jsonpath = self.get_json_filepath(media)
foldername = os.path.basename(os.path.dirname(jsonpath))
Log('폴더명: %s', foldername)
match = re.search('[\[\{](?P<code>([a-zA-Z0-9]+)|(tmdb\-\d+)|(tvdb\-\d+))[\]\}]', foldername, re.IGNORECASE)
if match:
tmp = match.group('code')
code = tmp.replace('tmdb-', 'MT').replace('tvdb-', 'FU')
Log('get_code_from_folderpath: %s', code)
return code
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
class PutRequest(urllib2.Request):
def __init__(self, *args, **kwargs):
return urllib2.Request.__init__(self, *args, **kwargs)
def get_method(self, *args, **kwargs):
return 'PUT'
#!/usr/bin/env python3
"""
fix_vp8x.py — Plex WebP → JPEG 변환 스크립트 v4
Plex + SJVA 에이전트 환경에서 WebP 이미지(VP8/VP8L/VP8X)로 인한 크래시 방지
v4 변경사항:
- WebP → JPEG 변환 (VP8 재인코딩은 효과 없음 확인)
- ffmpeg-static -q:v 2 사용
v2 변경사항:
- VP8X뿐 아니라 VP8, VP8L 등 모든 WebP 포맷 탐지/변환
- 로그 메시지에 WebP 서브타입(VP8/VP8L/VP8X) 표시
사용법:
python3 fix_vp8x.py [옵션]
옵션:
-b, --base PATH Plex Metadata 폴더 경로 (기본: Synology 표준 경로)
-d, --days N 최근 N일 이내 수정된 파일만 스캔 (0=전체, 기본: 0)
-w, --workers N 병렬 처리 수 (기본: 4)
-q, --quality N JPEG 변환 품질 0-100 (기본: 90)
--discord-webhook URL 결과를 Discord 웹훅으로 전송
--dry-run 변환 없이 대상 파일만 출력
--quiet 진행 메시지 최소화 (cron용)
-v, --verbose 성공 파일도 출력
요구사항:
- Python 3.6+
- ImageMagick (convert 명령)
- sudo 권한 (Plex 파일 소유권 처리용, 없으면 일반 권한으로 시도)
스케줄링:
- Synology: DSM 제어판 → 작업 스케줄러 → 사용자 정의 스크립트 (root)
(/etc/crontab은 DSM 업데이트 시 초기화되므로 사용 금지)
"""
import os
import sys
import time
import sqlite3
import subprocess
import argparse
import logging
import json
import threading
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
# ── 기본 설정 ──────────────────────────────────────────────────
DEFAULT_BASES = [
"/volume1/PlexMediaServer/AppData/Plex Media Server/Metadata",
"/volume2/PlexMediaServer/AppData/Plex Media Server/Metadata",
"/volume3/PlexMediaServer/AppData/Plex Media Server/Metadata",
"/volume4/PlexMediaServer/AppData/Plex Media Server/Metadata",
"/volume5/PlexMediaServer/AppData/Plex Media Server/Metadata",
]
LIBS = ["Movies", "TV Shows", "Albums", "Artists"]
# 에이전트 원본 + Plex 활성 이미지 디렉토리 모두 스캔
IMAGE_DIRS = {"posters", "art", "thumb", "backgrounds", "seasons"}
# ── WebP 탐지 ──────────────────────────────────────────────────
def get_webp_type(fpath: str) -> str:
"""WebP 파일의 서브타입 반환. WebP가 아니면 None 반환.
Returns:
"VP8" - Lossy WebP
"VP8L" - Lossless WebP
"VP8X" - Extended WebP
None - WebP가 아님
"""
try:
with open(fpath, "rb") as f:
h = f.read(21)
if len(h) < 12:
return None
# RIFF 컨테이너 + WEBP 시그니처
if h[0:4] != b"RIFF" or h[8:12] != b"WEBP":
return None
# 서브타입 판별
if len(h) >= 16 and h[12:16] == b"VP8X":
return "VP8X"
if len(h) >= 16 and h[12:16] == b"VP8L":
return "VP8L"
if len(h) >= 15 and h[12:15] == b"VP8":
return "VP8"
# RIFF+WEBP이지만 알 수 없는 서브타입 → 여전히 WebP
return "VP8"
except (OSError, PermissionError):
return None
def is_webp(fpath: str) -> bool:
"""모든 WebP 포맷 탐지 (VP8, VP8L, VP8X)"""
return get_webp_type(fpath) is not None
# 하위 호환: 기존 is_vp8x 함수도 유지
def is_vp8x(fpath: str) -> bool:
return get_webp_type(fpath) == "VP8X"
# ── Plex DB ────────────────────────────────────────────────────
def find_plex_db(base: str) -> str:
"""Metadata 경로에서 Plex 라이브러리 DB 경로 자동 추론"""
plex_root = os.path.dirname(base)
db = os.path.join(plex_root, "Plug-in Support", "Databases",
"com.plexapp.plugins.library.db")
return db if os.path.isfile(db) else None
def extract_bundle_hash(fpath: str) -> str:
"""파일 경로에서 번들 해시 추출"""
parts = fpath.split(os.sep)
for i, part in enumerate(parts):
if part.endswith(".bundle"):
bundle_name = part[:-7]
if i > 0:
return parts[i - 1] + bundle_name
return bundle_name
return None
def lookup_titles(db_path: str, hashes: set) -> dict:
"""번들 해시 → 콘텐츠 제목 매핑"""
if not db_path or not hashes:
return {}
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
placeholders = ",".join("?" * len(hashes))
rows = conn.execute(
f"SELECT hash, title, metadata_type FROM metadata_items "
f"WHERE hash IN ({placeholders})",
list(hashes)
).fetchall()
conn.close()
type_map = {1: "🎬", 2: "📺", 4: "📺", 8: "🎵", 9: "🎵", 10: "🎵", 18: "📺"}
return {row[0]: (row[1], type_map.get(row[2], "📁")) for row in rows}
except Exception:
return {}
# ── Discord 웹훅 ───────────────────────────────────────────────
def send_discord(webhook_url: str, ok: int, fail: int, elapsed: float,
titles: list, days: int, dry_run: bool, type_stats: dict = None):
"""Discord 웹훅으로 변환 결과 전송"""
if not webhook_url:
return
action = "DRY-RUN" if dry_run else "변환 완료"
days_str = f"최근 {days}일" if days > 0 else "전체"
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
if titles:
title_lines = "\n".join(f"{icon} {title}" for title, icon in titles[:20])
if len(titles) > 20:
title_lines += f"\n...외 {len(titles) - 20}개"
content_section = f"\n**영향받은 콘텐츠 ({len(titles)}개)**\n```\n{title_lines}\n```"
else:
content_section = ""
type_info = ""
if type_stats:
type_info = " | ".join(f"{k}:{v}" for k, v in sorted(type_stats.items()))
type_info = f"\n포맷 : {type_info}"
status_icon = "✅" if fail == 0 else "⚠️"
total = ok + fail
message = (
f"{status_icon} **[Plex WebP Fix]** `{ts}` — {action}\n"
f"```\n"
f"범위 : {days_str}\n"
f"변환 : {ok}개 성공 / {fail}개 실패 / 총 {total}개{type_info}\n"
f"소요 : {elapsed:.1f}초\n"
f"```"
f"{content_section}"
)
if total == 0:
message = f"✅ **[Plex WebP Fix]** `{ts}` — WebP 파일 없음 (정상)"
try:
payload = json.dumps({"content": message}).encode()
req = urllib.request.Request(
webhook_url,
data=payload,
headers={
"Content-Type": "application/json",
"User-Agent": "DiscordBot (fix_vp8x, 2.0)",
},
method="POST"
)
urllib.request.urlopen(req, timeout=10)
except Exception as e:
logging.getLogger().warning(f"Discord 전송 실패: {e}")
# ── 헬퍼 함수 ──────────────────────────────────────────────────
def find_plex_base():
for path in DEFAULT_BASES:
if os.path.isdir(path):
return path
return None
def scan_library(lib_path: str, since_days: int = 0) -> list:
"""라이브러리 디렉토리를 스캔하여 모든 WebP 파일 탐지
.bundle 내 모든 이미지 디렉토리를 스캔:
- 에이전트 원본: posters/, art/, thumb/, backgrounds/, seasons/
- Plex 활성: _stored/, _combined/, _uploaded/
"""
results = []
cutoff = time.time() - since_days * 86400 if since_days > 0 else 0
for root, dirs, files in os.walk(lib_path):
basename = os.path.basename(root)
parent = os.path.basename(os.path.dirname(root))
# .bundle 내부의 이미지 관련 디렉토리만 스캔
if basename not in IMAGE_DIRS and parent not in IMAGE_DIRS:
continue
for fname in files:
fpath = os.path.join(root, fname)
if os.path.islink(fpath):
continue
if cutoff > 0:
try:
if os.path.getmtime(fpath) < cutoff:
continue
except OSError:
continue
if is_webp(fpath):
results.append(fpath)
return results
FFMPEG_STATIC = "/usr/local/bin/ffmpeg-static"
def convert_file(fpath: str, quality: int, dry_run: bool, use_sudo: bool) -> tuple:
"""WebP → JPEG 변환 (ffmpeg-static). 반환: (status, fpath, msg, webp_type)"""
webp_type = get_webp_type(fpath) or "?"
if dry_run:
return ("SKIP", fpath, "[dry-run]", webp_type)
tmp = f"/tmp/webp_fix_{os.getpid()}_{threading.get_ident()}_{os.path.basename(fpath)[:30]}.jpg"
try:
r = subprocess.run(
[FFMPEG_STATIC, "-y", "-i", fpath,
"-q:v", str(max(1, min(quality // 33, 3))), tmp],
capture_output=True, timeout=60
)
if r.returncode != 0:
return ("FAIL", fpath, r.stderr.decode(errors="replace").strip()[:120], webp_type)
if not os.path.exists(tmp) or os.path.getsize(tmp) == 0:
return ("FAIL", fpath, "ffmpeg 출력 파일 없음", webp_type)
# 원본 파일 소유권/퍼미션 보존
try:
stat = os.stat(fpath)
orig_uid, orig_gid, orig_mode = stat.st_uid, stat.st_gid, stat.st_mode
except OSError:
orig_uid = orig_gid = orig_mode = None
cp_cmd = ["sudo", "cp", tmp, fpath] if use_sudo else ["cp", tmp, fpath]
r2 = subprocess.run(cp_cmd, capture_output=True, timeout=10)
if r2.returncode != 0:
return ("FAIL", fpath, r2.stderr.decode(errors="replace").strip()[:120], webp_type)
# 소유권/퍼미션 복원
if orig_uid is not None:
try:
chown_cmd = ["sudo", "chown", f"{orig_uid}:{orig_gid}", fpath] if use_sudo \
else ["chown", f"{orig_uid}:{orig_gid}", fpath]
subprocess.run(chown_cmd, capture_output=True, timeout=5)
chmod_cmd = ["sudo", "chmod", oct(orig_mode)[-3:], fpath] if use_sudo \
else ["chmod", oct(orig_mode)[-3:], fpath]
subprocess.run(chmod_cmd, capture_output=True, timeout=5)
except Exception:
pass
return ("OK", fpath, "", webp_type)
except subprocess.TimeoutExpired:
return ("FAIL", fpath, "timeout", webp_type)
except Exception as e:
return ("FAIL", fpath, str(e)[:120], webp_type)
finally:
try:
os.remove(tmp)
except OSError:
pass
def check_dependencies() -> tuple:
has_ffmpeg = os.path.isfile(FFMPEG_STATIC) and os.access(FFMPEG_STATIC, os.X_OK)
has_sudo = subprocess.run(["sudo", "-n", "true"], capture_output=True).returncode == 0
return has_ffmpeg, has_sudo
# ── 메인 ──────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Plex WebP(VP8/VP8L/VP8X) → JPEG 변환 (UltraBlurProcessor 크래시 방지)"
)
parser.add_argument("-b", "--base", help="Plex Metadata 폴더 경로")
parser.add_argument("-d", "--days", type=int, default=0,
help="최근 N일 이내 파일만 스캔 (0=전체)")
parser.add_argument("-w", "--workers", type=int, default=4,
help="병렬 처리 수 (기본: 4)")
parser.add_argument("-q", "--quality", type=int, default=90,
help="JPEG 품질 0-100 (기본: 90)")
parser.add_argument("--discord-webhook", metavar="URL",
help="결과를 Discord 웹훅으로 전송")
parser.add_argument("--dry-run", action="store_true",
help="변환 없이 대상 파일만 출력")
parser.add_argument("--quiet", action="store_true",
help="진행 메시지 최소화 (cron용)")
parser.add_argument("-v", "--verbose", action="store_true",
help="성공 파일도 출력")
args = parser.parse_args()
level = logging.WARNING if args.quiet else logging.INFO
logging.basicConfig(format="%(message)s", level=level)
log = logging.getLogger()
has_ffmpeg, has_sudo = check_dependencies()
if not has_ffmpeg:
print(f"ERROR: ffmpeg-static이 없습니다: {FFMPEG_STATIC}", file=sys.stderr)
sys.exit(1)
if not has_sudo:
log.warning("⚠ sudo 권한 없음 — 파일 소유권 문제 시 일부 실패할 수 있습니다.")
base = args.base or find_plex_base()
if not base or not os.path.isdir(base):
print(f"ERROR: Plex Metadata 폴더를 찾을 수 없습니다.\n"
f" --base 옵션으로 경로를 지정하세요.", file=sys.stderr)
sys.exit(1)
plex_db = find_plex_db(base)
if plex_db:
log.info(f" Plex DB : {plex_db}")
else:
log.warning(" Plex DB : 찾을 수 없음 — 콘텐츠 제목 조회 불가")
mode = "[DRY-RUN] " if args.dry_run else ""
days_str = f"최근 {args.days}일 이내" if args.days > 0 else "전체"
ts_start = datetime.now().strftime("%Y-%m-%d %H:%M")
log.warning(f"[WebP Fix] {ts_start} 시작 — 범위: {days_str}")
log.info(f"{'='*60}")
log.info(f"{mode}Plex WebP 변환 시작 {ts_start}")
log.info(f" 경로 : {base}")
log.info(f" 범위 : {days_str}")
log.info(f" 대상 : VP8 + VP8L + VP8X (모든 WebP)")
log.info(f" 품질 : JPEG q:v {max(1, min(args.quality // 33, 3))}")
log.info(f" workers: {args.workers}")
log.info(f"{'='*60}")
# 스캔
all_webp = []
for lib in LIBS:
lib_path = os.path.join(base, lib)
if not os.path.isdir(lib_path):
continue
log.info(f" 스캔 중: {lib} ...")
found = scan_library(lib_path, args.days)
if found:
# 서브타입별 카운트
type_counts = {}
for f in found:
t = get_webp_type(f) or "?"
type_counts[t] = type_counts.get(t, 0) + 1
type_str = ", ".join(f"{k}:{v}" for k, v in sorted(type_counts.items()))
log.info(f" → WebP {len(found)}개 발견 ({type_str})")
all_webp.extend(found)
total = len(all_webp)
# 전체 서브타입 통계
type_stats = {}
for f in all_webp:
t = get_webp_type(f) or "?"
type_stats[t] = type_stats.get(t, 0) + 1
if total == 0:
log.warning(f"[WebP Fix] {datetime.now().strftime('%Y-%m-%d %H:%M')} 완료 — WebP 없음 (정상)")
log.info("\n✅ WebP 파일 없음 — 처리할 파일이 없습니다.")
send_discord(args.discord_webhook, 0, 0, 0, [], args.days, args.dry_run)
return
# 번들 해시 수집 → 제목 조회
hashes = {extract_bundle_hash(f) for f in all_webp if extract_bundle_hash(f)}
title_map = lookup_titles(plex_db, hashes)
if title_map:
log.info(f" 콘텐츠 : {len(title_map)}개 식별됨")
type_str = ", ".join(f"{k}:{v}" for k, v in sorted(type_stats.items()))
log.info(f"\n총 {total}개 WebP 파일 ({type_str}) {'목록 출력' if args.dry_run else '변환'} 시작...\n")
# 변환 실행
ok = fail = 0
start = time.time()
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = {
executor.submit(convert_file, f, args.quality, args.dry_run, has_sudo): f
for f in all_webp
}
for i, fut in enumerate(as_completed(futures), 1):
status, fpath, msg, wtype = fut.result()
short = os.path.basename(fpath)[:50]
if status == "OK":
ok += 1
if args.verbose:
h = extract_bundle_hash(fpath)
title = title_map.get(h, ("", ""))[0] if h else ""
log.info(f" OK [{wtype}]: {title or short}")
elif status == "SKIP":
ok += 1
h = extract_bundle_hash(fpath)
title = title_map.get(h, ("", ""))[0] if h else fpath
print(f" [DRY-RUN] [{wtype}] {title or fpath}")
else:
fail += 1
log.warning(f" FAIL [{wtype}]: {short}\n {msg}")
if not args.quiet and i % 100 == 0:
elapsed = time.time() - start
log.info(f" 진행: {i}/{total} ({ok}✓ {fail}✗) {elapsed:.0f}초")
elapsed = time.time() - start
action = "확인" if args.dry_run else "변환"
log.info(f"\n{'='*60}")
log.info(f"완료: {action} {ok}개 성공 / {fail}개 실패 / {elapsed:.1f}초")
log.info(f"포맷: {type_str}")
if title_map:
log.info(f"\n영향받은 콘텐츠 ({len(title_map)}개):")
for title, icon in sorted(title_map.values(), key=lambda x: x[0]):
log.info(f" {icon} {title}")
if fail > 0:
log.info(f" 실패 파일은 Plex가 다음 예약 작업 시 재다운로드합니다.")
log.info(f"{'='*60}")
log.warning(f"[WebP Fix] {datetime.now().strftime('%Y-%m-%d %H:%M')} 완료 — 변환 {ok}개 성공 / {fail}개 실패 / {elapsed:.1f}초")
# Discord 알림
titles_list = sorted(title_map.values(), key=lambda x: x[0]) if title_map else []
send_discord(args.discord_webhook, ok, fail, elapsed,
titles_list, args.days, args.dry_run, type_stats)
if fail > 0 and ok == 0:
sys.exit(1)
if __name__ == "__main__":
main()
# -*- coding: utf-8 -*-
import os, traceback, json, urllib, re, unicodedata, urllib2
from .agent_base import AgentBase, PutRequest
class ModuleKtv(AgentBase):
module_name = 'ktv'
def get_year(self, media):
try:
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s/children' % media.id)
# 시즌.
Log(json.dumps(data, indent=4))
filename = data['MediaContainer']['Metadata'][0]['Media'][0]['Part'][0]['file']
ret = os.path.splitext(os.path.basename(filename))[0]
match = Regex(r'(?P<date>\d{6})').search(ret)
if match:
return match.group('date')
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def search(self, results, media, lang, manual):
try:
try:
code = self.get_code_from_folderpath(media)
if code != None and code.startswith('MT'):
code = code.replace('MT', 'FT')
if code != None and code.startswith('F'):
meta = MetadataSearchResult(id=code, name=code, year=1900, score=100, thumb="", lang=lang)
results.Append(meta)
return
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
# 2021-12-13 닥터 슬럼프 리메이크 FT105262
if manual and media.show is not None and media.show.startswith('FT'):
code = media.show
meta = MetadataSearchResult(id=code, name=code, year='', score=100, thumb="", lang=lang)
results.Append(meta)
return
if manual and media.show is not None and media.show.startswith('K'):
# 2022-11-18 KBS 같은 경우
try:
code, title = media.show.split('|')
if code != 'KTV':
meta = MetadataSearchResult(id=code, name=title, year='', score=100, thumb="", lang=lang)
results.Append(meta)
return
except:
pass
# KTV|수당영웅
Log('SEARCH 0: %s' % media.show)
if manual and media.show is not None and media.show.startswith('KTV'):
keyword = media.show.replace('KTV|', '')
else:
Log('SEARCH : %s' % media.show)
keyword = media.show
Log('>> %s : %s %s' % (self.module_name, keyword, manual))
Log('KEYWORD : %s', keyword)
use_json = False
search_data = None
search_key = u'search|%s' % keyword
if self.is_read_json(media) and manual == False:
info_json = self.get_info_json(media)
if info_json is not None and search_key in info_json:
search_data = info_json[search_key]
use_json = True
if search_data is None:
search_data = self.send_search(self.module_name, keyword, manual)
if search_data is not None and self.is_write_json(media):
self.save_info(media, {search_key:search_data})
#self.append_info(media, search_key, search_data)
if search_data is None:
return
#Log(json.dumps(search_data, indent=4))
# 2021-07-07
# 다음 차단-> 차단상태에서 ott search data 저장 -> 점수 미달 -> 새로고침 안됨
max_score = 0
daum_max_score = 100
equal_max_score = 100
if 'daum' in search_data:
data = search_data['daum']
flag_media_season = False
if len(media.seasons) > 1:
for media_season_index in media.seasons:
if int(media_season_index) > 1:# and int(media_season_index) < 1900:
flag_media_season = True
break
# 미디어도 시즌, 메타도 시즌
if flag_media_season and len(data['series']) > 1:
# 마지막 시즌 ID
results.Append(MetadataSearchResult(
id=data['series'][-1]['code'],
name=u'%s | 시리즈' % keyword,
year=data['series'][-1]['year'],
score=100, lang=lang)
)
# 미디어 단일, 메타 시즌
elif len(data['series']) > 1:
#reversed
for index, series in enumerate(reversed(data['series'])):
Log(index)
Log(series)
if series['year'] is not None:
score = 95-(index*5)
if media.year == series['year']:
score = 100
if score < 20:
score = 20
if 'status' in series and series['status'] == 0:
score = score -40
max_score = max(max_score, score)
results.Append(MetadataSearchResult(id=series['code'], name=series['title'], year=series['year'], score=score, lang=lang))
# 미디어 단일, 메타 단일 or 미디어 시즌, 메타 단일
else:
# 2019-05-23 미리보기 에피들이 많아져서 그냥 방송예정도 선택되게.
#if data['status'] != 0:
# 2021-06-27 동명 컨텐츠중 년도 매칭되는것을 100으로 주기위해 99로 변경
if 'equal_name' in data and len(data['equal_name']) > 0:
score = daum_max_score = 99
#나의 아저씨 동명 같은 년도
if data['year'] == media.year:
score = daum_max_score = 100
equal_max_score = 99
else:
score = 100
meta = MetadataSearchResult(id=data['code'], name=data['title'], year=data['year'], thumb=data['image_url'], score=score, lang=lang)
tmp = data['extra_info'] + ' '
if data['status'] == 0:
tmp = tmp + u'방송예정'
elif data['status'] == 1:
tmp = tmp + u'방송중'
elif data['status'] == 2:
tmp = tmp + u'방송종료'
tmp = tmp + self.search_result_line() + data['desc']
meta.summary = tmp
meta.type = 'movie'
max_score = max(max_score, score)
results.Append(meta)
if 'equal_name' in data:
for index, program in enumerate(data['equal_name']):
if program['year'] == media.year:
score = min(equal_max_score, 100 - (index))
max_score = max(max_score, score)
results.Append(MetadataSearchResult(id=program['code'], name='%s | %s' % (program['title'], program['studio']), year=program['year'], score=score, lang=lang))
else:
score = min(equal_max_score, 80 - (index*5))
max_score = max(max_score, score)
results.Append(MetadataSearchResult(id=program['code'], name='%s | %s' % (program['title'], program['studio']), year=program['year'], score=score, lang=lang))
def func(show_list):
for idx, item in enumerate(show_list):
score = min(daum_max_score, item['score'])
meta = MetadataSearchResult(id=item['code'], name=item['title'], score=score, thumb=item['image_url'], lang=lang)
meta.summary = item['site'] + ' ' + item['studio']
meta.type = "movie"
results.Append(meta)
return score
if 'tving' in search_data:
score = func(search_data['tving'])
max_score = max(max_score, score)
if 'wavve' in search_data:
score = func(search_data['wavve'])
max_score = max(max_score, score)
if 'watcha' in search_data:
score = func(search_data['watcha'])
max_score = max(max_score, score)
if use_json and max_score < 85:
self.remove_info(media)
self.search(results, media, lang, manual)
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def update_info(self, metadata, metadata_season, meta_info):
#metadata.original_title = metadata.title
#metadata.title_sort = unicodedata.normalize('NFKD', metadata.title)
metadata.studio = meta_info['studio']
try: metadata.originally_available_at = Datetime.ParseDate(meta_info['premiered']).date()
except: pass
metadata.content_rating = meta_info['mpaa']
metadata.summary = meta_info['plot']
metadata_season.summary = metadata.summary
metadata.genres.clear()
for tmp in meta_info['genre']:
metadata.genres.add(tmp)
module_prefs = self.get_module_prefs(self.module_name)
# 부가영상
for item in meta_info['extras']:
url = 'sjva://sjva.me/playvideo/%s|%s' % (item['mode'], item['content_url'])
metadata.extras.add(self.extra_map[item['content_type'].lower()](url=url, title=self.change_html(item['title']), originally_available_at=Datetime.ParseDate(item['premiered']).date(), thumb=item['thumb']))
# rating
for item in meta_info['ratings']:
if item['name'] == 'tmdb':
metadata.rating = item['value']
metadata.audience_rating = 0.0
# role
#metadata.roles.clear()
for item in ['actor', 'director', 'credits']:
for item in meta_info[item]:
actor = metadata.roles.new()
actor.role = item['role']
actor.name = item['name']
actor.photo = item['thumb']
Log('%s - %s'% (actor.name, actor.photo))
# poster
ProxyClass = Proxy.Preview
valid_names = []
season_valid_names = []
poster_index = art_index = banner_index = 0
for item in sorted(meta_info['thumb'], key=lambda k: k['score'], reverse=True):
valid_names.append(item['value'])
try:
if item['aspect'] == 'poster':
if item['thumb'] == '':
metadata.posters[item['value']] = ProxyClass(self.safe_image_content(item['value']), sort_order=poster_index+1)
metadata_season.posters[item['value']] = ProxyClass(self.safe_image_content(item['value']), sort_order=poster_index+1)
else:
metadata.posters[item['value']] = ProxyClass(self.safe_image_content(item['thumb']), sort_order=poster_index+1)
metadata_season.posters[item['value']] = ProxyClass(self.safe_image_content(item['thumb']), sort_order=poster_index+1)
season_valid_names.append(item['value'])
poster_index = poster_index + 1
elif item['aspect'] == 'landscape':
if item['thumb'] == '':
metadata.art[item['value']] = ProxyClass(self.safe_image_content(item['value']), sort_order=art_index+1)
metadata_season.art[item['value']] = ProxyClass(self.safe_image_content(item['value']), sort_order=art_index+1)
else:
metadata.art[item['value']] = ProxyClass(self.safe_image_content(item['thumb']), sort_order=art_index+1)
metadata_season.art[item['value']] = ProxyClass(self.safe_image_content(item['thumb']), sort_order=art_index+1)
season_valid_names.append(item['value'])
art_index = art_index + 1
elif item['aspect'] == 'banner':
if item['thumb'] == '':
metadata.banners[item['value']] = ProxyClass(self.safe_image_content(item['value']), sort_order=banner_index+1)
else:
metadata.banners[item['value']] = ProxyClass(self.safe_image_content(item['thumb']), sort_order=banner_index+1)
banner_index = banner_index + 1
except Exception as e:
Log('Exception:%s', e)
#Log(traceback.format_exc())
# 이거 확인필요. 번들제거 영향. 시즌을 주석처리안하면 쇼에 최후것만 입력됨.
#metadata.posters.validate_keys(valid_names)
#metadata.art.validate_keys(valid_names)
#metadata.banners.validate_keys(valid_names)
#metadata_season.posters.validate_keys(season_valid_names)
#metadata_season.art.validate_keys(season_valid_names)
# 테마
valid_names = []
if 'themes' in meta_info['extra_info']:
for tmp in meta_info['extra_info']['themes']:
if tmp not in metadata.themes:
valid_names.append(tmp)
metadata.themes[tmp] = Proxy.Media(self.safe_image_content(tmp))
# 테마2
# Get the TVDB id from the Movie Database Agent
tvdb_id = None
if 'tmdb_id' in meta_info['extra_info']:
tvdb_id = Core.messaging.call_external_function(
'com.plexapp.agents.themoviedb',
'MessageKit:GetTvdbId',
kwargs = dict(
tmdb_id = meta_info['extra_info']['tmdb_id']
)
)
Log('TVDB_ID : %s', tvdb_id)
THEME_URL = 'https://tvthemes.plexapp.com/%s.mp3'
if tvdb_id and THEME_URL % tvdb_id not in metadata.themes:
tmp = THEME_URL % tvdb_id
try:
metadata.themes[tmp] = Proxy.Media(self.safe_image_content(THEME_URL % tvdb_id))
valid_names.append(tmp)
except: pass
metadata.themes.validate_keys(valid_names)
def update_episode(self, show_epi_info, episode, media, info_json, is_write_json, frequency=None):
try:
valid_names = []
if 'daum' in show_epi_info:
#if 'tving_id' in meta_info['extra_info']:
# param += ('|' + 'V' + meta_info['extra_info']['tving_id'])
episode_info = None
if info_json is not None and show_epi_info['daum']['code'] in info_json:
episode_info = info_json[show_epi_info['daum']['code']]
if episode_info is None:
episode_info = self.send_episode_info(self.module_name, show_epi_info['daum']['code'])
if episode_info is not None and is_write_json:
info_json[show_epi_info['daum']['code']] = episode_info
episode.originally_available_at = Datetime.ParseDate(episode_info['premiered']).date()
episode.title = episode_info['title']
episode.summary = episode_info['plot']
# 2024.06.09 ott_match
ott_thumb = False
for site in ['tving', 'wavve']:
if site in show_epi_info:
thumb_index = 20
valid_names.append(show_epi_info[site]['thumb'])
try:
episode.thumbs[show_epi_info[site]['thumb']] = Proxy.Preview(self.safe_image_content(show_epi_info[site]['thumb']), sort_order=thumb_index+1)
ott_thumb = True
except: pass
if ott_thumb:
return
thumb_index = 30
ott_mode = 'only_thumb'
for item in sorted(episode_info['thumb'], key=lambda k: k['score'], reverse=True):
valid_names.append(item['value'])
if item['thumb'] == '':
try: episode.thumbs[item['value']] = Proxy.Preview(self.safe_image_content(item['value']), sort_order=thumb_index+1)
except: pass
else:
try : episode.thumbs[item['value']] = Proxy.Preview(self.safe_image_content(item['thumb']), sort_order=thumb_index+1)
except: pass
thumb_index = thumb_index + 1
ott_mode = 'stop'
# 부가영상
module_prefs = self.get_module_prefs(self.module_name)
for item in episode_info['extras']:
url = 'sjva://sjva.me/playvideo/%s|%s' % (item['mode'], item['content_url'])
episode.extras.add(self.extra_map[item['content_type'].lower()](url=url, title=self.change_html(item['title']), originally_available_at=Datetime.ParseDate(item['premiered']).date(), thumb=item['thumb']))
else:
ott_mode = 'full'
if ott_mode != 'stop':
for site in ['tving', 'wavve']:
if site in show_epi_info:
if ott_mode == 'full':
episode.originally_available_at = Datetime.ParseDate(show_epi_info[site]['premiered']).date()
#episode.title = show_epi_info[site]['premiered']
episode.title = show_epi_info[site]['title'] if show_epi_info[site]['title'] != '' else show_epi_info[site]['premiered']
if frequency is not None:
episode.title = u'%s회 (%s)' % (frequency, episode.title)
episode.summary = show_epi_info[site]['plot']
if ott_mode in ['full', 'only_thumb']:
thumb_index = 20
valid_names.append(show_epi_info[site]['thumb'])
try: episode.thumbs[show_epi_info[site]['thumb']] = Proxy.Preview(self.safe_image_content(show_epi_info[site]['thumb']), sort_order=thumb_index+1)
except: pass
#episode.thumbs.validate_keys(valid_names)
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def update(self, metadata, media, lang):
#self.base_update(metadata, media, lang)
try:
is_write_json = self.is_write_json(media)
module_prefs = self.get_module_prefs(self.module_name)
flag_ending = False
flag_media_season = False
if len(media.seasons) > 1:
for media_season_index in media.seasons:
if int(media_season_index) > 1:# and int(media_season_index) < 1900:
flag_media_season = True
break
search_data = None
search_key = u'search|%s' % media.title
info_json = {}
if self.is_read_json(media):
tmp = self.get_info_json(media)
#Log(tmp)
if tmp is not None and search_key in tmp:
search_data = tmp[search_key]
info_json = tmp
if search_data is None:
search_data = self.send_search(self.module_name, media.title, False)
if search_data is not None and is_write_json:
#self.append_info(media, search_key, search_data)
info_json[search_key] = search_data
index_list = [index for index in media.seasons]
index_list = sorted(index_list)
#for media_season_index in media.seasons:
# 2021-11-05
metadata.roles.clear()
for media_season_index in index_list:
Log('media_season_index is %s', media_season_index)
# 2022-04-05
search_media_season_index = media_season_index
if len(str(media_season_index)) > 2:
search_media_season_index = str(media_season_index)[-2:]
if search_media_season_index in ['0', '00']:
continue
#Log(self.d(search_data['daum']['series']))
search_title = media.title.replace(u'[종영]', '')
search_title = search_title.split('|')[0].strip()
Log('search_title2 : %s', search_title)
#Log('search_code2 : %s', search_code)
# 신과함께3 단일 미디어파일이면 search_media_season_index 1이여서 시즌1이 매칭됨.
# 단일 미디어 파일에서는 사용하지 않도록함.
# 어짜피 여러 시즌버전을 넣는다면 신과함꼐3도 시즌3으로 바꾸어야함.
search_code = metadata.id
only_season_title_show = False
if flag_media_season and 'daum' in search_data and len(search_data['daum']['series']) > 1:
try: #사당보다 먼 의정부보다 가까운 3
Log(len(search_data['daum']['series']))
search_title = search_data['daum']['series'][int(search_media_season_index)-1]['title']
search_code = search_data['daum']['series'][int(search_media_season_index)-1]['code']
except:
only_season_title_show = True
Log('flag_media_season : %s', flag_media_season)
Log('search_title : %s', search_title)
Log('search_code : %s', search_code)
Log('media_season_index : %s', media_season_index)
Log('search_media_season_index: %s', search_media_season_index)
Log('only_season_title_show : %s', only_season_title_show)
#self.get_json_filepath(media)
#self.get_json_filepath(media.seasons[media_season_index])
if only_season_title_show == False:
meta_info = None
if info_json is not None and search_code in info_json:
# 방송중이라면 저장된 정보를 무시해야 새로운 에피를 갱신
if info_json[search_code]['status'] == 2:
meta_info = info_json[search_code]
if meta_info is None:
meta_info = self.send_info(self.module_name, search_code, title=search_title)
if meta_info is not None and is_write_json:
#self.append_info(media, search_code, meta_info)
info_json[search_code] = meta_info
#self.save_info(media, info_json)
Log("SEARCH_CODE: %s", search_code)
Log("TITLE: %s", meta_info['title'])
Log("SUMMARY: %s", meta_info['plot'])
#Log(json.dumps(meta_info, indent=4))
if flag_media_season:
metadata.title = media.title.split('|')[0].strip()
else:
metadata.title = meta_info['title']
metadata.original_title = metadata.title
metadata.title_sort = unicodedata.normalize('NFKD', metadata.title)
if flag_media_season == False and meta_info['status'] == 2 and module_prefs['end_noti_filepath'] != '':
parts = media.seasons[media_season_index].all_parts()
end_noti_filepath = module_prefs['end_noti_filepath'].split('|')
for tmp in end_noti_filepath:
if parts[0].file.find(tmp) != -1:
metadata.title = u'[종영]%s' % metadata.title
break
metadata_season = metadata.seasons[media_season_index]
self.update_info(metadata, metadata_season, meta_info)
# 포스터
# Get episode data.
@parallelize
def UpdateEpisodes():
for media_episode_index in media.seasons[media_season_index].episodes:
episode = metadata.seasons[media_season_index].episodes[media_episode_index]
@task
def UpdateEpisode(episode=episode, media_season_index=media_season_index, media_episode_index=media_episode_index, media=media):
frequency = False
show_epi_info = None
if media_episode_index in meta_info['extra_info']['episodes']:
show_epi_info = meta_info['extra_info']['episodes'][media_episode_index]
self.update_episode(show_epi_info, episode, media, info_json, is_write_json)
else:
#에피정보가 없다면
match = Regex(r'\d{4}-\d{2}-\d{2}').search(media_episode_index)
if match:
for key, value in meta_info['extra_info']['episodes'].items():
if ('daum' in value and value['daum']['premiered'] == media_episode_index) or ('tving' in value and value['tving']['premiered'] == media_episode_index) or ('wavve' in value and value['wavve']['premiered'] == media_episode_index):
show_epi_info = value
self.update_episode(show_epi_info, episode, media, info_json, is_write_json, frequency=key)
break
if show_epi_info is None:
return
episode.directors.clear()
episode.producers.clear()
episode.writers.clear()
for item in meta_info['credits']:
meta = episode.writers.new()
meta.role = item['role']
meta.name = item['name']
meta.photo = item['thumb']
for item in meta_info['director']:
meta = episode.directors.new()
meta.role = item['role']
meta.name = item['name']
meta.photo = item['thumb']
# 시즌 title, summary
if is_write_json and only_season_title_show == False:
self.save_info(media, info_json)
# 2021-09-15 주석처리함. 임의의 시즌으로 분할하는 경우를 고려
#if not flag_media_season:
# return
url = 'http://127.0.0.1:32400/library/metadata/%s' % media.id
data = JSON.ObjectFromURL(url)
section_id = data['MediaContainer']['librarySectionID']
token = Request.Headers['X-Plex-Token']
for media_season_index in media.seasons:
Log('media_season_index is %s', media_season_index)
if media_season_index == '0':
continue
try:
filepath = media.seasons[media_season_index].all_parts()[0].file
tmp = os.path.basename(os.path.dirname(filepath))
season_title = None
if tmp != metadata.title:
Log(tmp)
match = Regex(r'(?P<season_num>\d{1,8})\s*(?P<season_title>.*?)$').search(tmp)
Log('MATCH: %s' % match)
if match and (tmp.startswith(u'시즌 ') or tmp.startswith(u'Season ')):
Log('FORCE season_num : %s', match.group('season_num'))
Log('FORCE season_title : %s', match.group('season_title'))
Log('media_season_index : %s', media_season_index)
if int(match.group('season_num')) == int(media_season_index) and match.group('season_title') is not None:
season_title = match.group('season_title')
try:
Log("VAR season_title : %s" % season_title)
Log("VAR season_title : %s" % metadata_season.summary)
except: pass
metadata_season = metadata.seasons[media_season_index]
if season_title is None:
if metadata_season.summary != None:
url = 'http://127.0.0.1:32400/library/sections/%s/all?type=3&id=%s&summary.value=%s&X-Plex-Token=%s' % (section_id, media.seasons[media_season_index].id, urllib.quote(metadata_season.summary.encode('utf8')), token)
else:
if metadata_season.summary == None:
metadata_season.summary = ''
url = 'http://127.0.0.1:32400/library/sections/%s/all?type=3&id=%s&title.value=%s&summary.value=%s&X-Plex-Token=%s' % (section_id, media.seasons[media_season_index].id, urllib.quote(season_title.encode('utf8')), urllib.quote(metadata_season.summary.encode('utf8')), token)
Log('URL : %s' % url)
request = PutRequest(url)
response = urllib2.urlopen(request)
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
# -*- coding: utf-8 -*-
import json
import os
import re
import time
import traceback
import unicodedata
import urllib
from io import open
from .agent_base import AgentBase
class ModuleYamlBase(AgentBase):
def get_yaml_filepath(self, media, content_type):
try:
metadata_key = media if type(media) == type('') else media.id
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s?includeChildren=1' % metadata_key)
section_id = str(data['MediaContainer']['librarySectionID'])
#Log(self.d(data))
"""
if data['MediaContainer']['Metadata'][0]['type'] == 'album':
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s/children' % media.id)
#Log(self.d(data))
elif data['MediaContainer']['Metadata'][0]['type'] == 'artist':
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s/children' % data['MediaContainer']['Metadata'][0]['Children']['Metadata'][0]['ratingKey'])
"""
if content_type == 'movie':
# 파일명.yaml / xxx-aaa.yaml / movie.yaml
folder_list = []
if 'Media' in data['MediaContainer']['Metadata'][0]:
for media in data['MediaContainer']['Metadata'][0]['Media']:
for part in media['Part']:
filepath = part['file']
folderpath = os.path.dirname(filepath)
filename = os.path.basename(filepath)
tmp = os.path.splitext(filename)
yaml_filepath = os.path.join(folderpath, '%s.yaml' % tmp[0])
if os.path.exists(yaml_filepath):
return yaml_filepath
code = tmp[0].split(' ')[0]
if code[-2] == 'd' and code [-3] == 'c':
code = code[:-3].strip(' .-')
yaml_filepath = os.path.join(folderpath, '%s.yaml' % code)
if os.path.exists(yaml_filepath):
return yaml_filepath
yaml_filepath = os.path.join(folderpath, 'movie.yaml')
if os.path.exists(yaml_filepath):
return yaml_filepath
elif content_type == 'show':
filepath_list = {'show':None, 'seasons':[]}
if 'Location' in data['MediaContainer']['Metadata'][0]:
folderpath = data['MediaContainer']['Metadata'][0]['Location'][0]['path']
yaml_filepath = os.path.join(folderpath, 'show.yaml')
if os.path.exists(yaml_filepath):
filepath_list['show'] = yaml_filepath
filelist = os.listdir(folderpath)
for filename in filelist:
filepath = os.path.join(folderpath, filename)
if os.path.isdir(filepath):
season_yaml_filepath = os.path.join(filepath, 'season.yaml')
if os.path.exists(season_yaml_filepath):
filepath_list['seasons'].append(season_yaml_filepath)
return filepath_list
elif content_type == 'album':
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s/children' % metadata_key)
filename = data['MediaContainer']['Metadata'][0]['Media'][0]['Part'][0]['file']
parent = os.path.split(os.path.dirname(filename))[1]
match = re.match('CD(?P<disc>\d+)', parent, re.IGNORECASE)
if match:
album_root = os.path.dirname(os.path.dirname(filename))
else:
album_root = os.path.dirname(filename)
#yaml_filepath = os.path.join(os.path.dirname(filename), 'album.yaml')
yaml_filepath = os.path.join(album_root, 'album.yaml')
if os.path.exists(yaml_filepath):
return yaml_filepath
elif content_type == 'artist':
data = AgentBase.my_JSON_ObjectFromURL('http://127.0.0.1:32400/library/metadata/%s/children' % data['MediaContainer']['Metadata'][0]['Children']['Metadata'][0]['ratingKey'])
filename = data['MediaContainer']['Metadata'][0]['Media'][0]['Part'][0]['file']
parent = os.path.split(os.path.dirname(filename))[1]
match = re.match('CD(?P<disc>\d+)', parent, re.IGNORECASE)
if match:
album_root = os.path.dirname(os.path.dirname(filename))
else:
album_root = os.path.dirname(filename)
album_basename = os.path.basename(album_root)
if False and album_basename.count(' - ') == 1:
yaml_filepath = os.path.join(album_root, 'artist.yaml')
else:
# 2022-05-02
# V.A 가있다는 것은 카테-앨범 구조라고 픽스
# 없다면 카테 - 아티스트 - 앨범
# OST 컴필 등
artist_root = os.path.dirname(album_root)
cate_root = os.path.dirname(artist_root)
va_flag = None
if os.path.exists(os.path.join(cate_root, 'VA1')):
va_flag = "va_depth1"
elif os.path.exists(os.path.join(cate_root, 'VA2')):
va_flag = "va_depth2_artist_dummy"
elif os.path.exists(os.path.join(artist_root, 'VA2')):
va_flag = "va_depth1"
if va_flag == None or va_flag == 'va_depth2_artist_dummy':
yaml_filepath = os.path.join(artist_root, 'artist.yaml')
elif va_flag == 'va_depth1':
yaml_filepath = os.path.join(album_root, 'artist.yaml')
if os.path.exists(yaml_filepath):
return yaml_filepath
except Exception as e:
Log('Exception:%s', e)
Log(traceback.format_exc())
def get(self, data, field, default):
ret = data.get(field, None)
if ret is None or ret == '':
ret = default
return ret
def get_bool(self, data, field, default):
ret = data.get(field, None)
if ret is None or ret == '':
ret = str(default)
if ret.lower() in ['true']:
return True
elif ret.lower() in ['false']:
return False
return ret
def get_list(self, data, field):
ret = data.get(field, None)
if ret is None:
ret = []
else:
if type(ret) != type([]):
ret = [x.strip() for x in ret.split(',')]
return ret
def get_person_list(self, data, field):
ret = data.get(field, None)
if ret is None:
ret = []
else:
if type(ret) != type([]):
tmp = []
for value in ret.split(','):
tmp.append({'name':value.strip()})
ret = tmp
return ret
def get_media_list(self, data, field):
ret = data.get(field, None)
if ret is None:
ret = []
else:
if type(ret) != type([]):
tmp = []
insert_index = -1
for idx, value in enumerate(ret.split(',')):
if value.startswith('http'):
tmp.append({'url':value.strip()})
insert_index = idx
else:
if insert_index > -1:
tmp[insert_index]['url'] = '%s,%s' % (tmp[insert_index]['url'], value)
ret = tmp
return ret
# 포인터가 아니다. 변수의 값이 넘어와버린다
# setattr로 클래스 변수 값을 셋한다.
# 그런데 기본형(string, int)이 아닌 것들은 포인터처럼 처리..
# set_data만 setattr로.. 나머지는 getattr로 변수주소를 받아 처리
def set_data(self, meta, data, field, is_primary):
try:
#Log('set_data : %s', field)
value = self.get(data, field, None)
if value is not None:
if field == 'title_sort':
value = unicodedata.normalize('NFKD', value)
elif field in ['originally_available_at', 'available_at']:
value = Datetime.ParseDate(value).date()
elif field in ['rating', 'audience_rating']:
value = float(value)
elif field == 'year':
value = int(value)
setattr(meta, field, value)
elif is_primary:
setattr(meta, field, None)
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def set_data_list(self, meta, data, field, is_primary):
try:
meta = getattr(meta, field)
value = self.get_list(data, field)
if len(value) > 0:
meta.clear()
for tmp in value:
meta.add(tmp)
elif is_primary:
meta.clear()
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def set_data_person(self, meta, data, field, is_primary):
try:
meta = getattr(meta, field)
value = self.get_person_list(data, field)
if len(value) > 0:
meta.clear()
for person in value:
meta_person = meta.new()
meta_person.name = self.get(person, 'name', None)
meta_person.role = self.get(person, 'role', None)
try:
tmp = self.get(person, 'photo', None)
if tmp != None and tmp.startswith('http') == False:
ddns = Prefs['server'].rstrip('/')
tmp = tmp.lstrip('/')
tmp = ddns + '/' + tmp
meta_person.photo = tmp
except Exception as e:
meta_person.photo = self.get(person, 'photo', None)
elif is_primary:
meta.clear()
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def set_data_media(self, meta, data, field, is_primary):
try:
meta = getattr(meta, field)
value = self.get_media_list(data, field)
if len(value) > 0:
valid_names = []
for idx, media in enumerate(value):
if 'thumb' in media:
tmp = media['thumb']
valid_names.append(media['thumb'])
else:
tmp = media['url']
valid_names.append(media['url'])
try:
if tmp != None and tmp.startswith('http') == False:
ddns = Prefs['server'].rstrip('/')
tmp = tmp.lstrip('/')
tmp = ddns + '/' + tmp
elif tmp.startswith('https://cdn.discordapp.com/attachments'):
ddns = Prefs['server'].rstrip('/')
tmp = tmp.replace('https://cdn.discordapp.com', ddns)
meta[tmp] = Proxy.Preview(self.safe_image_content(tmp), sort_order=idx+1)
except Exception as e:
Log('Exception:%s', e)
meta.validate_keys(valid_names)
elif is_primary:
meta.validate_keys([])
#Log(meta)
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def set_data_reviews(self, meta, data, field, is_primary):
try:
meta = getattr(meta, field)
value = self.get(data, field, [])
if len(value) > 0:
meta.clear()
for review in value:
r = meta.new()
r.author = self.get(review, 'author', None)
r.source = self.get(review, 'source', None)
r.image = self.get(review, 'image', None)
r.link = self.get(review, 'link', None)
r.text = self.get(review, 'text', None)
elif is_primary:
meta.clear()
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
def set_data_extras(self, meta, data, field, is_primary):
try:
meta = getattr(meta, field)
value = self.get(data, field, [])
if len(value) > 0:
for extra in value:
mode = self.get(extra, 'mode', None)
extra_type = self.get(extra, 'type', 'trailer')
extra_class = self.extra_map[extra_type.lower()]
extra_url = extra.get('param')
url = 'sjva://sjva.me/playvideo/%s|%s' % (mode, extra_url)
meta.add(
extra_class(
url=url,
title=self.change_html(extra.get('title', '')),
originally_available_at = Datetime.ParseDate(self.get(extra, 'originally_available_at', '1900-12-31')).date(),
thumb=self.get(extra, 'thumb', '')
)
)
elif is_primary:
#Log(meta)
#meta.clear()
pass
except Exception as exception:
Log('Exception:%s', exception)
Log(traceback.format_exc())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment