79216422

Date: 2024-11-22 19:59:54
Score: 0.5
Natty:
Report link

So, I have a solution, but it's a little hacky. For reasons I don't understand, the publisher from pekko needs a subscriber other than the ones over the ones from RSocket. This works, but with the caveat that the sink will consume everything it can if there are no connected clients (which is actually preferred in my use case)

def serverSink : Sink[Payload, NotUsed] = {
    val sink : Sink[Payload, Publisher[Payload]] = Sink.asPublisher(true)
    sink.mapMaterializedValue { pub =>
      pub.subscribe(new Subscriber[Payload] {
        override def onComplete(): Unit = ()
        override def onError(t: Throwable): Unit = ()
        override def onNext(t: Payload): Unit = ()
        override def onSubscribe(s: Subscription): Unit = s.request(Long.MaxValue)
      })
      RSocketServer.create(
        SocketAcceptor.forRequestStream(payload =>
          Flux.from(pub)
      )).bindNow(TcpServerTransport.create("localhost", 3141))
      NotUsed.notUsed()
    }
  }
Reasons:
  • Long answer (-0.5):
  • Has code block (-0.5):
  • Self-answer (0.5):
  • Low reputation (1):
Posted by: David Masters