Can you remove: inclusiveEndAt=end_time.isoformat()
I believe this is causing the pipeline to run in batch mode when you really want it to be in streaming mode.