Last active
October 29, 2019 22:59
-
-
Save ahoy-jon/915c87e1d2f857eb5b72049960e7680a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ZioToSparkIterator { | |
def toIterator[E, A](q: UIO[stream.Stream[E, A]]): Iterator[Either[E, A]] = new Iterator[Either[E, A]] { | |
sealed trait State | |
case object Running extends State | |
sealed trait ValueOrClosed extends State | |
case object Closed extends ValueOrClosed | |
case class Value(value: Either[E, A]) extends ValueOrClosed | |
var state: State = Running | |
val queue: SynchronousQueue[ValueOrClosed] = new SynchronousQueue() | |
val streamToConcurrentQueue: ZIO[Any, Throwable, Unit] = for { | |
stream <- q | |
_ <- stream.either.foreach(x => ZIO.effect(queue.put(Value(x)))) | |
_ <- ZIO.effect(queue.put(Closed)) | |
} yield {} | |
val zioThread: Thread = new Thread { | |
override def run(): Unit = | |
new DefaultRuntime {}.unsafeRun(streamToConcurrentQueue) | |
} | |
zioThread.start() | |
private def pool(): ValueOrClosed = queue.take() | |
override def hasNext: Boolean = | |
state match { | |
case Closed => false | |
case _: Value => true | |
case Running => | |
state = pool() | |
state != Closed | |
} | |
private val undefinedBehavior = new Exception("called next on a closed Iterator") | |
override def next(): Either[E, A] = | |
state match { | |
case Value(value) => | |
state = Running | |
value | |
case Closed => throw undefinedBehavior | |
case Running => | |
pool() match { | |
case Closed => | |
state = Closed | |
throw undefinedBehavior | |
case Value(v) => | |
//stage = Running | |
v | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment