Can u give a try using below two options
spark.read.schema(schema).option("mode", "DROPMALFORMED").json("my_path")
spark.read.schema(schema).option("mode", "FAIFAST").json("my_path")
DROPMALFORMED : will ignore the corrupted records.
FAILFAST : throws an exception when it meets corrupted records.