79740099

Date: 2025-08-19 15:03:47
Score: 0.5
Natty:
Report link

You didn't reveal a thing about the callback interface you're working with, so I'm just going to assume/hope/guess that the terms of that interface are, "you register a callback once, and then that callback will be occasionally invoked in the future, possibly from a different thread, until it's unregistered".

If that's the case, then try something like this on for size:

async def _callback_iterator(register, unregister):
    loop = asyncio.get_running_loop()
    q = asyncio.Queue()
    callback = lambda x: loop.call_soon_threadsafe(q.put, x)

    register(callback)
    try:
        for x in q:
            yield x
    finally:
        unregister(callback)


def my_api_iterator():
    return _callback_iterator(
        _MY_API.register_callback,
        _MY_API.unregister_callback
    )


async for message in my_api_iterator(
    _MY_API.register_callback,
    _MY_API.unregister_callback
):
    ...

It may seem excessive to use a queue, but that queue embodies the least "spiky" answer to the question: if your asyncio event loop hasn't got around to reading a message by the time your API has a new message, what should happen? Should the callback you passed to your API block? If not, (or if it should only block for a finite amount of time,) then should it just silently drop the new message, or should it raise an exception? What if the API consumer is some low-level, non-Python library code that doesn't support either failure exit-codes or Python exceptions?

Reasons:
  • Long answer (-1):
  • Has code block (-0.5):
  • Ends in question mark (2):
Posted by: JamesTheAwesomeDude