79483456

Date: 2025-03-04 11:03:47
Score: 3
Natty:
Report link
this is how my code looks like ->

package org.socgen.ibi.effectCalc.jdbcConn

import com.typesafe.config.Config
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.sql.{Connection, DriverManager, Statement}
import org.socgen.ibi.effectCalc.logger.EffectCalcLogger
import org.socgen.ibi.effectCalc.common.MsSqlJdbcConnectionInfo

class EffectCalcJdbcConnection(config: Config) {

  private val microsoftSqlserverJDBCSpark = "com.microsoft.sqlserver.jdbc.spark"
  val url: String = config.getString("ibi.db.jdbcURL")
  val user: String = config.getString("ibi.db.user")
  private val pwd: String = config.getString("ibi.db.password")
  private val driverClassName: String = config.getString("ibi.db.driverClass")
  private val databaseName: String = config.getString("ibi.db.stage_ec_sql")
  private val dburl = s"${url};databasename=${databaseName}"

  private val dfMsqlWriteOptions = new MsSqlJdbcConnectionInfo(dburl, user, pwd)

  private val connectionProperties = new java.util.Properties()
  connectionProperties.setProperty("Driver", s"${driverClassName}")
  connectionProperties.setProperty("AutoCommit", "true")
  connectionProperties.put("user", s"${user}")
  connectionProperties.put("password", s"${pwd}")
  Class.forName(s"${driverClassName}")
  private val conn: Connection = DriverManager.getConnection(dburl, user, pwd)
  private var stmt: Statement = null

  private def truncateTable(table: String): String = { "TRUNCATE TABLE " + table + ";" }

  private def getTableColumns( table: String, connection: Connection ): List[String] = {
    val columnStartingIndex = 1
    val statement = s"SELECT TOP 0 * FROM $table"
    val resultSetMetaData = connection.createStatement().executeQuery(statement).getMetaData
    println("Metadata" + resultSetMetaData)
    val columnToFilter = List("loaddatetime")
    (columnStartingIndex to resultSetMetaData.getColumnCount).toList.map(resultSetMetaData.getColumnName).filterNot(columnToFilter.contains(_))
  }

  def pushToResultsSQL(ResultsDf: DataFrame): Unit = {
    val resultsTable = config.getString("ibi.db.stage_ec_sql_results_table")

    try {
      stmt = conn.createStatement()
      stmt.executeUpdate(truncateTable(resultsTable))
      EffectCalcLogger.info( s" TABLE $resultsTable TRUNCATE  ****", this.getClass.getName )

      val numExecutors = ResultsDf.sparkSession.conf.get("spark.executor.instances").toInt
      val numExecutorsCores = ResultsDf.sparkSession.conf.get("spark.executor.cores").toInt
      val numPartitions = numExecutors * numExecutorsCores
      EffectCalcLogger.info( s"coalesce($numPartitions)  <---> (numExecutors = $numExecutors) * (numExecutorsCores = $numExecutorsCores)", this.getClass.getName )

      val String_format_list = List( "accounttype", "baseliiaggregategrosscarryoffbalance", "baseliiaggregategrosscarryonbalance", "baseliiaggregateprovoffbalance", "baseliiaggregateprovonbalance", "closingbatchid", "closingclosingdate", "closingifrs9eligibilityflaggrosscarrying", "closingifrs9eligibilityflagprovision", "closingifrs9provisioningstage", "contractid", "contractprimarycurrency", "effectivedate", "exposurenature", "fxsituation", "groupproduct", "indtypprod", "issuingapplicationcode", "openingbatchid", "openingclosingdate", "openingifrs9eligibilityflaggrosscarrying", "openingifrs9eligibilityflagprovision", "openingifrs9provisioningstage", "reportingentitymagnitudecode", "transfert", "closingdate", "frequency", "batchid")

      val Decimal_format_list = List( "alloctakeovereffect", "closinggrosscarryingamounteur", "closingprovisionamounteur", "exchangeeureffect", "expireddealseffect", "expireddealseffect2", "newproductioneffect", "openinggrosscarryingamounteur", "openingprovisionamounteur", "overallstageeffect", "stages1s2effect", "stages1s3effect", "stages2s1effect", "stages2s3effect", "stages3s1effect", "stages3s2effect")

      val selectWithCast = ResultsDf.columns.map(column => {
        if (String_format_list.contains(column.toLowerCase))
          col(column).cast(StringType)
        else if (Decimal_format_list.contains(column.toLowerCase))
          col(column).cast(DoubleType).cast(DecimalType(30, 2))
        else col(column)
      })

      val orderOfColumnsInSQL = getTableColumns(resultsTable, conn)

      EffectCalcLogger.info( s" Starting writing to $resultsTable table ", this.getClass.getName )

      ResultsDf.select(selectWithCast: _*).select(orderOfColumnsInSQL.map(col): _*).coalesce(numPartitions).write.mode(org.apache.spark.sql.SaveMode.Append).format("jdbc").options(dfMsqlWriteOptions.configMap ++ Map("dbTable" -> resultsTable, "batchsize" -> "10000")).save()

      EffectCalcLogger.info( s"Writing to $resultsTable table completed ", this.getClass.getName)
      conn.close()
    } catch {
      case e: Exception =>
        EffectCalcLogger.error(s"Exception has been raised while pushing to $resultsTable:" + e.printStackTrace(),this.getClass.getName)
        throw e
    }
  }

