Last active
April 21, 2019 19:33
-
-
Save electricessence/9bdac902d2a56d2804bc93ed11c0b832 to your computer and use it in GitHub Desktop.
Dataflow Extensions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Diagnostics.Contracts; | |
using System.IO; | |
namespace System.Threading.Tasks.Dataflow | |
{ | |
public static class DataflowExtensions | |
{ | |
public static BatchBlock<T> Batch<T>(this ISourceBlock<T> source, | |
int batchSize, | |
GroupingDataflowBlockOptions dataflowBlockOptions = null) | |
{ | |
var batchBlock = dataflowBlockOptions == null | |
? new BatchBlock<T>(batchSize) | |
: new BatchBlock<T>(batchSize, dataflowBlockOptions); | |
source.LinkToWithCompletion(batchBlock); | |
return batchBlock; | |
} | |
public static async ValueTask<List<T>> ReadUntilComplete<T>(this IReceivableSourceBlock<T> source) | |
{ | |
var result = new List<T>(); | |
while (await source.OutputAvailableAsync()) | |
{ | |
while (source.TryReceive(null, out var e)) | |
{ | |
result.Add(e); | |
} | |
} | |
return result; | |
} | |
public static async ValueTask<int> AllLinesTo(this TextReader source, | |
ITargetBlock<string> target) | |
{ | |
var count = 0; | |
string line; | |
while ((line = await source.ReadLineAsync()) != null) | |
{ | |
if (!target.Post(line) && !await target.SendAsync(line)) | |
break; | |
count++; | |
} | |
return count; | |
} | |
public static async ValueTask<int> AllLinesTo<T>(this TextReader source, | |
ITargetBlock<T> target, | |
Func<string, T> transform) | |
{ | |
if (source == null) | |
throw new NullReferenceException(); | |
Contract.EndContractBlock(); | |
var count = 0; | |
string line; | |
while ((line = await source.ReadLineAsync()) != null) | |
{ | |
var e = transform(line); | |
if (!target.Post(e) && !await target.SendAsync(e)) | |
break; | |
count++; | |
} | |
return count; | |
} | |
public static ValueTask<List<T>> Throttle<T>(this IEnumerable<Lazy<T>> source, int maxConcurrent, bool ensureOrdered = false) | |
where T : Task | |
{ | |
if (source == null) | |
throw new NullReferenceException(); | |
Contract.EndContractBlock(); | |
var throttler = new TransformBlock<Lazy<T>, T>( | |
async lazy => | |
{ | |
var t = lazy.Value; | |
await t; | |
return t; | |
}, | |
new ExecutionDataflowBlockOptions() | |
{ | |
EnsureOrdered = ensureOrdered, | |
SingleProducerConstrained = true, | |
MaxDegreeOfParallelism = maxConcurrent | |
}); | |
source.ToTargetBlock(throttler); | |
throttler.Complete(); | |
return throttler.ReadUntilComplete(); | |
} | |
static DataflowLinkOptions PropagateLink() => | |
new DataflowLinkOptions() { PropagateCompletion = true }; | |
public static IDisposable LinkToWithCompletion<T>(this ISourceBlock<T> source, ITargetBlock<T> target) | |
=> source.LinkTo(target, PropagateLink()); | |
public static TTarget PipeCompletion<T, TTarget>(this ISourceBlock<T> source, TTarget target) | |
where TTarget : ITargetBlock<T> | |
{ | |
if (source == null) | |
throw new NullReferenceException(); | |
if (target == null) | |
throw new ArgumentNullException(nameof(target)); | |
Contract.EndContractBlock(); | |
source.LinkTo(target, PropagateLink()); | |
return target; | |
} | |
public static int ToTargetBlock<T>(this IEnumerable<T> source, | |
ITargetBlock<T> target) | |
{ | |
if (source == null) | |
throw new NullReferenceException(); | |
if (target == null) | |
throw new ArgumentNullException(nameof(target)); | |
Contract.EndContractBlock(); | |
var count = 0; | |
foreach (var entry in source) | |
{ | |
if (!target.Post(entry)) | |
break; | |
count++; | |
} | |
return count; | |
} | |
public static ValueTask<int> ToTargetBlockAsync<T>(this IEnumerable<T> source, | |
ITargetBlock<T> target, | |
CancellationToken cancellationToken = default) | |
{ | |
if (source == null) | |
throw new NullReferenceException(); | |
if (target == null) | |
throw new ArgumentNullException(nameof(target)); | |
Contract.EndContractBlock(); | |
if (source is ICollection<T> c && c.Count == 0) | |
return new ValueTask<int>(0); | |
return ToTargetBlockAsyncCore(); | |
async ValueTask<int> ToTargetBlockAsyncCore() | |
{ | |
var count = 0; | |
foreach (var entry in source) | |
{ | |
if (cancellationToken.IsCancellationRequested | |
|| !target.Post(entry) && !await target.SendAsync(entry)) | |
break; | |
count++; | |
} | |
return count; | |
} | |
} | |
public static ISourceBlock<T> AsBufferBlock<T>(this IEnumerable<T> source, | |
int capacity = DataflowBlockOptions.Unbounded, | |
CancellationToken cancellationToken = default) | |
{ | |
if (source == null) | |
throw new NullReferenceException(); | |
Contract.EndContractBlock(); | |
var buffer = new BufferBlock<T>(new DataflowBlockOptions() | |
{ | |
BoundedCapacity = capacity, | |
CancellationToken = cancellationToken | |
}); | |
var result = ToTargetBlockAsync(source, buffer, cancellationToken); | |
if (result.IsCompletedSuccessfully) | |
{ | |
buffer.Complete(); | |
} | |
else | |
{ | |
_ = CallCompleteWhenFinished(); | |
async ValueTask CallCompleteWhenFinished() | |
{ | |
await result; | |
buffer.Complete(); | |
} | |
} | |
return buffer; | |
} | |
public static ActionBlock<T> Pipe<T>(this ISourceBlock<T> source, | |
Action<T> handler, | |
ExecutionDataflowBlockOptions dataflowBlockOptions = null) | |
{ | |
if (source == null) | |
throw new NullReferenceException(); | |
if (handler == null) | |
throw new ArgumentNullException(nameof(handler)); | |
Contract.EndContractBlock(); | |
var receiver = dataflowBlockOptions == null | |
? new ActionBlock<T>(handler) | |
: new ActionBlock<T>(handler, dataflowBlockOptions); | |
source.LinkToWithCompletion(receiver); | |
return receiver; | |
} | |
public static ActionBlock<T> Pipe<T>(this ISourceBlock<T> source, | |
Func<T, Task> handler, | |
ExecutionDataflowBlockOptions dataflowBlockOptions = null) | |
{ | |
if (source == null) | |
throw new NullReferenceException(); | |
if (handler == null) | |
throw new ArgumentNullException(nameof(handler)); | |
Contract.EndContractBlock(); | |
var receiver = dataflowBlockOptions == null | |
? new ActionBlock<T>(handler) | |
: new ActionBlock<T>(handler, dataflowBlockOptions); | |
source.LinkToWithCompletion(receiver); | |
return receiver; | |
} | |
public static TransformBlock<TIn, TOut> Pipe<TIn, TOut>(this ISourceBlock<TIn> source, | |
Func<TIn, TOut> transform, | |
ExecutionDataflowBlockOptions dataflowBlockOptions = null) | |
{ | |
if (source == null) | |
throw new NullReferenceException(); | |
if (transform == null) | |
throw new ArgumentNullException(nameof(transform)); | |
Contract.EndContractBlock(); | |
var receiver = dataflowBlockOptions == null | |
? new TransformBlock<TIn, TOut>(transform) | |
: new TransformBlock<TIn, TOut>(transform, dataflowBlockOptions); | |
source.LinkToWithCompletion(receiver); | |
return receiver; | |
} | |
public static TransformBlock<TIn, TOut> Pipe<TIn, TOut>(this ISourceBlock<TIn> source, | |
Func<TIn, Task<TOut>> transform, | |
ExecutionDataflowBlockOptions dataflowBlockOptions = null) | |
{ | |
if (source == null) | |
throw new NullReferenceException(); | |
if (transform == null) | |
throw new ArgumentNullException(nameof(transform)); | |
Contract.EndContractBlock(); | |
var receiver = dataflowBlockOptions == null | |
? new TransformBlock<TIn, TOut>(transform) | |
: new TransformBlock<TIn, TOut>(transform, dataflowBlockOptions); | |
source.LinkToWithCompletion(receiver); | |
return receiver; | |
} | |
/// <summary> | |
/// Pipes the source data through a single producer constrained ActionBlock with the specified max concurrency. | |
/// </summary> | |
/// <typeparam name="T">The input type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="maxConcurrency">The maximum concurrency of the action block</param> | |
/// <param name="handler">The handler function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The ActionBlock created.</returns> | |
public static ActionBlock<T> Parallel<T>(this ISourceBlock<T> source, | |
int maxConcurrency, | |
Action<T> handler, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(handler, new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = maxConcurrency, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained ActionBlock with the specified max concurrency. | |
/// </summary> | |
/// <typeparam name="T">The input type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="maxConcurrency">The maximum concurrency of the action block</param> | |
/// <param name="handler">The async handler function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The ActionBlock created.</returns> | |
public static ActionBlock<T> Parallel<T>(this ISourceBlock<T> source, | |
int maxConcurrency, | |
Func<T, Task> handler, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(handler, new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = maxConcurrency, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained TransformBlock with the specified max concurrency. | |
/// </summary> | |
/// <typeparam name="TIn">The input type.</typeparam> | |
/// <typeparam name="TOut">The output type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="maxConcurrency">The maximum concurrency of the transform block</param> | |
/// <param name="transform">The transform function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The TransformBlock created.</returns> | |
public static TransformBlock<TIn, TOut> Parallel<TIn, TOut>(this ISourceBlock<TIn> source, | |
int maxConcurrency, | |
Func<TIn, TOut> transform, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(transform, new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = maxConcurrency, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained TransformBlock with the specified max concurrency. | |
/// </summary> | |
/// <typeparam name="TIn">The input type.</typeparam> | |
/// <typeparam name="TOut">The output type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="maxConcurrency">The maximum concurrency of the transform block</param> | |
/// <param name="transform">The async transform function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The TransformBlock created.</returns> | |
public static TransformBlock<TIn, TOut> Parallel<TIn, TOut>(this ISourceBlock<TIn> source, | |
int maxConcurrency, | |
Func<TIn, Task<TOut>> transform, | |
bool singleProducerConstrained = true, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(transform, new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = maxConcurrency, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained ActionBlock with the specified bounded capacity and max concurrency. | |
/// </summary> | |
/// <typeparam name="T">The input type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="capacity">The bounded capacity of the transform block</param> | |
/// <param name="maxConcurrency">The maximum concurrency of the action block</param> | |
/// <param name="handler">The handler function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The ActionBlock created.</returns> | |
public static ActionBlock<T> BoundedParallel<T>(this ISourceBlock<T> source, | |
int capacity, | |
int maxConcurrency, | |
Action<T> handler, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(handler, new ExecutionDataflowBlockOptions | |
{ | |
BoundedCapacity = capacity, | |
MaxDegreeOfParallelism = maxConcurrency, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained ActionBlock with the specified bounded capacity and max concurrency. | |
/// </summary> | |
/// <typeparam name="T">The input type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="capacity">The bounded capacity of the transform block</param> | |
/// <param name="maxConcurrency">The maximum concurrency of the action block</param> | |
/// <param name="handler">The async handler function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The ActionBlock created.</returns> | |
public static ActionBlock<T> BoundedParallel<T>(this ISourceBlock<T> source, | |
int capacity, | |
int maxConcurrency, | |
Func<T, Task> handler, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(handler, new ExecutionDataflowBlockOptions | |
{ | |
BoundedCapacity = capacity, | |
MaxDegreeOfParallelism = maxConcurrency, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained TransformBlock with the specified bounded capacity and max concurrency. | |
/// </summary> | |
/// <typeparam name="TIn">The input type.</typeparam> | |
/// <typeparam name="TOut">The output type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="capacity">The bounded capacity of the transform block</param> | |
/// <param name="maxConcurrency">The maximum concurrency of the transform block</param> | |
/// <param name="transform">The transform function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The TransformBlock created.</returns> | |
public static TransformBlock<TIn, TOut> BoundedParallel<TIn, TOut>(this ISourceBlock<TIn> source, | |
int capacity, | |
int maxConcurrency, | |
Func<TIn, TOut> transform, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(transform, new ExecutionDataflowBlockOptions | |
{ | |
BoundedCapacity = capacity, | |
MaxDegreeOfParallelism = maxConcurrency, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained TransformBlock with the specified bounded capacity and max concurrency. | |
/// </summary> | |
/// <typeparam name="TIn">The input type.</typeparam> | |
/// <typeparam name="TOut">The output type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="capacity">The bounded capacity of the transform block</param> | |
/// <param name="maxConcurrency">The maximum concurrency of the transform block</param> | |
/// <param name="transform">The async transform function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The TransformBlock created.</returns> | |
public static TransformBlock<TIn, TOut> BoundedParallel<TIn, TOut>(this ISourceBlock<TIn> source, | |
int capacity, | |
int maxConcurrency, | |
Func<TIn, Task<TOut>> transform, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(transform, new ExecutionDataflowBlockOptions | |
{ | |
BoundedCapacity = capacity, | |
MaxDegreeOfParallelism = maxConcurrency, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained ActionBlock with the specified bounded capacity. | |
/// </summary> | |
/// <typeparam name="T">The input type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="capacity">The bounded capacity of the transform block</param> | |
/// <param name="handler">The handler function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The ActionBlock created.</returns> | |
public static ActionBlock<T> Bounded<T>(this ISourceBlock<T> source, | |
int capacity, | |
Action<T> handler, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(handler, new ExecutionDataflowBlockOptions | |
{ | |
BoundedCapacity = capacity, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained ActionBlock with the specified bounded capacity. | |
/// </summary> | |
/// <typeparam name="T">The input type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="capacity">The bounded capacity of the transform block</param> | |
/// <param name="handler">The async handler function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The ActionBlock created.</returns> | |
public static ActionBlock<T> Bounded<T>(this ISourceBlock<T> source, | |
int capacity, | |
Func<T, Task> handler, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(handler, new ExecutionDataflowBlockOptions | |
{ | |
BoundedCapacity = capacity, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained TransformBlock with the specified bounded capacity. | |
/// </summary> | |
/// <typeparam name="TIn">The input type.</typeparam> | |
/// <typeparam name="TOut">The output type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="capacity">The bounded capacity of the transform block</param> | |
/// <param name="transform">The transform function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The TransformBlock created.</returns> | |
public static TransformBlock<TIn, TOut> Bounded<TIn, TOut>(this ISourceBlock<TIn> source, | |
int capacity, | |
Func<TIn, TOut> transform, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(transform, new ExecutionDataflowBlockOptions | |
{ | |
BoundedCapacity = capacity, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Pipes the source data through a single producer constrained TransformBlock with the specified bounded capacity. | |
/// </summary> | |
/// <typeparam name="TIn">The input type.</typeparam> | |
/// <typeparam name="TOut">The output type.</typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="capacity">The bounded capacity of the transform block</param> | |
/// <param name="transform">The async transform function to apply.</param> | |
/// <param name="cancellationToken">An optional cancellation token.</param> | |
/// <returns>The TransformBlock created.</returns> | |
public static TransformBlock<TIn, TOut> Bounded<TIn, TOut>(this ISourceBlock<TIn> source, | |
int capacity, | |
Func<TIn, Task<TOut>> transform, | |
CancellationToken cancellationToken = default) | |
=> source.Pipe(transform, new ExecutionDataflowBlockOptions | |
{ | |
BoundedCapacity = capacity, | |
SingleProducerConstrained = true, | |
CancellationToken = cancellationToken | |
}); | |
/// <summary> | |
/// Processes an item through an acceptor function. | |
/// If the acceptor returns true, then it was accepted and subsequently received/taken from the source. | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
/// <param name="source">The source block to receive from.</param> | |
/// <param name="acceptor">The function to process the item and decide if accepted.</param> | |
/// <returns>The original source block to allow for more acceptors or filters to be applied.</returns> | |
public static ISourceBlock<T> AcceptOrPass<T>(this ISourceBlock<T> source, | |
Func<T, bool> acceptor) | |
{ | |
var receiver = new AcceptOrPassBlock<T>(acceptor); | |
source.LinkTo(receiver, PropagateLink()); | |
return source; | |
} | |
} | |
sealed class AcceptOrPassBlock<T> : ITargetBlock<T> | |
{ | |
private readonly Func<T, bool> _handler; | |
private readonly ITargetBlock<T> _completer; | |
private AcceptOrPassBlock() | |
{ | |
_completer = new ActionBlock<T>(e => { }); | |
Completion = _completer.Completion; | |
} | |
public AcceptOrPassBlock(Func<T, bool> handler) : this() | |
{ | |
_handler = handler ?? throw new ArgumentNullException(nameof(handler)); | |
} | |
public DataflowMessageStatus OfferMessage( | |
DataflowMessageHeader messageHeader, | |
T messageValue, ISourceBlock<T> source, | |
bool consumeToAccept) | |
{ | |
if (_completer.Completion.IsCompleted) | |
return DataflowMessageStatus.DecliningPermanently; | |
return _handler(messageValue) | |
? DataflowMessageStatus.Accepted | |
: DataflowMessageStatus.Declined; | |
} | |
public void Complete() | |
=> _completer.Complete(); | |
public void Fault(Exception exception) | |
=> _completer.Fault(exception); | |
public Task Completion { get; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment