Can u give a try using below two options
spark.read.schema(schema).option("mode", "DROPMALFORMED").json("my_path")
spark.read.schema(schema).option("mode", "FAIFAST").json("my_path")
DROPMALFORMED
: will ignore the corrupted records.
FAILFAST
: throws an exception when it meets corrupted records.