Created
October 30, 2013 12:13
-
-
Save dnauck/7231622 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class GetEventStoreSubscriptionDispatcher | |
{ | |
private const string EventClrTypeHeader = "EventClrTypeName"; | |
private readonly IBus bus; | |
private readonly IEventStoreConnection eventStoreConnection; | |
private readonly MongoDatabase mongoDatabase; | |
private static readonly JsonSerializerSettings SerializerSettings; | |
static GetEventStoreSubscriptionDispatcher() | |
{ | |
SerializerSettings = new JsonSerializerSettings {TypeNameHandling = TypeNameHandling.None}; | |
} | |
public GetEventStoreSubscriptionDispatcher(IBus bus, IEventStoreConnection eventStoreConnection, MongoServer mongoServer) | |
{ | |
this.bus = bus; | |
this.eventStoreConnection = eventStoreConnection; | |
mongoDatabase = mongoServer.GetDatabase(DbConfig.ReadModelStoreDatabase); | |
} | |
public void SubscribeToEventsForHandlerOfType<T>() where T : IHandlesEvent | |
{ | |
var eventsToSubscribe = from handler in typeof(T).GetInterfaces() | |
where handler.IsGenericType && | |
handler.GetGenericTypeDefinition() == typeof (IHandlesEvent<>) | |
select handler.GetGenericArguments().First(); | |
var readModelVersions = mongoDatabase.GetCollection<ReadModelVersion>(); | |
foreach (var @event in eventsToSubscribe) | |
{ | |
var storeVersionRecord = readModelVersions.FindOne(Query<ReadModelVersion>.EQ(v => v.Id, @event.Name)); | |
var storeVersion = storeVersionRecord != null ? new int?(storeVersionRecord.Version) : null; | |
var eventStream = string.Concat("$et-", @event.Name); | |
try | |
{ | |
Debug.WriteLine("Subscribe to Events from Stream " + eventStream + " from position " + (storeVersion ?? 0)); | |
eventStoreConnection.SubscribeToStreamFrom(eventStream, storeVersion, true, | |
EventAppeared, | |
LiveProcessingStarted, | |
SubscriptionDropped); | |
} | |
catch (Exception exception) | |
{ | |
//... | |
} | |
} | |
} | |
private void SubscriptionDropped(EventStoreCatchUpSubscription eventStoreCatchUpSubscription, SubscriptionDropReason subscriptionDropReason, Exception arg3) | |
{ | |
Debug.WriteLine("Dropped " + subscriptionDropReason + " on subscription " + | |
eventStoreCatchUpSubscription.StreamId + Environment.NewLine + arg3); | |
} | |
private void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription) | |
{ | |
Debug.WriteLine("Live processing of " + eventStoreCatchUpSubscription.StreamId + " started.."); | |
} | |
private void EventAppeared(EventStoreCatchUpSubscription eventStoreCatchUpSubscription, ResolvedEvent resolvedEvent) | |
{ | |
var eventData = Encoding.UTF8.GetString(resolvedEvent.Event.Data); | |
var eventMetaData = Encoding.UTF8.GetString(resolvedEvent.Event.Metadata); | |
var eventHeaders = JsonConvert.DeserializeObject<IDictionary<string, object>>(eventMetaData, | |
SerializerSettings); | |
var eventType = Type.GetType(eventHeaders[EventClrTypeHeader].ToString()); | |
if (eventType == null) | |
return; | |
var @event = JsonConvert.DeserializeObject(eventData, eventType, SerializerSettings); | |
Debug.WriteLine("Handle Event of Type " + eventType.Name); | |
var readModelVersions = mongoDatabase.GetCollection<ReadModelVersion>(); | |
var storeVersionRecord = readModelVersions.FindOne(Query<ReadModelVersion>.EQ(v => v.Id, eventType.Name)) ?? | |
new ReadModelVersion { Id = eventType.Name }; | |
//use event number from $et-<Event> stream, not from aggregate | |
storeVersionRecord.Version = resolvedEvent.OriginalEventNumber; | |
var method = bus.GetType().GetMethod("Publish").MakeGenericMethod(eventType); | |
method.Invoke(bus, new[] { @event }); | |
// persist event id after successful event handling | |
readModelVersions.Save(storeVersionRecord); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment