Last active
March 26, 2025 19:51
-
-
Save cenkbilgen/1acd23935cf8217c6650f4f987dd7f22 to your computer and use it in GitHub Desktop.
Swift Max Limit on Concurrent Tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Ever try to user TaskGroups for working on a bunch of values in a collection concurrently? | |
It's fine but was having issues if I wantd to limit work to only "n" current tasks at a time. | |
Throttling is available in AsyncAlgoriths, but that is time based, not number of tasks. | |
Also tried keeping a counter/window type data struct. | |
But found a cleaner way with `.async.buffer(policy: .bounded(maxConcurrentTaskCount))` | |
NOTE: Your collection order will be lost, use a tuple with index to keep that information if you need it | |
*/ | |
// add package github.com/apple/swift-async-algorithms.git | |
import AsyncAlgorithms | |
enum ConcurrentTaskSample { | |
// map values in InputType to OutputType | |
func concurrentStream<InputType, OutputType>(inputs: some Collection<InputType>, | |
maxConcurrentTaskCount: Int, | |
mapClosure: @escaping (InputType) async throws -> (OutputType)) -> AsyncStream<OutputType> { | |
let inputStream = inputs.async.buffer(policy: .bounded(maxConcurrentTaskCount)) | |
let (stream, continuation) = AsyncStream.makeStream(of: OutputType.self) | |
Task { | |
for await input in inputStream { | |
try Task.checkCancellation() | |
let output = try await mapClosure(input) | |
continuation.yield(output) | |
} | |
continuation.finish() | |
} | |
return stream | |
} | |
func concurrentStreamCollected<InputType, OutputType>(inputs: some Collection<InputType>, | |
maxConcurrentTaskCount: Int, | |
mapClosure: @escaping (InputType) async throws -> (OutputType)) async throws -> [OutputType] { | |
let outputStream = concurrentStream(inputs: inputs, | |
maxConcurrentTaskCount: maxConcurrentTaskCount, | |
mapClosure: mapClosure) | |
var outputs: [OutputType] = [] | |
outputs.reserveCapacity(inputs.count) | |
for await output in outputStream { | |
try Task.checkCancellation() | |
outputs.append(output) | |
} | |
return outputs | |
} | |
// Sample | |
struct User {} | |
func fetchUser(id: String) async throws -> User { User() } | |
func fetch15UsersAtATime(userIds: [String]) async throws -> [User] { | |
try await concurrentStreamCollected(inputs: userIds, maxConcurrentTaskCount: 15) { userId in | |
try await fetchUser(id: userId) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment