79548155

Date: 2025-04-01 09:08:37
Score: 1
Natty:
Report link

I think the issue here could be that the element of a stateful DoFn is a tuple[key, value]. One state per key/window, but since you're using global windows, there's one state per key.

Your BagStateSpec parameter stores only the value part of element. But as mentioned above, since you're storing only a single value, you might want to switch to ReadModifyWriteStateSpec.

So first: key, value = element

Consider also adding input type hints to your DoFn: @beam.typehints.with_input_types(KV[str, TimestampedValue])

More here: https://beam.apache.org/blog/stateful-processing/

Reasons:
  • Long answer (-0.5):
  • No code block (0.5):
  • Low reputation (1):
Posted by: halfspring