-
-
Save ivanopagano/febfa1f30b88d75d4351 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "external-service-integration-akka-persistence" | |
version := "1.0" | |
scalaVersion := "2.11.1" | |
libraryDependencies ++= Seq( | |
"com.typesafe.akka" %% "akka-persistence-experimental" % "2.3.6", | |
"com.typesafe.akka" %% "akka-remote" % "2.3.6" | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
common { | |
akka { | |
actor { | |
provider = "akka.remote.RemoteActorRefProvider" | |
} | |
remote { | |
enabled-transports = ["akka.remote.netty.tcp"] | |
netty.tcp { | |
hostname = "127.0.0.1" | |
} | |
} | |
loglevel = INFO | |
} | |
} | |
processor { | |
akka { | |
remote { | |
netty.tcp { | |
port = 2553 | |
} | |
} | |
} | |
} | |
validator { | |
akka { | |
remote { | |
netty.tcp { | |
port = 2552 | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.iainhull.akka | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.event.Logging | |
import akka.pattern.ask | |
import akka.util.Timeout | |
import akka.persistence.{PersistentView, AtLeastOnceDelivery, PersistentActor} | |
import akka.persistence.AtLeastOnceDelivery.{UnconfirmedDelivery, UnconfirmedWarning} | |
import com.typesafe.config.ConfigFactory | |
// ------------------------------------ | |
// Domain object | |
// ------------------------------------ | |
case class Order(id: Int = -1, details: String, | |
creditCardNumber: String, | |
creditCardValidation: Validation = Validation.Pending) | |
sealed trait Validation | |
object Validation { | |
case object Pending extends Validation | |
case object Success extends Validation | |
case object Failure extends Validation | |
} | |
// ------------------------------------ | |
// Domain events | |
// ------------------------------------ | |
sealed trait Command | |
case class OrderSubmitted(order: Order) extends Command | |
sealed trait Event | |
case class OrderStored(order: Order) extends Event | |
case class OrderAccepted(order: Order, deliveryId: Long) extends Event | |
case class OrderRejected(order: Order, deliveryId: Long) extends Event | |
case class CreditCardValidationRequested(deliveryId: Long, orderId: Int, creditCardNumber: String) | |
case class CreditCardValidated(deliveryId: Long, orderId: Int) | |
case class CreditCardValidationFailed(deliveryId: Long, orderId: Int) | |
// ------------------------------------ | |
// Application commands/events | |
// ------------------------------------ | |
case class SetCreditCardValidator(validator: ActorPath) | |
case class SetValidOrderDestination(destination: ActorRef) | |
case class SetInvalidOrderDestination(destination: ActorRef) | |
case class Recover(timeout: Timeout) | |
case object Recovered | |
// ------------------------------------ | |
// Eventsourced order processor | |
// ------------------------------------ | |
class OrderProcessor extends PersistentActor with AtLeastOnceDelivery { | |
import Validation._ | |
var validator: ActorPath = context.system.deadLetters.path | |
var orders = Map.empty[Int, Order] // state | |
override def persistenceId = "order-processor" | |
def updateState(event: Event): Unit = event match { | |
case OrderStored(order) => | |
orders = orders + (order.id -> order) | |
deliver(validator, deliveryId => CreditCardValidationRequested(deliveryId, order.id, order.creditCardNumber)) | |
case OrderAccepted(order, deliveryId) => | |
orders = orders + (order.id -> order) | |
confirmDelivery(deliveryId) | |
case OrderRejected(order, deliveryId) => | |
orders = orders + (order.id -> order) | |
confirmDelivery(deliveryId) | |
} | |
val receiveCommand: Receive = { | |
case OrderSubmitted(order) => | |
val id = orders.size | |
persist(OrderStored(order.copy(id = id))) { event => | |
updateState(event) | |
sender ! event | |
} | |
case CreditCardValidated(deliveryId, orderId) => | |
orders.get(orderId) foreach { order => | |
if (order.creditCardValidation == Pending) { | |
persist(OrderAccepted(order.copy(creditCardValidation = Success), deliveryId))(updateState) | |
} | |
} | |
case CreditCardValidationFailed(deliveryId, orderId) => | |
orders.get(orderId) foreach { order => | |
if (order.creditCardValidation == Pending) { | |
persist(OrderRejected(order.copy(creditCardValidation = Failure), deliveryId))(updateState) | |
} | |
} | |
case UnconfirmedWarning(unconfirmedDeliveries) => | |
for { | |
UnconfirmedDelivery(deliveryId, _, CreditCardValidationRequested(_, orderId, _)) <- unconfirmedDeliveries | |
order <- orders.get(orderId) | |
} { | |
persist(OrderRejected(order.copy(creditCardValidation = Failure), deliveryId))(updateState) | |
} | |
case SetCreditCardValidator(v) => | |
validator = v | |
} | |
val receiveRecover: Receive = { | |
case event: Event => updateState(event) | |
} | |
} | |
object OrderProcessor extends App { | |
val config = ConfigFactory.load("order") | |
val configCommon = config.getConfig("common") | |
println(config.getConfig("processor").withFallback(configCommon)) | |
implicit val system = ActorSystem("example", config.getConfig("processor").withFallback(configCommon)) | |
implicit val timeout = Timeout(5 seconds) | |
import system.dispatcher | |
val log = Logging(system, this.getClass) | |
val destination = system.actorOf(Props[OrderDestination], "destination") | |
val processor = system.actorOf(Props[OrderProcessor], "processor") | |
val validator = ActorPath.fromString("akka.tcp://[email protected]:2552/user/validator") | |
processor ! SetCreditCardValidator(validator) | |
val f1 = processor ? OrderSubmitted(Order(details = "jelly beans", creditCardNumber = "1234-5678-1234-5678")) | |
val f2 = processor ? OrderSubmitted(Order(details = "jelly beans", creditCardNumber = "1234-5678-1234-0000")) | |
for (r1 <- f1; r2 <- f2) { | |
log.info("Reply 1: {}", r1) | |
log.info("Reply 2: {}", r2) | |
} | |
} | |
// ------------------------------------ | |
// Local receiver of orders after | |
// credit card validation | |
// ------------------------------------ | |
class OrderDestination extends PersistentView with ActorLogging { | |
override def persistenceId = "order-processor" | |
override def viewId = "order-processor-view" | |
def receive = { | |
case OrderAccepted(order, _) => log.info("Received accepted order: {}", order) | |
case OrderRejected(order, _) => log.info("Received rejected order: {}", order) | |
} | |
} | |
// ------------------------------------ | |
// Remote credit card validator | |
// ------------------------------------ | |
class CreditCardValidator extends Actor { | |
def receive = { | |
case CreditCardValidationRequested(deliveryId, orderId, creditCardNumber) => | |
if (creditCardNumber.contains("0000")) { | |
sender ! CreditCardValidationFailed(deliveryId, orderId) | |
} else { | |
sender ! CreditCardValidated(deliveryId, orderId) | |
} | |
} | |
} | |
object CreditCardValidator extends App { | |
val config = ConfigFactory.load("order") | |
val configCommon = config.getConfig("common") | |
val system = ActorSystem("example", config.getConfig("validator").withFallback(configCommon)) | |
system.actorOf(Props[CreditCardValidator], "validator") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment