This is not an expected behavior, of course.
I've never used python kafka clients, but
consumer.commit(message=msg)
What are you trying to commit here? Parameter should be a dict of {TopicPartition: OffsetAndMetadata}
Also, you have commit() in finally block, but (for example) in JVM scenario this block is not guaranteed to be executed (for example SIGTERM/ Control+Brake (SIGINT))
Usually consumer is closed via shutdownhook via .wakeUp + some atomic field (because it's not thread safe object and it can't be closed from another thread) like here
In order to check your commited offsets you can run a tool script and describe your group to see offsets
kafka-consumer-groups.sh --bootstrap-server broker1:30903,broker2:30448, broker3:30805 --describe --group {your group name}
Hope it will give you some clue.