Isn't this enough?
public static class ChannelReaderExtensions
{
public static async IAsyncEnumerable<T[]> Batch<T>(this ChannelReader<T> reader,
int maxBatchSize, TimeSpan interval,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var batch = new List<T>();
var intervalTask = default(Task);
while (!cancellationToken.IsCancellationRequested)
{
if (batch.Count == 0 && await reader.WaitToReadAsync(cancellationToken))
{
intervalTask = Task.Delay(interval, cancellationToken);
if (reader.TryRead(out var item))
batch.Add(item);
}
var readTask = reader.WaitToReadAsync(cancellationToken).AsTask();
var completedTask = await Task.WhenAny(readTask, intervalTask);
if (completedTask == intervalTask)
{
if (batch.Count > 0)
{
yield return [.. batch];
batch.Clear();
}
intervalTask = default;
continue;
}
while (reader.TryRead(out var item))
{
batch.Add(item);
if (batch.Count == maxBatchSize)
{
yield return [.. batch];
batch.Clear();
intervalTask = default;
break;
}
}
}
if (batch.Count > 0)
yield return [.. batch];
cancellationToken.ThrowIfCancellationRequested();
}
}