Skip to content

Instantly share code, notes, and snippets.

@electricessence
Last active April 21, 2019 19:33
Show Gist options
  • Save electricessence/9bdac902d2a56d2804bc93ed11c0b832 to your computer and use it in GitHub Desktop.
Save electricessence/9bdac902d2a56d2804bc93ed11c0b832 to your computer and use it in GitHub Desktop.
Dataflow Extensions
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.IO;
namespace System.Threading.Tasks.Dataflow
{
public static class DataflowExtensions
{
public static BatchBlock<T> Batch<T>(this ISourceBlock<T> source,
int batchSize,
GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
var batchBlock = dataflowBlockOptions == null
? new BatchBlock<T>(batchSize)
: new BatchBlock<T>(batchSize, dataflowBlockOptions);
source.LinkToWithCompletion(batchBlock);
return batchBlock;
}
public static async ValueTask<List<T>> ReadUntilComplete<T>(this IReceivableSourceBlock<T> source)
{
var result = new List<T>();
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(null, out var e))
{
result.Add(e);
}
}
return result;
}
public static async ValueTask<int> AllLinesTo(this TextReader source,
ITargetBlock<string> target)
{
var count = 0;
string line;
while ((line = await source.ReadLineAsync()) != null)
{
if (!target.Post(line) && !await target.SendAsync(line))
break;
count++;
}
return count;
}
public static async ValueTask<int> AllLinesTo<T>(this TextReader source,
ITargetBlock<T> target,
Func<string, T> transform)
{
if (source == null)
throw new NullReferenceException();
Contract.EndContractBlock();
var count = 0;
string line;
while ((line = await source.ReadLineAsync()) != null)
{
var e = transform(line);
if (!target.Post(e) && !await target.SendAsync(e))
break;
count++;
}
return count;
}
public static ValueTask<List<T>> Throttle<T>(this IEnumerable<Lazy<T>> source, int maxConcurrent, bool ensureOrdered = false)
where T : Task
{
if (source == null)
throw new NullReferenceException();
Contract.EndContractBlock();
var throttler = new TransformBlock<Lazy<T>, T>(
async lazy =>
{
var t = lazy.Value;
await t;
return t;
},
new ExecutionDataflowBlockOptions()
{
EnsureOrdered = ensureOrdered,
SingleProducerConstrained = true,
MaxDegreeOfParallelism = maxConcurrent
});
source.ToTargetBlock(throttler);
throttler.Complete();
return throttler.ReadUntilComplete();
}
static DataflowLinkOptions PropagateLink() =>
new DataflowLinkOptions() { PropagateCompletion = true };
public static IDisposable LinkToWithCompletion<T>(this ISourceBlock<T> source, ITargetBlock<T> target)
=> source.LinkTo(target, PropagateLink());
public static TTarget PipeCompletion<T, TTarget>(this ISourceBlock<T> source, TTarget target)
where TTarget : ITargetBlock<T>
{
if (source == null)
throw new NullReferenceException();
if (target == null)
throw new ArgumentNullException(nameof(target));
Contract.EndContractBlock();
source.LinkTo(target, PropagateLink());
return target;
}
public static int ToTargetBlock<T>(this IEnumerable<T> source,
ITargetBlock<T> target)
{
if (source == null)
throw new NullReferenceException();
if (target == null)
throw new ArgumentNullException(nameof(target));
Contract.EndContractBlock();
var count = 0;
foreach (var entry in source)
{
if (!target.Post(entry))
break;
count++;
}
return count;
}
public static ValueTask<int> ToTargetBlockAsync<T>(this IEnumerable<T> source,
ITargetBlock<T> target,
CancellationToken cancellationToken = default)
{
if (source == null)
throw new NullReferenceException();
if (target == null)
throw new ArgumentNullException(nameof(target));
Contract.EndContractBlock();
if (source is ICollection<T> c && c.Count == 0)
return new ValueTask<int>(0);
return ToTargetBlockAsyncCore();
async ValueTask<int> ToTargetBlockAsyncCore()
{
var count = 0;
foreach (var entry in source)
{
if (cancellationToken.IsCancellationRequested
|| !target.Post(entry) && !await target.SendAsync(entry))
break;
count++;
}
return count;
}
}
public static ISourceBlock<T> AsBufferBlock<T>(this IEnumerable<T> source,
int capacity = DataflowBlockOptions.Unbounded,
CancellationToken cancellationToken = default)
{
if (source == null)
throw new NullReferenceException();
Contract.EndContractBlock();
var buffer = new BufferBlock<T>(new DataflowBlockOptions()
{
BoundedCapacity = capacity,
CancellationToken = cancellationToken
});
var result = ToTargetBlockAsync(source, buffer, cancellationToken);
if (result.IsCompletedSuccessfully)
{
buffer.Complete();
}
else
{
_ = CallCompleteWhenFinished();
async ValueTask CallCompleteWhenFinished()
{
await result;
buffer.Complete();
}
}
return buffer;
}
public static ActionBlock<T> Pipe<T>(this ISourceBlock<T> source,
Action<T> handler,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (source == null)
throw new NullReferenceException();
if (handler == null)
throw new ArgumentNullException(nameof(handler));
Contract.EndContractBlock();
var receiver = dataflowBlockOptions == null
? new ActionBlock<T>(handler)
: new ActionBlock<T>(handler, dataflowBlockOptions);
source.LinkToWithCompletion(receiver);
return receiver;
}
public static ActionBlock<T> Pipe<T>(this ISourceBlock<T> source,
Func<T, Task> handler,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (source == null)
throw new NullReferenceException();
if (handler == null)
throw new ArgumentNullException(nameof(handler));
Contract.EndContractBlock();
var receiver = dataflowBlockOptions == null
? new ActionBlock<T>(handler)
: new ActionBlock<T>(handler, dataflowBlockOptions);
source.LinkToWithCompletion(receiver);
return receiver;
}
public static TransformBlock<TIn, TOut> Pipe<TIn, TOut>(this ISourceBlock<TIn> source,
Func<TIn, TOut> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (source == null)
throw new NullReferenceException();
if (transform == null)
throw new ArgumentNullException(nameof(transform));
Contract.EndContractBlock();
var receiver = dataflowBlockOptions == null
? new TransformBlock<TIn, TOut>(transform)
: new TransformBlock<TIn, TOut>(transform, dataflowBlockOptions);
source.LinkToWithCompletion(receiver);
return receiver;
}
public static TransformBlock<TIn, TOut> Pipe<TIn, TOut>(this ISourceBlock<TIn> source,
Func<TIn, Task<TOut>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (source == null)
throw new NullReferenceException();
if (transform == null)
throw new ArgumentNullException(nameof(transform));
Contract.EndContractBlock();
var receiver = dataflowBlockOptions == null
? new TransformBlock<TIn, TOut>(transform)
: new TransformBlock<TIn, TOut>(transform, dataflowBlockOptions);
source.LinkToWithCompletion(receiver);
return receiver;
}
/// <summary>
/// Pipes the source data through a single producer constrained ActionBlock with the specified max concurrency.
/// </summary>
/// <typeparam name="T">The input type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="maxConcurrency">The maximum concurrency of the action block</param>
/// <param name="handler">The handler function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The ActionBlock created.</returns>
public static ActionBlock<T> Parallel<T>(this ISourceBlock<T> source,
int maxConcurrency,
Action<T> handler,
CancellationToken cancellationToken = default)
=> source.Pipe(handler, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxConcurrency,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained ActionBlock with the specified max concurrency.
/// </summary>
/// <typeparam name="T">The input type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="maxConcurrency">The maximum concurrency of the action block</param>
/// <param name="handler">The async handler function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The ActionBlock created.</returns>
public static ActionBlock<T> Parallel<T>(this ISourceBlock<T> source,
int maxConcurrency,
Func<T, Task> handler,
CancellationToken cancellationToken = default)
=> source.Pipe(handler, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxConcurrency,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained TransformBlock with the specified max concurrency.
/// </summary>
/// <typeparam name="TIn">The input type.</typeparam>
/// <typeparam name="TOut">The output type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="maxConcurrency">The maximum concurrency of the transform block</param>
/// <param name="transform">The transform function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The TransformBlock created.</returns>
public static TransformBlock<TIn, TOut> Parallel<TIn, TOut>(this ISourceBlock<TIn> source,
int maxConcurrency,
Func<TIn, TOut> transform,
CancellationToken cancellationToken = default)
=> source.Pipe(transform, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxConcurrency,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained TransformBlock with the specified max concurrency.
/// </summary>
/// <typeparam name="TIn">The input type.</typeparam>
/// <typeparam name="TOut">The output type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="maxConcurrency">The maximum concurrency of the transform block</param>
/// <param name="transform">The async transform function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The TransformBlock created.</returns>
public static TransformBlock<TIn, TOut> Parallel<TIn, TOut>(this ISourceBlock<TIn> source,
int maxConcurrency,
Func<TIn, Task<TOut>> transform,
bool singleProducerConstrained = true,
CancellationToken cancellationToken = default)
=> source.Pipe(transform, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxConcurrency,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained ActionBlock with the specified bounded capacity and max concurrency.
/// </summary>
/// <typeparam name="T">The input type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="capacity">The bounded capacity of the transform block</param>
/// <param name="maxConcurrency">The maximum concurrency of the action block</param>
/// <param name="handler">The handler function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The ActionBlock created.</returns>
public static ActionBlock<T> BoundedParallel<T>(this ISourceBlock<T> source,
int capacity,
int maxConcurrency,
Action<T> handler,
CancellationToken cancellationToken = default)
=> source.Pipe(handler, new ExecutionDataflowBlockOptions
{
BoundedCapacity = capacity,
MaxDegreeOfParallelism = maxConcurrency,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained ActionBlock with the specified bounded capacity and max concurrency.
/// </summary>
/// <typeparam name="T">The input type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="capacity">The bounded capacity of the transform block</param>
/// <param name="maxConcurrency">The maximum concurrency of the action block</param>
/// <param name="handler">The async handler function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The ActionBlock created.</returns>
public static ActionBlock<T> BoundedParallel<T>(this ISourceBlock<T> source,
int capacity,
int maxConcurrency,
Func<T, Task> handler,
CancellationToken cancellationToken = default)
=> source.Pipe(handler, new ExecutionDataflowBlockOptions
{
BoundedCapacity = capacity,
MaxDegreeOfParallelism = maxConcurrency,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained TransformBlock with the specified bounded capacity and max concurrency.
/// </summary>
/// <typeparam name="TIn">The input type.</typeparam>
/// <typeparam name="TOut">The output type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="capacity">The bounded capacity of the transform block</param>
/// <param name="maxConcurrency">The maximum concurrency of the transform block</param>
/// <param name="transform">The transform function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The TransformBlock created.</returns>
public static TransformBlock<TIn, TOut> BoundedParallel<TIn, TOut>(this ISourceBlock<TIn> source,
int capacity,
int maxConcurrency,
Func<TIn, TOut> transform,
CancellationToken cancellationToken = default)
=> source.Pipe(transform, new ExecutionDataflowBlockOptions
{
BoundedCapacity = capacity,
MaxDegreeOfParallelism = maxConcurrency,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained TransformBlock with the specified bounded capacity and max concurrency.
/// </summary>
/// <typeparam name="TIn">The input type.</typeparam>
/// <typeparam name="TOut">The output type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="capacity">The bounded capacity of the transform block</param>
/// <param name="maxConcurrency">The maximum concurrency of the transform block</param>
/// <param name="transform">The async transform function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The TransformBlock created.</returns>
public static TransformBlock<TIn, TOut> BoundedParallel<TIn, TOut>(this ISourceBlock<TIn> source,
int capacity,
int maxConcurrency,
Func<TIn, Task<TOut>> transform,
CancellationToken cancellationToken = default)
=> source.Pipe(transform, new ExecutionDataflowBlockOptions
{
BoundedCapacity = capacity,
MaxDegreeOfParallelism = maxConcurrency,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained ActionBlock with the specified bounded capacity.
/// </summary>
/// <typeparam name="T">The input type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="capacity">The bounded capacity of the transform block</param>
/// <param name="handler">The handler function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The ActionBlock created.</returns>
public static ActionBlock<T> Bounded<T>(this ISourceBlock<T> source,
int capacity,
Action<T> handler,
CancellationToken cancellationToken = default)
=> source.Pipe(handler, new ExecutionDataflowBlockOptions
{
BoundedCapacity = capacity,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained ActionBlock with the specified bounded capacity.
/// </summary>
/// <typeparam name="T">The input type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="capacity">The bounded capacity of the transform block</param>
/// <param name="handler">The async handler function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The ActionBlock created.</returns>
public static ActionBlock<T> Bounded<T>(this ISourceBlock<T> source,
int capacity,
Func<T, Task> handler,
CancellationToken cancellationToken = default)
=> source.Pipe(handler, new ExecutionDataflowBlockOptions
{
BoundedCapacity = capacity,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained TransformBlock with the specified bounded capacity.
/// </summary>
/// <typeparam name="TIn">The input type.</typeparam>
/// <typeparam name="TOut">The output type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="capacity">The bounded capacity of the transform block</param>
/// <param name="transform">The transform function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The TransformBlock created.</returns>
public static TransformBlock<TIn, TOut> Bounded<TIn, TOut>(this ISourceBlock<TIn> source,
int capacity,
Func<TIn, TOut> transform,
CancellationToken cancellationToken = default)
=> source.Pipe(transform, new ExecutionDataflowBlockOptions
{
BoundedCapacity = capacity,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Pipes the source data through a single producer constrained TransformBlock with the specified bounded capacity.
/// </summary>
/// <typeparam name="TIn">The input type.</typeparam>
/// <typeparam name="TOut">The output type.</typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="capacity">The bounded capacity of the transform block</param>
/// <param name="transform">The async transform function to apply.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The TransformBlock created.</returns>
public static TransformBlock<TIn, TOut> Bounded<TIn, TOut>(this ISourceBlock<TIn> source,
int capacity,
Func<TIn, Task<TOut>> transform,
CancellationToken cancellationToken = default)
=> source.Pipe(transform, new ExecutionDataflowBlockOptions
{
BoundedCapacity = capacity,
SingleProducerConstrained = true,
CancellationToken = cancellationToken
});
/// <summary>
/// Processes an item through an acceptor function.
/// If the acceptor returns true, then it was accepted and subsequently received/taken from the source.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source">The source block to receive from.</param>
/// <param name="acceptor">The function to process the item and decide if accepted.</param>
/// <returns>The original source block to allow for more acceptors or filters to be applied.</returns>
public static ISourceBlock<T> AcceptOrPass<T>(this ISourceBlock<T> source,
Func<T, bool> acceptor)
{
var receiver = new AcceptOrPassBlock<T>(acceptor);
source.LinkTo(receiver, PropagateLink());
return source;
}
}
sealed class AcceptOrPassBlock<T> : ITargetBlock<T>
{
private readonly Func<T, bool> _handler;
private readonly ITargetBlock<T> _completer;
private AcceptOrPassBlock()
{
_completer = new ActionBlock<T>(e => { });
Completion = _completer.Completion;
}
public AcceptOrPassBlock(Func<T, bool> handler) : this()
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}
public DataflowMessageStatus OfferMessage(
DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source,
bool consumeToAccept)
{
if (_completer.Completion.IsCompleted)
return DataflowMessageStatus.DecliningPermanently;
return _handler(messageValue)
? DataflowMessageStatus.Accepted
: DataflowMessageStatus.Declined;
}
public void Complete()
=> _completer.Complete();
public void Fault(Exception exception)
=> _completer.Fault(exception);
public Task Completion { get; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment