Skip to content

Instantly share code, notes, and snippets.

@eode
Created November 2, 2018 04:54

Revisions

  1. eode created this gist Nov 2, 2018.
    99 changes: 99 additions & 0 deletions execnet_shared.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,99 @@
    from collections import MutableMapping, namedtuple


    Message = namedtuple("Message", 'word, obj')


    class ExecNetSharedStorageClient(MutableMapping):
    def __init__(self, channel):
    super().__init__()
    self.ch = channel

    def _call(self, word, *args, **kwargs):
    self.ch.send((word, (args, kwargs)))
    response = Message(*self.ch.receive())
    if response.word == word:
    return response.obj
    elif response.word == 'KeyError':
    raise KeyError(*response.obj)
    elif response.word == 'ValueError':
    raise ValueError(*response.obj)
    elif response.word == 'Exception':
    raise Exception(*response.obj)
    raise RuntimeError("Unexpected response: {!r}".format(response))

    def __delitem__(self, key):
    return self._call('__delitem__', key)

    def __getitem__(self, key):
    return self._call('__getitem__', key)

    def __iter__(self):
    return iter(self._call('__iter__'))

    def __len__(self):
    return self._call('__len__')

    def __setitem__(self, key, value):
    return self._call('__setitem__', key, value)


    class ExecNetSharedStorageServer(dict):
    permitted_methods = {'__delitem__', '__getitem__', '__iter__', '__len__', '__setitem__'}
    def __init__(self, channel, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.ch = channel

    def listen(self):
    self.ch.setcallback(self._callback)

    def check(self, timeout=0):
    message = self.ch.receive(timeout)
    return self._callback(message)

    def _callback(self, message):
    word, obj = message
    if word in self.permitted_methods:
    func = getattr(self, word)
    args, kwargs = obj
    try:
    value = func(*args, **kwargs)
    if word == '__iter__':
    value = tuple(value)
    self.ch.send((word, value))
    except (KeyError, ValueError) as err:
    self.ch.send((type(err).__name__, err.args))
    else:
    ch.send(('Exception', ('not permitted', (word, obj))))

    def cycle(self, forever=False):
    # continue indefinitely
    if forever:
    while forever:
    self.check(timeout=None)
    # if not forever, only finish pending items
    while True:
    try:
    self.check()
    except self.ch.TimeoutError:
    break


    if __name__ == "__channelexec__":
    # this module was imported and then run via gw.remote_exec(module)
    # this is only done for testing. We'll act as the Mapping server.
    while True:
    servers = []
    message = Message(*channel.receive())
    if message.word == 'exit':
    exit()
    if message.word == 'new':
    server = ExecNetSharedStorageServer(message.obj)
    server.listen()
    servers.append(server)


    def test_client_server(gw, channel):
    client = ExecNetSharedStorageClient(gw.newchannel())
    channel.send(('new', client.ch))
    return client