Last active
January 31, 2021 13:49
-
-
Save sonthonaxrk/ecf2222a3e8f0035928467c36c443428 to your computer and use it in GitHub Desktop.
This IPython kernel can take GUI messages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
import sys | |
from typing import Any, Tuple | |
from tornado import gen | |
from ipykernel.ipkernel import IPythonKernel | |
from devtools import pformat | |
from ipykernel.kernelbase import ( | |
CONTROL_PRIORITY, SHELL_PRIORITY, ABORT_PRIORITY | |
) | |
class AsyncGUIKernel(IPythonKernel): | |
implementation = 'Async GUI' | |
banner = ( | |
'Async GUI - Allow Comm messages to be passed ' | |
'when other cells are running.' | |
) | |
# Since this is not explicitly defined in the parent class | |
comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ] | |
def _parse_message(self, msg) -> Tuple[Any, dict]: | |
"""dispatch control requests""" | |
idents, msg = self.session.feed_identities(msg, copy=False) | |
try: | |
msg = self.session.deserialize(msg, content=True, copy=False) | |
return (idents, msg) | |
except: | |
self.log.error("Invalid Message", exc_info=True) | |
return | |
@gen.coroutine | |
def dispatch_shell(self, stream, msg: dict, idents): | |
"""dispatch shell requests""" | |
msg_type = msg['header']['msg_type'] | |
self._publish_status('busy') | |
if self._aborting: | |
self._send_abort_reply(stream, msg, idents) | |
self._publish_status('idle') | |
# flush to ensure reply is sent before | |
# handling the next request | |
stream.flush(zmq.POLLOUT) | |
return | |
# Print some info about this message and leave a '--->' marker, so it's | |
# easier to trace visually the message chain when debugging. Each | |
# handler prints its message at the end. | |
self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) | |
self.log.debug(' Content: %s\n --->\n ', msg['content']) | |
if not self.should_handle(stream, msg, idents): | |
return | |
handler = self.shell_handlers.get(msg_type, None) | |
if handler is None: | |
self.log.warning("Unknown message type: %r", msg_type) | |
else: | |
self.log.debug("%s: %s", msg_type, msg) | |
try: | |
self.pre_handler_hook() | |
except Exception: | |
self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) | |
try: | |
yield gen.maybe_future(handler(stream, idents, msg)) | |
except Exception: | |
self.log.error("Exception in message handler:", exc_info=True) | |
finally: | |
try: | |
self.post_handler_hook() | |
except Exception: | |
self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) | |
sys.stdout.flush() | |
sys.stderr.flush() | |
self._publish_status('idle') | |
# flush to ensure reply is sent before | |
# handling the next request | |
stream.flush(zmq.POLLOUT) | |
@gen.coroutine | |
def dispatch_control(self, msg: dict, idents): | |
self.log.debug("Control received: %s", msg) | |
self._publish_status('busy') | |
if self._aborting: | |
self._send_abort_reply(self.control_stream, msg, idents) | |
self._publish_status('idle') | |
return | |
header = msg['header'] | |
msg_type = header['msg_type'] | |
handler = self.control_handlers.get(msg_type, None) | |
if handler is None: | |
self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) | |
else: | |
try: | |
yield gen.maybe_future(handler(self.control_stream, idents, msg)) | |
except Exception: | |
self.log.error("Exception in control handler:", exc_info=True) | |
sys.stdout.flush() | |
sys.stderr.flush() | |
self._publish_status('idle') | |
# flush to ensure reply is sent | |
self.control_stream.flush(zmq.POLLOUT) | |
def schedule_dispatch(self, priority, dispatch, *args): | |
""" | |
Changes the schedule_dispatch dispatch method to | |
always dispatch comm events. | |
""" | |
# quick and dirty checks for message type | |
if priority == SHELL_PRIORITY: | |
stream, unparsed_msg = args | |
indent, msg = self._parse_message(unparsed_msg) | |
new_args = (stream, msg, indent) | |
if new_args and msg['header']['msg_type'] in self.comm_msg_types: | |
# Dispatch now | |
return dispatch(*new_args) | |
elif priority == CONTROL_PRIORITY: # CONTROL_PRIORITY | |
# One arg | |
(unparsed_msg,) = args | |
indent, msg = self._parse_message(unparsed_msg) | |
new_args = (msg, indent) | |
elif priority == ABORT_PRIORITY: # ABORT_PRIORITY | |
new_args = args | |
# Anything else, keep things the same. | |
idx = next(self._message_counter) | |
self.msg_queue.put_nowait( | |
( | |
priority, | |
idx, | |
dispatch, | |
new_args, | |
) | |
) | |
# ensure the eventloop wakes up | |
self.io_loop.add_callback(lambda: None) | |
def set_parent(self, ident, parent): | |
# The last message sent will set what cell output | |
# to use. We want to the awaiting future to print | |
# it's own output, not the cell which the comm is | |
# associated with. | |
# Don't change the output if the message is from a comm | |
if msg['header']['msg_type'] not in self.comm_msg_types: | |
super().set_parent(indent, parent) | |
if __name__ == '__main__': | |
from ipykernel.kernelapp import IPKernelApp | |
IPKernelApp.launch_instance(kernel_class=AsyncGUIKernel) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment