You are missing confluent's magic byte. It has to be the first byte in the message or you'll get this error.
https://www.confluent.io/blog/how-to-fix-unknown-magic-byte-errors-in-apache-kafka/