Skip to content

Instantly share code, notes, and snippets.

@ruxo
Last active August 24, 2024 02:57
Show Gist options
  • Save ruxo/0a5b358caa7b6f80010011c45b350fd1 to your computer and use it in GitHub Desktop.
Save ruxo/0a5b358caa7b6f80010011c45b350fd1 to your computer and use it in GitHub Desktop.
using System.Collections.Frozen;
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Text.Json.Nodes;
using Akka.Actor;
using Akka.DependencyInjection;
using Akka.IO;
using Microsoft.Extensions.Logging;
using TCRB.CoreApp;
namespace TCRB.DontCall.Helpers;
public static class ActorExtension
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
static Props DependencyProps<T>(this ActorSystem sys, params object[] parameters) where T : ActorBase =>
DependencyResolver.For(sys).Props<T>(parameters);
public static IActorRef CreateActor<T>(this ActorSystem sys, string name, params object[] parameters) where T : ActorBase =>
sys.ActorOf(sys.DependencyProps<T>(parameters), name);
public static IActorRef CreateActor<T>(this IUntypedActorContext context, string name, params object[] parameters) where T : ActorBase =>
context.ActorOf(context.System.DependencyProps<T>(parameters), name);
public static void ReceiveTcp(this IActorRef self, FrozenDictionary<string, Type> deserializers, ILogger logger,
Tcp.Received received) {
var data = received.Data.ToString();
var (typeName, jsonObject) = JsonSerializer.Deserialize<(string, JsonObject)>(data);
if (deserializers.Get(typeName).IfSome(out var t))
self.Forward(jsonObject.Deserialize(t));
else
logger.LogError("Unexpected type: {Type}", typeName);
}
public static void TellTcp<T>(this IActorRef actor, T message) where T : notnull {
var package = JsonSerializer.Serialize((typeof(T).FullName, message), JsonStandardSerialization.Default);
actor.Tell(Tcp.Write.Create(ByteString.FromString(package)));
}
/// <summary>
/// Nicely wrap the actor's ask pattern into EitherAsync&lt;ErrorInfo,T&gt;.
/// </summary>
/// <param name="actor">Target actor</param>
/// <param name="message">A query message</param>
/// <typeparam name="T">A success type</typeparam>
/// <returns></returns>
public static EitherAsync<ErrorInfo,T> SafeAsk<T>(this ICanTell actor, object message)
=> TryCatch(async () => await actor.Ask<T>(message));
public static void SafeTell<T>(this ICanTell actor, Func<T> handler, IActorRef? from = default) {
var sender = from ?? ActorCell.GetCurrentSelfOrNoSender();
try {
actor.Tell(handler(), sender);
} catch (Exception e) {
actor.Tell(new Status.Failure(e is ErrorInfoException? e : new ErrorInfoException(ErrorFrom.Exception(e))), sender);
}
}
public static async ValueTask SafeTell<T>(this ICanTell actor, Func<ValueTask<T>> handler, IActorRef? from = default) {
var sender = from ?? ActorCell.GetCurrentSelfOrNoSender();
try {
actor.Tell(await handler(), sender);
} catch (Exception e) {
actor.Tell(new Status.Failure(e is ErrorInfoException? e : new ErrorInfoException(ErrorFrom.Exception(e))), sender);
}
}
}
using System.Collections.Frozen;
using System.Linq.Expressions;
using System.Reflection;
using System.Runtime.CompilerServices;
using Akka.Actor;
namespace RZ.AkkaNet;
[AttributeUsage(AttributeTargets.Class)]
public sealed class AkkaWrapperAttribute : Attribute;
public static class AkkaWrapper
{
static readonly FrozenDictionary<Type, Func<ActorSystem, IActorRef, object>> creators;
static AkkaWrapper() {
var wrapperTypes = Seq(from assembly in AppDomain.CurrentDomain.GetAssemblies()
where assembly.GetName().Name!.StartsWith("DontCall")
from type in assembly.GetTypes()
where type.GetCustomAttribute<AkkaWrapperAttribute>() is not null
select type);
var creatorList = from type in wrapperTypes
let interfaceType = type.GetInterfaces().Single()
let ctor = type.GetConstructor([typeof(ActorSystem), typeof(IActorRef)]) ??
throw new InvalidOperationException($"No public parameterless constructor found for {type}")
let systemParam = Expression.Parameter(typeof(ActorSystem), "system")
let actorRefParam = Expression.Parameter(typeof(IActorRef), "actor")
let creator = Expression.Lambda<Func<ActorSystem, IActorRef, object>>(Expression.New(ctor, systemParam, actorRefParam), systemParam, actorRefParam)
select KeyValuePair.Create(interfaceType, creator.Compile());
creators = creatorList.ToFrozenDictionary();
}
public static T Create<T>(ActorSystem system, IActorRef actor) where T : class =>
(T) Create(typeof(T), system, actor);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static object Create(Type interfaceType, ActorSystem system, IActorRef actor) =>
creators[interfaceType](system, actor);
}
using System.Reflection;
using Akka.Actor;
using LanguageExt.UnitsOfMeasure;
using RZ.Foundation;
using Seq = LanguageExt.Seq;
namespace RZ.AkkaNet.Blob;
public static class BlobDelivery
{
/// Maximum lifetime of blob sender actor. A retriever must try to retrieve all chunks within this time.
public static readonly TimeSpan BlobSenderLifetime = 1.Minutes();
/// <summary>
/// The size of blob chunk which must be less than overflow of Akka.NET message size (default = 128K).
/// </summary>
const int ChunkSize = 64 * 1024;
/// <summary>
/// Create a blob sender and initialize blob delivery by returning the delivery box.
/// </summary>
/// <param name="actorContext"></param>
/// <param name="boxId"></param>
/// <param name="data"></param>
/// <returns></returns>
public static BlobDeliveryBox Deliver<T>(this IUntypedActorContext actorContext, Guid boxId, Either<ErrorInfo,T> data) =>
Deliver(actorContext.System, actorContext.CreateActor<BlobSender>, boxId, data);
#region Handle call with blob
public static void HandleBlobCallParameter(this IActorRef self, IActorRef sender, ActorSystem system, BlobCallParameter call) =>
Task.Run(async () => await call.Parameter.Retrieve(system))
.PipeTo(self, sender, data => new BlobCallParameterRaw(data, call.Parameter.DataType, call.ReturnType));
public static void UnwrapBlobDeliveryBox(this IActorRef self, IActorRef sender, ActorSystem system, BlobCallParameterRaw call) {
if (call.Data.IfRight(out var blob, out var error)) {
var data = system.Serialization.FindSerializerForType(call.DataType).FromBinary(blob, call.DataType);
self.Tell(data, sender);
}
else {
// TODO improve for performance
var eitherType = typeof(Either<,>).MakeGenericType(typeof(ErrorInfo), call.ReturnType);
var either = eitherType.GetMethod("Left", BindingFlags.Static)!.Invoke(null, [error]);
sender.Tell(either);
}
}
#endregion
#region Retriever
/// <summary>
/// Initialize blob retrieval and convert it to the standard error for async operation.
/// </summary>
/// <param name="service"></param>
/// <param name="system"></param>
/// <param name="message"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static EitherAsync<ErrorInfo, T> RetrieveBlob<T>(this IActorRef service, ActorSystem system, object message) =>
from deliveryBox in TryCatch(async () => await service.Ask<BlobDeliveryBox>(message))
from result in deliveryBox.RetrieveBlob<T>(system)
select result;
public static EitherAsync<ErrorInfo, T> RetrieveBlob<T>(this BlobDeliveryBox deliveryBox, ActorSystem system) =>
from _ in guard(deliveryBox.DataType == typeof(Either<ErrorInfo, T>),
ErrorInfo.Of(ErrorCodes.InvalidResponse, $"Unexpected data type: {deliveryBox.DataType}"))
from accumulated in deliveryBox.Retrieve(system)
from result in Deserialize<T>(system, accumulated).ToAsync()
select result;
static Either<ErrorInfo, T> Deserialize<T>(ActorSystem system, byte[] data) =>
system.Serialization
.FindSerializerForType(typeof(Either<ErrorInfo,T>))
.FromBinary<Either<ErrorInfo, T>>(data);
public static EitherAsync<ErrorInfo, byte[]> Retrieve(this BlobDeliveryBox deliveryBox, ActorSystem system) {
var retriever = Inbox.Create(system);
retriever.Send(deliveryBox.BlobInitiator, new BlobMessages.InitializeBlobRetrieval(deliveryBox.Id));
var init = EitherAsync<ErrorInfo, Lst<BlobChunk>>.Right(Lst<BlobChunk>.Empty);
var accumulated = Seq.generate(deliveryBox.TotalChunks, _ => retriever.RetrieveData(deliveryBox.Id))
.Fold(init,
(current, value) => from list in current
from v in value
select list.Add(v))
.Bind(result => CollectChunk(result).ToAsync())
// Guarantee disposal of retriever. I know, it's not nice 😔
.BiMap(x => x.SideEffect(_ => retriever.Dispose()),
x => x.SideEffect(_ => retriever.Dispose()));
return accumulated;
}
#endregion
#region Sender
public static BlobCallParameter SendBlob<TReturn>(this ActorSystem system, Guid boxId, object message) =>
new(Deliver(system, system.CreateActor<BlobSender>, boxId, message), typeof(TReturn));
public static EitherAsync<ErrorInfo, TReturn> SendBlob<TReturn>(this IActorRef service, ActorSystem system, Guid boxId, object message) =>
service.SafeAsk<TReturn>(system.SendBlob<TReturn>(boxId, message));
#endregion
static BlobDeliveryBox Deliver(ActorSystem system, Func<string, object[], IActorRef> createBlobSender, Guid boxId, object data) {
var buffer = system.Serialization.FindSerializerFor(data).ToBinary(data);
var totalChunks = (buffer.Length + ChunkSize -1 ) / ChunkSize;
var chunks = Enumerable.Range(0, totalChunks).Map(n => {
var offset = n * ChunkSize;
var length = Math.Min(ChunkSize, buffer.Length - offset);
return new Memory<byte>(buffer, offset, length);
}).ToArray().ToSeq();
var initiator = createBlobSender(boxId.ToString(), [boxId, chunks]);
return new BlobDeliveryBox(boxId, data.GetType(), buffer.Length, totalChunks, initiator);
}
static Either<ErrorInfo, byte[]> CollectChunk(Lst<BlobChunk> chunks) {
var sequence = chunks.OrderBy(b => b.Index).ToSeq();
if (sequence.IsEmpty) return ErrorInfo.Of(ErrorCodes.InvalidResponse, "No chunk received");
var sequenceIsCorrect = Enumerable.Range(0, chunks.Count - 1).All(i => sequence[i].Index + 1 == sequence[i + 1].Index);
if (!sequenceIsCorrect) return ErrorInfo.Of(ErrorCodes.InvalidResponse, $"Chunk sequence is incorrect: {sequence.Map(b => b.Index.ToString()).Join(", ")}");
var buffer = new List<byte>(sequence.Sum(b => b.Data.Length));
sequence.Iter(b => buffer.AddRange(b.Data));
return buffer.ToArray();
}
static EitherAsync<ErrorInfo, BlobChunk> RetrieveData(this Inbox inbox, Guid chunkId) =>
TryAsync(() => inbox.ReceiveAsync(AkkaSettings.AskTimeout))
.Map(v => (Either<ErrorInfo, BlobChunk>) v)
.ToEither(ErrorFrom.Exception)
.Bind(v => v.ToAsync())
.Bind(v => v.Id == chunkId
? RightAsync<ErrorInfo, BlobChunk>(v)
: ErrorInfo.Of(ErrorCodes.InvalidResponse, $"Chunk ID is mismatched, possibly bug! Expect: {chunkId}, Got: {v.Id}"));
}
using Akka.Actor;
using RZ.CoreApp;
namespace RZ.AkkaNet.Blob;
public sealed class BlobSender(Guid boxId, Seq<Memory<byte>> chunks) : UntypedActor, IWithTimers
{
const string FailureTimerName = "timeout";
protected override void PreStart() {
Timers.StartSingleTimer(FailureTimerName, PoisonPill.Instance, BlobDelivery.BlobSenderLifetime);
}
protected override void OnReceive(object message) {
switch (message) {
case BlobMessages.InitializeBlobRetrieval m:
if (m.BlobId == boxId)
chunks.Iter((n, chunk) => Sender.Tell(Either<ErrorInfo, BlobChunk>.Right(new BlobChunk(boxId, n, chunk.ToArray()))));
else
Sender.Tell(Either<ErrorInfo, BlobChunk>.Left(ErrorInfo.Of(ErrorCodes.InvalidRequest, $"Incorrect Box ID {m.BlobId}")));
Self.Tell(PoisonPill.Instance);
break;
default:
Unhandled(message);
break;
}
}
public required ITimerScheduler Timers { get; set; }
}
using Akka.Actor;
namespace RZ.AkkaNet.Blob;
public sealed record BlobCallParameter(BlobDeliveryBox Parameter, Type ReturnType);
public sealed record BlobCallParameterRaw(Either<ErrorInfo, byte[]> Data, Type DataType, Type ReturnType);
public sealed record BlobDeliveryBox(Guid Id, Type DataType, int TotalBytes, int TotalChunks, IActorRef BlobInitiator);
public sealed record BlobChunk(Guid Id, int Index, byte[] Data);
public static class BlobMessages
{
public sealed record InitializeBlobRetrieval(Guid BlobId);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment