this is how the error for my spark application looks like ->
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 31) (hludlx54.dns21.socgen executor 2): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file hdfs://HDFS-LUDH01/fhml/uv/ibi_a8411/effect_calculation/uv_results_test/closingDate=20240630/frequency=Q/batchId=M-20240630-INIT_RWA-00607-P0001/part-00001-c41ee3a2-5ada-47c9-8e7d-fbb9b180ab81.c000.snappy.parquet. Column: [allocTakeoverEffect], Expected: float, Found: DOUBLE
at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:570)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:195)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
##############################
here's the function in scala for it ->
def pushToResultsSQL(ResultsDf: DataFrame): Unit = {
val resultsTable = config.getString("ibi.db.stage_ec_sql_results_table")
try {
stmt = conn.createStatement()
stmt.executeUpdate(truncateTable(resultsTable))
EffectCalcLogger.info(
s" TABLE $resultsTable TRUNCATE ****",
this.getClass.getName
)
val String_format_list = List( "accounttype", "baseliiaggregategrosscarryoffbalance", "baseliiaggregategrosscarryonbalance", "baseliiaggregateprovoffbalance", "baseliiaggregateprovonbalance", "closingbatchid", "closingclosingdate", "closingifrs9eligibilityflaggrosscarrying", "closingifrs9eligibilityflagprovision", "closingifrs9provisioningstage", "contractid", "contractprimarycurrency", "effectivedate", "exposurenature", "fxsituation", "groupproduct", "indtypprod", "issuingapplicationcode", "openingbatchid", "openingclosingdate", "openingifrs9eligibilityflaggrosscarrying", "openingifrs9eligibilityflagprovision", "openingifrs9provisioningstage", "reportingentitymagnitudecode", "transfert", "closingdate", "frequency", "batchid"
)
val Decimal_format_list = List( "alloctakeovereffect", "closinggrosscarryingamounteur", "closingprovisionamounteur", "exchangeeureffect", "expireddealseffect", "expireddealseffect2", "newproductioneffect", "openinggrosscarryingamounteur", "openingprovisionamounteur", "overallstageeffect", "stages1s2effect", "stages1s3effect", "stages2s1effect", "stages2s3effect", "stages3s1effect", "stages3s2effect"
)
val selectWithCast = ResultsDf.columns.map(column => {
if (String_format_list.contains(column.toLowerCase))
col(column).cast(StringType)
else if (Decimal_format_list.contains(column.toLowerCase))
col(column).cast(DecimalType(30, 2))
else col(column)
})
val ResultsDfWithLoadDateTime =
ResultsDf.withColumn("loaddatetime", current_timestamp())
print(
s"this is ResultsDfWithLoadDateTime: \n ${ResultsDfWithLoadDateTime.show(false) }"
)
val orderOfColumnsInSQL = getTableColumns(resultsTable, conn)
print(s"This is order of columns for results table: $orderOfColumnsInSQL")
EffectCalcLogger.info(
s" Starting writing to $resultsTable table ",
this.getClass.getName
)
ResultsDfWithLoadDateTime.select(selectWithCast: _*).select(orderOfColumnsInSQL.map(col): _*).coalesce(numPartitions).write.mode(org.apache.spark.sql.SaveMode.Append).format(microsoftSqlserverJDBCSpark).options(dfMsqlWriteOptions.configMap ++ Map("dbTable" -> resultsTable)).save()
EffectCalcLogger.info(
s"Writing to $resultsTable table completed ",
this.getClass.getName
)
conn.close()
} catch {
case e: Exception =>
EffectCalcLogger.error(
s"Exception has been raised while pushing to $resultsTable:" + e
.printStackTrace(),
this.getClass.getName
)
throw e
}
}
###################################
and I'll give you the hive create table statement (source side) ->
CREATE EXTERNAL TABLE `uv_results_test`(
`accounttype` string,
`alloctakeovereffect` float,
`baseliiaggregategrosscarryoffbalance` string,
`baseliiaggregategrosscarryonbalance` string,
`baseliiaggregateprovoffbalance` string,
...... rest of the similar columns
`stages3s2effect` float,
`transfert` string)
PARTITIONED BY (
`closingdate` string,
`frequency` string,
`batchid` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://HDFS-LUDH01/fhml/uv/ibi_a8411/effect_calculation/uv_results_test'
#############################
and this is the schema in the SQL side (sink) ->
CREATE TABLE [dbo].[effect_calculation_results](
[fxsituation] [varchar](500) NULL,
[openingclosingdate] [varchar](500) NULL,
[closingclosingdate] [varchar](500) NULL,
[contractid] [varchar](500) NULL,
[issuingApplicationCode] [varchar](500) NULL,
[exposureNature] [varchar](500) NULL,
[groupProduct] [varchar](500) NULL,
[contractPrimaryCurrency] [varchar](500) NULL,
[IndTypProd] [varchar](500) NULL,
[reportingentitymagnitudecode] [varchar](500) NULL,
[openingIfrs9EligibilityFlagGrossCarrying] [varchar](500) NULL,
[openingIfrs9EligibilityFlagProvision] [varchar](500) NULL,
[closingIfrs9EligibilityFlagGrossCarrying] [varchar](500) NULL,
[closingIfrs9EligibilityFlagProvision] [varchar](500) NULL,
[openingprovisionAmountEur] [decimal](30, 2) NULL,
[openinggrossCarryingAmountEur] [decimal](30, 2) NULL,
[closingprovisionAmountEur] [decimal](30, 2) NULL,
[closinggrossCarryingAmountEur] [decimal](30, 2) NULL,
[openingIfrs9ProvisioningStage] [varchar](500) NULL,
[closingifrs9ProvisioningStage] [varchar](500) NULL,
[effectiveDate] [varchar](500) NULL,
[baseliiAggregateGrossCarryOnBalance] [varchar](500) NULL,
[baseliiAggregateGrossCarryOffBalance] [varchar](500) NULL,
[baseliiAggregateProvOnBalance] [varchar](500) NULL,
[baseliiAggregateProvOffBalance] [varchar](500) NULL,
[Transfert] [varchar](500) NULL,
[exchangeEurEffect] [decimal](30, 2) NULL,
[newProductionEffect] [decimal](30, 2) NULL,
[expiredDealsEffect] [decimal](30, 2) NULL,
[allocTakeoverEffect] [decimal](30, 2) NULL,
[stageS1S2Effect] [decimal](30, 2) NULL,
[stageS2S1Effect] [decimal](30, 2) NULL,
[stageS1S3Effect] [decimal](30, 2) NULL,
[stageS3S1Effect] [decimal](30, 2) NULL,
[stageS2S3Effect] [decimal](30, 2) NULL,
[stageS3S2Effect] [decimal](30, 2) NULL,
[overallStageEffect] [decimal](30, 2) NULL,
[expiredDealsEffect2] [decimal](30, 2) NULL,
[loaddatetime] [datetime] NULL,
[openingbatchid] [varchar](500) NULL,
[closingbatchid] [varchar](500) NULL,
[accountType] [varchar](500) NULL
) ON [PRIMARY]
GO
so basically If I have to say, the job is taking the data from hive table and writing it to the SQL side table, but I am not sure why there's this error popping up which I have given in the beginning
I looked at the parquet schema of the data lying underneath hdfs path for column allocTakeoverEffect, its of the type double
please let me know how this issue can be fixed
I tried running this