Skip to content

Instantly share code, notes, and snippets.

@CarloMicieli
Created March 29, 2016 19:10
Show Gist options
  • Save CarloMicieli/31be8627b29563e34be031835860df55 to your computer and use it in GitHub Desktop.
Save CarloMicieli/31be8627b29563e34be031835860df55 to your computer and use it in GitHub Desktop.
Fibonacci numbers with Akka
package io.github.carlomicieli.fibonacci
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import akka.event.LoggingReceive
import scala.io.StdIn
object FibonacciApp1 {
def main(args: Array[String]): Unit = {
println("Press ENTER to quit...")
val actorSystem = ActorSystem("AkkaFibonacci1")
val fibActor = actorSystem.actorOf(Fibonacci.props(), "fib")
fibActor ! Fibonacci.Compute(50)
StdIn.readLine()
val f = actorSystem.terminate()
}
class Fibonacci extends Actor with ActorLogging {
import Fibonacci._
import FibonacciRouter._
private var routerRef = Option.empty[ActorRef]
override def preStart() = {
val fibRouter = context.actorOf(FibonacciRouter.props(), "fib-router")
routerRef = Some(fibRouter)
}
def receive = LoggingReceive {
case Compute(n) =>
routerRef foreach { _ ! RequestFib(n) }
case Result(n, res) =>
log.info(s"fib($n) is $res")
}
}
object Fibonacci {
def props(): Props = Props[Fibonacci]
case class Compute(n: Int)
case class Result(n: Int, res: Long)
}
class FibonacciRouter extends Actor {
import Fibonacci._
import FibonacciRouter._
import FibonacciWorker._
private val id = new AtomicInteger(1)
private var workers = Map.empty[Int, ActorRef]
private var requests = Map.empty[Int, ActorRef]
def receive = LoggingReceive {
case RequestFib(n) =>
requests = requests + (n -> sender)
self ! ComputeFib(n)
case ComputeFib(n) =>
val workerRef = workers.getOrElse(n, newWorker(n))
workerRef forward DoCompute(id.getAndIncrement())
case FibComputed(cid, n, res) =>
requests.get(n) match {
case None =>
case Some(senderRef) =>
senderRef ! Result(n, res)
requests = requests - n
}
}
private def newWorker(n: Int): ActorRef = {
val workerRef = context.actorOf(FibonacciWorker.props(n), s"fib-worker-$n")
workers = workers + (n -> workerRef)
workerRef
}
}
object FibonacciRouter {
def props(): Props = Props[FibonacciRouter]
case class ComputeFib(n: Int)
case class FibComputed(id: Int, n: Int, res: Long)
case class RequestFib(n: Int)
}
class FibonacciWorker(n: Int) extends Actor {
import FibonacciWorker._
import FibonacciRouter._
private var outstandingRequests = List.empty[ActorRef]
private var results: (Option[Long], Option[Long]) = (None, None)
private var asked = false
override def preStart() = {
if (n == 0 || n == 1)
context.become(baseCaseReceive)
else
context.become(computeFibonacci)
}
def receive = computeFibonacci
def computeFibonacci = LoggingReceive {
case DoCompute(id) =>
addRequest(id, sender)
if (!asked) {
val routerRef = context.parent
routerRef ! ComputeFib(n - 1)
routerRef ! ComputeFib(n - 2)
asked = true
}
case FibComputed(id, i, res) =>
if (i == n - 1)
results = (Some(res), results._2)
else
results = (results._1, Some(res))
results match {
case (Some(a), Some(b)) =>
val res = a + b
outstandingRequests foreach { _ ! FibComputed(id, n, res) }
outstandingRequests = List.empty[ActorRef]
context.become(alreadyComputed)
case _ =>
}
}
def alreadyComputed: Receive = LoggingReceive {
case DoCompute(id) =>
val (Some(a), Some(b)) = results
sender ! FibComputed(id, n, a + b)
case FibComputed(_, _, _) =>
}
def baseCaseReceive: Receive = {
case DoCompute(id) =>
sender ! FibComputed(id, n, 1)
}
private def addRequest(id: Int, senderRef: ActorRef): Unit = {
outstandingRequests = senderRef :: outstandingRequests
}
}
object FibonacciWorker {
def props(n: Int): Props = Props(classOf[FibonacciWorker], n)
case class DoCompute(id: Int)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment