Skip to content

Instantly share code, notes, and snippets.

@cenkbilgen
Last active March 26, 2025 19:51
Show Gist options
  • Save cenkbilgen/1acd23935cf8217c6650f4f987dd7f22 to your computer and use it in GitHub Desktop.
Save cenkbilgen/1acd23935cf8217c6650f4f987dd7f22 to your computer and use it in GitHub Desktop.
Swift Max Limit on Concurrent Tasks
/*
Ever try to user TaskGroups for working on a bunch of values in a collection concurrently?
It's fine but was having issues if I wantd to limit work to only "n" current tasks at a time.
Throttling is available in AsyncAlgoriths, but that is time based, not number of tasks.
Also tried keeping a counter/window type data struct.
But found a cleaner way with `.async.buffer(policy: .bounded(maxConcurrentTaskCount))`
NOTE: Your collection order will be lost, use a tuple with index to keep that information if you need it
*/
// add package github.com/apple/swift-async-algorithms.git
import AsyncAlgorithms
enum ConcurrentTaskSample {
// map values in InputType to OutputType
func concurrentStream<InputType, OutputType>(inputs: some Collection<InputType>,
maxConcurrentTaskCount: Int,
mapClosure: @escaping (InputType) async throws -> (OutputType)) -> AsyncStream<OutputType> {
let inputStream = inputs.async.buffer(policy: .bounded(maxConcurrentTaskCount))
let (stream, continuation) = AsyncStream.makeStream(of: OutputType.self)
Task {
for await input in inputStream {
try Task.checkCancellation()
let output = try await mapClosure(input)
continuation.yield(output)
}
continuation.finish()
}
return stream
}
func concurrentStreamCollected<InputType, OutputType>(inputs: some Collection<InputType>,
maxConcurrentTaskCount: Int,
mapClosure: @escaping (InputType) async throws -> (OutputType)) async throws -> [OutputType] {
let outputStream = concurrentStream(inputs: inputs,
maxConcurrentTaskCount: maxConcurrentTaskCount,
mapClosure: mapClosure)
var outputs: [OutputType] = []
outputs.reserveCapacity(inputs.count)
for await output in outputStream {
try Task.checkCancellation()
outputs.append(output)
}
return outputs
}
// Sample
struct User {}
func fetchUser(id: String) async throws -> User { User() }
func fetch15UsersAtATime(userIds: [String]) async throws -> [User] {
try await concurrentStreamCollected(inputs: userIds, maxConcurrentTaskCount: 15) { userId in
try await fetchUser(id: userId)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment