79684820

Date: 2025-06-30 13:18:38
Score: 1.5
Natty:
Report link

now it is working and solved thanks to @miguel-grinberg!

First I switched to gevent instead of using eventlet and made some changes to my code, so instead of running the pubsub in the background as a thread I am running it with socketio default's.

# extensions.py
socketio = SocketIO(path='/socket.io', async_mode='gevent', cors_allowed_origins='*', ping_timeout=15, ping_interval=60)

# __init__.py
def create_app(config_class=Config):
....
    socketio.init_app(app, message_queue=redis_db_static.redis_url, channel=app.config.get("WEBSOCKET_CHANNEL"))

Then within my redis publish method I did so that it could work both with websockets or with other channels/services and keep my websocket dispatcher services class (think that this code is running in a celery worker):

    def publish(self, pubsub_message: RedisPubSubMessage):
        try:
            if pubsub_message.module == "RedisWS":
                WSS = self.app.extensions.get("RedisWS").ws_services.get(pubsub_message.company_id)
                # TODO the reponse model should route to a WSService or to something different
                if pubsub_message.message is not None:
                    if isinstance(pubsub_message.message, list):
                        getattr(WSS, pubsub_message.method)(*pubsub_message.message)
                    elif isinstance(pubsub_message.message, dict):
                        getattr(WSS, pubsub_message.method)(pubsub_message.message)
                    elif isinstance(pubsub_message.message, str):
                        getattr(WSS, pubsub_message.method)(pubsub_message.message)
                else:
                    getattr(WSS, pubsub_message.method)()
                self.logger.debug(f"Event emitted in socketio {self.socketio}: {pubsub_message.model_dump()}")
                return "emitted to sockets"
            else:
                # GENERIC PUBLISH
                return self.redis.publish(self.channel, pubsub_message.model_dump_json())
        except Exception as e:
            self.logger.error(f"Pubsub publish error: {e}").save("pubsub_published")

class WSService:
    def __init__(self, company, socketio):
        self._version = '2.2'
        self.socket = socketio
        self.logger = logger
...
    def new_message(self, message):
        if message.tracking_status != "hidden":
            message_payload = message.to_dict()
            self.socket.emit('new_message', message_payload, room=message.user.id)
           
Reasons:
  • Blacklisted phrase (0.5): thanks
  • Long answer (-1):
  • Has code block (-0.5):
  • User mentioned (1): @miguel-grinberg
  • Self-answer (0.5):
  • Low reputation (1):
Posted by: Aitor Gastaminza Unanue