Created
November 21, 2019 21:16
-
-
Save ahoy-jon/90cabce02b64aeae9f9c6b9ebbbfffc1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ToIterator private (runtime: Runtime[Any]) { | |
def unsafeCreate[V](q: UIO[stream.Stream[_,V]]): Iterator[V] = | |
new Iterator[V] { | |
import ToIterator._ | |
var state: State[V] = Running | |
val synchronousBlockQueue: BlockingQueue[ValueOrClosed[V]] = new SynchronousQueue[ValueOrClosed[V]] | |
private def put(value:ValueOrClosed[V]): Task[Unit] = IO.effect(synchronousBlockQueue.put(value)) | |
private val io = q >>= (_.map(Value.apply).foreachManaged(put).use_(put(Closed))) | |
private val thread: Thread = new Thread() { | |
override def run(): Unit = | |
runtime.unsafeRunAsync_(io) | |
} | |
var threadStarted = false | |
def checkStarted(): Unit = { | |
if(!threadStarted) thread.start() | |
threadStarted = true | |
} | |
private def pool(): ValueOrClosed[V] = { | |
checkStarted() | |
synchronousBlockQueue.take() | |
} | |
override def hasNext: Boolean = | |
state match { | |
case Closed => false | |
case Value(_) => true | |
case Running => | |
state = pool() | |
state != Closed | |
} | |
private val undefinedBehavior = new NoSuchElementException("next on empty iterator") | |
override def next(): V = | |
state match { | |
case Value(value) => | |
state = Running | |
value | |
case Closed => throw undefinedBehavior | |
case Running => | |
pool() match { | |
case Closed => | |
state = Closed | |
throw undefinedBehavior | |
case Value(v) => | |
//stage = Running | |
v | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment