Skip to content

Instantly share code, notes, and snippets.

@cilliemalan
Last active April 29, 2025 09:35
Show Gist options
  • Save cilliemalan/7cc4d5e336cfc0157ed2aae40ee9e23b to your computer and use it in GitHub Desktop.
Save cilliemalan/7cc4d5e336cfc0157ed2aae40ee9e23b to your computer and use it in GitHub Desktop.
C# Work Queue
' .Net 1.1 and Later
Imports System.Threading
Public NotInheritable Class MessageSequencer
Implements IDisposable
Private _disposed As Integer = 0
ReadOnly _waitThread As Thread
ReadOnly _mre As New ManualResetEvent(False)
ReadOnly _waitQueue As New Queue
Public Sub New()
_waitThread = New Thread(AddressOf WaitThreadProc)
_waitThread.IsBackground = True
_waitThread.Name = "MessageSequencer.WaitThreadProc"
_waitThread.Start()
End Sub
Private Class QueueData
Public ReadOnly Owner As Subscriber
Public ReadOnly Info As Callback
Public Sub New(owner As Subscriber, info As Callback)
Me.Owner = owner
Me.Info = info
End Sub
End Class
Friend Sub AddMessage(Owner As Subscriber, Info As Callback)
SyncLock Me
If _disposed <> 0 Then
Exit Sub
End If
_waitQueue.Enqueue(New QueueData(Owner, Info))
_mre.Set()
End SyncLock
End Sub
Public Sub ReleaseNextMessage()
End Sub
Private Sub WaitThreadProc()
While _disposed <> 0
_mre.WaitOne()
Dim operation As QueueData = Nothing
SyncLock Me
If _disposed Then
Exit While
End If
If _waitQueue.Count = 0 Then
Continue While
End If
operation = _waitQueue.Dequeue()
End SyncLock
' we need to run this on a new thread
' because the operations have a nonzero
' probability of calling Thread.CurrentThread.Abort()
' for some arcane reason.
Dim nt As New Thread(
Sub()
Try
operation.Owner.RaiseMessage(operation.Info)
Catch tae As ThreadAbortException
' do nothing.
Catch ex As Exception
Dim msgInfo = $"MessageSequencer: {My.Application.Info.ProductName}" &
$"-{operation.Info.Message.MessageID}: " &
ex.Message & vbCrLf &
ex.StackTrace & vbCrLf
EventLog.WriteEntry("Message Subscriber", msgInfo, EventLogEntryType.Error)
End Try
End Sub)
nt.IsBackground = True
nt.Name = "Message Sequencer Thread"
nt.Start()
nt.Join()
SyncLock Me
If _waitQueue.Count = 0 Then
_mre.Reset()
End If
End SyncLock
End While
End Sub
Public Sub Dispose() Implements IDisposable.Dispose
SyncLock Me
_disposed = 1
_waitQueue.Clear()
End SyncLock
_mre.Set()
_waitThread.Join()
_mre.Dispose()
End Sub
End Class
' .Net 4.0 and Later
Option Infer On
Imports System.Threading
Imports System.Collections.Concurrent
Public NotInheritable Class MessageSequencer
Implements IDisposable
Private _disposed As Integer = 0
ReadOnly _waitThread As Thread
ReadOnly _waitQueue As New BlockingCollection(Of QueueData)
Public Sub New()
_waitThread = New Thread(AddressOf WaitThreadProc)
_waitThread.IsBackground = True
_waitThread.Name = "MessageSequencer.WaitThreadProc"
_waitThread.Start()
End Sub
Private Class QueueData
Public ReadOnly Owner As Subscriber
Public ReadOnly Info As Callback
Public Sub New(owner As Subscriber, info As Callback)
Me.Owner = owner
Me.Info = info
End Sub
End Class
Friend Sub AddMessage(Owner As Subscriber, Info As Callback)
If Volatile.Read(_disposed) = 0 Then
_waitQueue.Add(New QueueData(Owner, Info))
End If
End Sub
Public Sub ReleaseNextMessage()
End Sub
Private Sub WaitThreadProc()
For Each operation In _waitQueue.GetConsumingEnumerable()
' we need to run this on a new thread
' because the operations have a nonzero
' probability of calling Thread.CurrentThread.Abort()
' for some arcane reason.
Dim nt As New Thread(
Sub()
Try
operation.Owner.RaiseMessage(operation.Info)
Catch tae As ThreadAbortException
' do nothing.
Catch ex As Exception
Dim msgInfo = $"MessageSequencer: {My.Application.Info.ProductName}" &
$"-{operation.Info.Message.MessageID}: " &
ex.Message & vbCrLf &
ex.StackTrace & vbCrLf
EventLog.WriteEntry("Message Subscriber", msgInfo, EventLogEntryType.Error)
End Try
End Sub)
nt.IsBackground = True
nt.Name = "Message Sequencer Thread"
nt.Start()
nt.Join()
Next
End Sub
Public Sub Dispose() Implements IDisposable.Dispose
If Interlocked.Exchange(_disposed, 1) = 0 Then
_waitQueue.CompleteAdding()
_waitThread.Join()
_waitQueue.Dispose()
End If
End Sub
End Class
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment