zipWithIndex
for precise batchingrdd = df.rdd.zipWithIndex()
batched_rdds = rdd.map(lambda x: (x[1] // batch_size, x[0])).groupByKey().map(lambda x: x[1])
batched_dfs = [spark.createDataFrame(batch, schema=df.schema) for batch in batched_rdds.collect()]