Last active
September 2, 2021 13:54
-
-
Save pbuyle/66b6ec40eb0360993af829c50fe08035 to your computer and use it in GitHub Desktop.
Untested Scala code to group com.twitter.concurrent.AsyncStream elements toghether. I wrote it thinking I needed it but was wrong. Manbe I will eventually need it. Maybe not. Maybe I will not remember this gist when I do. Maybe I got it all wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.concurrent.AsyncStream | |
object AsyncStreamHelpers { | |
implicit class RichAsyncStream[T](val stream: AsyncStream[T]) { | |
/** | |
* Partitions this stream into sequences of adjacent elements grouped according to some discriminator function. | |
* | |
* This method will evaluation of this stream until its first element not belonging to the first group (ie. if the | |
* first element of the returned stream is a size of `n` then `n + 1` first element of this stream will be | |
* evaluated). Accessing each subsequent element of the resulting stream will evaluate another group of elements | |
* from thies stream. | |
* | |
* {{{ | |
* val stream = AsyncStream(("a",1),("a",2),("b",1),("a", 3),("b", 2"),("b",3)) | |
* stream.groupBy(_._1) = AsyncStream( | |
* ("a" -> ("a",1) :: ("a",2) :: Nil), | |
* ("b" -> ("b",1") :: Nil), | |
* ("a" -> ("a",3") :: Nil), | |
* ("b" -> ("b",2) :: ("b",3) :: Nil) | |
* }}} | |
* | |
* @param f the discriminator function. | |
* @tparam K the type of keys returned by the discriminator function. | |
*/ | |
def groupBy[K](f: T => K): AsyncStream[(K, Seq[T])] = { | |
val head = AsyncStream.fromFuture(stream.uncons.flatMap { | |
case Some((head, tail)) => | |
val headKey = f(head) | |
tail().takeWhile(f(_) == headKey).toSeq().map(headKey -> _).map(Some(_)) | |
case None => Future.None | |
}) | |
head.flatMap { | |
case Some((key, seq)) => AsyncStream.mk((key, seq), stream.drop(seq.size).groupBy(f)) | |
case None => AsyncStream.empty | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment