import org.springframework.cloud.stream.binder.kinesis.listener.CheckpointerAwareMessageHandler;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
@Component
public class KinesisCheckpointerHandler extends CheckpointerAwareMessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(KinesisCheckpointerHandler.class);
public KinesisCheckpointerHandler() {
super();
}
@Override
protected void onMessage(Object payload, ShardRecordProcessorCheckpointer checkpointer) {
LOGGER.info("Processing message: {}", payload);
try {
checkpointer.checkpoint();
LOGGER.info("Checkpoint successful.");
} catch (Exception e) {
LOGGER.error("Checkpoint failed: {}", e.getMessage(), e);
}
}
}