Created
November 2, 2018 04:54
Revisions
-
eode created this gist
Nov 2, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,99 @@ from collections import MutableMapping, namedtuple Message = namedtuple("Message", 'word, obj') class ExecNetSharedStorageClient(MutableMapping): def __init__(self, channel): super().__init__() self.ch = channel def _call(self, word, *args, **kwargs): self.ch.send((word, (args, kwargs))) response = Message(*self.ch.receive()) if response.word == word: return response.obj elif response.word == 'KeyError': raise KeyError(*response.obj) elif response.word == 'ValueError': raise ValueError(*response.obj) elif response.word == 'Exception': raise Exception(*response.obj) raise RuntimeError("Unexpected response: {!r}".format(response)) def __delitem__(self, key): return self._call('__delitem__', key) def __getitem__(self, key): return self._call('__getitem__', key) def __iter__(self): return iter(self._call('__iter__')) def __len__(self): return self._call('__len__') def __setitem__(self, key, value): return self._call('__setitem__', key, value) class ExecNetSharedStorageServer(dict): permitted_methods = {'__delitem__', '__getitem__', '__iter__', '__len__', '__setitem__'} def __init__(self, channel, *args, **kwargs): super().__init__(*args, **kwargs) self.ch = channel def listen(self): self.ch.setcallback(self._callback) def check(self, timeout=0): message = self.ch.receive(timeout) return self._callback(message) def _callback(self, message): word, obj = message if word in self.permitted_methods: func = getattr(self, word) args, kwargs = obj try: value = func(*args, **kwargs) if word == '__iter__': value = tuple(value) self.ch.send((word, value)) except (KeyError, ValueError) as err: self.ch.send((type(err).__name__, err.args)) else: ch.send(('Exception', ('not permitted', (word, obj)))) def cycle(self, forever=False): # continue indefinitely if forever: while forever: self.check(timeout=None) # if not forever, only finish pending items while True: try: self.check() except self.ch.TimeoutError: break if __name__ == "__channelexec__": # this module was imported and then run via gw.remote_exec(module) # this is only done for testing. We'll act as the Mapping server. while True: servers = [] message = Message(*channel.receive()) if message.word == 'exit': exit() if message.word == 'new': server = ExecNetSharedStorageServer(message.obj) server.listen() servers.append(server) def test_client_server(gw, channel): client = ExecNetSharedStorageClient(gw.newchannel()) channel.send(('new', client.ch)) return client