Created
December 6, 2024 17:49
-
-
Save to11mtm/8f51de404be5fbd5a5937423caefc844 to your computer and use it in GitHub Desktop.
NatsKVStore Example for Split GetEntryAsync
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class NatsKVStore : INatsKVStore | |
{ | |
public ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default) | |
{ | |
ValidateKey(key); | |
serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer<T>(); | |
var request = new StreamMsgGetRequest(); | |
var keySubject = $"$KV.{Bucket}.{key}"; | |
if (revision == default) | |
{ | |
request.LastBySubj = keySubject; | |
} | |
else | |
{ | |
request.Seq = revision; | |
request.NextBySubj = keySubject; | |
} | |
if (_stream.Info.Config.AllowDirect) | |
{ | |
return GetEntryDirectAsync(key, revision, serializer, request, keySubject, cancellationToken); | |
} | |
else | |
{ | |
return GetEntryNonDirectAsync(key, revision, serializer, request, keySubject, cancellationToken); | |
} | |
} | |
private async ValueTask<NatsKVEntry<T>> GetEntryNonDirectAsync<T>(string key, ulong revision, INatsDeserialize<T> serializer, StreamMsgGetRequest request, string keySubject, CancellationToken cancellationToken) | |
{ | |
var response = await _stream.GetAsync(request, cancellationToken); | |
if (revision != default) | |
{ | |
if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) | |
{ | |
throw new NatsKVException("Unexpected subject"); | |
} | |
} | |
T? data; | |
NatsDeserializeException? deserializeException = null; | |
if (response.Message.Data.Length > 0) | |
{ | |
var buffer = new ReadOnlySequence<byte>(response.Message.Data); | |
try | |
{ | |
data = serializer.Deserialize(buffer); | |
} | |
catch (Exception e) | |
{ | |
deserializeException = new NatsDeserializeException(buffer.ToArray(), e); | |
data = default; | |
} | |
} | |
else | |
{ | |
data = default; | |
} | |
return new NatsKVEntry<T>(Bucket, key) | |
{ | |
Created = response.Message.Time, | |
Revision = response.Message.Seq, | |
Value = data, | |
UsedDirectGet = false, | |
Error = deserializeException, | |
}; | |
} | |
private async ValueTask<NatsKVEntry<T>> GetEntryDirectAsync<T>( | |
string key, | |
ulong revision, | |
INatsDeserialize<T> serializer, | |
StreamMsgGetRequest request, | |
string keySubject, | |
CancellationToken cancellationToken) | |
{ | |
var direct = await _stream.GetDirectAsync<T>(request, serializer, cancellationToken); | |
if (direct is { Headers: { } headers } msg) | |
{ | |
var sequence = ValidateAndGetSequence(revision, headers, keySubject, out var timestamp, out var operation); | |
return new NatsKVEntry<T>(Bucket, key) | |
{ | |
Bucket = Bucket, | |
Key = key, | |
Created = timestamp, | |
Revision = sequence, | |
Operation = operation, | |
Value = msg.Data, | |
Delta = 0, | |
UsedDirectGet = true, | |
Error = msg.Error, | |
}; | |
} | |
else | |
{ | |
throw new NatsKVException("Missing headers"); | |
} | |
} | |
private static ulong ValidateAndGetSequence(ulong revision, NatsHeaders headers, string keySubject, | |
out DateTimeOffset timestamp, out NatsKVOperation operation) | |
{ | |
if (headers.Code == 404) | |
throw new NatsKVKeyNotFoundException(); | |
if (!headers.TryGetLastValue(NatsSubject, out var subject)) | |
throw new NatsKVException("Missing sequence header"); | |
if (revision != default) | |
{ | |
if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) | |
{ | |
throw new NatsKVException("Unexpected subject"); | |
} | |
} | |
if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) | |
throw new NatsKVException("Missing sequence header"); | |
if (!ulong.TryParse(sequenceValue, out var sequence)) | |
throw new NatsKVException("Can't parse sequence header"); | |
if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) | |
throw new NatsKVException("Missing timestamp header"); | |
if (!DateTimeOffset.TryParse(timestampValue, out timestamp)) | |
throw new NatsKVException("Can't parse timestamp header"); | |
operation = NatsKVOperation.Put; | |
if (headers.TryGetValue(KVOperation, out var operationValues)) | |
{ | |
if (operationValues.Count != 1) | |
throw new NatsKVException("Unexpected number of operation headers"); | |
if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) | |
throw new NatsKVException("Can't parse operation header"); | |
} | |
if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) | |
{ | |
throw new NatsKVKeyDeletedException(sequence); | |
} | |
return sequence; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment