Last active
August 24, 2024 02:57
-
-
Save ruxo/0a5b358caa7b6f80010011c45b350fd1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Frozen; | |
using System.Runtime.CompilerServices; | |
using System.Text.Json; | |
using System.Text.Json.Nodes; | |
using Akka.Actor; | |
using Akka.DependencyInjection; | |
using Akka.IO; | |
using Microsoft.Extensions.Logging; | |
using TCRB.CoreApp; | |
namespace TCRB.DontCall.Helpers; | |
public static class ActorExtension | |
{ | |
[MethodImpl(MethodImplOptions.AggressiveInlining)] | |
static Props DependencyProps<T>(this ActorSystem sys, params object[] parameters) where T : ActorBase => | |
DependencyResolver.For(sys).Props<T>(parameters); | |
public static IActorRef CreateActor<T>(this ActorSystem sys, string name, params object[] parameters) where T : ActorBase => | |
sys.ActorOf(sys.DependencyProps<T>(parameters), name); | |
public static IActorRef CreateActor<T>(this IUntypedActorContext context, string name, params object[] parameters) where T : ActorBase => | |
context.ActorOf(context.System.DependencyProps<T>(parameters), name); | |
public static void ReceiveTcp(this IActorRef self, FrozenDictionary<string, Type> deserializers, ILogger logger, | |
Tcp.Received received) { | |
var data = received.Data.ToString(); | |
var (typeName, jsonObject) = JsonSerializer.Deserialize<(string, JsonObject)>(data); | |
if (deserializers.Get(typeName).IfSome(out var t)) | |
self.Forward(jsonObject.Deserialize(t)); | |
else | |
logger.LogError("Unexpected type: {Type}", typeName); | |
} | |
public static void TellTcp<T>(this IActorRef actor, T message) where T : notnull { | |
var package = JsonSerializer.Serialize((typeof(T).FullName, message), JsonStandardSerialization.Default); | |
actor.Tell(Tcp.Write.Create(ByteString.FromString(package))); | |
} | |
/// <summary> | |
/// Nicely wrap the actor's ask pattern into EitherAsync<ErrorInfo,T>. | |
/// </summary> | |
/// <param name="actor">Target actor</param> | |
/// <param name="message">A query message</param> | |
/// <typeparam name="T">A success type</typeparam> | |
/// <returns></returns> | |
public static EitherAsync<ErrorInfo,T> SafeAsk<T>(this ICanTell actor, object message) | |
=> TryCatch(async () => await actor.Ask<T>(message)); | |
public static void SafeTell<T>(this ICanTell actor, Func<T> handler, IActorRef? from = default) { | |
var sender = from ?? ActorCell.GetCurrentSelfOrNoSender(); | |
try { | |
actor.Tell(handler(), sender); | |
} catch (Exception e) { | |
actor.Tell(new Status.Failure(e is ErrorInfoException? e : new ErrorInfoException(ErrorFrom.Exception(e))), sender); | |
} | |
} | |
public static async ValueTask SafeTell<T>(this ICanTell actor, Func<ValueTask<T>> handler, IActorRef? from = default) { | |
var sender = from ?? ActorCell.GetCurrentSelfOrNoSender(); | |
try { | |
actor.Tell(await handler(), sender); | |
} catch (Exception e) { | |
actor.Tell(new Status.Failure(e is ErrorInfoException? e : new ErrorInfoException(ErrorFrom.Exception(e))), sender); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Frozen; | |
using System.Linq.Expressions; | |
using System.Reflection; | |
using System.Runtime.CompilerServices; | |
using Akka.Actor; | |
namespace RZ.AkkaNet; | |
[AttributeUsage(AttributeTargets.Class)] | |
public sealed class AkkaWrapperAttribute : Attribute; | |
public static class AkkaWrapper | |
{ | |
static readonly FrozenDictionary<Type, Func<ActorSystem, IActorRef, object>> creators; | |
static AkkaWrapper() { | |
var wrapperTypes = Seq(from assembly in AppDomain.CurrentDomain.GetAssemblies() | |
where assembly.GetName().Name!.StartsWith("DontCall") | |
from type in assembly.GetTypes() | |
where type.GetCustomAttribute<AkkaWrapperAttribute>() is not null | |
select type); | |
var creatorList = from type in wrapperTypes | |
let interfaceType = type.GetInterfaces().Single() | |
let ctor = type.GetConstructor([typeof(ActorSystem), typeof(IActorRef)]) ?? | |
throw new InvalidOperationException($"No public parameterless constructor found for {type}") | |
let systemParam = Expression.Parameter(typeof(ActorSystem), "system") | |
let actorRefParam = Expression.Parameter(typeof(IActorRef), "actor") | |
let creator = Expression.Lambda<Func<ActorSystem, IActorRef, object>>(Expression.New(ctor, systemParam, actorRefParam), systemParam, actorRefParam) | |
select KeyValuePair.Create(interfaceType, creator.Compile()); | |
creators = creatorList.ToFrozenDictionary(); | |
} | |
public static T Create<T>(ActorSystem system, IActorRef actor) where T : class => | |
(T) Create(typeof(T), system, actor); | |
[MethodImpl(MethodImplOptions.AggressiveInlining)] | |
public static object Create(Type interfaceType, ActorSystem system, IActorRef actor) => | |
creators[interfaceType](system, actor); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Reflection; | |
using Akka.Actor; | |
using LanguageExt.UnitsOfMeasure; | |
using RZ.Foundation; | |
using Seq = LanguageExt.Seq; | |
namespace RZ.AkkaNet.Blob; | |
public static class BlobDelivery | |
{ | |
/// Maximum lifetime of blob sender actor. A retriever must try to retrieve all chunks within this time. | |
public static readonly TimeSpan BlobSenderLifetime = 1.Minutes(); | |
/// <summary> | |
/// The size of blob chunk which must be less than overflow of Akka.NET message size (default = 128K). | |
/// </summary> | |
const int ChunkSize = 64 * 1024; | |
/// <summary> | |
/// Create a blob sender and initialize blob delivery by returning the delivery box. | |
/// </summary> | |
/// <param name="actorContext"></param> | |
/// <param name="boxId"></param> | |
/// <param name="data"></param> | |
/// <returns></returns> | |
public static BlobDeliveryBox Deliver<T>(this IUntypedActorContext actorContext, Guid boxId, Either<ErrorInfo,T> data) => | |
Deliver(actorContext.System, actorContext.CreateActor<BlobSender>, boxId, data); | |
#region Handle call with blob | |
public static void HandleBlobCallParameter(this IActorRef self, IActorRef sender, ActorSystem system, BlobCallParameter call) => | |
Task.Run(async () => await call.Parameter.Retrieve(system)) | |
.PipeTo(self, sender, data => new BlobCallParameterRaw(data, call.Parameter.DataType, call.ReturnType)); | |
public static void UnwrapBlobDeliveryBox(this IActorRef self, IActorRef sender, ActorSystem system, BlobCallParameterRaw call) { | |
if (call.Data.IfRight(out var blob, out var error)) { | |
var data = system.Serialization.FindSerializerForType(call.DataType).FromBinary(blob, call.DataType); | |
self.Tell(data, sender); | |
} | |
else { | |
// TODO improve for performance | |
var eitherType = typeof(Either<,>).MakeGenericType(typeof(ErrorInfo), call.ReturnType); | |
var either = eitherType.GetMethod("Left", BindingFlags.Static)!.Invoke(null, [error]); | |
sender.Tell(either); | |
} | |
} | |
#endregion | |
#region Retriever | |
/// <summary> | |
/// Initialize blob retrieval and convert it to the standard error for async operation. | |
/// </summary> | |
/// <param name="service"></param> | |
/// <param name="system"></param> | |
/// <param name="message"></param> | |
/// <typeparam name="T"></typeparam> | |
/// <returns></returns> | |
public static EitherAsync<ErrorInfo, T> RetrieveBlob<T>(this IActorRef service, ActorSystem system, object message) => | |
from deliveryBox in TryCatch(async () => await service.Ask<BlobDeliveryBox>(message)) | |
from result in deliveryBox.RetrieveBlob<T>(system) | |
select result; | |
public static EitherAsync<ErrorInfo, T> RetrieveBlob<T>(this BlobDeliveryBox deliveryBox, ActorSystem system) => | |
from _ in guard(deliveryBox.DataType == typeof(Either<ErrorInfo, T>), | |
ErrorInfo.Of(ErrorCodes.InvalidResponse, $"Unexpected data type: {deliveryBox.DataType}")) | |
from accumulated in deliveryBox.Retrieve(system) | |
from result in Deserialize<T>(system, accumulated).ToAsync() | |
select result; | |
static Either<ErrorInfo, T> Deserialize<T>(ActorSystem system, byte[] data) => | |
system.Serialization | |
.FindSerializerForType(typeof(Either<ErrorInfo,T>)) | |
.FromBinary<Either<ErrorInfo, T>>(data); | |
public static EitherAsync<ErrorInfo, byte[]> Retrieve(this BlobDeliveryBox deliveryBox, ActorSystem system) { | |
var retriever = Inbox.Create(system); | |
retriever.Send(deliveryBox.BlobInitiator, new BlobMessages.InitializeBlobRetrieval(deliveryBox.Id)); | |
var init = EitherAsync<ErrorInfo, Lst<BlobChunk>>.Right(Lst<BlobChunk>.Empty); | |
var accumulated = Seq.generate(deliveryBox.TotalChunks, _ => retriever.RetrieveData(deliveryBox.Id)) | |
.Fold(init, | |
(current, value) => from list in current | |
from v in value | |
select list.Add(v)) | |
.Bind(result => CollectChunk(result).ToAsync()) | |
// Guarantee disposal of retriever. I know, it's not nice 😔 | |
.BiMap(x => x.SideEffect(_ => retriever.Dispose()), | |
x => x.SideEffect(_ => retriever.Dispose())); | |
return accumulated; | |
} | |
#endregion | |
#region Sender | |
public static BlobCallParameter SendBlob<TReturn>(this ActorSystem system, Guid boxId, object message) => | |
new(Deliver(system, system.CreateActor<BlobSender>, boxId, message), typeof(TReturn)); | |
public static EitherAsync<ErrorInfo, TReturn> SendBlob<TReturn>(this IActorRef service, ActorSystem system, Guid boxId, object message) => | |
service.SafeAsk<TReturn>(system.SendBlob<TReturn>(boxId, message)); | |
#endregion | |
static BlobDeliveryBox Deliver(ActorSystem system, Func<string, object[], IActorRef> createBlobSender, Guid boxId, object data) { | |
var buffer = system.Serialization.FindSerializerFor(data).ToBinary(data); | |
var totalChunks = (buffer.Length + ChunkSize -1 ) / ChunkSize; | |
var chunks = Enumerable.Range(0, totalChunks).Map(n => { | |
var offset = n * ChunkSize; | |
var length = Math.Min(ChunkSize, buffer.Length - offset); | |
return new Memory<byte>(buffer, offset, length); | |
}).ToArray().ToSeq(); | |
var initiator = createBlobSender(boxId.ToString(), [boxId, chunks]); | |
return new BlobDeliveryBox(boxId, data.GetType(), buffer.Length, totalChunks, initiator); | |
} | |
static Either<ErrorInfo, byte[]> CollectChunk(Lst<BlobChunk> chunks) { | |
var sequence = chunks.OrderBy(b => b.Index).ToSeq(); | |
if (sequence.IsEmpty) return ErrorInfo.Of(ErrorCodes.InvalidResponse, "No chunk received"); | |
var sequenceIsCorrect = Enumerable.Range(0, chunks.Count - 1).All(i => sequence[i].Index + 1 == sequence[i + 1].Index); | |
if (!sequenceIsCorrect) return ErrorInfo.Of(ErrorCodes.InvalidResponse, $"Chunk sequence is incorrect: {sequence.Map(b => b.Index.ToString()).Join(", ")}"); | |
var buffer = new List<byte>(sequence.Sum(b => b.Data.Length)); | |
sequence.Iter(b => buffer.AddRange(b.Data)); | |
return buffer.ToArray(); | |
} | |
static EitherAsync<ErrorInfo, BlobChunk> RetrieveData(this Inbox inbox, Guid chunkId) => | |
TryAsync(() => inbox.ReceiveAsync(AkkaSettings.AskTimeout)) | |
.Map(v => (Either<ErrorInfo, BlobChunk>) v) | |
.ToEither(ErrorFrom.Exception) | |
.Bind(v => v.ToAsync()) | |
.Bind(v => v.Id == chunkId | |
? RightAsync<ErrorInfo, BlobChunk>(v) | |
: ErrorInfo.Of(ErrorCodes.InvalidResponse, $"Chunk ID is mismatched, possibly bug! Expect: {chunkId}, Got: {v.Id}")); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Akka.Actor; | |
using RZ.CoreApp; | |
namespace RZ.AkkaNet.Blob; | |
public sealed class BlobSender(Guid boxId, Seq<Memory<byte>> chunks) : UntypedActor, IWithTimers | |
{ | |
const string FailureTimerName = "timeout"; | |
protected override void PreStart() { | |
Timers.StartSingleTimer(FailureTimerName, PoisonPill.Instance, BlobDelivery.BlobSenderLifetime); | |
} | |
protected override void OnReceive(object message) { | |
switch (message) { | |
case BlobMessages.InitializeBlobRetrieval m: | |
if (m.BlobId == boxId) | |
chunks.Iter((n, chunk) => Sender.Tell(Either<ErrorInfo, BlobChunk>.Right(new BlobChunk(boxId, n, chunk.ToArray())))); | |
else | |
Sender.Tell(Either<ErrorInfo, BlobChunk>.Left(ErrorInfo.Of(ErrorCodes.InvalidRequest, $"Incorrect Box ID {m.BlobId}"))); | |
Self.Tell(PoisonPill.Instance); | |
break; | |
default: | |
Unhandled(message); | |
break; | |
} | |
} | |
public required ITimerScheduler Timers { get; set; } | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Akka.Actor; | |
namespace RZ.AkkaNet.Blob; | |
public sealed record BlobCallParameter(BlobDeliveryBox Parameter, Type ReturnType); | |
public sealed record BlobCallParameterRaw(Either<ErrorInfo, byte[]> Data, Type DataType, Type ReturnType); | |
public sealed record BlobDeliveryBox(Guid Id, Type DataType, int TotalBytes, int TotalChunks, IActorRef BlobInitiator); | |
public sealed record BlobChunk(Guid Id, int Index, byte[] Data); | |
public static class BlobMessages | |
{ | |
public sealed record InitializeBlobRetrieval(Guid BlobId); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment