Last active
December 18, 2021 23:46
-
-
Save tmarkovski/916d982138ba05ca500d874883bed47e to your computer and use it in GitHub Desktop.
Protobuf serializer for Azure Cosmos DB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics.CodeAnalysis; | |
using System.IO; | |
using System.Text; | |
using System.Text.Json; | |
using Google.Protobuf; | |
using Google.Protobuf.Reflection; | |
using Microsoft.Azure.Cosmos; | |
public class ProtobufCosmosSerializer : CosmosSerializer | |
{ | |
private readonly JsonParser _parser = new(JsonParser.Settings.Default.WithIgnoreUnknownFields(true)); | |
public override T FromStream<T>(Stream stream) { | |
using (stream) { | |
if (typeof(Stream).IsAssignableFrom(typeof(T))) { | |
return (T)(object)stream; | |
} | |
var descriptor = GetDescriptor<T>(); | |
var document = JsonDocument.Parse(stream); | |
if (document.RootElement.ValueKind == JsonValueKind.Array) { | |
var size = document.RootElement.GetArrayLength(); | |
var result = Array.CreateInstance(typeof(T).GetElementType()!, size); | |
for (var i = 0; i < size; i++) { | |
var message = _parser.Parse(document.RootElement[i].GetRawText(), descriptor); | |
result.SetValue(message, i); | |
} | |
return (T)(object)result; | |
} | |
using StreamReader reader = new(stream); | |
// rewind stream to 0 because the document parser moved it to the end | |
reader.BaseStream.Position = 0; | |
return (T)_parser.Parse(reader, descriptor); | |
} | |
} | |
public override Stream ToStream<T>(T input) | |
{ | |
if (input is not IMessage) throw new Exception("input message must be 'IMessage'"); | |
MemoryStream streamPayload = new(); | |
using StreamWriter streamWriter = new(streamPayload, new UTF8Encoding(false, true), 1024, true); | |
JsonFormatter.Default.Format((IMessage) input, streamWriter); | |
streamWriter.Flush(); | |
streamPayload.Position = 0; | |
return streamPayload; | |
} | |
private static MessageDescriptor GetDescriptor<T>() { | |
var messageType = typeof(T).IsArray ? typeof(T).GetElementType()! : typeof(T); | |
if (!messageType.IsAssignableTo(typeof(IMessage))) { | |
throw new Exception($"requested type must be IMessage, found {messageType.Name}"); | |
} | |
return (MessageDescriptor)messageType.GetProperty("Descriptor")!.GetValue(null)!; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment