Created
August 4, 2016 17:56
-
-
Save shekhargulati/07efca9d0a6a9daee7e8724228c2a91d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package playground | |
import akka.actor.{Actor, ActorLogging, ActorRef, Props} | |
import playground.Parent.{BulkTasks, Finish, TaskCompleted, TaskFailed} | |
import scala.util.{Failure, Success, Try} | |
trait Task { | |
val id: String | |
def execute() | |
} | |
class OkTask(val id: String = "OK") extends Task { | |
override def execute(): Unit = println(s"Executed task $id") | |
} | |
class FailureTask(val id: String = "FailureTask", message: String) extends Task { | |
override def execute(): Unit = throw new RuntimeException(message) | |
} | |
object Parent { | |
def props(receiver: ActorRef) = Props(new Parent(receiver)) | |
case class BulkTasks(tasks: List[Task]) | |
case class TaskCompleted(task: Task) | |
case class TaskFailed(task: Task, exception: Throwable) | |
case class Finish(statuses: Map[String, String]) | |
} | |
class Parent(receiver: ActorRef) extends Actor with ActorLogging { | |
var totalTasks = -1 | |
var statuses = Map[String, String]() | |
override def receive: Receive = { | |
case BulkTasks(Nil) => log.info("No task to process") | |
case BulkTasks(task :: remainingTasks) => | |
if (totalTasks == -1) { | |
totalTasks = remainingTasks.size + 1 | |
} | |
val taskActor = context.actorOf(Props[Child], task.id) | |
taskActor ! task | |
self ! BulkTasks(remainingTasks) | |
case TaskCompleted(task) => | |
totalTasks -= 1 | |
log.info(s"Completed task ${task.id}") | |
statuses = statuses + (task.id -> "OK") | |
checkStateAndNotify() | |
case TaskFailed(task, throwable) => | |
totalTasks -= 1 | |
log.info(s"Task failed ${task.id}") | |
statuses = statuses + (task.id -> throwable.getMessage) | |
checkStateAndNotify() | |
} | |
private def checkStateAndNotify(): Unit = { | |
if (totalTasks == 0) { | |
log.info(s"Processed all messages...Sending finish message to sender") | |
receiver ! Finish(statuses) | |
} else { | |
log.info(s"Still $totalTasks task(s) remaining") | |
} | |
} | |
} | |
class Child extends Actor with ActorLogging { | |
override def receive: Receive = { | |
case task: Task => | |
log.info(s"Executing task ${task.id}") | |
Try { | |
task.execute() | |
} match { | |
case Success(_) => | |
context.parent ! TaskCompleted(task) | |
case Failure(e) => | |
context.parent ! TaskFailed(task, e) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package playground | |
import akka.actor.ActorSystem | |
import akka.testkit.{ImplicitSender, TestKit} | |
import org.scalatest.{MustMatchers, WordSpecLike} | |
import playground.Parent.{BulkTasks, Finish} | |
class ParentChildTaskProcessingExampleSpec extends TestKit(ActorSystem("testsystem")) | |
with WordSpecLike | |
with MustMatchers | |
with StopSystemAfterAll { | |
"Parent must" must { | |
"process all Ok BulkMessage and stop" in { | |
val actor = system.actorOf(Parent.props(testActor)) | |
actor ! BulkTasks(List(new OkTask("ok1"), new OkTask("ok2"))) | |
expectMsg(Finish(Map("ok1" -> "OK", "ok2" -> "OK"))) | |
} | |
"process Ok and Failure BulkMessage and stop" in { | |
val actor = system.actorOf(Parent.props(testActor)) | |
actor ! BulkTasks(List(new OkTask("ok1"), new FailureTask("fa1", "Awesome Failure!!"))) | |
expectMsg(Finish(Map("ok1" -> "OK", "fa1" -> "Awesome Failure!!"))) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment