79470581

Date: 2025-02-26 17:31:47
Score: 2
Natty:
Report link
this is how the error for my spark application looks like ->

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 31) (hludlx54.dns21.socgen executor 2): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file hdfs://HDFS-LUDH01/fhml/uv/ibi_a8411/effect_calculation/uv_results_test/closingDate=20240630/frequency=Q/batchId=M-20240630-INIT_RWA-00607-P0001/part-00001-c41ee3a2-5ada-47c9-8e7d-fbb9b180ab81.c000.snappy.parquet. Column: [allocTakeoverEffect], Expected: float, Found: DOUBLE
    at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:570)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:195)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)

##############################

here's the function in scala for it ->

  def pushToResultsSQL(ResultsDf: DataFrame): Unit = {
    val resultsTable = config.getString("ibi.db.stage_ec_sql_results_table")

    try {
      stmt = conn.createStatement()
      stmt.executeUpdate(truncateTable(resultsTable))

      EffectCalcLogger.info(
        s" TABLE $resultsTable TRUNCATE  ****",
        this.getClass.getName
      )

      val String_format_list = List( "accounttype", "baseliiaggregategrosscarryoffbalance", "baseliiaggregategrosscarryonbalance", "baseliiaggregateprovoffbalance", "baseliiaggregateprovonbalance", "closingbatchid", "closingclosingdate", "closingifrs9eligibilityflaggrosscarrying", "closingifrs9eligibilityflagprovision", "closingifrs9provisioningstage", "contractid", "contractprimarycurrency", "effectivedate", "exposurenature", "fxsituation", "groupproduct", "indtypprod", "issuingapplicationcode", "openingbatchid", "openingclosingdate", "openingifrs9eligibilityflaggrosscarrying", "openingifrs9eligibilityflagprovision", "openingifrs9provisioningstage", "reportingentitymagnitudecode", "transfert", "closingdate", "frequency", "batchid"
      )

      val Decimal_format_list = List( "alloctakeovereffect", "closinggrosscarryingamounteur", "closingprovisionamounteur", "exchangeeureffect", "expireddealseffect", "expireddealseffect2", "newproductioneffect", "openinggrosscarryingamounteur", "openingprovisionamounteur", "overallstageeffect", "stages1s2effect", "stages1s3effect", "stages2s1effect", "stages2s3effect", "stages3s1effect", "stages3s2effect"
      )

      val selectWithCast = ResultsDf.columns.map(column => {
        if (String_format_list.contains(column.toLowerCase))
          col(column).cast(StringType)
        else if (Decimal_format_list.contains(column.toLowerCase))
          col(column).cast(DecimalType(30, 2))
        else col(column)
      })

      val ResultsDfWithLoadDateTime =
        ResultsDf.withColumn("loaddatetime", current_timestamp())

      print(
        s"this is ResultsDfWithLoadDateTime: \n ${ResultsDfWithLoadDateTime.show(false) }"
      )

      val orderOfColumnsInSQL = getTableColumns(resultsTable, conn)

      print(s"This is order of columns for results table: $orderOfColumnsInSQL")

      EffectCalcLogger.info(
        s" Starting writing to $resultsTable table ",
        this.getClass.getName
      )

      ResultsDfWithLoadDateTime.select(selectWithCast: _*).select(orderOfColumnsInSQL.map(col): _*).coalesce(numPartitions).write.mode(org.apache.spark.sql.SaveMode.Append).format(microsoftSqlserverJDBCSpark).options(dfMsqlWriteOptions.configMap ++ Map("dbTable" -> resultsTable)).save()

      EffectCalcLogger.info(
        s"Writing to $resultsTable table completed ",
        this.getClass.getName
      )
      conn.close()
    } catch {
      case e: Exception =>
        EffectCalcLogger.error(
          s"Exception has been raised while pushing to $resultsTable:" + e
            .printStackTrace(),
          this.getClass.getName
        )
        throw e
    }

  }

###################################

and I'll give you the hive create table statement (source side) ->

CREATE EXTERNAL TABLE `uv_results_test`(
  `accounttype` string, 
  `alloctakeovereffect` float, 
  `baseliiaggregategrosscarryoffbalance` string, 
  `baseliiaggregategrosscarryonbalance` string, 
  `baseliiaggregateprovoffbalance` string, 
    ...... rest of the similar columns
  `stages3s2effect` float, 
  `transfert` string)
PARTITIONED BY ( 
  `closingdate` string, 
  `frequency` string, 
  `batchid` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://HDFS-LUDH01/fhml/uv/ibi_a8411/effect_calculation/uv_results_test'

#############################

and this is the schema in the SQL side (sink) ->

CREATE TABLE [dbo].[effect_calculation_results](
    [fxsituation] [varchar](500) NULL,
    [openingclosingdate] [varchar](500) NULL,
    [closingclosingdate] [varchar](500) NULL,
    [contractid] [varchar](500) NULL,
    [issuingApplicationCode] [varchar](500) NULL,
    [exposureNature] [varchar](500) NULL,
    [groupProduct] [varchar](500) NULL,
    [contractPrimaryCurrency] [varchar](500) NULL,
    [IndTypProd] [varchar](500) NULL,
    [reportingentitymagnitudecode] [varchar](500) NULL,
    [openingIfrs9EligibilityFlagGrossCarrying] [varchar](500) NULL,
    [openingIfrs9EligibilityFlagProvision] [varchar](500) NULL,
    [closingIfrs9EligibilityFlagGrossCarrying] [varchar](500) NULL,
    [closingIfrs9EligibilityFlagProvision] [varchar](500) NULL,
    [openingprovisionAmountEur] [decimal](30, 2) NULL,
    [openinggrossCarryingAmountEur] [decimal](30, 2) NULL,
    [closingprovisionAmountEur] [decimal](30, 2) NULL,
    [closinggrossCarryingAmountEur] [decimal](30, 2) NULL,
    [openingIfrs9ProvisioningStage] [varchar](500) NULL,
    [closingifrs9ProvisioningStage] [varchar](500) NULL,
    [effectiveDate] [varchar](500) NULL,
    [baseliiAggregateGrossCarryOnBalance] [varchar](500) NULL,
    [baseliiAggregateGrossCarryOffBalance] [varchar](500) NULL,
    [baseliiAggregateProvOnBalance] [varchar](500) NULL,
    [baseliiAggregateProvOffBalance] [varchar](500) NULL,
    [Transfert] [varchar](500) NULL,
    [exchangeEurEffect] [decimal](30, 2) NULL,
    [newProductionEffect] [decimal](30, 2) NULL,
    [expiredDealsEffect] [decimal](30, 2) NULL,
    [allocTakeoverEffect] [decimal](30, 2) NULL,
    [stageS1S2Effect] [decimal](30, 2) NULL,
    [stageS2S1Effect] [decimal](30, 2) NULL,
    [stageS1S3Effect] [decimal](30, 2) NULL,
    [stageS3S1Effect] [decimal](30, 2) NULL,
    [stageS2S3Effect] [decimal](30, 2) NULL,
    [stageS3S2Effect] [decimal](30, 2) NULL,
    [overallStageEffect] [decimal](30, 2) NULL,
    [expiredDealsEffect2] [decimal](30, 2) NULL,
    [loaddatetime] [datetime] NULL,
    [openingbatchid] [varchar](500) NULL,
    [closingbatchid] [varchar](500) NULL,
    [accountType] [varchar](500) NULL
) ON [PRIMARY]
GO

so basically If I have to say, the job is taking the data from hive table and writing it to the SQL side table, but I am not sure why there's this error popping up which I have given in the beginning

I looked at the parquet schema of the data lying underneath hdfs path for column allocTakeoverEffect, its of the type double

please let me know how this issue can be fixed

I tried running this 
Reasons:
  • RegEx Blacklisted phrase (2.5): please let me know how
  • Long answer (-1):
  • Has code block (-0.5):
  • Low reputation (1):
Posted by: Arun Rathod