Forked from shekhargulati/ParentChildTaskProcessingExample.scala
Last active
August 8, 2016 12:03
-
-
Save raboof/d78eae72a6c10cdafd99972cb382564d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package playground | |
import scala.concurrent.duration._ | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.{Failure, Success, Try} | |
import akka.actor.{Actor, ActorLogging, ActorRef, Props} | |
import akka.util.Timeout | |
import akka.pattern.{ask, pipe} | |
import playground.Parent.{BulkTasks, Finish, TaskCompleted, TaskFailed} | |
trait Task { | |
val id: String | |
def execute() | |
} | |
class OkTask(val id: String = "OK") extends Task { | |
override def execute(): Unit = println(s"Executed task $id") | |
} | |
class FailureTask(val id: String = "FailureTask", message: String) extends Task { | |
override def execute(): Unit = throw new RuntimeException(message) | |
} | |
object Parent { | |
def props(receiver: ActorRef) = Props(new Parent(receiver)) | |
case class BulkTasks(tasks: List[Task]) | |
case class TaskCompleted(task: Task) | |
case class TaskFailed(task: Task, exception: Throwable) | |
case class Finish(statuses: Map[String, String]) | |
} | |
class Parent(receiver: ActorRef) extends Actor with ActorLogging { | |
implicit val timeout: Timeout = 10 seconds | |
implicit val ec: ExecutionContext = context.dispatcher | |
override def receive: Receive = { | |
case BulkTasks(Nil) => log.info("No task to process") | |
case BulkTasks(tasks) => | |
val results: List[Future[(String, String)]] = tasks | |
.map(task => { | |
val actor = context.actorOf(Props(new Child(task)), task.id) | |
(actor ? Child.Start).map { | |
case _: TaskCompleted => (task.id -> "OK") | |
case TaskFailed(_, throwable) => (task.id -> throwable.getMessage) | |
case other => (task.id -> s"Invalid response: $other") | |
} | |
}) | |
Future.sequence(results) | |
.map(statuses => Finish(statuses.toMap)) | |
.pipeTo(receiver) | |
} | |
} | |
object Child { | |
case object Start | |
} | |
class Child(task: Task) extends Actor with ActorLogging { | |
import Child._ | |
override def receive: Receive = { | |
case Start => | |
log.info(s"Executing task ${task.id}") | |
Try { | |
task.execute() | |
} match { | |
case Success(_) => | |
sender() ! TaskCompleted(task) | |
case Failure(e) => | |
sender() ! TaskFailed(task, e) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package playground | |
import akka.actor.ActorSystem | |
import akka.testkit.{ImplicitSender, TestKit} | |
import org.scalatest.{MustMatchers, WordSpecLike} | |
import playground.Parent.{BulkTasks, Finish} | |
class ParentChildTaskProcessingExampleSpec extends TestKit(ActorSystem("testsystem")) | |
with WordSpecLike | |
with MustMatchers | |
with StopSystemAfterAll { | |
"Parent must" must { | |
"process all Ok BulkMessage and stop" in { | |
val actor = system.actorOf(Parent.props(testActor)) | |
actor ! BulkTasks(List(new OkTask("ok1"), new OkTask("ok2"))) | |
expectMsg(Finish(Map("ok1" -> "OK", "ok2" -> "OK"))) | |
} | |
"process Ok and Failure BulkMessage and stop" in { | |
val actor = system.actorOf(Parent.props(testActor)) | |
actor ! BulkTasks(List(new OkTask("ok1"), new FailureTask("fa1", "Awesome Failure!!"))) | |
expectMsg(Finish(Map("ok1" -> "OK", "fa1" -> "Awesome Failure!!"))) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment