Last active
December 14, 2015 13:52
-
-
Save CasperTDK/47dd221ae49739e1f6c5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MessageInfoAggregatorSaga : Saga<MessageAggregaterSagaData>, | |
IAmInitiatedBy<SendMessageInfo>, | |
IHandleMessages<DispatchMessageInfo> | |
{ | |
private readonly IBus _bus; | |
private readonly IMessageService _messageService; | |
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | |
private readonly MessageAggregator _messageAggregator = new MessageAggregator(); | |
private readonly object _synchronizedSagaLock = new object(); | |
public MessageInfoAggregatorSaga(IBus bus, IMessageService messageService) | |
{ | |
_bus = bus; | |
_messageService = messageService; | |
} | |
/// <summary> | |
/// Allows messages to accumulate during this period | |
/// </summary> | |
private static TimeSpan DeferTime => TimeSpan.FromSeconds(10); | |
public override void ConfigureHowToFindSaga() | |
{ | |
} | |
public void Handle(SendMessageInfo sendMessageInfo) | |
{ | |
lock (_synchronizedSagaLock) | |
{ | |
if (IsNew) | |
{ | |
Data.MessagesToSend = new List<MessageInfo>(); | |
} | |
Data.MessagesToSend.Add(sendMessageInfo.MessageInfo); | |
Log.Debug("Scheduled MessageInfo for defered dispatching"); | |
if (!Data.DeferInProgress) | |
{ | |
Data.DeferInProgress = true; | |
_bus.Defer(DeferTime, new DispatchMessageInfo()); | |
Log.DebugFormat("Will dispatch scheduled messages in {0:g}", DeferTime); | |
} | |
} | |
} | |
public void Handle(DispatchMessageInfo message) | |
{ | |
lock (_synchronizedSagaLock) | |
{ | |
foreach (var messageInfo in _messageAggregator.Aggregate(Data.MessagesToSend)) | |
{ | |
_messageService.SendMessage(messageInfo); | |
} | |
Log.DebugFormat("Dispatched message info batch. MessageCount={0}", Data.MessagesToSend); | |
Data.MessagesToSend.Clear(); | |
Data.DeferInProgress = false; | |
} | |
} | |
} | |
public class DispatchMessageInfo | |
{ | |
} | |
public class MessageAggregaterSagaData : ISagaData | |
{ | |
public Guid Id { get; set; } | |
public int Revision { get; set; } | |
/// <summary> | |
/// Batch | |
/// </summary> | |
public List<MessageInfo> MessagesToSend { get; set; } | |
public bool DeferInProgress { get; set; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment