Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created December 6, 2024 17:49
Show Gist options
  • Save to11mtm/8f51de404be5fbd5a5937423caefc844 to your computer and use it in GitHub Desktop.
Save to11mtm/8f51de404be5fbd5a5937423caefc844 to your computer and use it in GitHub Desktop.
NatsKVStore Example for Split GetEntryAsync
public class NatsKVStore : INatsKVStore
{
public ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer<T>();
var request = new StreamMsgGetRequest();
var keySubject = $"$KV.{Bucket}.{key}";
if (revision == default)
{
request.LastBySubj = keySubject;
}
else
{
request.Seq = revision;
request.NextBySubj = keySubject;
}
if (_stream.Info.Config.AllowDirect)
{
return GetEntryDirectAsync(key, revision, serializer, request, keySubject, cancellationToken);
}
else
{
return GetEntryNonDirectAsync(key, revision, serializer, request, keySubject, cancellationToken);
}
}
private async ValueTask<NatsKVEntry<T>> GetEntryNonDirectAsync<T>(string key, ulong revision, INatsDeserialize<T> serializer, StreamMsgGetRequest request, string keySubject, CancellationToken cancellationToken)
{
var response = await _stream.GetAsync(request, cancellationToken);
if (revision != default)
{
if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal))
{
throw new NatsKVException("Unexpected subject");
}
}
T? data;
NatsDeserializeException? deserializeException = null;
if (response.Message.Data.Length > 0)
{
var buffer = new ReadOnlySequence<byte>(response.Message.Data);
try
{
data = serializer.Deserialize(buffer);
}
catch (Exception e)
{
deserializeException = new NatsDeserializeException(buffer.ToArray(), e);
data = default;
}
}
else
{
data = default;
}
return new NatsKVEntry<T>(Bucket, key)
{
Created = response.Message.Time,
Revision = response.Message.Seq,
Value = data,
UsedDirectGet = false,
Error = deserializeException,
};
}
private async ValueTask<NatsKVEntry<T>> GetEntryDirectAsync<T>(
string key,
ulong revision,
INatsDeserialize<T> serializer,
StreamMsgGetRequest request,
string keySubject,
CancellationToken cancellationToken)
{
var direct = await _stream.GetDirectAsync<T>(request, serializer, cancellationToken);
if (direct is { Headers: { } headers } msg)
{
var sequence = ValidateAndGetSequence(revision, headers, keySubject, out var timestamp, out var operation);
return new NatsKVEntry<T>(Bucket, key)
{
Bucket = Bucket,
Key = key,
Created = timestamp,
Revision = sequence,
Operation = operation,
Value = msg.Data,
Delta = 0,
UsedDirectGet = true,
Error = msg.Error,
};
}
else
{
throw new NatsKVException("Missing headers");
}
}
private static ulong ValidateAndGetSequence(ulong revision, NatsHeaders headers, string keySubject,
out DateTimeOffset timestamp, out NatsKVOperation operation)
{
if (headers.Code == 404)
throw new NatsKVKeyNotFoundException();
if (!headers.TryGetLastValue(NatsSubject, out var subject))
throw new NatsKVException("Missing sequence header");
if (revision != default)
{
if (!string.Equals(subject, keySubject, StringComparison.Ordinal))
{
throw new NatsKVException("Unexpected subject");
}
}
if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue))
throw new NatsKVException("Missing sequence header");
if (!ulong.TryParse(sequenceValue, out var sequence))
throw new NatsKVException("Can't parse sequence header");
if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue))
throw new NatsKVException("Missing timestamp header");
if (!DateTimeOffset.TryParse(timestampValue, out timestamp))
throw new NatsKVException("Can't parse timestamp header");
operation = NatsKVOperation.Put;
if (headers.TryGetValue(KVOperation, out var operationValues))
{
if (operationValues.Count != 1)
throw new NatsKVException("Unexpected number of operation headers");
if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation))
throw new NatsKVException("Can't parse operation header");
}
if (operation is NatsKVOperation.Del or NatsKVOperation.Purge)
{
throw new NatsKVKeyDeletedException(sequence);
}
return sequence;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment