Sunday, September 6, 2020

Batched Task.WhenAll

Task.WhenAll1 can run a list of tasks concurrently, but this could potentially exhaust the underlying resource (e.g. db connections). Instead we can run the tasks in batches, limiting the size or concurrency of each batch to something the underlying resource can handle.

Using SemaphoreSlim

The following implementation is a simplification of a StackOverflow answer2. It uses a SemaphoreSlim3, which is a lightweight alternative4 to Semaphore5.

public static async Task<IEnumerable<T>> WhenAllBatched<T>(IEnumerable<Task<T>> tasks, int batchSize)
{
    using var semaphore = new SemaphoreSlim(batchSize);

    var pendingTasks = tasks.Select(async task =>
    {
        try
        {
            semaphore.Wait();
            return await task;
        }
        finally
        {
            semaphore.Release();
        }
    });

    return await Task.WhenAll(pendingTasks);
}

Using a loop

An even cheaper alternative, would be to use Chunk6 to create a list of batches, then loop over those batches, running each concurrently with Task.WhenAll1.

public static async Task<IEnumerable<T>> WhenAllBatched<T>(IEnumerable<Task<T>> tasks, int batchSize)
{
    IEnumerable<T> results = new List<T>();
    foreach (var batch in tasks.Chunk(batchSize))
    {
        var partialResults = await Task.WhenAll(batch);
        results = results.Concat(partialResults);
    }
    return results;
}

Footnotes

  1. https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.whenall?view=net-6.0 2

  2. https://stackoverflow.com/questions/42511104/run-asynchronous-tasks-in-batch

  3. https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphoreslim?view=net-6.0

  4. https://docs.microsoft.com/en-us/dotnet/standard/threading/semaphore-and-semaphoreslim

  5. https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphore?view=net-6.0

  6. https://docs.microsoft.com/en-us/dotnet/api/system.linq.enumerable.chunk?view=net-6.0