  def pushToStockSQL(StockDf: DataFrame): Unit = {
    val stockTable = config.getString("ibi.db.stage_ec_sql_stocks_table")

    try {
      stmt = conn.createStatement()
      stmt.executeUpdate(truncateTable(stockTable))

      EffectCalcLogger.info(s" TABLE $stockTable TRUNCATE  ****", this.getClass.getName)

      val numExecutors = StockDf.sparkSession.conf.get("spark.executor.instances").toInt
      val numExecutorsCores = StockDf.sparkSession.conf.get("spark.executor.cores").toInt
      val numPartitions = numExecutors * numExecutorsCores
      EffectCalcLogger.info( s"coalesce($numPartitions)  <---> (numExecutors = $numExecutors) * (numExecutorsCores = $numExecutorsCores)", this.getClass.getName)

      val Integer_format_list = List( "forbearancetype", "ifrs9eligibilityflaggrosscarrying", "ifrs9eligibilityflagprovision", "intercompanygroupid", "closingdate" )

      val String_format_list = List( "accountaggregategrosscarryoffbalance", "accountaggregategrosscarryonbalance", "accountaggregateprovoffbalance", "accountaggregateprovonbalance", "accounttype", "assetlocationcountryiso2code", "baseliiaggregategrosscarryoffbalance", "baseliiaggregategrosscarryoffbalancefinrep", "baseliiaggregategrosscarryoffbalancenote38", "baseliiaggregategrosscarryonbalance", "baseliiaggregategrosscarryonbalancefinrep", "baseliiaggregategrosscarryonbalancenote38", "baseliiaggregateprovoffbalance", "baseliiaggregateprovoffbalancefinrep", "baseliiaggregateprovoffbalancenote38", "baseliiaggregateprovonbalance", "baseliiaggregateprovonbalancefinrep", "baseliiaggregateprovonbalancenote38", "baselptfcode", "baselptfcodelabel", "businessunit", "businessunitlabel", "capitalisticgsname", "companyname", "contractid", "contractlineid", "contractprimarycurrency", "counterpartinternalratinglegalentity", "counterpartsectorfinrep", "countryinitialriskiso2code", "economicamountcurrencyprovision", "effectivedate", "essacc", "exposurenature", "forbonecontractindication", "groupproduct", "groupproductlabel", "groupthirdpartyid", "ifrs9implementationmethod", "ifrs9provisioningstage", "investmentcategorygrouping", "issuingapplicationcode", "libcountryriskgroup", "localthirdpartyid", "lreentitycountryiso2code", "lreid", "lreusualname", "monitoringstructuressbu", "monitoringstructuressbulabel", "nacecode", "natureoftherealeconomicactivitynaer", "originindication", "pole", "polelabel", "portfoliocode", "portfoliolabel", "reportingentitymagnitudecode", "situationtechnicalid", "stage", "subbusinessunit", "subbusinessunitlabel", "subpole", "subpolelabel", "subportfoliocode", "subportfoliolabel", "watchlist", "closingdate", "frequency", "batchid", "exchangerate", "ifrseligibilityflag" )

      val Decimal_format_list = List( "grosscarryingamounteur", "provisionamounteur")

      val selectWithCast = StockDf.columns.map(column => {
        if (String_format_list.contains(column.toLowerCase))
          col(column).cast(StringType)
        else if (Integer_format_list.contains(column.toLowerCase))
          col(column).cast(IntegerType)
        else if (Decimal_format_list.contains(column.toLowerCase))
          col(column).cast(DecimalType(30, 2))
        else col(column)
      })

      val StockDfWithLoadDateTime =
        StockDf.withColumn("loaddatetime", current_timestamp())

      val orderOfColumnsInSQL = getTableColumns(stockTable, conn)

      EffectCalcLogger.info( s" Starting writing to $stockTable table ", this.getClass.getName )

      StockDfWithLoadDateTime.select(selectWithCast: _*).select(orderOfColumnsInSQL.map(col): _*).coalesce(numPartitions).write.mode(org.apache.spark.sql.SaveMode.Append).format("jdbc").options(dfMsqlWriteOptions.configMap ++ Map("dbTable" -> stockTable, "batchsize" -> "10000")).save()

      EffectCalcLogger.info( s"Writing to $stockTable table completed ", this.getClass.getName )
      conn.close()
    } catch {
      case e: Exception =>
        EffectCalcLogger.error( s"Exception has been raised while pushing to $stockTable:" + e.printStackTrace(),this.getClass.getName )
        throw e
    }
  }
}

######

now what the above code is basically trying to do is read the data from two different hive external tables (results and stock) and overwrite this data to their corresponding tables in mysql. now what I want you to do is restructure the code a bit because I see pushToResultsSQL and pushToStockSQL have lot of common code (try to create a function which makes there's a common peice of code and these two functions use this new function ), make sure the functionality doesn't change but the functions are efficient enough and follow all of the latest scala coding standards. overall, you need to make this code a standard code.

please give me the complete updated code (you can only if needed skip the column names in the vals, this is to ensure that I get everything from the updated code.)
Reasons:
  • RegEx Blacklisted phrase (2.5): please give me
  • RegEx Blacklisted phrase (1): I want
  • Long answer (-1):
  • Has code block (-0.5):
  • Low reputation (1):
Posted by: Arun Rathod