79475020

Date: 2025-02-28 09:22:25
Score: 3
Natty:
Report link
this is how my code looks like rn ->

package org.socgen.ibi.effectCalc.jdbcConn

import com.typesafe.config.Config
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.sql.{Connection, DriverManager, Statement}
import org.socgen.ibi.effectCalc.logger.EffectCalcLogger
import org.socgen.ibi.effectCalc.common.MsSqlJdbcConnectionInfo

class EffectCalcJdbcConnection(config: Config) {

  private val microsoftSqlserverJDBCSpark = "com.microsoft.sqlserver.jdbc.spark"
  val url: String = config.getString("ibi.db.jdbcURL")
  val user: String = config.getString("ibi.db.user")
  private val pwd: String = config.getString("ibi.db.password")
  private val driverClassName: String = config.getString("ibi.db.driverClass")
  private val databaseName: String = config.getString("ibi.db.stage_ec_sql")
  private val dburl = s"$url;databasename=$databaseName"

  private val dfMsqlWriteOptions = new MsSqlJdbcConnectionInfo(dburl, user, pwd)

  private val connectionProperties = new java.util.Properties()
  connectionProperties.setProperty("Driver", s"$driverClassName")
  connectionProperties.setProperty("AutoCommit", "true")
  connectionProperties.put("user", s"$user")
  connectionProperties.put("password", s"$pwd")
  Class.forName(s"$driverClassName")
  private val conn: Connection = DriverManager.getConnection(dburl, user, pwd)
  private var stmt: Statement = null

  private def truncateTable(table: String): String = {
    "TRUNCATE TABLE " + table + ";"
  }

  private def getTableColumns(
                               table: String,
                               connection: Connection
                             ): List[String] = {
    val columnStartingIndex = 1
    val statement = s"SELECT TOP 0 * FROM $table"
    val resultSetMetaData =
      connection.createStatement().executeQuery(statement).getMetaData
    println("Metadata" + resultSetMetaData)
    val columnToFilter = List("TO ADD")
    (columnStartingIndex to resultSetMetaData.getColumnCount).toList
      .map(resultSetMetaData.getColumnName)
      .filterNot(columnToFilter.contains(_))
  }

  def pushToResultsSQL(ResultsDf: DataFrame): Unit = {
    val resultsTable = config.getString("ibi.db.stage_ec_sql_results_table")

    try {
      stmt = conn.createStatement()
      stmt.executeUpdate(truncateTable(resultsTable))

      EffectCalcLogger.info(
        s" TABLE $resultsTable TRUNCATE  ****",
        this.getClass.getName
      )

      val numExecutors =
        ResultsDf.sparkSession.conf.get("spark.executor.instances").toInt
      val numExecutorsCores =
        ResultsDf.sparkSession.conf.get("spark.executor.cores").toInt
      val numPartitions = numExecutors * numExecutorsCores
      EffectCalcLogger.info(
        s"coalesce($numPartitions)  <---> (numExecutors = $numExecutors) * (numExecutorsCores = $numExecutorsCores)",
        this.getClass.getName
      )

      val String_format_list = List( "accounttype", "baseliiaggregategrosscarryoffbalance", "baseliiaggregategrosscarryonbalance", "baseliiaggregateprovoffbalance", "baseliiaggregateprovonbalance", "closingbatchid", "closingclosingdate", "closingifrs9eligibilityflaggrosscarrying", "closingifrs9eligibilityflagprovision", "closingifrs9provisioningstage", "contractid", "contractprimarycurrency", "effectivedate", "exposurenature", "fxsituation", "groupproduct", "indtypprod", "issuingapplicationcode", "openingbatchid", "openingclosingdate", "openingifrs9eligibilityflaggrosscarrying", "openingifrs9eligibilityflagprovision", "openingifrs9provisioningstage", "reportingentitymagnitudecode", "transfert", "closingdate", "frequency", "batchid"
      )

      val Decimal_format_list = List( "alloctakeovereffect", "closinggrosscarryingamounteur", "closingprovisionamounteur", "exchangeeureffect", "expireddealseffect", "expireddealseffect2", "newproductioneffect", "openinggrosscarryingamounteur", "openingprovisionamounteur", "overallstageeffect", "stages1s2effect", "stages1s3effect", "stages2s1effect", "stages2s3effect", "stages3s1effect", "stages3s2effect"
      )

      val selectWithCast = ResultsDf.columns.map(column => {
        if (String_format_list.contains(column.toLowerCase))
          col(column).cast(StringType)
        else if (Decimal_format_list.contains(column.toLowerCase))
          col(column).cast(DoubleType).cast(DecimalType(30, 2))
        else col(column)
      })

      print(s"This is selectWithCast for Results Table: $selectWithCast")

      val ResultsDfWithLoadDateTime =
        ResultsDf.withColumn("loaddatetime", current_timestamp())

      print(
        s"this is ResultsDfWithLoadDateTime: \n ${ResultsDfWithLoadDateTime.show(false) }"
      )

      val orderOfColumnsInSQL = getTableColumns(resultsTable, conn)

      print(s"This is order of columns for results table: $orderOfColumnsInSQL")

      EffectCalcLogger.info(
        s" Starting writing to $resultsTable table ",
        this.getClass.getName
      )

      ResultsDfWithLoadDateTime.select(selectWithCast: _*).select(orderOfColumnsInSQL.map(col): _*).coalesce(numPartitions).write.mode(org.apache.spark.sql.SaveMode.Append).format(microsoftSqlserverJDBCSpark).options(dfMsqlWriteOptions.configMap ++ Map("dbTable" -> resultsTable)).save()

      EffectCalcLogger.info(
        s"Writing to $resultsTable table completed ",
        this.getClass.getName
      )
      conn.close()
    } catch {
      case e: Exception =>
        EffectCalcLogger.error(
          s"Exception has been raised while pushing to $resultsTable:" + e
            .printStackTrace(),
          this.getClass.getName
        )
        throw e
    }

  }
  
  --------------------------------
  
  now in this above code I want to not include the loaddatetime into the ResultsDf and rather exclude it from orderOfColumnsInSQL, can you tell me how it can be done
Reasons:
  • RegEx Blacklisted phrase (2.5): can you tell me how
  • RegEx Blacklisted phrase (1): I want
  • Long answer (-1):
  • Has code block (-0.5):
  • Low reputation (1):
Posted by: Arun Rathod