using System; using System.Threading.Tasks; using NServiceBus; using NServiceBus.Extensibility; using NServiceBus.Pipeline; using NServiceBus.Routing; using NServiceBus.Transport; namespace TestRawProcessing { public delegate Task<ErrorHandleResult> OnError(IncomingMessage messageContext, IDispatchMessages dispatcher); class Program { static void Main(string[] args) { Start().GetAwaiter().GetResult(); } static async Task Start() { var config = new EndpointConfiguration("RawEndpoint"); config.UseTransport<LearningTransport>(); config.EnableInstallers(); config.SendFailedMessagesTo("error"); config.Pipeline.Register(b => new RawProcessingBehavior(OnMessage, OnError, b.Build<IDispatchMessages>()), "Process raw messages"); config.Recoverability().CustomPolicy((recoverabilityConfig, context) => RecoverabilityAction.ImmediateRetry()); var endpoint = await Endpoint.Start(config); Console.WriteLine("Press enter to exit"); Console.ReadLine(); await endpoint.Stop(); } static Task<ErrorHandleResult> OnError(IncomingMessage messagecontext, IDispatchMessages dispatcher) { return Task.FromResult(ErrorHandleResult.RetryRequired); } static Task OnMessage(IncomingMessage message, IDispatchMessages dispatcher) { var outgoingMessage = new OutgoingMessage(message.MessageId, message.Headers, message.Body); var transportOperations = new TransportOperation(outgoingMessage, new UnicastAddressTag("Receiver")); return dispatcher.Dispatch(new TransportOperations(transportOperations), new TransportTransaction(), new ContextBag()); } } class RawProcessingBehavior : Behavior<ITransportReceiveContext> { readonly Func<IncomingMessage, IDispatchMessages, Task> onMessage; readonly OnError onError; readonly IDispatchMessages dispatcher; public RawProcessingBehavior(Func<IncomingMessage, IDispatchMessages, Task> onMessage, OnError onError, IDispatchMessages dispatcher) { this.onMessage = onMessage; this.onError = onError; this.dispatcher = dispatcher; } public override async Task Invoke(ITransportReceiveContext context, Func<Task> next) { try { await onMessage(context.Message, dispatcher); } catch (Exception e) { var result = await onError(context.Message, dispatcher); if (result == ErrorHandleResult.RetryRequired) { throw new Exception("Retry required", e); } } } } }