Last active
January 12, 2023 19:08
-
-
Save yapaxi/9526ca734c8cc30173276c616324d33b to your computer and use it in GitHub Desktop.
Possible fix for starvation in redis force reconnection code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using StackExchange.Redis; | |
namespace ConsoleApp38 | |
{ | |
class Program | |
{ | |
static async Task Main(string[] args) | |
{ | |
// DEMO CODE | |
// NOT FOR PRODUCTION PURPOSES | |
var redis = new Redis("CONNECTION STRING"); | |
var connection = await redis.GetConnection(CancellationToken.None); | |
var cnsl = new CancellationTokenSource(); | |
Console.CancelKeyPress += (a, b) => | |
{ | |
cnsl.Cancel(); | |
b.Cancel = true; | |
}; | |
Console.WriteLine("Running heath-loop"); | |
var heathLoop = Task.Run(async () => | |
{ | |
while (!cnsl.IsCancellationRequested) | |
{ | |
try | |
{ | |
if (await redis.ConsiderReconnect(cnsl.Token)) | |
{ | |
return; | |
} | |
} | |
catch (OperationCanceledException) when (cnsl.IsCancellationRequested) | |
{ | |
break; | |
} | |
catch (Exception e) | |
{ | |
Console.Write($"Failure in heath loop: {e}"); | |
} | |
} | |
Console.WriteLine($"Heath loop aborted"); | |
}); | |
Console.WriteLine("Running connection-loop"); | |
var connectionLoop = Task.Run(async () => | |
{ | |
while (!cnsl.IsCancellationRequested) | |
{ | |
try | |
{ | |
var newConnection = await redis.GetConnection(cnsl.Token); | |
if (connection != newConnection) | |
{ | |
Console.WriteLine($"Reconnected"); | |
return; | |
} | |
} | |
catch (OperationCanceledException) when (cnsl.IsCancellationRequested) | |
{ | |
break; | |
} | |
catch (Exception e) | |
{ | |
Console.Write($"Failure in connection loop: {e}"); | |
} | |
} | |
Console.WriteLine($"Did not reconnect"); | |
}); | |
await heathLoop; | |
await connectionLoop; | |
} | |
} | |
public class Redis | |
{ | |
// config | |
internal static readonly TimeSpan ReconnectMinFrequency = TimeSpan.FromSeconds(10); | |
internal static readonly TimeSpan ReconnectErrorThreshold = TimeSpan.FromSeconds(5); | |
internal static readonly TimeSpan ConnectionLockTimeout = TimeSpan.FromSeconds(10); | |
internal static readonly TimeSpan ConnectionTimeout = TimeSpan.FromSeconds(20); | |
private readonly string _connectionString; | |
// state | |
private volatile ConnectionMultiplexer? _multiplexer; | |
private readonly SemaphoreSlim _connectionLock; | |
private readonly SemaphoreSlim _reconnectNoWaitLock; | |
private long _lastReconnectTicks; | |
private DateTimeOffset _firstError; | |
private DateTimeOffset _previousError; | |
public Redis(string connectionString) | |
{ | |
if (string.IsNullOrWhiteSpace(connectionString)) | |
{ | |
throw new ArgumentException($"'{nameof(connectionString)}' cannot be null or whitespace.", nameof(connectionString)); | |
} | |
_connectionString = connectionString; | |
_connectionLock = new SemaphoreSlim(1,1); | |
_reconnectNoWaitLock = new SemaphoreSlim(1, 1); | |
_lastReconnectTicks = DateTimeOffset.MinValue.UtcTicks; | |
_firstError = DateTimeOffset.MinValue; | |
_previousError = DateTimeOffset.MinValue; | |
_multiplexer = null; | |
} | |
public async Task<ConnectionMultiplexer> GetConnection(CancellationToken token) | |
{ | |
var mr = _multiplexer; | |
if (mr is not null) | |
{ | |
return mr; | |
} | |
bool taken = false; | |
try | |
{ | |
taken = await _connectionLock.WaitAsync(ConnectionLockTimeout, token).ConfigureAwait(false); | |
if (!taken) | |
{ | |
throw new Exception("Failed to get connection-lock: lock timeout"); | |
} | |
mr = _multiplexer; | |
if (mr is not null) | |
{ | |
return mr; | |
} | |
var connectionTimeoutTask = Task.Delay(ConnectionTimeout, token); | |
var connectionTask = ConnectionMultiplexer.ConnectAsync(_connectionString); | |
var winner = await Task.WhenAny( | |
connectionTask, | |
connectionTimeoutTask | |
).ConfigureAwait(false); | |
if (winner == connectionTimeoutTask) | |
{ | |
throw new Exception("Failed to get connection-lock: connection timeout"); | |
} | |
mr = await connectionTask.ConfigureAwait(false); | |
_multiplexer = mr; | |
return mr; | |
} | |
finally | |
{ | |
if (taken) | |
{ | |
_connectionLock.Release(); | |
} | |
} | |
} | |
public async Task<bool> ConsiderReconnect(CancellationToken token) | |
{ | |
if (_multiplexer is null) | |
{ | |
return false; | |
} | |
var utcNow = DateTimeOffset.UtcNow; | |
var previousTicks = Interlocked.Read(ref _lastReconnectTicks); | |
var previousReconnect = new DateTimeOffset(previousTicks, TimeSpan.Zero); | |
var elapsedSinceLastReconnect = utcNow - previousReconnect; | |
if (elapsedSinceLastReconnect < ReconnectMinFrequency) | |
{ | |
return false; | |
} | |
bool taken = false; | |
try | |
{ | |
// only tests if reconnect is running | |
// but does not wait for it | |
taken = await _reconnectNoWaitLock.WaitAsync(TimeSpan.Zero).ConfigureAwait(false); | |
if (!taken) | |
{ | |
return false; | |
} | |
utcNow = DateTimeOffset.UtcNow; | |
elapsedSinceLastReconnect = utcNow - previousReconnect; | |
if (_firstError == DateTimeOffset.MinValue) | |
{ | |
_firstError = utcNow; | |
_previousError = utcNow; | |
return false; | |
} | |
if (elapsedSinceLastReconnect < ReconnectMinFrequency) | |
{ | |
return false; | |
} | |
var elapsedSinceFirstError = utcNow - _firstError; | |
var elapsedSinceMostRecentError = utcNow - _previousError; | |
var shouldReconnect = | |
elapsedSinceFirstError >= ReconnectErrorThreshold | |
&& elapsedSinceMostRecentError <= ReconnectErrorThreshold; | |
_previousError = utcNow; | |
if (!shouldReconnect) | |
{ | |
return false; | |
} | |
_firstError = DateTimeOffset.MinValue; | |
_previousError = DateTimeOffset.MinValue; | |
var oldMultiplexer = _multiplexer; | |
if (oldMultiplexer is not null) | |
{ | |
try | |
{ | |
await Task.WhenAny( | |
oldMultiplexer.CloseAsync(), | |
Task.Delay(ConnectionTimeout, token) | |
).ConfigureAwait(false); | |
} | |
catch | |
{ | |
} | |
} | |
_multiplexer = null; | |
Interlocked.Exchange(ref _lastReconnectTicks, utcNow.UtcTicks); | |
return true; | |
} | |
finally | |
{ | |
if (taken) | |
{ | |
_reconnectNoWaitLock.Release(); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment