Can you clarify in your question if you are attempting to read parquet or csv? In the code snippet you provided you are specifying the format as parquet .option("cloudFiles.format", "parquet")
. If you are trying to read csv files using autoloader, the following in your code looks like it might be the cause:
cloudFiles.inferColumnTypes
to true. its default by false as specified in the documentation link below.checkpoint_path
contains the inferred schema information and the checkpoint information.referencing this documentation
(spark
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.inferColumnTypes", "true")
.load(latest_file_location)
.toDF(*new_columns)
.select("*", spark_col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"),current_date().alias("processing_date"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(once=True)
.option("mergeSchema", "true")
.toTable(table_name))