79450552

Date: 2025-02-19 07:44:52
Score: 1
Natty:
Report link

I managed to solve the problem so I'm posting the fix. Maybe someone will find it useful one day.

Firstly, I added a withIdlness to my watermark strategy so it looks like this at the moment:

      WatermarkStrategy<Message> watermarkStrategy = WatermarkStrategy
                .<Message>forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.SECONDS))
                .withTimestampAssigner((event, timestamp) -> event.getApproximateCreationDateTime().getTime())
                .withIdleness(idlenessTolerance);

Then thanks to the help of my colleague I set parallelism for input stream equal to number of shards of Kinesis Stream which I work with so the part of code connected with it looks like this:

DataStream<Message> input = env.fromSource(source,
                watermarkStrategy,
                "Kinesis source",
                TypeInformation.of(Message.class))
                .setParallelism(4);

It's a pity that when working with Flink there are no sign of such misconfiguration or at least it wasn't easy for me to detect it.

Cheers!

Reasons:
  • Blacklisted phrase (0.5): thanks
  • Blacklisted phrase (1): Cheers
  • Long answer (-1):
  • Has code block (-0.5):
  • Self-answer (0.5):
  • Low reputation (0.5):
Posted by: Andy