Created
June 7, 2011 21:29
-
-
Save enebo/1013217 to your computer and use it in GitHub Desktop.
pi.rb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "java" | |
$: << File.join(File.dirname(__FILE__), 'lib') | |
require 'scala-library' | |
require 'akka/akka-actor-1.1.2' | |
java_import 'akka.actor.Actors' | |
java_import 'akka.actor.ActorRef' | |
java_import 'akka.actor.UntypedActor' | |
java_import 'akka.actor.UntypedActorFactory' | |
java_import 'akka.routing.CyclicIterator' | |
java_import 'akka.routing.InfiniteIterator' | |
java_import 'akka.routing.Routing' | |
java_import 'akka.routing.UntypedLoadBalancer' | |
java_import java.lang.System | |
java_import java.util.concurrent.CountDownLatch | |
def actorOf(&code) | |
Actors.actorOf(Class.new do | |
include UntypedActorFactory | |
define_method(:create) { |*args| code[*args] } | |
end.new) | |
end | |
class Calculate | |
def perform(master) | |
router = master.router | |
# schedule work | |
master.nMessages.times do |i| | |
first, last = i * master.nElements, (i+1) * master.nElements | |
router.sendOneWay PiWorker.new(first, last), master.context | |
end | |
# send a PoisonPill to all workers telling them to shut down themselves | |
router.sendOneWay(Routing::Broadcast.new(Actors.poisonPill)) | |
# send a PoisonPill to the router, telling him to shut himself down | |
router.sendOneWay Actors.poisonPill | |
end | |
end | |
# Calculate Part of Pi | |
class PiWorker | |
def initialize(first, last) | |
@first, @last = first, last | |
end | |
def perform | |
(@first...@last).inject(0.0) do |acc, i| | |
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) | |
end | |
end | |
end | |
class Result | |
def initialize(value) | |
@value = value | |
end | |
def perform(master) | |
master.pi += @value | |
master.nResults += 1 | |
master.context.stop if master.nResults == master.nMessages | |
end | |
end | |
class Worker < UntypedActor | |
# needed by Actors.actorOf | |
def self.create(*args) | |
new *args | |
end | |
# message handler | |
def onReceive(work) | |
unless work.respond_to? :perform | |
raise ArgumentError.new "Unknown message: #{work}" | |
end | |
# perform the work and reply with the result | |
context.reply_unsafe Result.new(work.perform) | |
end | |
end | |
class PiRouter < UntypedLoadBalancer | |
attr_reader :seq | |
def initialize(workers) | |
super() | |
@seq = CyclicIterator.new(workers) | |
end | |
end | |
class Master < UntypedActor | |
attr_accessor :pi, :nResults | |
attr_reader :router, :nElements, :nMessages | |
def initialize(nWorkers, nMessages, nElements, latch) | |
super() | |
@nMessages, @nElements, @latch = nMessages, nElements, latch | |
@nResults, @pi = 0, 0.0 | |
workers = Array.new(nWorkers) { Actors.actorOf(Worker).start } | |
# wrap them with a load-balancing router | |
@router = actorOf { PiRouter.new(workers) }.start | |
end | |
# message handler | |
def onReceive(message) | |
unless message.respond_to? :perform | |
raise ArgumentError.new "Unknown message [#{message}]" | |
end | |
message.perform self | |
end | |
def preStart | |
@start = Time.now | |
end | |
def postStop | |
puts "\n\tPi: \t\t#{@pi}\n\tTime taken: \t#{Time.now - @start} ms", | |
@latch.countDown | |
end | |
end | |
class Pi | |
def self.calculate(nWorkers, nElements, nMessages) | |
latch = CountDownLatch.new 1 | |
master = Actors.actorOf { Master.new nWorkers, nMessages, nElements, latch }.start | |
master.send_one_way Calculate.new # start the calculation | |
latch.await # wait for master to shut down | |
end | |
end | |
Pi.calculate(4, 10000, 10000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment