Created
December 11, 2015 15:54
-
-
Save natewave/8ad4e74b8fa253bb1f71 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.{ Future, ExecutionContext } | |
object GroupedAsync { | |
def sequencePar[K, V, R](fs: ((K, V)) => Future[(K, R)])(input: Seq[(K, V)])(implicit ec: ExecutionContext): Future[Seq[(K, R)]] = { | |
val result: Future[Seq[(K, R)]] = { | |
val grouped: Map[K, Seq[(K, V)]] = input.groupBy(_._1) | |
val futurePerKeyMap: Map[K, Future[Seq[(K, R)]]] = grouped.mapValues { elements => | |
val l: Future[Seq[(K, R)]] = elements.foldLeft(Future(Seq.empty[(K, R)])) { | |
(previousFuture, next) => | |
for { | |
previousResults <- previousFuture | |
next <- fs(next) | |
} yield previousResults :+ next | |
} | |
l | |
} | |
val future: Future[Seq[Seq[(K, R)]]] = Future.sequence(futurePerKeyMap.valuesIterator.toSeq) | |
val flat = future.map(_.flatten) | |
flat | |
} | |
result | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment