Skip to content

Instantly share code, notes, and snippets.

@olegch
Last active April 29, 2025 20:19
Show Gist options
  • Save olegch/5911920dfa105232a5741c659d3a030a to your computer and use it in GitHub Desktop.
Save olegch/5911920dfa105232a5741c659d3a030a to your computer and use it in GitHub Desktop.
package queue
import (
"context"
"testing"
"time"
)
// TestLostMessageIssue
func TestLostMessageIssue(t *testing.T) {
q := &Queue{}
// there is no open API to check/guarantee that a subscriber is registered with the queue,
// so using an implementation detail here.
numSub := func() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.subscribers)
}
// go proc for the first subscriber
ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
go func() {
_, _ = q.Get(ctx1)
}()
// wait the first subscriber to start and register with the queue;
for numSub() < 1 {
time.Sleep(1 * time.Millisecond)
}
// go proc for the second subscriber
chSecondSubscriberReceivedMessage := make(chan (struct{}))
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
go func() {
_, _ = q.Get(ctx2)
close(chSecondSubscriberReceivedMessage)
}()
// wait for both subscribers to start and register with the queue;
for numSub() < 2 {
time.Sleep(1 * time.Millisecond)
}
// the first subscriber cancels
cancel1()
// now sending a messages
err := q.Put(Message{Text: "message"})
if err != nil {
t.FailNow()
}
// ... which the second subscriber never receives
select {
case <-chSecondSubscriberReceivedMessage:
case <-time.After(5 * time.Second):
t.Fail()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment