Is the implementation of EventEmitter
under your control? or it is just a mock implementation of external service? If they're under your control I think you can make the emitNextFailureHandler
returns true
when encountering FAIL_OVERFLOW
or FAIL_TERMINATED
so that the EventEmitter
can retry emit.
Simple example:
Sinks.EmitFailureHandler emitNextFailureHandler = (signalType, emitResult) -> {
return Sinks.EmitResult.FAIL_OVERFLOW.equals(emitResult) || Sinks.EmitResult.FAIL_NON_SERIALIZED.equals(emitResult);
};
Also, I'd like to discuss this in another aspect.
I think the repository::remove
is a synchronized method that blocks your thread so that's why the queue is under pressure so hard. This might not be a best practice for reactive programming. A better way to do this might be to make the repository.remove()
return a Mono
or Flux
(in practice you may need to change the dependencies to Reactive
related dependencies), then use .flatMap(repository::remove)
instead of .doOnNext()