Skip to content

Instantly share code, notes, and snippets.

@amitayh
Last active July 23, 2019 18:43
Show Gist options
  • Save amitayh/90340b15233074a6e2ddc6fe11733b81 to your computer and use it in GitHub Desktop.
Save amitayh/90340b15233074a6e2ddc6fe11733b81 to your computer and use it in GitHub Desktop.
import scalaz.zio._
import scalaz.zio.clock.Clock
import scalaz.zio.console.Console
import scalaz.zio.duration._
object KafkaClone extends App {
type Consumer[R, A] = A => ZIO[R, Nothing, _]
trait Topic[A] {
def produce(message: A): UIO[Unit]
def subscribe[R](consume: Consumer[R, A], initialOffset: Int = 0): ZIO[R, Nothing, Canceler]
def produced: UIO[Vector[A]]
}
case class Messages[A](log: Vector[A], next: Promise[Nothing, A]) {
def get(index: Int): UIO[A] =
if (index < log.length) ZIO.succeed(log(index))
else next.await
}
object Messages {
def make[A]: UIO[Messages[A]] =
Promise.make[Nothing, A].map(Messages(Vector.empty, _))
}
object Topic {
def make[A]: UIO[Topic[A]] = for {
empty <- Messages.make[A]
state <- Ref.make(empty)
} yield new Topic[A] {
override def produce(message: A): UIO[Unit] = for {
next <- Promise.make[Nothing, A]
prev <- state.modify { messages =>
(messages.next, Messages(messages.log :+ message, next))
}
_ <- prev.succeed(message)
} yield ()
override def subscribe[R](consume: Consumer[R, A], initialOffset: Int): ZIO[R, Nothing, Canceler] = {
def go(offset: Int): ZIO[R, Nothing, Nothing] = for {
messages <- state.get
message <- messages.get(offset)
_ <- consume(message)
result <- go(offset + 1)
} yield result
go(initialOffset).fork.map(_.interrupt)
}
override def produced: UIO[Vector[A]] =
state.get.map(_.log)
}
}
override def run(args: List[String]): ZIO[Environment, Nothing, Int] = for {
ping <- Topic.make[Int]
pong <- Topic.make[Int]
_ <- ping.subscribe[Clock with Console] { i =>
console.putStrLn(s"PING $i") *>
clock.sleep(200.millis) *>
pong.produce(i + 1)
}
_ <- pong.subscribe[Clock with Console] { i =>
console.putStrLn(s"PONG $i") *>
clock.sleep(200.millis) *>
ping.produce(i + 1)
}
_ <- ping.produce(1)
_ <- clock.sleep(5.second)
pings <- ping.produced
pongs <- pong.produced
_ <- console.putStrLn(s"PINGS: $pings")
_ <- console.putStrLn(s"PONGS: $pongs")
} yield 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment