Created
January 25, 2018 02:13
-
-
Save wixb50/6a95a255f0f7aca32cbd718124a46ba7 to your computer and use it in GitHub Desktop.
zerorpc thread friendly client.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# @Author: xiewenqian <int> | |
# @Date: 2016-09-12T09:13:01+08:00 | |
# @Email: [email protected] | |
# @Last modified by: int | |
# @Last modified time: 2016-09-22T10:24:14+08:00 | |
import os | |
import zmq | |
import logging | |
from zerorpc.events import Events | |
from zerorpc.context import Context | |
from zerorpc.exceptions import TimeoutExpired, RemoteError | |
from redis.connection import BlockingConnectionPool | |
ZRPC_HOST_SOCK = 'tcp://120.0.0.1:8080' | |
import msgpack_numpy as m | |
m.patch() | |
logger = logging.getLogger(__name__) | |
class RContext(Context): | |
_instance = None | |
def __init__(self): | |
super(RContext, self).__init__() | |
@staticmethod | |
def get_instance(): | |
if RContext._instance is None: | |
RContext._instance = RContext() | |
return RContext._instance | |
def socket(self, socket_type): | |
if self.closed: | |
raise zmq.ZMQError(zmq.ENOTSUP) | |
return zmq.Socket(self, socket_type) | |
class Connection(Events): | |
def __init__(self, connect_to, heartbeat=30): | |
super(Connection, self).__init__(zmq.REQ, RContext.get_instance()) | |
self._heartbeat = heartbeat | |
self.connect_to = connect_to | |
self.connect(connect_to) | |
self.pid = os.getpid() | |
def _process_answer(self, context, req_event, rep_event, | |
handle_remote_error): | |
if rep_event.name == u'ERR': | |
exception = handle_remote_error(rep_event) | |
context.hook_client_after_request(req_event, rep_event, exception) | |
raise exception | |
context.hook_client_after_request(req_event, rep_event) | |
return rep_event.args[0] | |
def _handle_remote_error(self, event): | |
exception = self._context.hook_client_handle_remote_error(event) | |
if not exception: | |
if event.header.get(u'v', 1) >= 2: | |
(name, msg, traceback) = event.args | |
exception = RemoteError(name, msg, traceback) | |
else: | |
(msg,) = event.args | |
exception = RemoteError('RemoteError', msg, None) | |
return exception | |
def process_response(self, request_event, timeout): | |
def raise_error(ex): | |
self._context.hook_client_after_request(request_event, None, ex) | |
raise ex | |
try: | |
reply_event = self.recv(timeout=timeout) | |
except TimeoutExpired: | |
raise_error(TimeoutExpired(timeout, | |
'calling remote method {0}'.format(request_event.name))) | |
return self._process_answer(self._context, request_event, | |
reply_event, self._handle_remote_error) | |
def __call__(self, method, *args, **kargs): | |
if isinstance(method, bytes): | |
method = method.decode('utf-8') | |
timeout = kargs.get('timeout', self._heartbeat) | |
request_event = self.new_event(method, args) | |
self.emit_event(request_event) | |
return self.process_response(request_event, timeout) | |
def disconnect(self, resolve=True): | |
r = [] | |
for endpoint_ in self._resolve_endpoint(self.connect_to, resolve): | |
r.append(self._socket.disconnect(endpoint_)) | |
logger.debug('disconnected from %s (status=%s)', endpoint_, r[-1]) | |
return r | |
class RPClient(object): | |
def __init__(self, connect_to, heartbeat=30): | |
self.connection_pool = BlockingConnectionPool( | |
connection_class=Connection, | |
timeout=heartbeat, | |
connect_to=connect_to, | |
heartbeat=heartbeat | |
) | |
def close(self): | |
self.connection_pool.disconnect() | |
def __getattr__(self, name): | |
return lambda *args, **kwargs: self(name, *args, **kwargs) | |
def __call__(self, name, *args, **kwargs): | |
connection = self.connection_pool.get_connection('') | |
try: | |
return connection(name, *args, **kwargs) | |
finally: | |
self.connection_pool.release(connection) | |
class RPCSock(RPClient): | |
def __init__(self, heartbeat=60): | |
super(RPCSock, self).__init__(ZRPC_HOST_SOCK, heartbeat) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment