79745212

Date: 2025-08-24 22:42:20
Score: 1.5
Natty:
Report link

Isn't this enough?

public static class ChannelReaderExtensions
{
    public static async IAsyncEnumerable<T[]> Batch<T>(this ChannelReader<T> reader, 
        int maxBatchSize, TimeSpan interval,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var batch = new List<T>();
        var intervalTask = default(Task);

        while (!cancellationToken.IsCancellationRequested)
        {
            if (batch.Count == 0 && await reader.WaitToReadAsync(cancellationToken))
            {
                intervalTask = Task.Delay(interval, cancellationToken);
                if (reader.TryRead(out var item))
                    batch.Add(item);
            }

            var readTask = reader.WaitToReadAsync(cancellationToken).AsTask();
            var completedTask = await Task.WhenAny(readTask, intervalTask);

            if (completedTask == intervalTask)
            {
                if (batch.Count > 0)
                {
                    yield return [.. batch];
                    batch.Clear();
                }

                intervalTask = default;
                continue;
            }

            while (reader.TryRead(out var item))
            {
                batch.Add(item);

                if (batch.Count == maxBatchSize)
                {
                    yield return [.. batch];
                    batch.Clear();
                    intervalTask = default;
                    break;
                }
            }
        }

        if (batch.Count > 0)
            yield return [.. batch];

        cancellationToken.ThrowIfCancellationRequested();
    }
}
Reasons:
  • Long answer (-1):
  • Has code block (-0.5):
  • Ends in question mark (2):
  • Starts with a question (0.5): Isn't this
  • Low reputation (0.5):
Posted by: kamilz