Last active
April 29, 2025 20:19
-
-
Save olegch/5911920dfa105232a5741c659d3a030a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package queue | |
import ( | |
"context" | |
"testing" | |
"time" | |
) | |
// TestLostMessageIssue | |
func TestLostMessageIssue(t *testing.T) { | |
q := &Queue{} | |
// there is no open API to check/guarantee that a subscriber is registered with the queue, | |
// so using an implementation detail here. | |
numSub := func() int { | |
q.mu.Lock() | |
defer q.mu.Unlock() | |
return len(q.subscribers) | |
} | |
// go proc for the first subscriber | |
ctx1, cancel1 := context.WithCancel(context.Background()) | |
defer cancel1() | |
go func() { | |
_, _ = q.Get(ctx1) | |
}() | |
// wait the first subscriber to start and register with the queue; | |
for numSub() < 1 { | |
time.Sleep(1 * time.Millisecond) | |
} | |
// go proc for the second subscriber | |
chSecondSubscriberReceivedMessage := make(chan (struct{})) | |
ctx2, cancel2 := context.WithCancel(context.Background()) | |
defer cancel2() | |
go func() { | |
_, _ = q.Get(ctx2) | |
close(chSecondSubscriberReceivedMessage) | |
}() | |
// wait for both subscribers to start and register with the queue; | |
for numSub() < 2 { | |
time.Sleep(1 * time.Millisecond) | |
} | |
// the first subscriber cancels | |
cancel1() | |
// now sending a messages | |
err := q.Put(Message{Text: "message"}) | |
if err != nil { | |
t.FailNow() | |
} | |
// ... which the second subscriber never receives | |
select { | |
case <-chSecondSubscriberReceivedMessage: | |
case <-time.After(5 * time.Second): | |
t.Fail() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment