I had to combine Craig Kelly's response with this one: https://stackoverflow.com/a/77911668/3776765
@pytest.fixture(scope='session')
def event_loop():
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
yield loop
pending = asyncio.tasks.all_tasks(loop)
if pending:
loop.run_until_complete(asyncio.gather(*pending))
loop.run_until_complete(asyncio.sleep(1))
loop.close()