Created
April 11, 2016 15:01
-
-
Save jayhilden/2078872a53c7df0fe45d661861ed2d45 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using RabbitMQ.Client.Events; | |
using Verify.Platform.Infrastructure.RabbitMQ.Producers; | |
namespace Verify.Platform.Infrastructure.RabbitMQ | |
{ | |
public interface IRabbitUtilityService | |
{ | |
void AttemptRetry(BasicDeliverEventArgs deliverEventArgs); | |
int GetCurrentRetryCount(BasicDeliverEventArgs deliverEventArgs); | |
} | |
internal class RabbitUtilityService : IRabbitUtilityService | |
{ | |
private readonly IRabbitMessageBusProducerInternal _messageBusProducer; | |
public const ushort MaxRetryCount = 10; | |
public const string HeaderKeyRetryCount = "RetryCount"; | |
public RabbitUtilityService(IRabbitMessageBusProducerInternal messageBusProducer) | |
{ | |
_messageBusProducer = messageBusProducer; | |
} | |
/// <summary> | |
/// retry count header (10 times max) | |
/// ++count, ack & resend; nack if past max | |
/// </summary> | |
public void AttemptRetry(BasicDeliverEventArgs deliverEventArgs) | |
{ | |
// Get count of previous retry attempts | |
var retryCount = GetCurrentRetryCount(deliverEventArgs); | |
if (retryCount >= MaxRetryCount) | |
{ | |
Console.WriteLine("Max retry reached"); | |
throw new RabbitRetryLimitReachedException(); | |
} | |
string originatingQueueName; | |
if (!TryGetOriginatingQueue(deliverEventArgs, out originatingQueueName)) | |
{ | |
throw new RabbitRetryFailedException(); | |
} | |
// Retry | |
retryCount++; | |
deliverEventArgs.BasicProperties.Headers[HeaderKeyRetryCount] = retryCount; | |
Console.WriteLine("{0} = {1}", HeaderKeyRetryCount, retryCount); | |
Console.WriteLine("publishing back into queue {0}", originatingQueueName); | |
_messageBusProducer.Publish("", originatingQueueName, deliverEventArgs.BasicProperties, deliverEventArgs.Body); | |
} | |
public virtual int GetCurrentRetryCount(BasicDeliverEventArgs deliverEventArgs) | |
{ | |
if (deliverEventArgs?.BasicProperties?.Headers == null || !deliverEventArgs.BasicProperties.Headers.ContainsKey(HeaderKeyRetryCount)) | |
{ | |
return 0; | |
} | |
var retryHeader = deliverEventArgs.BasicProperties.Headers[HeaderKeyRetryCount].ToString(); | |
int retryCount; | |
return int.TryParse(retryHeader, out retryCount) | |
? retryCount | |
: 0; | |
} | |
#region Helpers | |
internal virtual string ConvertFromUTF8ToString(byte[] bytes) | |
{ | |
return bytes == null | |
? null | |
: Encoding.UTF8.GetString(bytes); | |
/*the code below does not work properly for how rabbit encodes the data | |
var chars = new char[bytes.Length/sizeof (char)]; | |
Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length); | |
return new string(chars); | |
*/ | |
} | |
internal virtual IEnumerable<T> ParseRabbitHeaderValue<T>(BasicDeliverEventArgs deliverEventArgs, string key) | |
{ | |
if (!deliverEventArgs.BasicProperties.Headers.ContainsKey(key)) | |
{ | |
throw new ArgumentException($"missing {key} key", nameof(deliverEventArgs)); | |
} | |
var val = deliverEventArgs.BasicProperties.Headers[key]; | |
var list = val as List<object>; | |
if (list == null) | |
{ | |
throw new NotSupportedException($"type {val.GetType()} is not yet supported"); | |
} | |
return list.OfType<T>(); | |
} | |
internal virtual bool TryGetOriginatingQueue(BasicDeliverEventArgs deliverEventArgs, out string originatingQueueName) | |
{ | |
const string headerKeyDeadLetter = "x-death"; | |
const string headerKeyReason = "reason"; | |
var rejectedDeathHeader = ParseRabbitHeaderValue<IDictionary<string, object>>(deliverEventArgs, headerKeyDeadLetter) | |
.FirstOrDefault(header => ConvertFromUTF8ToString((byte[])header[headerKeyReason]) == "rejected"); | |
if (rejectedDeathHeader == null) | |
{ | |
originatingQueueName = null; | |
return false; | |
} | |
originatingQueueName = ConvertFromUTF8ToString((byte[])rejectedDeathHeader["queue"]); | |
return true; | |
} | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment