Last active
September 2, 2023 23:10
-
-
Save ruxo/29eabd3926ba97e1524360ebabc4562e to your computer and use it in GitHub Desktop.
Simple mailbox processor (Single-thread apartment)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Concurrent; | |
using Microsoft.Extensions.Logging; | |
namespace RZ.Foundation; | |
/// <summary> | |
/// A mailbox processor that allows some concurrent dispatching. | |
/// </summary> | |
/// <param name="logger">Microsoft Logger</param> | |
/// <param name="concurrentLimit">Maximum number of concurrent threads allowed to process the mailbox</param> | |
/// <param name="handler">Message handler</param> | |
/// <typeparam name="TData">Type of the data item</typeparam> | |
public sealed class ConcurrentMailBox<TData>(ILogger logger, int concurrentLimit, Func<TData,CancellationToken,ValueTask> handler) : IDisposable | |
{ | |
readonly ConcurrentQueue<TData> queue = new(); | |
readonly CancellationTokenSource queueCancel = new(); | |
readonly SemaphoreSlim limiter = new(0, concurrentLimit); | |
int consumerQuit; | |
public void Post(in TData data) { | |
if (queueCancel.IsCancellationRequested) | |
logger.LogError("MailBox<{Name}> is shutting down. Message is skipped", typeof(TData).Name); | |
else { | |
queue.Enqueue(data); | |
Interlocked.Increment(ref consumerQuit); | |
Task.Factory.StartNew(Dispatch, queueCancel.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current) | |
.ContinueWith(_ => Interlocked.Decrement(ref consumerQuit)); | |
} | |
} | |
async Task Dispatch() { | |
logger.LogDebug("Start a consumer thread {Id} for MailBox<{Name}>", Thread.GetCurrentProcessorId(), typeof(TData).Name); | |
retry: | |
if (!await limiter.WaitAsync(1, queueCancel.Token)) return; | |
try { | |
await RunConsumerLoop(); | |
} | |
finally { | |
limiter.Release(); | |
} | |
if (!queueCancel.IsCancellationRequested && !queue.IsEmpty) | |
goto retry; | |
logger.LogDebug("Consumer thread {Id} for MailBox<{Name}> is exiting the loop", Thread.GetCurrentProcessorId(), typeof(TData).Name); | |
} | |
async ValueTask RunConsumerLoop() { | |
while (!queueCancel.IsCancellationRequested && queue.TryDequeue(out var item)) { | |
try { | |
await handler(item, queueCancel.Token); | |
} | |
catch (Exception e) { | |
logger.LogError(e, "Unhandled exception detected! Message may have been skipped for item: {@Item}", item); | |
} | |
} | |
} | |
public void Dispose() { | |
queueCancel.Cancel(); | |
using var spinCancel = new CancellationTokenSource(MailBox.LoopWaitTime); | |
var wait = new SpinWait(); | |
while (!spinCancel.Token.IsCancellationRequested && consumerQuit > 0) | |
wait.SpinOnce(sleep1Threshold: 10_000); | |
if (consumerQuit > 0) | |
logger.LogError("Bug: {ConsumerCount} background consumers are still working during MailBox<{Name}> exit!", consumerQuit, typeof(TData).Name); | |
queueCancel.Dispose(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Concurrent; | |
using Microsoft.Extensions.Logging; | |
namespace RZ.Foundation; | |
static class MailBox | |
{ | |
public static readonly TimeSpan LoopWaitTime = TimeSpan.FromSeconds(30); | |
} | |
public sealed class MailBox<TData>(ILogger logger, Func<TData,CancellationToken,ValueTask> handler) : IDisposable | |
{ | |
readonly ConcurrentQueue<TData> queue = new(); | |
readonly CancellationTokenSource queueCancel = new(); | |
int consumerQuit; | |
public void Post(TData data) { | |
if (queueCancel.IsCancellationRequested) | |
logger.LogError("MailBox<{Name}> is shutting down. Message is skipped", typeof(TData).Name); | |
else { | |
queue.Enqueue(data); | |
if (Interlocked.Increment(ref consumerQuit) == 1) // Note, rare condition to cause another thread spawn if the counter is wrapped. | |
Task.Factory.StartNew(Dispatch, queueCancel.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current); | |
} | |
} | |
async Task Dispatch() { | |
logger.LogDebug("Start a consumer thread for MailBox<{Name}>", typeof(TData).Name); | |
await RunConsumerLoop(); | |
while (!queueCancel.IsCancellationRequested && !queue.IsEmpty && Interlocked.Increment(ref consumerQuit) == 1) | |
await RunConsumerLoop(); | |
logger.LogDebug("Consumer thread for MailBox<{Name}> is exiting the loop", typeof(TData).Name); | |
} | |
async ValueTask RunConsumerLoop() { | |
try { | |
while (!queueCancel.IsCancellationRequested && queue.TryDequeue(out var item)) { | |
try { | |
await handler(item, queueCancel.Token); | |
} | |
catch (Exception e) { | |
logger.LogError(e, "Unhandled exception detected! Message may have been skipped for item: {@Item}", item); | |
} | |
} | |
} | |
finally { | |
Interlocked.Exchange(ref consumerQuit, 0); | |
logger.LogDebug("Reset the counter of MailBox<{Name}>", typeof(TData).Name); | |
} | |
} | |
public void Dispose() { | |
queueCancel.Cancel(); | |
using var spinCancel = new CancellationTokenSource(MailBox.LoopWaitTime); | |
var wait = new SpinWait(); | |
while (!spinCancel.Token.IsCancellationRequested && consumerQuit > 0) | |
wait.SpinOnce(sleep1Threshold: 10_000); | |
if (consumerQuit > 0) | |
logger.LogError("Bug: the background consumer is still working during MailBox<{Name}> exit!", typeof(TData).Name); | |
queueCancel.Dispose(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment