Skip to content

Instantly share code, notes, and snippets.

@gregoryyoung
Created October 28, 2015 14:02

Revisions

  1. gregoryyoung created this gist Oct 28, 2015.
    134 changes: 134 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,134 @@
    using System;
    using System.CodeDom;
    using System.Collections.Generic;
    using System.Linq;
    using System.Net;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using EventStore.ClientAPI;
    using EventStore.ClientAPI.Embedded;
    using EventStore.ClientAPI.SystemData;
    using EventStore.Core;
    using EventStore.Core.Bus;
    using EventStore.Core.Messages;

    namespace ReplicaSpike
    {
    class Program
    {
    static void Main(string[] args)
    {
    var port = int.Parse(args[0]);
    var otherport = int.Parse(args[1]);
    var node = StartEmbeddedEventStore(port);
    Console.WriteLine("Node up. waiting");
    Thread.Sleep(10000);
    Console.WriteLine("Setting up thread to write later.");
    Task.Factory.StartNew(() => WriteEvents(node, port));
    Console.WriteLine("Setting up replication");
    ReplicateFrom(node, otherport);
    Console.ReadLine();
    }

    private static void WriteEvents(ClusterVNode node, int myPort)
    {
    using (var conn = EmbeddedEventStoreConnection.Create(node, ConnectionSettings.Create().LimitRetriesForOperationTo(3)))
    {
    for(int i=0;i<500;i++)
    {
    Thread.Sleep(1000);
    Console.WriteLine("writing event to me");
    conn.AppendToStreamAsync("foo-" + myPort, ExpectedVersion.Any, BuildEvent(myPort)).Wait();
    }
    }
    }

    private static EventData BuildEvent(int port)
    {
    return new EventData(Guid.NewGuid(), "foo", true, new byte[500], Encoding.UTF8.GetBytes(port.ToString()));
    }

    private static void ReplicateFrom(ClusterVNode node, int otherport)
    {
    Console.WriteLine("Attempting to connect");
    //load up initial checksum from stream or even local checkpoint (local checkpoint may be better)
    using (var localConnection = EmbeddedEventStoreConnection.Create(node, "local connection"))
    {
    using (var connection = EventStoreConnection.Create(ConnectionSettings.Create()
    .KeepReconnecting()
    .KeepRetrying()
    .Build(), new IPEndPoint(IPAddress.Loopback, otherport)))
    {
    connection.ConnectAsync().Wait();
    Console.WriteLine("Successfully conencted to localhost:{0}", otherport);
    Console.WriteLine("Setting up subscription");
    var sub = connection.SubscribeToAllFrom(null, false,
    (s, ev) =>
    {
    var port = TryReadPortFromMetadata(ev.OriginalEvent.Metadata);
    if (port == otherport)
    {
    Console.WriteLine("writing event {0}/{1}", ev.OriginalStreamId, ev.OriginalEventNumber);
    localConnection.AppendToStreamAsync(ev.OriginalStreamId, ev.OriginalEventNumber -1,
    new UserCredentials("admin", "changeit"),
    new EventData(ev.OriginalEvent.EventId,
    ev.OriginalEvent.EventType,
    ev.OriginalEvent.IsJson,
    ev.OriginalEvent.Data,
    ev.OriginalEvent.Metadata)).Wait();
    }
    else
    {
    Console.WriteLine("Not writing {0}/{1}", ev.OriginalStreamId, ev.OriginalEventNumber);
    }
    },
    (s) => Console.WriteLine("live started`"),
    (s, e, ex) =>
    {
    Console.WriteLine("sub dropped {0} {1}", e, ex);
    },
    new UserCredentials("admin", "changeit"));
    Console.WriteLine("waiting.");
    Console.ReadLine();
    }
    }
    }

    private static int TryReadPortFromMetadata(byte[] metadata)
    {
    if (metadata == null) return 0;
    try
    {
    var str = Encoding.UTF8.GetString(metadata);
    return int.Parse(str);
    }
    catch (Exception ex)
    {
    return 0;
    }
    }

    static ClusterVNode StartEmbeddedEventStore(int port)
    {
    var embeddedEventStore = EmbeddedVNodeBuilder.AsSingleNode()
    .RunInMemory()
    .RunProjections(ProjectionsMode.None)
    .WithExternalTcpOn(new IPEndPoint(IPAddress.Loopback, port))
    .WithInternalTcpOn(new IPEndPoint(IPAddress.None, 1234))
    .WithInternalHttpOn(new IPEndPoint(IPAddress.None, 1234))
    .WithExternalHttpOn(new IPEndPoint(IPAddress.None, 1234))
    .Build();

    var startedEvent = new ManualResetEventSlim(false);
    embeddedEventStore.MainBus.Subscribe(
    new AdHocHandler<UserManagementMessage.UserManagementServiceInitialized>(m => startedEvent.Set()));

    embeddedEventStore.Start();
    Console.WriteLine("Waiting on node.");
    if (!startedEvent.Wait(60000))
    throw new TimeoutException("Embedded Event Store has not started in 60 seconds.");
    return embeddedEventStore;
    }
    }
    }