Last active
April 29, 2025 09:35
-
-
Save cilliemalan/7cc4d5e336cfc0157ed2aae40ee9e23b to your computer and use it in GitHub Desktop.
C# Work Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
' .Net 1.1 and Later | |
Imports System.Threading | |
Public NotInheritable Class MessageSequencer | |
Implements IDisposable | |
Private _disposed As Integer = 0 | |
ReadOnly _waitThread As Thread | |
ReadOnly _mre As New ManualResetEvent(False) | |
ReadOnly _waitQueue As New Queue | |
Public Sub New() | |
_waitThread = New Thread(AddressOf WaitThreadProc) | |
_waitThread.IsBackground = True | |
_waitThread.Name = "MessageSequencer.WaitThreadProc" | |
_waitThread.Start() | |
End Sub | |
Private Class QueueData | |
Public ReadOnly Owner As Subscriber | |
Public ReadOnly Info As Callback | |
Public Sub New(owner As Subscriber, info As Callback) | |
Me.Owner = owner | |
Me.Info = info | |
End Sub | |
End Class | |
Friend Sub AddMessage(Owner As Subscriber, Info As Callback) | |
SyncLock Me | |
If _disposed <> 0 Then | |
Exit Sub | |
End If | |
_waitQueue.Enqueue(New QueueData(Owner, Info)) | |
_mre.Set() | |
End SyncLock | |
End Sub | |
Public Sub ReleaseNextMessage() | |
End Sub | |
Private Sub WaitThreadProc() | |
While _disposed <> 0 | |
_mre.WaitOne() | |
Dim operation As QueueData = Nothing | |
SyncLock Me | |
If _disposed Then | |
Exit While | |
End If | |
If _waitQueue.Count = 0 Then | |
Continue While | |
End If | |
operation = _waitQueue.Dequeue() | |
End SyncLock | |
' we need to run this on a new thread | |
' because the operations have a nonzero | |
' probability of calling Thread.CurrentThread.Abort() | |
' for some arcane reason. | |
Dim nt As New Thread( | |
Sub() | |
Try | |
operation.Owner.RaiseMessage(operation.Info) | |
Catch tae As ThreadAbortException | |
' do nothing. | |
Catch ex As Exception | |
Dim msgInfo = $"MessageSequencer: {My.Application.Info.ProductName}" & | |
$"-{operation.Info.Message.MessageID}: " & | |
ex.Message & vbCrLf & | |
ex.StackTrace & vbCrLf | |
EventLog.WriteEntry("Message Subscriber", msgInfo, EventLogEntryType.Error) | |
End Try | |
End Sub) | |
nt.IsBackground = True | |
nt.Name = "Message Sequencer Thread" | |
nt.Start() | |
nt.Join() | |
SyncLock Me | |
If _waitQueue.Count = 0 Then | |
_mre.Reset() | |
End If | |
End SyncLock | |
End While | |
End Sub | |
Public Sub Dispose() Implements IDisposable.Dispose | |
SyncLock Me | |
_disposed = 1 | |
_waitQueue.Clear() | |
End SyncLock | |
_mre.Set() | |
_waitThread.Join() | |
_mre.Dispose() | |
End Sub | |
End Class | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
' .Net 4.0 and Later | |
Option Infer On | |
Imports System.Threading | |
Imports System.Collections.Concurrent | |
Public NotInheritable Class MessageSequencer | |
Implements IDisposable | |
Private _disposed As Integer = 0 | |
ReadOnly _waitThread As Thread | |
ReadOnly _waitQueue As New BlockingCollection(Of QueueData) | |
Public Sub New() | |
_waitThread = New Thread(AddressOf WaitThreadProc) | |
_waitThread.IsBackground = True | |
_waitThread.Name = "MessageSequencer.WaitThreadProc" | |
_waitThread.Start() | |
End Sub | |
Private Class QueueData | |
Public ReadOnly Owner As Subscriber | |
Public ReadOnly Info As Callback | |
Public Sub New(owner As Subscriber, info As Callback) | |
Me.Owner = owner | |
Me.Info = info | |
End Sub | |
End Class | |
Friend Sub AddMessage(Owner As Subscriber, Info As Callback) | |
If Volatile.Read(_disposed) = 0 Then | |
_waitQueue.Add(New QueueData(Owner, Info)) | |
End If | |
End Sub | |
Public Sub ReleaseNextMessage() | |
End Sub | |
Private Sub WaitThreadProc() | |
For Each operation In _waitQueue.GetConsumingEnumerable() | |
' we need to run this on a new thread | |
' because the operations have a nonzero | |
' probability of calling Thread.CurrentThread.Abort() | |
' for some arcane reason. | |
Dim nt As New Thread( | |
Sub() | |
Try | |
operation.Owner.RaiseMessage(operation.Info) | |
Catch tae As ThreadAbortException | |
' do nothing. | |
Catch ex As Exception | |
Dim msgInfo = $"MessageSequencer: {My.Application.Info.ProductName}" & | |
$"-{operation.Info.Message.MessageID}: " & | |
ex.Message & vbCrLf & | |
ex.StackTrace & vbCrLf | |
EventLog.WriteEntry("Message Subscriber", msgInfo, EventLogEntryType.Error) | |
End Try | |
End Sub) | |
nt.IsBackground = True | |
nt.Name = "Message Sequencer Thread" | |
nt.Start() | |
nt.Join() | |
Next | |
End Sub | |
Public Sub Dispose() Implements IDisposable.Dispose | |
If Interlocked.Exchange(_disposed, 1) = 0 Then | |
_waitQueue.CompleteAdding() | |
_waitThread.Join() | |
_waitQueue.Dispose() | |
End If | |
End Sub | |
End Class | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment