Last active
January 6, 2021 02:39
-
-
Save adymitruk/92be6a19656d68070bc041e9e1e1509a to your computer and use it in GitHub Desktop.
simple event store in #0tech c#
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Configuration; | |
using System.IO; | |
using System.Text; | |
using System.Threading; | |
using System.Xml; | |
using System.Xml.Serialization; | |
using System.Linq; | |
using domain; | |
using Messages; | |
namespace TransactionStorage | |
{ | |
public static class StringExt { public static string Reverse(this string orig) { return orig.Aggregate("", (x, y) => y + x); } } | |
public static class Storage { | |
private static readonly string _path; | |
private const int LongSize = sizeof (long); | |
private const int GUIDSize = 16; | |
static Storage() { _path = ConfigurationManager.AppSettings["path"]; } | |
public static IHistory GetAggregate<T>(string aggregateId) where T : new() { | |
var history = (IHistory)new T(); | |
var events = RetrieveFor(aggregateId).ToList(); | |
events.ForEach(history.Replay); | |
if (events.Count > 10) Add(history.GenerateSnapshot(aggregateId)); | |
return history; | |
} | |
public static void Add(Event @event) { | |
string s = @event.GUID.Reverse(); | |
string filename; | |
string directory = GetDirectoryAndFileNameForEvents(s, out filename); | |
if (!Directory.Exists(_path + Path.DirectorySeparatorChar + directory)) Directory.CreateDirectory(_path + Path.DirectorySeparatorChar + directory); | |
string serializedAggregate; | |
var xws = new XmlWriterSettings {OmitXmlDeclaration = true}; | |
var ns = new XmlSerializerNamespaces(); | |
ns.Add("", ""); | |
var sb = new StringBuilder(); | |
using (var xmlw = XmlWriter.Create(sb, xws)) { | |
new XmlSerializer(@event.GetType()).Serialize(xmlw, @event, ns); | |
serializedAggregate = sb.ToString(); } | |
sb = new StringBuilder(); | |
var mutex = new Mutex(false, @event.GUID); | |
mutex.WaitOne(); | |
try { | |
using (var fs = File.Open(_path + Path.DirectorySeparatorChar + "eventindex", FileMode.Append, FileAccess.Write, FileShare.None)) { | |
try { | |
using(var writer = XmlWriter.Create(sb, xws)) { | |
writer.WriteStartElement("wrapper"); | |
writer.WriteElementString("type", @event.GetType().AssemblyQualifiedName); | |
writer.WriteStartElement("event"); | |
writer.WriteRaw(serializedAggregate); | |
writer.WriteEndElement(); | |
writer.WriteEndElement(); } | |
byte[] contentBytes = Encoding.Default.GetBytes(sb.ToString()); | |
byte[] lengthBytes = BitConverter.GetBytes(contentBytes.Length); | |
long position; | |
using (var writer = new FileStream(filename, FileMode.Append)) { | |
try { | |
position = writer.Position; | |
writer.Write(lengthBytes, 0, 4); | |
writer.Write(contentBytes, 0, contentBytes.Length); } | |
finally { writer.Close(); } } | |
if (@event is Snapshot) { | |
GetDirectoryAndFileNameForSnapshots(s, out filename); | |
using (var writer = new FileStream(filename, FileMode.OpenOrCreate)) { | |
try { | |
writer.Write(BitConverter.GetBytes(position), 0, LongSize); } | |
finally { writer.Close(); } } } | |
fs.Write(new Guid(@event.GUID).ToByteArray(), 0, GUIDSize); | |
fs.Write(BitConverter.GetBytes(position), 0, LongSize); } | |
finally { fs.Close(); } } } | |
finally { mutex.ReleaseMutex(); } | |
} | |
private static string GetDirectoryAndFileNameForEvents(string s, out string filename) { return GetFileName(s, out filename, ".history"); } | |
private static void GetDirectoryAndFileNameForSnapshots(string s, out string filename) { GetFileName(s, out filename, ".snapshots"); return; } | |
private static string GetFileName(string s, out string filename, string extension) { | |
filename = _path + Path.DirectorySeparatorChar + s.Insert(2, Path.DirectorySeparatorChar + "") + extension; | |
return s.Substring(0, 2);} | |
public static IEnumerable<Event> RetrieveFor(string id) { | |
var s = id.Reverse(); | |
string filename; | |
var offset = GetStartingIndexFor(s); | |
GetDirectoryAndFileNameForEvents(s, out filename); | |
using (var fs = new FileStream(filename,FileMode.Open,FileAccess.Read,FileShare.Read)) { | |
try { | |
fs.Position = offset; | |
while (true) { | |
var contentBufferLength = new byte[4]; | |
var read = fs.Read(contentBufferLength, 0, 4); | |
if (read == 0) break; | |
if (read < 4) throw new ApplicationException("event index integrity error"); | |
yield return GetStoredEvent(fs, contentBufferLength); } } | |
finally { fs.Close(); } } | |
yield break; | |
} | |
private static long GetStartingIndexFor(string s) { | |
string file; | |
GetDirectoryAndFileNameForSnapshots(s, out file); | |
if (!File.Exists(file)) return 0; | |
using (var fs = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read)) { | |
try { | |
var contentBufferLength = new byte[4]; | |
if (fs.Read(contentBufferLength, 0, 4) < 4) throw new ApplicationException("snapshot index error"); | |
return BitConverter.ToInt32(contentBufferLength, 0); } | |
finally { fs.Close(); } } | |
} | |
private static Event GetStoredEvent(FileStream fs, byte[] contentLengthBuffer) { | |
int contentLength = BitConverter.ToInt32(contentLengthBuffer, 0); | |
var content = new byte[contentLength]; | |
if (fs.Read(content, 0, contentLength) < contentLength) throw new ApplicationException("incomplete event information retrieved"); | |
var xmlDocument = new XmlDocument(); | |
xmlDocument.Load(new StringReader(Encoding.Default.GetString(content, 0, contentLength))); | |
var serializer = new XmlSerializer(Type.GetType(xmlDocument.SelectSingleNode("wrapper/type").InnerText)); | |
return ((Event) serializer.Deserialize(new StringReader(xmlDocument.SelectSingleNode("wrapper/event").InnerXml))); | |
} | |
private static Event RetrieveEvent(Guid id, long indexOffset) { | |
string s = id.ToString().Reverse(); | |
string filename; | |
GetDirectoryAndFileNameForEvents(s, out filename); | |
using (var fs = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read)) { | |
fs.Position = indexOffset; | |
var contentBufferLength = new byte[4]; | |
if (fs.Read(contentBufferLength, 0, 4) < 4) throw new ApplicationException("Event integrity problem"); | |
return GetStoredEvent(fs, contentBufferLength); } | |
} | |
public static IEnumerable<Event> RetrieveAll() { | |
string filename = _path + Path.DirectorySeparatorChar + "eventindex"; | |
using (var fs = new FileStream(filename,FileMode.Open,FileAccess.Read,FileShare.Read)) { | |
try { | |
while (true) { | |
var guidBytes = new byte[GUIDSize]; | |
var offsetBytes = new byte[LongSize]; | |
int readBytes = fs.Read(guidBytes,0, GUIDSize); | |
if (readBytes==0) break; | |
if (readBytes!=GUIDSize) throw new ApplicationException("index integrity error"); | |
readBytes = fs.Read(offsetBytes, 0, LongSize); | |
if (readBytes!=LongSize) throw new ApplicationException("index integrity error"); | |
var guid = new Guid(guidBytes); | |
long indexOffset = BitConverter.ToInt64(offsetBytes, 0); | |
yield return RetrieveEvent(guid, indexOffset); } } | |
finally { fs.Close(); } } | |
yield break; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment