Last active
April 27, 2022 06:35
-
-
Save amitayh/e4ad769652efee4c6dc5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.wixpress.quotes.common | |
import java.util.UUID | |
object CQRS { | |
//////////////////////////////////////////////////////////////////////////////// | |
// Example application | |
//////////////////////////////////////////////////////////////////////////////// | |
def main(args: Array[String]) { | |
val commandBus = new CommandBus[QuoteCommand] | |
val eventBus = new EventBus[QuoteEvent] | |
val eventStore = new InMemoryEventStore[QuoteEvent] | |
val repository = new QuoteRepository(eventStore) | |
val handler = new QuoteCommandHandler(repository, eventBus) | |
val listView = new ListViewDenormalizer | |
val mailer = new Mailer(commandBus) | |
commandBus.subscribe(handler) | |
eventBus.subscribe(listView) | |
eventBus.subscribe(mailer) | |
val quoteId1 = UUID.randomUUID.toString | |
commandBus.publish(CreateNewQuote(quoteId1)) | |
commandBus.publish(AddItemToQuote(quoteId1, QuoteItem("something", 5.5))) | |
commandBus.publish(AddItemToQuote(quoteId1, QuoteItem("something else", 6))) | |
commandBus.publish(SetQuoteDiscount(quoteId1, 10)) | |
val quoteId2 = UUID.randomUUID.toString | |
commandBus.publish(CreateNewQuote(quoteId2)) | |
commandBus.publish(AddItemToQuote(quoteId2, QuoteItem("foobar", 12.3))) | |
commandBus.publish(SendQuoteToClient(quoteId2, "[email protected]")) | |
println("--------------------") | |
listView.getAll.foreach(println) | |
} | |
//////////////////////////////////////////////////////////////////////////////// | |
// Types | |
//////////////////////////////////////////////////////////////////////////////// | |
type AggregateId = String | |
type AggregateVersion = Long | |
type EventStream[E] = List[E] | |
//////////////////////////////////////////////////////////////////////////////// | |
// Interfaces | |
//////////////////////////////////////////////////////////////////////////////// | |
trait AggregateRepository[T] { | |
def load(aggregateId: AggregateId): Option[T] | |
def save(aggregate: T): Unit | |
} | |
trait EventHandler[E, T] { | |
def handle: PartialFunction[E, T] | |
} | |
trait CommandHandler[C] { | |
def handle: PartialFunction[C, Unit] | |
} | |
trait AggregateRoot { | |
def id: AggregateId | |
def version: AggregateVersion | |
} | |
trait EventStore[E] { | |
def getEventsForAggregate(id: AggregateId): EventStream[E] | |
def appendEvents(id: AggregateId, events: EventStream[E]): Unit | |
} | |
trait MessageBus[M, H] { | |
def subscribe(handler: H) | |
def publish(message: M): Unit | |
def publish(messages: EventStream[M]): Unit = messages.foreach(publish) | |
} | |
//////////////////////////////////////////////////////////////////////////////// | |
// Event sourcing | |
//////////////////////////////////////////////////////////////////////////////// | |
trait EventSourcedAggregateRoot[E, T] extends AggregateRoot with EventHandler[E, EventSourcedAggregateRoot[E, T]] | |
trait EventSourcedAggregateRepository[E, T] extends AggregateRepository[EventSourcedAggregateRoot[E, T]] { | |
type AggregateType = EventSourcedAggregateRoot[E, T] | |
def eventStore: EventStore[E] | |
def empty: AggregateType | |
private var uncommittedEvents: EventStream[E] = Nil | |
override def load(aggregateId: AggregateId): Option[AggregateType] = { | |
eventStore.getEventsForAggregate(aggregateId) match { | |
case Nil => None | |
case events => | |
val aggregate = events.foldLeft(empty)((prev, event) => prev.handle(event)) | |
Some(aggregate) | |
} | |
} | |
override def save(aggregate: AggregateType): Unit = { | |
eventStore.appendEvents(aggregate.id, uncommittedEvents) | |
uncommittedEvents = Nil | |
} | |
def appendEvents(events: EventStream[E]): Unit = { | |
uncommittedEvents ++= events | |
} | |
} | |
//////////////////////////////////////////////////////////////////////////////// | |
// Message busses | |
//////////////////////////////////////////////////////////////////////////////// | |
class EventBus[E] extends MessageBus[E, EventHandler[E, _]] { | |
type HandlerType = EventHandler[E, _] | |
private var handlers: List[HandlerType] = Nil | |
override def subscribe(handler: HandlerType): Unit = { | |
handlers = handler :: handlers | |
} | |
override def publish(event: E): Unit = { | |
handlers.filter(_.handle.isDefinedAt(event)).foreach(_.handle(event)) | |
} | |
} | |
class CommandBus[C] extends MessageBus[C, CommandHandler[C]] { | |
type HandlerType = CommandHandler[C] | |
private var handlers: List[HandlerType] = Nil | |
override def subscribe(handler: HandlerType): Unit = { | |
handlers = handler :: handlers | |
} | |
override def publish(command: C): Unit = { | |
handlers.find(_.handle.isDefinedAt(command)).foreach(_.handle(command)) | |
} | |
} | |
class InMemoryEventStore[E] extends EventStore[E] { | |
private var events: Map[AggregateId, EventStream[E]] = Map.empty | |
override def getEventsForAggregate(aggregateId: AggregateId): EventStream[E] = { | |
events.getOrElse(aggregateId, List()) | |
} | |
override def appendEvents(aggregateId: AggregateId, newEvents: EventStream[E]): Unit = { | |
// TODO: concurrent modification validation | |
val oldEvents: EventStream[E] = getEventsForAggregate(aggregateId) | |
events = events.updated(aggregateId, oldEvents ++ newEvents) | |
} | |
} | |
//////////////////////////////////////////////////////////////////////////////// | |
// Events | |
//////////////////////////////////////////////////////////////////////////////// | |
sealed trait QuoteCommand { | |
def quoteId: AggregateId | |
} | |
case class CreateNewQuote(quoteId: AggregateId) extends QuoteCommand | |
case class AddItemToQuote(quoteId: AggregateId, item: QuoteItem) extends QuoteCommand | |
case class SetQuoteDiscount(quoteId: AggregateId, discount: Double) extends QuoteCommand | |
case class SendQuoteToClient(quoteId: AggregateId, email: String) extends QuoteCommand | |
case class MarkQuoteAsSent(quoteId: AggregateId) extends QuoteCommand | |
//////////////////////////////////////////////////////////////////////////////// | |
// Commands | |
//////////////////////////////////////////////////////////////////////////////// | |
sealed trait QuoteEvent { | |
def quoteId: AggregateId | |
} | |
case class NewQuoteCreated(quoteId: AggregateId) extends QuoteEvent | |
case class ItemAddedToQuote(quoteId: AggregateId, item: QuoteItem) extends QuoteEvent | |
case class QuoteDiscountSet(quoteId: AggregateId, discount: Double) extends QuoteEvent | |
case class QuoteSentToClient(quoteId: AggregateId, email: String) extends QuoteEvent | |
case class QuoteMarkedAsSent(quoteId: AggregateId) extends QuoteEvent | |
//////////////////////////////////////////////////////////////////////////////// | |
// Aggregates | |
//////////////////////////////////////////////////////////////////////////////// | |
case class Quote(id: AggregateId, version: AggregateVersion, items: List[QuoteItem], discount: Double) | |
extends EventSourcedAggregateRoot[QuoteEvent, Quote] { | |
override def handle: PartialFunction[QuoteEvent, Quote] = { | |
case NewQuoteCreated(quoteId) => copy(id = quoteId, version = version + 1) | |
case ItemAddedToQuote(_, item) => copy(items = items :+ item, version = version + 1) | |
case QuoteDiscountSet(_, rate) => copy(discount = rate, version = version + 1) | |
case _ => copy(version = version + 1) | |
} | |
} | |
object Quote { | |
def empty: Quote = Quote("", 0, Nil, 0) | |
} | |
case class QuoteItem(name: String, price: Double) | |
//////////////////////////////////////////////////////////////////////////////// | |
// Wiring | |
//////////////////////////////////////////////////////////////////////////////// | |
class QuoteCommandHandler(repository: EventSourcedAggregateRepository[QuoteEvent, Quote], | |
eventBus: EventBus[QuoteEvent]) | |
extends CommandHandler[QuoteCommand] { | |
override def handle = new PartialFunction[QuoteCommand, Unit] { | |
override def isDefinedAt(command: QuoteCommand): Boolean = { | |
getEvent.isDefinedAt(command) | |
} | |
override def apply(command: QuoteCommand): Unit = { | |
val quote = repository.load(command.quoteId).getOrElse(repository.empty) | |
val event = getEvent(command) | |
val updatedQuote = quote.handle(event) | |
repository.appendEvents(List(event)) | |
repository.save(updatedQuote) | |
eventBus.publish(event) | |
} | |
} | |
def getEvent: PartialFunction[QuoteCommand, QuoteEvent] = { | |
case CreateNewQuote(id) => NewQuoteCreated(id) | |
case AddItemToQuote(id, item) => ItemAddedToQuote(id, item) | |
case SetQuoteDiscount(id, discount) => QuoteDiscountSet(id, discount) | |
case SendQuoteToClient(id, email) => QuoteSentToClient(id, email) | |
case MarkQuoteAsSent(id) => QuoteMarkedAsSent(id) | |
} | |
} | |
class QuoteRepository(override val eventStore: EventStore[QuoteEvent]) | |
extends EventSourcedAggregateRepository[QuoteEvent, Quote] { | |
override def empty: AggregateType = Quote.empty | |
} | |
case class QuoteSummaryDto(quoteId: AggregateId, total: Double = 0, status: String = "Draft") | |
class ListViewDenormalizer extends EventHandler[QuoteEvent, Unit] { | |
private var list: Map[AggregateId, QuoteSummaryDto] = Map.empty | |
override def handle: PartialFunction[QuoteEvent, Unit] = { | |
case NewQuoteCreated(quoteId) => | |
list += (quoteId -> QuoteSummaryDto(quoteId)) | |
case ItemAddedToQuote(quoteId, item) => | |
val updated = list.get(quoteId).map(old => old.copy(total = old.total + item.price)) | |
list += (quoteId -> updated.get) | |
case QuoteMarkedAsSent(quoteId) => | |
val updated = list.get(quoteId).map(old => old.copy(status = "Sent")) | |
list += (quoteId -> updated.get) | |
} | |
def getAll = list.values | |
} | |
class Mailer(commandBus: CommandBus[QuoteCommand]) extends EventHandler[QuoteEvent, Unit] { | |
override def handle: PartialFunction[QuoteEvent, Unit] = { | |
case QuoteSentToClient(quoteId, email) => | |
println(s"Mailing quote $quoteId to $email...") | |
commandBus.publish(MarkQuoteAsSent(quoteId)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment