Created
March 5, 2025 23:31
-
-
Save to11mtm/6cfeda1e956f42fa4f007e82fe4c2983 to your computer and use it in GitHub Desktop.
WIP/POC for an akka streams unfoldasync stage in the middle of a flow.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//// THIS IS PROBABLY NOT QUITE WORKING IN A FEW WAYS, JUST A CONVERSATION STARTER AT THIS POINT | |
/// <summary> | |
/// INTERNAL API | |
/// </summary> | |
/// <typeparam name="TIn">TBD</typeparam> | |
/// <typeparam name="TOut">TBD</typeparam> | |
[InternalApi] | |
public sealed class UnfoldAsync<TIn, TUnfoldState, TOut> : GraphStage<FlowShape<TIn, TOut>> | |
{ | |
#region internal classes | |
private sealed class Logic : InAndOutGraphStageLogic | |
{ | |
private class Holder | |
{ | |
private readonly Action<Holder> _callback; | |
public Holder(object message, Result<(Option<TUnfoldState>,TOut)> element, Action<Holder> callback) | |
{ | |
_callback = callback; | |
Message = message; | |
Element = element; | |
} | |
public Result<(Option<TUnfoldState>,TOut)> Element { get; private set; } | |
public object Message { get; } | |
public void SetElement(Result<(Option<TUnfoldState>, TOut)> result) | |
{ | |
Element = result.IsSuccess && result.Value.Equals(default((TOut,TUnfoldState))) | |
? Result.Failure<(Option<TUnfoldState>,TOut)>(ReactiveStreamsCompliance.ElementMustNotBeNullException) | |
: result; | |
} | |
public void Invoke(Result<(Option<TUnfoldState>,TOut)> result) | |
{ | |
SetElement(result); | |
_callback(this); | |
} | |
} | |
private static readonly Result<(Option<TUnfoldState>,TOut)> NotYetThere = | |
Result.Failure<(Option<TUnfoldState>,TOut)>(new Exception()); | |
private readonly UnfoldAsync<TIn, TUnfoldState, TOut> _stage; | |
private readonly Decider _decider; | |
private IBuffer<Holder> _buffer; | |
private readonly Action<Holder> _taskCallback; | |
private TUnfoldState _state = default; | |
public Logic(Attributes inheritedAttributes, UnfoldAsync<TIn,TUnfoldState, TOut> stage) : base(stage.Shape) | |
{ | |
_stage = stage; | |
var attr = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null); | |
_decider = attr != null ? attr.Decider : Deciders.StoppingDecider; | |
_taskCallback = GetAsyncCallback<Holder>(HolderCompleted); | |
SetHandlers(stage.In, stage.Out, this); | |
} | |
public override void OnPush() | |
{ | |
var message = Grab(_stage.In); | |
try | |
{ | |
var task = _stage._mapFunc((_state,message)); | |
var holder = new Holder(message, NotYetThere, _taskCallback); | |
_buffer.Enqueue(holder); | |
// We dispatch the task if it's ready to optimize away | |
// scheduling it to an execution context | |
if (task.IsCompleted) | |
{ | |
holder.SetElement(Result.FromTask(task)); | |
HolderCompleted(holder); | |
} | |
else | |
task.ContinueWith(t => holder.Invoke(Result.FromTask(t)), | |
TaskContinuationOptions.ExecuteSynchronously); | |
} | |
catch (Exception e) | |
{ | |
var strategy = _decider(e); | |
Log.Error(e, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", message, strategy); | |
switch (strategy) | |
{ | |
case Directive.Stop: | |
FailStage(e); | |
break; | |
case Directive.Resume: | |
case Directive.Restart: | |
break; | |
default: | |
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", e); | |
} | |
} | |
if (Todo < _stage._parallelism && !HasBeenPulled(_stage.In)) | |
TryPull(_stage.In); | |
} | |
public override void OnUpstreamFinish() | |
{ | |
if (Todo == 0) | |
CompleteStage(); | |
} | |
public override void OnPull() => PushOne(); | |
private int Todo => _buffer.Used; | |
public override void PreStart() => _buffer = Buffer.Create<Holder>(_stage._parallelism, Materializer); | |
private void PushOne() | |
{ | |
var inlet = _stage.In; | |
while (true) | |
{ | |
if (_buffer.IsEmpty) | |
{ | |
if (IsClosed(inlet)) | |
CompleteStage(); | |
else if (!HasBeenPulled(inlet)) | |
Pull(inlet); | |
} | |
else if (_buffer.Peek().Element == NotYetThere) | |
{ | |
if (Todo < _stage._parallelism && !HasBeenPulled(inlet)) | |
TryPull(inlet); | |
} | |
else | |
{ | |
var holder = _buffer.Dequeue(); | |
var result = holder.Element; | |
if (!result.IsSuccess) | |
{ | |
// this could happen if we are looping in PushOne and end up on a failed Task before the | |
// HolderCompleted callback has run | |
var strategy = _decider(result.Exception); | |
Log.Error(result.Exception, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", holder.Message, strategy); | |
switch (strategy) | |
{ | |
case Directive.Stop: | |
FailStage(result.Exception); | |
return; | |
case Directive.Resume: | |
case Directive.Restart: | |
break; | |
default: | |
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception); | |
} | |
continue; | |
} | |
Push(_stage.Out, result.Value.Item2); | |
if (Todo < _stage._parallelism && !HasBeenPulled(inlet)) | |
TryPull(inlet); | |
} | |
break; | |
} | |
} | |
private void HolderCompleted(Holder holder) | |
{ | |
var element = holder.Element; | |
if (element.IsSuccess) | |
{ | |
_state = holder.Element.Value.Item1.Value; | |
if (IsAvailable(_stage.Out)) | |
PushOne(); | |
return; | |
} | |
var exception = element.Exception; | |
var strategy = _decider(exception); | |
Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy); | |
switch (strategy) | |
{ | |
case Directive.Stop: | |
FailStage(exception); | |
break; | |
case Directive.Resume: | |
case Directive.Restart: | |
if (IsAvailable(_stage.Out)) | |
PushOne(); | |
break; | |
default: | |
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", exception); | |
} | |
} | |
public override string ToString() => $"SelectAsync.Logic(buffer={_buffer})"; | |
} | |
#endregion | |
private readonly int _parallelism = 1; | |
private readonly Func<(TUnfoldState, TIn), Task<(Option<TUnfoldState>, TOut)>> _mapFunc; | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public readonly Inlet<TIn> In = new("SelectAsync.in"); | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public readonly Outlet<TOut> Out = new("SelectAsync.out"); | |
/// <summary> | |
/// TBD | |
/// </summary> | |
/// <param name="parallelism">TBD</param> | |
/// <param name="mapFunc">TBD</param> | |
public UnfoldAsync(Func<(TUnfoldState, TIn), Task<(Option<TUnfoldState>, TOut)>> mapFunc) | |
{ | |
_mapFunc = mapFunc; | |
Shape = new FlowShape<TIn, TOut>(In, Out); | |
} | |
/// <summary> | |
/// TBD | |
/// </summary> | |
protected override Attributes InitialAttributes { get; } = Attributes.CreateName("selectAsync"); | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public override FlowShape<TIn, TOut> Shape { get; } | |
/// <summary> | |
/// TBD | |
/// </summary> | |
/// <param name="inheritedAttributes">TBD</param> | |
/// <returns>TBD</returns> | |
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) | |
=> new Logic(inheritedAttributes, this); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment