Skip to content

Instantly share code, notes, and snippets.

@CasperTDK
Last active December 14, 2015 13:52
Show Gist options
  • Save CasperTDK/47dd221ae49739e1f6c5 to your computer and use it in GitHub Desktop.
Save CasperTDK/47dd221ae49739e1f6c5 to your computer and use it in GitHub Desktop.
public class MessageInfoAggregatorSaga : Saga<MessageAggregaterSagaData>,
IAmInitiatedBy<SendMessageInfo>,
IHandleMessages<DispatchMessageInfo>
{
private readonly IBus _bus;
private readonly IMessageService _messageService;
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly MessageAggregator _messageAggregator = new MessageAggregator();
private readonly object _synchronizedSagaLock = new object();
public MessageInfoAggregatorSaga(IBus bus, IMessageService messageService)
{
_bus = bus;
_messageService = messageService;
}
/// <summary>
/// Allows messages to accumulate during this period
/// </summary>
private static TimeSpan DeferTime => TimeSpan.FromSeconds(10);
public override void ConfigureHowToFindSaga()
{
}
public void Handle(SendMessageInfo sendMessageInfo)
{
lock (_synchronizedSagaLock)
{
if (IsNew)
{
Data.MessagesToSend = new List<MessageInfo>();
}
Data.MessagesToSend.Add(sendMessageInfo.MessageInfo);
Log.Debug("Scheduled MessageInfo for defered dispatching");
if (!Data.DeferInProgress)
{
Data.DeferInProgress = true;
_bus.Defer(DeferTime, new DispatchMessageInfo());
Log.DebugFormat("Will dispatch scheduled messages in {0:g}", DeferTime);
}
}
}
public void Handle(DispatchMessageInfo message)
{
lock (_synchronizedSagaLock)
{
foreach (var messageInfo in _messageAggregator.Aggregate(Data.MessagesToSend))
{
_messageService.SendMessage(messageInfo);
}
Log.DebugFormat("Dispatched message info batch. MessageCount={0}", Data.MessagesToSend);
Data.MessagesToSend.Clear();
Data.DeferInProgress = false;
}
}
}
public class DispatchMessageInfo
{
}
public class MessageAggregaterSagaData : ISagaData
{
public Guid Id { get; set; }
public int Revision { get; set; }
/// <summary>
/// Batch
/// </summary>
public List<MessageInfo> MessagesToSend { get; set; }
public bool DeferInProgress { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment