package sample.eventdriven.scala

import akka.actor.{Actor, ActorRef, ActorSystem, Inbox, Props}
import akka.persistence.PersistentActor

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.duration._

// ===============================================================
// Demo of an Event-driven Architecture in Akka and Scala.
//
// Show-casing:
//   1. Events-first Domain Driven Design using Commands and Events
//   2. Asynchronous communication through an Event Stream
//   3. Asynchronous Process Manager driving the workflow
//   4. Event-sourced Aggregates
//
// Used in my talk on 'How Events Are Reshaping Modern Systems'.
//
// NOTE: This is a very much simplified and dumbed down sample
//       that is by no means a template for production use.
//       F.e. in a real-world app I would not use Serializable
//       but a JSON, Protobuf, Avro, or some other good lib.
//       I would not use Akka's built in EventStream, but
//       probably Kafka or Kinesis. Etc.
// ===============================================================

object OrderManagement extends App {

  // =========================================================
  // Commands
  // =========================================================
  sealed trait Command
  case class CreateOrder(userId: Int, productId: Int) extends Command
  case class ReserveProduct(userId: Int, productId: Int) extends Command
  case class SubmitPayment(userId: Int, productId: Int) extends Command
  case class ShipProduct(userId: Int, txId: Int) extends Command

  // =========================================================
  // Events
  // =========================================================
  sealed trait Event
  case class ProductReserved(userId: Int, txId: Int) extends Event
  case class ProductOutOfStock(userId: Int, productId: Int) extends Event
  case class PaymentAuthorized(userId: Int, txId: Int) extends Event
  case class PaymentDeclined(userId: Int, txId: Int) extends Event
  case class ProductShipped(userId: Int, txId: Int) extends Event
  case class OrderCompleted(userId: Int, txId: Int) extends Event

  // =========================================================
  // Top-level service functioning as a Process Manager
  // Coordinating the workflow on behalf of the Client
  // =========================================================
  class Orders(client: ActorRef, inventory: ActorRef, payment: ActorRef) extends Actor {

    override def preStart = {
      system.eventStream.subscribe(self, classOf[ProductReserved])   // Subscribe to ProductReserved Events
      system.eventStream.subscribe(self, classOf[ProductOutOfStock]) // Subscribe to ProductOutOfStock Events
      system.eventStream.subscribe(self, classOf[ProductShipped])    // Subscribe to ProductShipped Events
      system.eventStream.subscribe(self, classOf[PaymentAuthorized]) // Subscribe to PaymentAuthorized Events
      system.eventStream.subscribe(self, classOf[PaymentDeclined])   // Subscribe to PaymentDeclined Events
    }

    def receive = {
      case cmd: CreateOrder =>                                          // 1. Receive CreateOrder Command
        inventory.tell(ReserveProduct(cmd.userId, cmd.productId), self) // 2. Send ReserveProduct Command to Inventory
        println(s"COMMAND:\t\t$cmd => ${self.path.name}")

      case evt: ProductReserved =>                                      // 3. Receive ProductReserved Event
        payment.tell(SubmitPayment(evt.userId, evt.txId), self)         // 4. Send SubmitPayment Command to Payment
        println(s"EVENT:\t\t\t$evt => ${self.path.name}")

      case evt: PaymentAuthorized =>                                    // 5. Receive PaymentAuthorized Event
        inventory.tell(ShipProduct(evt.userId, evt.txId), self)         // 6. Send ShipProduct Command to Inventory
        println(s"EVENT:\t\t\t$evt => ${self.path.name}")

      case evt: ProductShipped =>                                       // 7. Receive ProductShipped Event
        client.tell(OrderCompleted(evt.userId, evt.txId), self)         // 8. Send OrderCompleted Event back to Client
        println(s"EVENT:\t\t\t$evt => ${self.path.name}")
    }
  }

  // =========================================================
  // Event Sourced Aggregate
  // =========================================================
  class Inventory extends PersistentActor {
    val persistenceId = "Inventory"

    var nrOfProductsShipped = 0 // Mutable state, persisted in memory (AKA Memory Image)

    def reserveProduct(userId: Int, productId: Int): Event = {
      println(s"SIDE-EFFECT:\tReserving Product => ${self.path.name}")
      ProductReserved(userId, productId)
    }

    def shipProduct(userId: Int, txId: Int): Event = {
      nrOfProductsShipped += 1 // Update internal state
      println(s"SIDE-EFFECT:\tShipping Product => ${self.path.name}" + " - ProductsShipped: " + nrOfProductsShipped)
      ProductShipped(userId, txId)
    }

    def receiveCommand = {
      case cmd: ReserveProduct =>                                     // Receive ReserveProduct Command
        val productStatus = reserveProduct(cmd.userId, cmd.productId) // Try to reserve the product
        persist(productStatus) { event =>                             // Try to persist the Event
          context.system.eventStream.publish(event)                   // If successful, publish Event to Event Stream
        }
        println(s"COMMAND:\t\t$cmd => ${self.path.name}")

      case cmd: ShipProduct =>                                        // Receive ShipProduct Command
        val shippingStatus = shipProduct(cmd.userId, cmd.txId)        // Try to ship the product
        persist(shippingStatus) { event =>                            // Try to persist the Event
          context.system.eventStream.publish(event)                   // If successful, publish Event to Event Stream
        }
        println(s"COMMAND:\t\t$cmd => ${self.path.name}")
    }

    def receiveRecover = {
      case event: ProductReserved => // Replay the ProductReserved events
        println(s"EVENT (REPLAY):\t$event => ${self.path.name}")

      case event: ProductShipped =>  // Replay the ProductShipped events
        nrOfProductsShipped += 1
        println(s"EVENT (REPLAY):\t$event => ${self.path.name} - ProductsShipped: $nrOfProductsShipped")
    }
  }

  // =========================================================
  // Event Sourced Aggregate
  // =========================================================
  class Payment extends PersistentActor {
    val persistenceId = "Payment"

    var uniqueTransactionNr = 0 // Mutable state, persisted in memory (AKA Memory Image)

    def processPayment(userId: Int, txId: Int): Event = {
      uniqueTransactionNr += 1
      println(s"SIDE-EFFECT:\tProcessing Payment => ${self.path.name} - TxNumber: $uniqueTransactionNr")
      PaymentAuthorized(userId, uniqueTransactionNr)
    }

    def receiveCommand = {
      case cmd: SubmitPayment =>                                      // Receive SubmitPayment Command
        val paymentStatus = processPayment(cmd.userId, cmd.productId) // Try to pay product
        persist(paymentStatus) { event =>                             // Try to persist Event
          context.system.eventStream.publish(event)                   // If successful, publish Event to Event Stream
        }
        println(s"COMMAND:\t\t$cmd => ${self.path.name}")
    }


    def receiveRecover = {
      case evt: PaymentAuthorized => // Replay the PaymentAuthorized events
        uniqueTransactionNr += 1
        println(s"EVENT (REPLAY):\t$evt => ${self.path.name} - TxNumber: $uniqueTransactionNr")

      case evt: PaymentDeclined =>   // Replay the PaymentDeclined events
        println(s"EVENT (REPLAY):\t$evt => ${self.path.name}")
    }
  }

  // =========================================================
  // Running the Order Management simulation
  // =========================================================
  val system = ActorSystem("OrderManagement")

  // Plumbing for "client"
  val clientInbox = Inbox.create(system)
  val client = clientInbox.getRef()

  // Create the services (cheating with "DI" by exploiting enclosing object scope)
  val inventory = system.actorOf(Props(classOf[Inventory]), "Inventory")
  val payment   = system.actorOf(Props(classOf[Payment]), "Payment")
  val orders    = system.actorOf(Props(classOf[Orders], client, inventory, payment), "Orders")

  // Submit an order
  clientInbox.send(orders, CreateOrder(9, 1337)) // Send a CreateOrder Command to the Orders service
  clientInbox.receive(5.seconds) match {         // Wait for OrderCompleted Event
    case confirmation: OrderCompleted =>
      println(s"EVENT:\t\t\t$confirmation => Client")
  }

  system.terminate().foreach(_ => println("System has terminated"))
}