Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created March 5, 2025 23:31
Show Gist options
  • Save to11mtm/6cfeda1e956f42fa4f007e82fe4c2983 to your computer and use it in GitHub Desktop.
Save to11mtm/6cfeda1e956f42fa4f007e82fe4c2983 to your computer and use it in GitHub Desktop.
WIP/POC for an akka streams unfoldasync stage in the middle of a flow.
//// THIS IS PROBABLY NOT QUITE WORKING IN A FEW WAYS, JUST A CONVERSATION STARTER AT THIS POINT
/// <summary>
/// INTERNAL API
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
[InternalApi]
public sealed class UnfoldAsync<TIn, TUnfoldState, TOut> : GraphStage<FlowShape<TIn, TOut>>
{
#region internal classes
private sealed class Logic : InAndOutGraphStageLogic
{
private class Holder
{
private readonly Action<Holder> _callback;
public Holder(object message, Result<(Option<TUnfoldState>,TOut)> element, Action<Holder> callback)
{
_callback = callback;
Message = message;
Element = element;
}
public Result<(Option<TUnfoldState>,TOut)> Element { get; private set; }
public object Message { get; }
public void SetElement(Result<(Option<TUnfoldState>, TOut)> result)
{
Element = result.IsSuccess && result.Value.Equals(default((TOut,TUnfoldState)))
? Result.Failure<(Option<TUnfoldState>,TOut)>(ReactiveStreamsCompliance.ElementMustNotBeNullException)
: result;
}
public void Invoke(Result<(Option<TUnfoldState>,TOut)> result)
{
SetElement(result);
_callback(this);
}
}
private static readonly Result<(Option<TUnfoldState>,TOut)> NotYetThere =
Result.Failure<(Option<TUnfoldState>,TOut)>(new Exception());
private readonly UnfoldAsync<TIn, TUnfoldState, TOut> _stage;
private readonly Decider _decider;
private IBuffer<Holder> _buffer;
private readonly Action<Holder> _taskCallback;
private TUnfoldState _state = default;
public Logic(Attributes inheritedAttributes, UnfoldAsync<TIn,TUnfoldState, TOut> stage) : base(stage.Shape)
{
_stage = stage;
var attr = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null);
_decider = attr != null ? attr.Decider : Deciders.StoppingDecider;
_taskCallback = GetAsyncCallback<Holder>(HolderCompleted);
SetHandlers(stage.In, stage.Out, this);
}
public override void OnPush()
{
var message = Grab(_stage.In);
try
{
var task = _stage._mapFunc((_state,message));
var holder = new Holder(message, NotYetThere, _taskCallback);
_buffer.Enqueue(holder);
// We dispatch the task if it's ready to optimize away
// scheduling it to an execution context
if (task.IsCompleted)
{
holder.SetElement(Result.FromTask(task));
HolderCompleted(holder);
}
else
task.ContinueWith(t => holder.Invoke(Result.FromTask(t)),
TaskContinuationOptions.ExecuteSynchronously);
}
catch (Exception e)
{
var strategy = _decider(e);
Log.Error(e, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", message, strategy);
switch (strategy)
{
case Directive.Stop:
FailStage(e);
break;
case Directive.Resume:
case Directive.Restart:
break;
default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", e);
}
}
if (Todo < _stage._parallelism && !HasBeenPulled(_stage.In))
TryPull(_stage.In);
}
public override void OnUpstreamFinish()
{
if (Todo == 0)
CompleteStage();
}
public override void OnPull() => PushOne();
private int Todo => _buffer.Used;
public override void PreStart() => _buffer = Buffer.Create<Holder>(_stage._parallelism, Materializer);
private void PushOne()
{
var inlet = _stage.In;
while (true)
{
if (_buffer.IsEmpty)
{
if (IsClosed(inlet))
CompleteStage();
else if (!HasBeenPulled(inlet))
Pull(inlet);
}
else if (_buffer.Peek().Element == NotYetThere)
{
if (Todo < _stage._parallelism && !HasBeenPulled(inlet))
TryPull(inlet);
}
else
{
var holder = _buffer.Dequeue();
var result = holder.Element;
if (!result.IsSuccess)
{
// this could happen if we are looping in PushOne and end up on a failed Task before the
// HolderCompleted callback has run
var strategy = _decider(result.Exception);
Log.Error(result.Exception, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", holder.Message, strategy);
switch (strategy)
{
case Directive.Stop:
FailStage(result.Exception);
return;
case Directive.Resume:
case Directive.Restart:
break;
default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception);
}
continue;
}
Push(_stage.Out, result.Value.Item2);
if (Todo < _stage._parallelism && !HasBeenPulled(inlet))
TryPull(inlet);
}
break;
}
}
private void HolderCompleted(Holder holder)
{
var element = holder.Element;
if (element.IsSuccess)
{
_state = holder.Element.Value.Item1.Value;
if (IsAvailable(_stage.Out))
PushOne();
return;
}
var exception = element.Exception;
var strategy = _decider(exception);
Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy);
switch (strategy)
{
case Directive.Stop:
FailStage(exception);
break;
case Directive.Resume:
case Directive.Restart:
if (IsAvailable(_stage.Out))
PushOne();
break;
default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", exception);
}
}
public override string ToString() => $"SelectAsync.Logic(buffer={_buffer})";
}
#endregion
private readonly int _parallelism = 1;
private readonly Func<(TUnfoldState, TIn), Task<(Option<TUnfoldState>, TOut)>> _mapFunc;
/// <summary>
/// TBD
/// </summary>
public readonly Inlet<TIn> In = new("SelectAsync.in");
/// <summary>
/// TBD
/// </summary>
public readonly Outlet<TOut> Out = new("SelectAsync.out");
/// <summary>
/// TBD
/// </summary>
/// <param name="parallelism">TBD</param>
/// <param name="mapFunc">TBD</param>
public UnfoldAsync(Func<(TUnfoldState, TIn), Task<(Option<TUnfoldState>, TOut)>> mapFunc)
{
_mapFunc = mapFunc;
Shape = new FlowShape<TIn, TOut>(In, Out);
}
/// <summary>
/// TBD
/// </summary>
protected override Attributes InitialAttributes { get; } = Attributes.CreateName("selectAsync");
/// <summary>
/// TBD
/// </summary>
public override FlowShape<TIn, TOut> Shape { get; }
/// <summary>
/// TBD
/// </summary>
/// <param name="inheritedAttributes">TBD</param>
/// <returns>TBD</returns>
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
=> new Logic(inheritedAttributes, this);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment