Created
February 5, 2020 10:18
-
-
Save ahoy-jon/284bff4495c7bd42c4a610fb26174b0c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def toIterator[R, E, A]( | |
stream: ZStream[R, E, A] | |
): ZManaged[R, Nothing, Iterator[Either[E, A]]] = { | |
/* | |
* Internal state of a Iterator pulling from a ZStream | |
* | |
* It starts as Running | |
* | |
* when Running , on pull (hasNext), pull the ZStream and switch to Closed or Value | |
* when Value , on consume (next), return the Value and switch to Running | |
* when Closed , on pull (hasNext), stays Closed | |
*/ | |
sealed trait State | |
case object Running extends State | |
sealed trait ValueOrClosed extends State | |
case object Closed extends ValueOrClosed | |
case class Value(value: Either[E, A]) extends ValueOrClosed | |
for { | |
state <- RefM.make[State](Running).toManaged_ | |
pull <- stream.process | |
runtime <- ZIO.runtime[R].toManaged_ | |
} yield { | |
val pool: URIO[R, ValueOrClosed] = | |
pull.fold({ | |
case None => Closed | |
case Some(e) => Value(Left(e)) | |
}, x => Value(Right(x))) | |
val _next: RIO[R, Either[E, A]] = { | |
def loop: State => URIO[R, (Either[Throwable, Either[E, A]], State)] = { | |
case Closed => UIO(Left(new NoSuchElementException("next on empty iterator")) -> Closed) | |
case Value(x) => UIO(Right(x) -> Running) | |
case Running => pool >>= loop | |
} | |
state | |
.modify(loop) | |
.absolve | |
} | |
val _hasNext: URIO[R, Boolean] = { | |
def loop: State => URIO[R, (Boolean, State)] = { | |
case Closed => UIO(false -> Closed) | |
case x: Value => UIO(true -> x) | |
case Running => pool >>= loop | |
} | |
state.modify(loop) | |
} | |
new Iterator[Either[E, A]] { | |
override def hasNext: Boolean = runtime.unsafeRun(_hasNext) | |
override def next(): Either[E, A] = runtime.unsafeRun(_next) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment