now it is working and solved thanks to @miguel-grinberg!
First I switched to gevent
instead of using eventlet
and made some changes to my code, so instead of running the pubsub in the background as a thread I am running it with socketio default's.
# extensions.py
socketio = SocketIO(path='/socket.io', async_mode='gevent', cors_allowed_origins='*', ping_timeout=15, ping_interval=60)
# __init__.py
def create_app(config_class=Config):
....
socketio.init_app(app, message_queue=redis_db_static.redis_url, channel=app.config.get("WEBSOCKET_CHANNEL"))
Then within my redis publish method I did so that it could work both with websockets or with other channels/services and keep my websocket dispatcher services class (think that this code is running in a celery worker):
def publish(self, pubsub_message: RedisPubSubMessage):
try:
if pubsub_message.module == "RedisWS":
WSS = self.app.extensions.get("RedisWS").ws_services.get(pubsub_message.company_id)
# TODO the reponse model should route to a WSService or to something different
if pubsub_message.message is not None:
if isinstance(pubsub_message.message, list):
getattr(WSS, pubsub_message.method)(*pubsub_message.message)
elif isinstance(pubsub_message.message, dict):
getattr(WSS, pubsub_message.method)(pubsub_message.message)
elif isinstance(pubsub_message.message, str):
getattr(WSS, pubsub_message.method)(pubsub_message.message)
else:
getattr(WSS, pubsub_message.method)()
self.logger.debug(f"Event emitted in socketio {self.socketio}: {pubsub_message.model_dump()}")
return "emitted to sockets"
else:
# GENERIC PUBLISH
return self.redis.publish(self.channel, pubsub_message.model_dump_json())
except Exception as e:
self.logger.error(f"Pubsub publish error: {e}").save("pubsub_published")
class WSService:
def __init__(self, company, socketio):
self._version = '2.2'
self.socket = socketio
self.logger = logger
...
def new_message(self, message):
if message.tracking_status != "hidden":
message_payload = message.to_dict()
self.socket.emit('new_message', message_payload, room=message.user.id)