Problem
In backend applications with Vapor, when we process large volumes of data concurrently, we face an optimization dilemma:
- Sequential processing with asyncMap: guarantees resource control, but is slow by processing elements one by one.
- Fully concurrent processing with concurrentMap: maximizes speed, but can saturate resources by launching thousands of simultaneous tasks.
For example, when processing 10,000 records with external API calls:
- asyncMap: 10,000 sequential calls β very slow but controlled.
- concurrentMap: 10,000 simultaneous calls β very fast but can exhaust connections/memory.
We need a solution that combines both approaches: divide the work into manageable groups and process each group concurrently, balancing speed and resource usage.
Solution
We extend Collection with a function that combines chunking (division into groups) and concurrent processing, allowing configuration of chunk size and timeout per chunk.
extension Collection where Element: Sendable {
func asyncConcurrentMap<T: Sendable>(
chunkSize: Int? = nil,
timeout: Double? = nil,
_ transform: @escaping @Sendable (Element) async throws -> T
) async throws -> [T] {
guard let chunkSize else {
return try await concurrentMap(transform)
}
return try await chunks(ofCount: chunkSize)
.asyncMap(timeout: timeout) {
try await $0.concurrentMap(transform)
}.flatMap { $0 }
}
}
Key points:
- If chunkSize is not specified, uses pure concurrentMap (fully concurrent processing).
- If chunkSize is specified, divides the collection into groups with chunks(ofCount:).
- Processes the chunks sequentially with asyncMap (with optional timeout).
- Within each chunk, processes the elements concurrently with concurrentMap.
- Flattens the results with flatMap to return a unified array.
Result
// Process 10,000 records in chunks of 100
// 100 concurrent tasks at a time, 100 times
let results = try await records.asyncConcurrentMap(
chunkSize: 100,
timeout: 30.0
) { record in
try await apiClient.process(record)
}
Benefits of this approach:
β‘ Perfect balance: combines the speed of concurrent processing with the control of sequential processing by chunks.
π― Resource control: limits the number of simultaneous tasks to the chunk size, avoiding saturation.
β±οΈ Timeout per chunk: detects and handles problematic chunks without blocking all processing.
π§ Full flexibility: use chunking when you need it, or pure concurrent processing when you donβt.
π Scalability: allows processing millions of records by adjusting chunk size according to available resources.
If you need pure parallel execution, I covered that in Concurrent Map. And if your sequential chunks need rate limiting between them, I added that capability in Async Map Timeout.
This solution is the natural evolution of asyncMap and concurrentMap, combining the best of both worlds to optimize massive data processing in backend applications.
Keep coding, keep running πββοΈ