Skip to content

Instantly share code, notes, and snippets.

@ruxo
Last active September 2, 2023 23:10
Show Gist options
  • Save ruxo/29eabd3926ba97e1524360ebabc4562e to your computer and use it in GitHub Desktop.
Save ruxo/29eabd3926ba97e1524360ebabc4562e to your computer and use it in GitHub Desktop.
Simple mailbox processor (Single-thread apartment)
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
namespace RZ.Foundation;
/// <summary>
/// A mailbox processor that allows some concurrent dispatching.
/// </summary>
/// <param name="logger">Microsoft Logger</param>
/// <param name="concurrentLimit">Maximum number of concurrent threads allowed to process the mailbox</param>
/// <param name="handler">Message handler</param>
/// <typeparam name="TData">Type of the data item</typeparam>
public sealed class ConcurrentMailBox<TData>(ILogger logger, int concurrentLimit, Func<TData,CancellationToken,ValueTask> handler) : IDisposable
{
readonly ConcurrentQueue<TData> queue = new();
readonly CancellationTokenSource queueCancel = new();
readonly SemaphoreSlim limiter = new(0, concurrentLimit);
int consumerQuit;
public void Post(in TData data) {
if (queueCancel.IsCancellationRequested)
logger.LogError("MailBox<{Name}> is shutting down. Message is skipped", typeof(TData).Name);
else {
queue.Enqueue(data);
Interlocked.Increment(ref consumerQuit);
Task.Factory.StartNew(Dispatch, queueCancel.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current)
.ContinueWith(_ => Interlocked.Decrement(ref consumerQuit));
}
}
async Task Dispatch() {
logger.LogDebug("Start a consumer thread {Id} for MailBox<{Name}>", Thread.GetCurrentProcessorId(), typeof(TData).Name);
retry:
if (!await limiter.WaitAsync(1, queueCancel.Token)) return;
try {
await RunConsumerLoop();
}
finally {
limiter.Release();
}
if (!queueCancel.IsCancellationRequested && !queue.IsEmpty)
goto retry;
logger.LogDebug("Consumer thread {Id} for MailBox<{Name}> is exiting the loop", Thread.GetCurrentProcessorId(), typeof(TData).Name);
}
async ValueTask RunConsumerLoop() {
while (!queueCancel.IsCancellationRequested && queue.TryDequeue(out var item)) {
try {
await handler(item, queueCancel.Token);
}
catch (Exception e) {
logger.LogError(e, "Unhandled exception detected! Message may have been skipped for item: {@Item}", item);
}
}
}
public void Dispose() {
queueCancel.Cancel();
using var spinCancel = new CancellationTokenSource(MailBox.LoopWaitTime);
var wait = new SpinWait();
while (!spinCancel.Token.IsCancellationRequested && consumerQuit > 0)
wait.SpinOnce(sleep1Threshold: 10_000);
if (consumerQuit > 0)
logger.LogError("Bug: {ConsumerCount} background consumers are still working during MailBox<{Name}> exit!", consumerQuit, typeof(TData).Name);
queueCancel.Dispose();
}
}
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
namespace RZ.Foundation;
static class MailBox
{
public static readonly TimeSpan LoopWaitTime = TimeSpan.FromSeconds(30);
}
public sealed class MailBox<TData>(ILogger logger, Func<TData,CancellationToken,ValueTask> handler) : IDisposable
{
readonly ConcurrentQueue<TData> queue = new();
readonly CancellationTokenSource queueCancel = new();
int consumerQuit;
public void Post(TData data) {
if (queueCancel.IsCancellationRequested)
logger.LogError("MailBox<{Name}> is shutting down. Message is skipped", typeof(TData).Name);
else {
queue.Enqueue(data);
if (Interlocked.Increment(ref consumerQuit) == 1) // Note, rare condition to cause another thread spawn if the counter is wrapped.
Task.Factory.StartNew(Dispatch, queueCancel.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
}
async Task Dispatch() {
logger.LogDebug("Start a consumer thread for MailBox<{Name}>", typeof(TData).Name);
await RunConsumerLoop();
while (!queueCancel.IsCancellationRequested && !queue.IsEmpty && Interlocked.Increment(ref consumerQuit) == 1)
await RunConsumerLoop();
logger.LogDebug("Consumer thread for MailBox<{Name}> is exiting the loop", typeof(TData).Name);
}
async ValueTask RunConsumerLoop() {
try {
while (!queueCancel.IsCancellationRequested && queue.TryDequeue(out var item)) {
try {
await handler(item, queueCancel.Token);
}
catch (Exception e) {
logger.LogError(e, "Unhandled exception detected! Message may have been skipped for item: {@Item}", item);
}
}
}
finally {
Interlocked.Exchange(ref consumerQuit, 0);
logger.LogDebug("Reset the counter of MailBox<{Name}>", typeof(TData).Name);
}
}
public void Dispose() {
queueCancel.Cancel();
using var spinCancel = new CancellationTokenSource(MailBox.LoopWaitTime);
var wait = new SpinWait();
while (!spinCancel.Token.IsCancellationRequested && consumerQuit > 0)
wait.SpinOnce(sleep1Threshold: 10_000);
if (consumerQuit > 0)
logger.LogError("Bug: the background consumer is still working during MailBox<{Name}> exit!", typeof(TData).Name);
queueCancel.Dispose();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment