Created
April 18, 2009 02:27
-
-
Save topia/97396 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# $URL$ | |
# $Id$ | |
"""Tiny Python Console for TwitterIrcGateway. | |
This script require IronPython 2.0 bundled or CPython 2.5 (not 2.6) bundled | |
pure python part of python standard library: | |
codeop, traceback, __future__, os, os.path (and ntpath), linecache, | |
stat, errno, copy_reg, types, UserDict | |
""" | |
__docformat__ = 'reStructuredText' | |
__author__ = 'Topia <[email protected]>' | |
__version__ = '$Revision$' | |
__date__ = '$Date$' | |
__credits__ = """copyright (C) 2009 Topia <[email protected]>. all rights reserved.""" | |
import sys | |
import clr | |
from Misuzilla.Net.Irc import * | |
sys.path.append(r"Libraries\IronPython") | |
from codeop import CommandCompiler | |
import traceback | |
## TODO: | |
## * better multiline handling | |
class IRCWriter(object): | |
def __init__(self, session, prefix, target, notice=True): | |
self.session = session | |
self.prefix = prefix | |
if notice: | |
self.msgclass = NoticeMessage | |
else: | |
self.msgclass = PrivMsgMessage | |
self.target = target | |
self.buffer = '' | |
def write(self, text): | |
buf = self.buffer + text | |
try: | |
while True: | |
if "\n" not in buf: | |
break | |
line, buf = buf.split("\n", 1) | |
ircmsg = self.msgclass(self.target, line.rstrip()) | |
ircmsg.Sender = self.prefix | |
self.session.Send(ircmsg) | |
finally: | |
self.buffer = buf | |
def flush(self): | |
if self.buffer: | |
self.write("\n") | |
class ConsoleHandler(object): | |
channel_name = '#pyconsole' | |
def __init__(self, locals=None): | |
self.compile = CommandCompiler() | |
self.resetbuffer() | |
if locals is None: | |
locals = {"__name__": "__console__", "__doc__": None, | |
"console": self} | |
self.locals = locals | |
def attach(self, session=Session, server=Server): | |
self.server = server | |
self.session = session | |
self.session.PreMessageReceived += self.onPreMessageReceived | |
self.session.PostMessageReceived += self.onPostMessageReceived | |
chname = self.channel_name | |
self.userhost = 'twitter@' + server.ServerName | |
self.stdout = IRCWriter(session, 'stdout!' + self.userhost, chname) | |
self.stderr = IRCWriter(session, 'stderr!' + self.userhost, chname) | |
def detach(self): | |
self.session.PreMessageReceived -= self.onPreMessageReceived | |
self.session.PostMessageReceived -= self.onPostMessageReceived | |
self.session = self.server = None | |
self.stdout = self.stderr = None | |
def send_notify(self, nick, msg): | |
ircmsg = NoticeMessage(self.channel_name, msg) | |
ircmsg.SenderHost = self.userhost | |
ircmsg.SenderNick = nick | |
self.session.Send(ircmsg) | |
def onPreMessageReceived(self, sender, eventargs): | |
msg = eventargs.Message | |
if (not isinstance(msg, (PrivMsgMessage, NoticeMessage)) or | |
msg.Receiver != self.channel_name): | |
return | |
eventargs.Cancel = True; | |
self.process_line(msg.Content) | |
def onPostMessageReceived(self, sender, eventargs): | |
msg = eventargs.Message | |
if (not isinstance(msg, JoinMessage) or | |
msg.Channel != self.channel_name): | |
return | |
Session.Groups[self.channel_name].IsSpecial = True; | |
def resetbuffer(self): | |
self.buffer = [] | |
self.inmultiline = False | |
def process_line(self, content): | |
if not content.strip(): | |
content = '' | |
more = self.push(content) | |
if more and not self.inmultiline: | |
self.send_notify('...', 'when insert blank (normally last) line, enter space only line') | |
self.inmultiline = True | |
def push(self, content): | |
self.buffer.append(content) | |
source = "\n".join(self.buffer) | |
more = self.runsource(source) | |
if not more: | |
self.resetbuffer() | |
return more | |
def runsource(self, source): | |
if source[:1] in [' ', '\t']: | |
source = 'if 1:\n%s' % source | |
try: | |
code = self.compile(source) | |
except (OverflowError, SyntaxError, ValueError, TypeError): | |
self.send_notify('failure-source', repr(source)) | |
for line in traceback.format_exc().splitlines(): | |
self.send_notify('failure', line) | |
return None | |
if code is None: | |
return True | |
self.runcode(code) | |
return False | |
def runcode(self, code): | |
__builtins__['_'] = None | |
iobackup = sys.stdin, sys.stdout, sys.stderr | |
try: | |
sys.stdin = None | |
sys.stdout = self.stdout | |
sys.stderr = self.stderr | |
try: | |
try: | |
exec code in self.locals | |
finally: | |
if sys.stdout: | |
sys.stdout.flush() | |
if sys.stderr: | |
sys.stderr.flush() | |
except Exception, e: | |
for line in traceback.format_exc().splitlines(): | |
self.send_notify('failure', line) | |
else: | |
if _ is None and self.inmultiline: | |
self.send_notify('success', '(accepted)') | |
finally: | |
sys.stdin, sys.stdout, sys.stderr = iobackup | |
console_locals = {"__name__": "__console__", "__doc__": None} | |
console = ConsoleHandler(console_locals) | |
console_locals['console'] = console | |
console_locals['Session'] = Session | |
console_locals['Server'] = Server | |
console_locals.update(sys.modules) | |
console.attach(Session, Server) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment