Created
August 31, 2021 18:13
-
-
Save zpqrtbnk/07ca326fca1a0a77ed5fe56f59915a26 to your computer and use it in GitHub Desktop.
HazelcastIssue466
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved. | |
// | |
// Licensed under the Apache License, Version 2.0 (the "License"); | |
// you may not use this file except in compliance with the License. | |
// You may obtain a copy of the License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, software | |
// distributed under the License is distributed on an "AS IS" BASIS, | |
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
// See the License for the specific language governing permissions and | |
// limitations under the License. | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Hazelcast.Core; | |
using Hazelcast.Serialization; | |
using Hazelcast.Testing; | |
using Hazelcast.Testing.Logging; | |
using Microsoft.Extensions.Logging; | |
using NUnit.Framework; | |
namespace Hazelcast.Tests.Sandbox | |
{ | |
// define extension methods, but don't leak them | |
using SupportTests_; | |
namespace SupportTests_ | |
{ | |
public static class Extensions | |
{ | |
public static SerializationOptions AddSerializer<TSerialized, TSerializer>(this SerializationOptions options) | |
where TSerializer : ISerializer, new() | |
{ | |
options.Serializers.Add(new SerializerOptions { SerializedType = typeof(TSerialized), Creator = () => new TSerializer() }); | |
return options; | |
} | |
} | |
} | |
[TestFixture] | |
public class SupportTests : SingleMemberRemoteTestBase | |
{ | |
// items for the queue | |
private class QueueItem { } | |
// dummy serializer for the queue items | |
private class QueueItemSerializer : IStreamSerializer<QueueItem> | |
{ | |
public void Dispose() | |
{ } | |
public int TypeId { get; } = 666; | |
public QueueItem Read(IObjectDataInput input) | |
=> new QueueItem(); | |
public void Write(IObjectDataOutput output, QueueItem obj) | |
{ } | |
} | |
// log to HConsole | |
protected override ILoggerFactory CreateLoggerFactory() => | |
Microsoft.Extensions.Logging.LoggerFactory.Create(builder => builder.AddHConsole()); | |
[Test] | |
[Timeout(20_000)] | |
public async Task TryToReproduceIssue() | |
{ | |
var queueName = "queue" + CreateUniqueName(); | |
var queueSubscribed = 0; | |
var queueEventsCount = 0; | |
using var _ = HConsole.Capture(o => o | |
.ClearAll() | |
.Configure<HConsoleLoggerProvider>().SetPrefix("LOG").SetMaxLevel() | |
.Configure().SetMinLevel() | |
.Configure(this).SetPrefix("TEST").SetMaxLevel() | |
); | |
var options = new HazelcastOptionsBuilder() | |
.With(o => | |
{ | |
// our test environment provides a cluster, and we need to configure the client accordingly | |
o.ClusterName = RcCluster.Id; | |
// our cluster lives on localhost | |
o.Networking.Addresses.Clear(); | |
o.Networking.Addresses.Add("127.0.0.1:5701"); | |
// fail fast, default timeout is infinite | |
o.Networking.ConnectionRetry.ClusterConnectionTimeoutMilliseconds = 4000; | |
// our test environment provides a logger factory | |
o.LoggerFactory.Creator = () => LoggerFactory; | |
// we need to be able to (de)serialize queue items | |
o.Serialization.AddSerializer<QueueItem, QueueItemSerializer>(); | |
// subscribe | |
o.AddSubscriber(events => events | |
.StateChanged(async (c, a) => | |
{ | |
if (a.State == ClientState.Connected) | |
{ | |
if (Interlocked.CompareExchange(ref queueSubscribed, 1, 0) == 1) return; // only once! | |
HConsole.WriteLine(this, "State == Connected"); | |
var q = await c.GetQueueAsync<QueueItem>(queueName).ConfigureAwait(false); | |
await q.SubscribeAsync(queueEvents => queueEvents | |
.ItemAdded((xq, xa) => | |
{ | |
HConsole.WriteLine(this, "Received item"); | |
Interlocked.Increment(ref queueEventsCount); | |
})).ConfigureAwait(false); | |
} | |
})); | |
}) | |
.Build(); | |
HConsole.WriteLine(this, "Start new client..."); | |
var client = await HazelcastClientFactory.StartNewClientAsync(options).ConfigureAwait(false); | |
HConsole.WriteLine(this, "Get queue..."); | |
var queue = await client.GetQueueAsync<QueueItem>(queueName).ConfigureAwait(false); | |
HConsole.WriteLine(this, "Offer items..."); | |
const int itemsCount = 12; | |
for (var i = 0; i < itemsCount; i++) | |
{ | |
Assert.That(await queue.OfferAsync(new QueueItem()).ConfigureAwait(false), Is.True); | |
} | |
// eventually, the event count will match | |
HConsole.WriteLine(this, "Count events..."); | |
await AssertEx.SucceedsEventually(() => Assert.That(queueEventsCount, Is.EqualTo(itemsCount)), 4000, 200).ConfigureAwait(false); | |
HConsole.WriteLine(this, "Success!"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment