Last active
January 11, 2022 16:32
-
-
Save PhotonQuantum/76183008ba0ae13e473f0c668b275b0b to your computer and use it in GitHub Desktop.
A simple feeluown tui. [WIP]
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
from blessed import Terminal | |
from concurrent.futures import ThreadPoolExecutor | |
from queue import SimpleQueue | |
from threading import Lock | |
import re | |
import socket | |
import subprocess | |
import time | |
term = Terminal() | |
fuo_lock = Lock() | |
def fuo_ask(fuo, cmd): | |
with fuo_lock: | |
fuo.sendall("".join((cmd, "\n")).encode('utf-8')) | |
b_msg = b'' | |
while True: | |
b_msg += fuo.recv(1024) | |
if b_msg[:3].decode("utf-8") == "ACK": | |
if b_msg[4:6].decode("utf-8").lower() == "ok": | |
pack_len = b_msg[7:16].decode("utf-8") | |
pack_len = int(pack_len[:pack_len.find('\n')]) | |
break | |
while True: | |
if len(b_msg) == pack_len + 11 + len(str(pack_len)): | |
break | |
b_msg += fuo.recv(1024) | |
return b_msg.decode('utf-8') | |
def nem_login(fuo): | |
login_script = ( | |
"from fuo_netease.login_controller import LoginController\n" | |
"user = LoginController.load()\n" | |
"if not user:\n" | |
" raise RuntimeError('No user info')\n" | |
"app.library.get('netease').auth(user)\n") | |
return fuo_ask(fuo, "exec <<EOF\n" + login_script + "\nEOF") | |
def re_get_params(src, param): | |
return re.findall("(?<=" + param + ").*", src) | |
def fetch_now_playing(sock): | |
str_status = fuo_ask(sock, "status") | |
if str_status.find("#") == -1: | |
song_name = "N/A" | |
song_pos = 0 | |
song_dur = 10 | |
else: | |
song_name = re_get_params(str_status, "#")[0].strip() | |
song_pos = re_get_params(str_status, "position:")[0].strip() | |
song_pos = 0 if song_pos == "None" else float(song_pos) | |
song_dur = re_get_params(str_status, "duration:")[0].strip() | |
song_dur = 1 if song_dur == "None" else float(song_dur) | |
song_state = re_get_params(str_status, "state:")[0].strip() | |
song_prog = float(song_pos) / song_dur | |
output_msg = "" | |
# with term.location(0, 4): | |
if song_state == "playing": | |
state = term.on_bright_green("PLAY") | |
elif song_state == "paused": | |
state = term.on_bright_red("PAUSE") | |
elif song_state == "stopped": | |
state = term.on_bright_red("STOP") | |
output_msg += term.center(term.bold((state + " " + song_name))) + "\n\n" | |
output_msg += term.center("[" + "=" * int(song_prog * 60) + | |
" " * int((1 - song_prog) * 60) + "]") | |
return output_msg | |
def get_song_list(sock): | |
str_list = fuo_ask(sock, "list") | |
uri_list = re.findall("fuo.\S*", str_list) | |
name_list = re.findall("(?<=# ).*", str_list) | |
return name_list, uri_list | |
def print_song_list(name_list, selitem, page_count): | |
output_msg = "" | |
name_list = ["N/A"] if not name_list else name_list | |
if selitem > len(name_list) - 1: | |
selitem = len(name_list) - 1 | |
elif selitem < 0: | |
selitem = 0 | |
page = selitem // page_count | |
for i in range(page * page_count, (page + 1) * page_count): | |
if i < len(name_list): | |
if i == selitem: | |
output_msg += term.center((term.underline_bright_magenta( | |
str(i + 1) + ". " + name_list[i].strip()))) | |
else: | |
output_msg += term.center( | |
str(i + 1) + ". " + name_list[i].strip()) | |
with term.location(0, 10): | |
print(output_msg + term.clear_eos()) | |
return selitem | |
def sync_current_song(reverse, uri_list, fuo_uri): | |
if reverse: | |
return len(uri_list) - uri_list[::-1].index(fuo_uri) - 1 | |
else: | |
return uri_list.index(fuo_uri) | |
def fetch_thread(sock, msg_queue): | |
global now_play | |
global name_list | |
global uri_list | |
global redraw | |
while True: | |
cmd = msg_queue.get() | |
if cmd == -1: | |
break | |
elif cmd == 0: # playlist update | |
name_list, uri_list = get_song_list(sock) | |
redraw = 2 | |
elif cmd == 1: # play status | |
now_play = fetch_now_playing(sock) | |
if redraw < 1: redraw = 1 | |
def timer_thread(msg_queue): | |
count = 0 | |
while not self_kill: | |
if not timer_suspend: | |
msg_queue.put(1) # update play status | |
if count == 10: | |
msg_queue.put(0) # update playlist | |
count = -1 | |
count += 1 | |
time.sleep(.5) | |
def lyric_thread(sock): | |
global lyrics | |
while True: | |
lyrics = sock.recv(4096).decode().replace('\n', '') | |
now_play = "" | |
name_list = "" | |
uri_list = "" | |
lyrics = "" | |
self_kill = False | |
timer_suspend = False | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
lyric_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
fuo_handler = None | |
try: | |
sock.connect(("127.0.0.1", 23333)) | |
except ConnectionRefusedError: | |
fuo_handler = subprocess.Popen(["fuo", "-nw"]) | |
if fuo_handler: | |
while sock.connect_ex(("127.0.0.1", 23333)) != 0: | |
time.sleep(0.5) | |
lyric_sock.connect(("127.0.0.1", 23334)) | |
sock.recv(4096) | |
lyric_sock.send("sub topic.live_lyric".encode()) | |
time.sleep(.5) | |
lyric_sock.recv(4096) | |
nem_login(sock) | |
print( | |
fuo_ask( | |
sock, | |
"exec <<EOF\nprint(app.library.get('netease')._user.identifier)\nEOF")) | |
print(term.clear()) | |
selitem = 0 | |
add_list = False | |
msg_queue = SimpleQueue() | |
redraw = 0 # 0: lyric 1: status 2: playlist | |
with ThreadPoolExecutor() as executor: | |
executor.submit(fetch_thread, sock, msg_queue) | |
msg_queue.put(1) | |
msg_queue.put(0) | |
executor.submit(timer_thread, msg_queue) | |
executor.submit(lyric_thread, lyric_sock) | |
with term.hidden_cursor(), term.fullscreen(): | |
while True: | |
if redraw > 0: | |
with term.location(0, 4): | |
print(now_play) | |
with term.location(0, 5): | |
print(term.center(term.cyan(lyrics))) | |
if redraw > 1: | |
selitem = print_song_list(name_list, selitem, 10) | |
redraw = 0 | |
with term.cbreak(): | |
val = term.inkey(timeout=.1) | |
if str(val) != "": | |
if str(val) == "q": | |
break | |
elif str(val) == " ": | |
fuo_ask(sock, "toggle") | |
msg_queue.put(1) | |
elif str(val) == "j": | |
selitem += 1 | |
redraw = 2 | |
elif str(val) == "k": | |
selitem -= 1 | |
redraw = 2 | |
elif str(val) == "d": | |
selitem += 10 | |
redraw = 2 | |
elif str(val) == "u": | |
selitem -= 10 | |
redraw = 2 | |
elif str(val) == "h": | |
fuo_ask(sock, "previous") | |
msg_queue.put(1) | |
elif str(val) == "l": | |
fuo_ask(sock, "next") | |
msg_queue.put(1) | |
elif str(val) == "c": | |
msg_queue.put(0) | |
selitem = sync_current_song( | |
False, uri_list, | |
re.findall("fuo\S*", fuo_ask(sock, "status"))[0]) | |
elif str(val) == "a": | |
add_list = True | |
elif val.name == "KEY_ENTER": | |
fuo_ask(sock, "play " + uri_list[selitem]) | |
msg_queue.put(1) | |
print(term.move(0, 0) + term.clear_eol()) | |
if add_list: | |
timer_suspend = True | |
with term.location(5, 20): | |
furi = input( | |
term.clear_eol + | |
"Adding new song [fuo://netease/playlists/71776406] > " | |
) | |
furi = "fuo://netease/playlists/71776406" if not furi else furi | |
print(term.clear_bol) | |
print(fuo_ask(sock, "add " + furi)) | |
msg_queue.put(0) | |
add_list = False | |
timer_suspend = False | |
self_kill = True | |
lyric_sock.shutdown(socket.SHUT_RDWR) | |
lyric_sock.close() | |
msg_queue.put(-1) | |
if fuo_handler: | |
fuo_handler.terminate() |
2020-02-13
feat:
- login netease before play for better song quality
- using thread to avoid unnecessary requests
- fetch lyrics from pubsub to get realtime lyrics
- partial redraw saves cpu usage
2020-02-16
fix:
- typo
The L22 should change from
if b_msg[4:6].decode("utf-8") == "ok":
to
if b_msg[4:6].decode("utf-8").lower() == "ok":
FeelUOwn did a unexpected break change in https://github.com/feeluown/FeelUOwn/commits/ed4931b00c9b1ac738c531b6f9f823833ca77ae6
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
2019-05-01
feat:
fix: