You didn't reveal a thing about the callback interface you're working with, so I'm just going to assume/hope/guess that the terms of that interface are, "you register a callback once, and then that callback will be occasionally invoked in the future, possibly from a different thread, until it's unregistered".
If that's the case, then try something like this on for size:
async def _callback_iterator(register, unregister):
loop = asyncio.get_running_loop()
q = asyncio.Queue()
callback = lambda x: loop.call_soon_threadsafe(q.put, x)
register(callback)
try:
for x in q:
yield x
finally:
unregister(callback)
def my_api_iterator():
return _callback_iterator(
_MY_API.register_callback,
_MY_API.unregister_callback
)
async for message in my_api_iterator(
_MY_API.register_callback,
_MY_API.unregister_callback
):
...
It may seem excessive to use a queue, but that queue embodies the least "spiky" answer to the question: if your asyncio event loop hasn't got around to reading a message by the time your API has a new message, what should happen? Should the callback you passed to your API block? If not, (or if it should only block for a finite amount of time,) then should it just silently drop the new message, or should it raise an exception? What if the API consumer is some low-level, non-Python library code that doesn't support either failure exit-codes or Python exceptions?