Created
August 18, 2015 08:20
-
-
Save MartinSeeler/594eedb7a44c5204e120 to your computer and use it in GitHub Desktop.
Akka stream synchronization
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.{AtomicReference, AtomicInteger, AtomicBoolean} | |
import akka.actor.Actor.Receive | |
import akka.actor._ | |
import akka.event.LoggingReceive | |
import akka.stream.actor.ActorSubscriberMessage.{OnNext, OnError, OnComplete} | |
import akka.stream.actor._ | |
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request} | |
import akka.stream.{Outlet, Inlet, Attributes, ActorMaterializer} | |
import akka.stream.Supervision.Directive | |
import akka.stream.scaladsl._ | |
import akka.stream.stage._ | |
import org.reactivestreams.{Processor, Publisher, Subscriber} | |
import scala.collection | |
import scala.collection.parallel.mutable | |
object AkkaStreamSync extends App { | |
implicit val sys = ActorSystem("AkkaStreamSync") | |
implicit val mat = ActorMaterializer() | |
val source: Source[Int, Unit] = Source(0 to 100) | |
val postRef = sys.actorOf(Props[SynchronizerActorPost], "post-ref") | |
val preRef = sys.actorOf(Props(classOf[SynchronizerActorPre], postRef), "pre-ref") | |
val preSubscriber: Subscriber[Int] = ActorSubscriber[Int](preRef) | |
val preProducer: Publisher[Int] = ActorPublisher[Int](preRef) | |
val postSubscriber: Subscriber[Int] = ActorSubscriber[Int](postRef) | |
val postProducer: Publisher[Int] = ActorPublisher[Int](postRef) | |
FlowGraph.closed() { implicit b => | |
import akka.stream.scaladsl.FlowGraph.Implicits._ | |
val preIn: Inlet[Int] = b.add(Sink(preSubscriber)) | |
val preOut: Outlet[Int] = b.add(Source(preProducer)) | |
val postIn: Inlet[Int] = b.add(Sink(postSubscriber)) | |
val postOut: Outlet[Int] = b.add(Source(postProducer)) | |
source ~> preIn | |
preOut ~> Flow[Int].filter(_ % 5 == 0).withAttributes(Attributes.inputBuffer(1, 1)) ~> postIn | |
postOut ~> Sink.foreach(println).withAttributes(Attributes.inputBuffer(1, 1)) | |
}.run() | |
} | |
sealed trait SynchronizerEvent | |
case class AnnounceNext[A](next: A) extends SynchronizerEvent | |
case class AckNext[A](next: A) extends SynchronizerEvent | |
case class AnnounceNotMissing[A](elem: A) extends SynchronizerEvent | |
case class AnnounceMissing[A](next: A) extends SynchronizerEvent | |
case class AckMissing[A](next: A) extends SynchronizerEvent | |
case class RetryMissing[A](next: A) extends SynchronizerEvent | |
case class RejectMissing[A](next: A) extends SynchronizerEvent | |
class SynchronizerActorPre(post: ActorRef) extends ActorSubscriber with ActorPublisher[Int] | |
with ActorLogging with Stash { | |
protected def requestStrategy: RequestStrategy = ZeroRequestStrategy | |
def receive: Receive = waitForNext | |
def waitForNext: Receive = LoggingReceive.withLabel("waitForNext") { | |
case Request(n) => request(1) | |
case OnNext(x: Int) => | |
context become announceNext(x) | |
post ! AnnounceNext(x) | |
} | |
def announceNext(elem: Int): Receive = LoggingReceive.withLabel("announceNext") { | |
case AckNext(x: Int) if x == elem => | |
context become awaitReqOrAck(elem) | |
onNext(elem) | |
} | |
def awaitReqOrAck(elem: Int): Receive = LoggingReceive.withLabel("awaitReqOrAck") { | |
case Request(n) => | |
context become awaitAckMissing(elem) | |
post ! AnnounceMissing(elem) | |
} | |
def awaitAckMissing(elem: Int): Receive = LoggingReceive.withLabel("awaitAckMissing") { | |
case OnComplete => onComplete() | |
case RetryMissing(x: Int) if x == elem => | |
post ! AnnounceMissing(elem) | |
case AckMissing(x: Int) if x == elem => | |
context become waitForNext | |
request(1) | |
case RejectMissing(x: Int) if x == elem => | |
context become waitForNext | |
request(1) | |
} | |
} | |
class SynchronizerActorPost extends ActorSubscriber with ActorPublisher[(Int, Option[Int])] with ActorLogging { | |
protected def requestStrategy: RequestStrategy = ZeroRequestStrategy | |
val queue = collection.mutable.Queue[(Int, Option[Int])]() | |
def receive: Receive = waitForNext | |
request(1) | |
def sendQueue() = while(isActive && totalDemand > 0 && queue.nonEmpty) { | |
onNext(queue.dequeue()) | |
} | |
def waitForNext: Receive = LoggingReceive.withLabel("waitForNext") { | |
case OnComplete => | |
sendQueue() | |
onCompleteThenStop() | |
case OnNext(n: Int) => | |
onErrorThenStop(new Exception(s"Received element $n after Ack")) | |
case AnnounceNext(x: Int) => | |
sendQueue() | |
context become awaitNextOrMissing(x, sender) | |
sender ! AckNext(x) | |
} | |
def awaitNextOrMissing(elem: Int, ref: ActorRef, round: Int = 1): Receive = LoggingReceive.withLabel("awaitNextOrMissing") { | |
case OnComplete => onComplete() | |
case OnNext(n: Int) => | |
context become awaitNotMissingAck(elem) | |
queue.enqueue((elem, Some(n))) | |
ref ! AnnounceNotMissing(elem) | |
case AnnounceMissing(x: Int) if x == elem => | |
if (round == 5) { | |
queue.enqueue((elem, None)) | |
if (isCompleted) { | |
sendQueue() | |
context.stop(self) | |
} else { | |
request(1) | |
context become waitForNext | |
} | |
sender ! AckMissing(elem) | |
} else { | |
context become awaitNextOrMissing(elem, ref, round + 1) | |
sender ! RetryMissing(elem) | |
} | |
} | |
def awaitNotMissingAck(elem: Int): Receive = LoggingReceive.withLabel("awaitNotMissingAck") { | |
case AnnounceMissing(x: Int) if x == elem => | |
request(1) | |
context become waitForNext | |
sender ! RejectMissing(elem) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment