Skip to content

Instantly share code, notes, and snippets.

@d03n3rfr1tz3
Last active May 5, 2024 00:38
Show Gist options
  • Save d03n3rfr1tz3/292f30db4ba23c73c14cb8e50025241c to your computer and use it in GitHub Desktop.
Save d03n3rfr1tz3/292f30db4ba23c73c14cb8e50025241c to your computer and use it in GitHub Desktop.
IRC Relay for Games with RCon. Currently only tested on the following Games, but might also work on others (with tweaking the rconMagic variable): Quake3
#!/usr/bin/env python3
import asyncio, io, re, select, socket, time
# Debug Configuration
debugMode = 1
# IRC Configuration
ircHost = ""
ircPort = 6667
ircChannel = ""
ircNick = ""
ircPass = None
ircAuthType = None
ircAuthPass = None
ircIgnore = []
# Log Configuration
qlogFile = "/root/.q3a/baseq3/qconsole.log"
# RCon Configuration
rconHost = "127.0.0.1"
rconPort = 27960
rconPass = ""
rconMagic = b"\xFF\xFF\xFF\xFF"
# Main Function
async def main():
await asyncio.gather(irc_task(), qlog_task(), rcon_task())
# Task for handling IRC
async def irc_task(*args):
global ircHost, ircPort, ircChannel, ircNick, ircPass, ircAuthType, ircAuthPass, ircIgnore, ircPing, ircRetry, ircSocket
if ircHost == None: return
if ircChannel == None: return
if ircNick == None: return
debug_print("Initialising IRC", 2)
await asyncio.sleep(1)
while running:
try:
debug_print("Starting IRC", 2)
ircSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ircSocket.connect((ircHost, ircPort))
ircSocket.setblocking(False)
await asyncio.sleep(0.1)
# receive something before sending
await irc_recv("")
# send pass
if ircPass != None:
await irc_send("PASS " + ircPass)
# send nick
await irc_send("NICK " + ircNick)
# send user
await irc_send("USER ircrelay 0 * :Quake3 ircRelay");
# receive until PING
await irc_recv("PING");
# give the server an additional second
await asyncio.sleep(1)
# send auth
if ircAuthType == "AuthServ":
await irc_send("PRIVMSG AuthServ :AUTH " + ircNick + " " + ircAuthPass)
await irc_recv("")
if ircAuthType == "NickServ":
await irc_send("PRIVMSG NickServ :IDENTIFY " + ircAuthPass)
await irc_recv("")
if ircAuthType == "Q":
await irc_send("PRIVMSG [email protected] :AUTH " + ircNick + " " + ircAuthPass)
await irc_recv("")
# send join
await irc_send("JOIN #" + ircChannel);
while running:
await irc_recv(None)
except Exception as ex:
debug_print(ex, 1)
finally:
debug_print("Stopping IRC", 2)
ircSocket.close()
waitSeconds = 5
if ircPing > 5:
ircPing = 0
ircRetry = 0
for x in range(++ircRetry):
waitSeconds = waitSeconds * 2
await asyncio.sleep(waitSeconds)
# IRC event handler
async def irc_event(type, *args):
global ircSocket
if type == "connected":
player = await irc_colors(args[0])
await irc_send("PRIVMSG #" + ircChannel + " :" + player + "\x0f " + type)
if type == "disconnected":
player = await irc_colors(args[0])
await irc_send("PRIVMSG #" + ircChannel + " :" + player + "\x0f " + type)
if type == "was kicked":
player = await irc_colors(args[0])
await irc_send("PRIVMSG #" + ircChannel + " :" + player + "\x0f " + type)
if type == "say":
player = await irc_colors(args[0])
text = await irc_colors(args[1])
await irc_send("PRIVMSG #" + ircChannel + " :" + player + "\x0f: " + text)
# Receiving IRC data
async def irc_recv(until):
global ircSocket
found = False
while found == False:
ready = select.select([ircSocket], [], [], 0.2)
if ready[0]:
content = ircSocket.recv(1024).decode("utf-8", "ignore")
if content != None:
lines = content.split('\n')
for line in lines:
line = line.rstrip()
if line:
debug_print("<- " + line, 3)
# received chat message
if line.find(" PRIVMSG ") > 0:
startpos = line.find(":")
endpos = line.find("!")
username = line[startpos + 1:endpos]
startpos = line.find(":", endpos)
message = line[startpos + 1:]
if username not in ircIgnore:
await rcon_send("say ^7" + username + ": ^2" + message)
else:
debug_print("Message from " + username + " got ignored.", 3)
# received ping
if line.startswith("PING "):
await irc_send(line.replace("PING", "PONG"))
if until == "" or (until != None and line.startswith(until)):
found = True
else:
await asyncio.sleep(0.1)
if until == None: found = True
else:
await asyncio.sleep(0.1)
# Sending IRC data
async def irc_send(cmd):
global ircSocket
if ircSocket != None:
for x in range(5):
ready = select.select([], [ircSocket], [], 0.2)
if ready[1]:
ircSocket.send((cmd + "\n").encode("utf-8"))
debug_print("-> " + cmd, 3)
break
else:
await asyncio.sleep(0.1)
# IRC color translator
async def irc_colors(value):
global colorTranslation
result = value
for key, val in colorTranslation.items():
result = result.replace(key, val)
return result
# Task for handling Log
async def qlog_task(*args):
global qlogFile, qlogSocket
debug_print("Initialising QLOG", 2)
await asyncio.sleep(1)
while running:
try:
debug_print("Starting QLOG", 2)
qlogSocket = open(qlogFile, "rt", buffering=1024, encoding="utf-8")
qlogSocket.seek(0, io.SEEK_END)
while running:
line = qlogSocket.readline().rstrip()
if line:
await qlog_parse(line)
else:
await asyncio.sleep(0.1)
except Exception as ex:
debug_print(ex, 1)
finally:
debug_print("Stopping QLOG", 2)
if qlogSocket != None:
qlogSocket.close()
await asyncio.sleep(5)
# Parser for a Log line
async def qlog_parse(line):
matchSay = regexLogSay.match(line)
matchJoinLeave = regexLogJoinLeave.match(line)
if matchSay:
await irc_event("say", matchSay.group(1), matchSay.group(2))
if matchJoinLeave:
await irc_event(matchJoinLeave.group(2), matchJoinLeave.group(1))
# Task for handling RCon
async def rcon_task(*args):
global rconSocket
debug_print("Initialising RCON", 2)
await asyncio.sleep(1)
while running:
try:
debug_print("Starting RCON", 2)
# create and open socket connection
rconSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rconSocket.connect((rconHost, rconPort))
rconSocket.setblocking(False)
# send "serverinfo" command
await rcon_send("serverinfo")
await asyncio.sleep(0.1)
while running:
# wait for socket readiness
ready = select.select([rconSocket], [], [], 0.2)
if ready[0]:
line = rconSocket.recv(1024).decode("utf-8", "ignore").rstrip()
if line:
debug_print(line, 3)
else:
await asyncio.sleep(0.1)
else:
await asyncio.sleep(0.1)
except Exception as ex:
debug_print(ex, 1)
finally:
debug_print("Stopping RCON", 2)
rconSocket.close()
await asyncio.sleep(5)
# Sending RCon data
async def rcon_send(cmd):
global rconSocket
if rconSocket != None:
for x in range(5):
ready = select.select([], [rconSocket], [], 0.2)
if ready[1]:
rconSocket.send(rconMagic + ("rcon " + rconPass + " " + cmd).encode("utf-8"))
break
else:
await asyncio.sleep(0.1)
def debug_print(text, mode):
global debugMode
if mode > debugMode: return
print(text, flush=True)
running = True
ircPing = 0
ircRetry = 0
ircSocket = None
qlogSocket = None
rconSocket = None
regexLogSay = re.compile('say: ([^:]+): (.*)', re.IGNORECASE)
regexLogJoinLeave = re.compile('broadcast: print "([^ ]+) (.*)\\\\n"', re.IGNORECASE)
colorTranslation = {
"^0": "\x03" + "14",
"^1": "\x03" + "04",
"^2": "\x03" + "03",
"^3": "\x03" + "08",
"^4": "\x03" + "02",
"^5": "\x03" + "11",
"^6": "\x03" + "13",
"^7": "\x03" + "15"
}
try:
time.sleep(5)
asyncio.run(main())
except KeyboardInterrupt:
print("Aborting...")
running = False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment