Created
November 29, 2015 02:22
-
-
Save Grokzen/8109ef4c0b4fb149e8ee to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@@ -397,7 +400,8 @@ class StrictRedis(object): | |
charset=None, errors=None, | |
decode_responses=False, retry_on_timeout=False, | |
ssl=False, ssl_keyfile=None, ssl_certfile=None, | |
- ssl_cert_reqs=None, ssl_ca_certs=None): | |
+ ssl_cert_reqs=None, ssl_ca_certs=None, | |
+ max_connections=None): | |
if not connection_pool: | |
if charset is not None: | |
warnings.warn(DeprecationWarning( | |
@@ -415,7 +419,8 @@ class StrictRedis(object): | |
'encoding': encoding, | |
'encoding_errors': encoding_errors, | |
'decode_responses': decode_responses, | |
- 'retry_on_timeout': retry_on_timeout | |
+ 'retry_on_timeout': retry_on_timeout, | |
+ 'max_connections': max_connections | |
} | |
# based on input, setup appropriate connection args | |
if unix_socket_path is not None: | |
@@ -476,6 +481,7 @@ class StrictRedis(object): | |
""" | |
shard_hint = kwargs.pop('shard_hint', None) | |
value_from_callable = kwargs.pop('value_from_callable', False) | |
+ watch_delay = kwargs.pop('watch_delay', None) | |
with self.pipeline(True, shard_hint) as pipe: | |
while 1: | |
try: | |
@@ -485,6 +491,8 @@ class StrictRedis(object): | |
exec_value = pipe.execute() | |
return func_value if value_from_callable else exec_value | |
except WatchError: | |
+ if watch_delay is not None and watch_delay > 0: | |
+ time.sleep(watch_delay) | |
continue | |
def lock(self, name, timeout=None, sleep=0.1, blocking_timeout=None, | |
@@ -1799,12 +1832,12 @@ class StrictRedis(object): | |
"Adds the specified elements to the specified HyperLogLog." | |
return self.execute_command('PFADD', name, *values) | |
- def pfcount(self, name): | |
+ def pfcount(self, *sources): | |
""" | |
Return the approximated cardinality of | |
- the set observed by the HyperLogLog at key. | |
+ the set observed by the HyperLogLog at key(s). | |
""" | |
- return self.execute_command('PFCOUNT', name) | |
+ return self.execute_command('PFCOUNT', *sources) | |
def pfmerge(self, dest, *sources): | |
"Merge N different HyperLogLogs into a single one." | |
@@ -2142,10 +2175,10 @@ class PubSub(object): | |
# previously listening to | |
return command(*args) | |
- def parse_response(self, block=True): | |
+ def parse_response(self, block=True, timeout=0): | |
"Parse the response from a publish/subscribe command" | |
connection = self.connection | |
- if not block and not connection.can_read(): | |
+ if not block and not connection.can_read(timeout=timeout): | |
return None | |
return self._execute(connection, connection.read_response) | |
--- a/redis/connection.py | |
+++ b/redis/connection.py | |
@@ -79,7 +80,9 @@ class Token(object): | |
class BaseParser(object): | |
EXCEPTION_CLASSES = { | |
- 'ERR': ResponseError, | |
+ 'ERR': { | |
+ 'max number of clients reached': ConnectionError | |
+ }, | |
'EXECABORT': ExecAbortError, | |
'LOADING': BusyLoadingError, | |
'NOSCRIPT': NoScriptError, | |
@@ -91,7 +94,10 @@ class BaseParser(object): | |
error_code = response.split(' ')[0] | |
if error_code in self.EXCEPTION_CLASSES: | |
response = response[len(error_code) + 1:] | |
- return self.EXCEPTION_CLASSES[error_code](response) | |
+ exception_class = self.EXCEPTION_CLASSES[error_code] | |
+ if isinstance(exception_class, dict): | |
+ exception_class = exception_class.get(response, ResponseError) | |
+ return exception_class(response) | |
return ResponseError(response) | |
@@ -542,11 +548,12 @@ class Connection(object): | |
e = sys.exc_info()[1] | |
self.disconnect() | |
if len(e.args) == 1: | |
- _errno, errmsg = 'UNKNOWN', e.args[0] | |
+ errno, errmsg = 'UNKNOWN', e.args[0] | |
else: | |
- _errno, errmsg = e.args | |
+ errno = e.args[0] | |
+ errmsg = e.args[1] | |
raise ConnectionError("Error %s while writing to socket. %s." % | |
- (_errno, errmsg)) | |
+ (errno, errmsg)) | |
except: | |
self.disconnect() | |
raise | |
@@ -555,13 +562,14 @@ class Connection(object): | |
"Pack and send a command to the Redis server" | |
self.send_packed_command(self.pack_command(*args)) | |
- def can_read(self): | |
+ def can_read(self, timeout=0): | |
"Poll the socket to see if there's data that can be read." | |
sock = self._sock | |
if not sock: | |
self.connect() | |
sock = self._sock | |
- return bool(select([sock], [], [], 0)[0]) or self._parser.can_read() | |
+ return self._parser.can_read() or \ | |
+ bool(select([sock], [], [], timeout)[0]) | |
def read_response(self): | |
"Read the response from a previously sent command" | |
diff --git a/redis/sentinel.py b/redis/sentinel.py | |
index 2f30062..3fb89ce 100644 | |
--- a/redis/sentinel.py | |
+++ b/redis/sentinel.py | |
@@ -129,6 +129,8 @@ class SentinelConnectionPool(ConnectionPool): | |
self.disconnect() | |
self.reset() | |
self.__init__(self.service_name, self.sentinel_manager, | |
+ is_master=self.is_master, | |
+ check_connection=self.check_connection, | |
connection_class=self.connection_class, | |
max_connections=self.max_connections, | |
**self.connection_kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment