this is how my code looks like ->
package org.socgen.ibi.effectCalc.jdbcConn
import com.typesafe.config.Config
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.sql.{Connection, DriverManager, Statement}
import org.socgen.ibi.effectCalc.logger.EffectCalcLogger
import org.socgen.ibi.effectCalc.common.MsSqlJdbcConnectionInfo
class EffectCalcJdbcConnection(config: Config) {
private val microsoftSqlserverJDBCSpark = "com.microsoft.sqlserver.jdbc.spark"
val url: String = config.getString("ibi.db.jdbcURL")
val user: String = config.getString("ibi.db.user")
private val pwd: String = config.getString("ibi.db.password")
private val driverClassName: String = config.getString("ibi.db.driverClass")
private val databaseName: String = config.getString("ibi.db.stage_ec_sql")
private val dburl = s"${url};databasename=${databaseName}"
private val dfMsqlWriteOptions = new MsSqlJdbcConnectionInfo(dburl, user, pwd)
private val connectionProperties = new java.util.Properties()
connectionProperties.setProperty("Driver", s"${driverClassName}")
connectionProperties.setProperty("AutoCommit", "true")
connectionProperties.put("user", s"${user}")
connectionProperties.put("password", s"${pwd}")
Class.forName(s"${driverClassName}")
private val conn: Connection = DriverManager.getConnection(dburl, user, pwd)
private var stmt: Statement = null
private def truncateTable(table: String): String = { "TRUNCATE TABLE " + table + ";" }
private def getTableColumns( table: String, connection: Connection ): List[String] = {
val columnStartingIndex = 1
val statement = s"SELECT TOP 0 * FROM $table"
val resultSetMetaData = connection.createStatement().executeQuery(statement).getMetaData
println("Metadata" + resultSetMetaData)
val columnToFilter = List("loaddatetime")
(columnStartingIndex to resultSetMetaData.getColumnCount).toList.map(resultSetMetaData.getColumnName).filterNot(columnToFilter.contains(_))
}
def pushToResultsSQL(ResultsDf: DataFrame): Unit = {
val resultsTable = config.getString("ibi.db.stage_ec_sql_results_table")
try {
stmt = conn.createStatement()
stmt.executeUpdate(truncateTable(resultsTable))
EffectCalcLogger.info( s" TABLE $resultsTable TRUNCATE ****", this.getClass.getName )
val numExecutors = ResultsDf.sparkSession.conf.get("spark.executor.instances").toInt
val numExecutorsCores = ResultsDf.sparkSession.conf.get("spark.executor.cores").toInt
val numPartitions = numExecutors * numExecutorsCores
EffectCalcLogger.info( s"coalesce($numPartitions) <---> (numExecutors = $numExecutors) * (numExecutorsCores = $numExecutorsCores)", this.getClass.getName )
val String_format_list = List( "accounttype", "baseliiaggregategrosscarryoffbalance", "baseliiaggregategrosscarryonbalance", "baseliiaggregateprovoffbalance", "baseliiaggregateprovonbalance", "closingbatchid", "closingclosingdate", "closingifrs9eligibilityflaggrosscarrying", "closingifrs9eligibilityflagprovision", "closingifrs9provisioningstage", "contractid", "contractprimarycurrency", "effectivedate", "exposurenature", "fxsituation", "groupproduct", "indtypprod", "issuingapplicationcode", "openingbatchid", "openingclosingdate", "openingifrs9eligibilityflaggrosscarrying", "openingifrs9eligibilityflagprovision", "openingifrs9provisioningstage", "reportingentitymagnitudecode", "transfert", "closingdate", "frequency", "batchid")
val Decimal_format_list = List( "alloctakeovereffect", "closinggrosscarryingamounteur", "closingprovisionamounteur", "exchangeeureffect", "expireddealseffect", "expireddealseffect2", "newproductioneffect", "openinggrosscarryingamounteur", "openingprovisionamounteur", "overallstageeffect", "stages1s2effect", "stages1s3effect", "stages2s1effect", "stages2s3effect", "stages3s1effect", "stages3s2effect")
val selectWithCast = ResultsDf.columns.map(column => {
if (String_format_list.contains(column.toLowerCase))
col(column).cast(StringType)
else if (Decimal_format_list.contains(column.toLowerCase))
col(column).cast(DoubleType).cast(DecimalType(30, 2))
else col(column)
})
val orderOfColumnsInSQL = getTableColumns(resultsTable, conn)
EffectCalcLogger.info( s" Starting writing to $resultsTable table ", this.getClass.getName )
ResultsDf.select(selectWithCast: _*).select(orderOfColumnsInSQL.map(col): _*).coalesce(numPartitions).write.mode(org.apache.spark.sql.SaveMode.Append).format("jdbc").options(dfMsqlWriteOptions.configMap ++ Map("dbTable" -> resultsTable, "batchsize" -> "10000")).save()
EffectCalcLogger.info( s"Writing to $resultsTable table completed ", this.getClass.getName)
conn.close()
} catch {
case e: Exception =>
EffectCalcLogger.error(s"Exception has been raised while pushing to $resultsTable:" + e.printStackTrace(),this.getClass.getName)
throw e
}
}
def pushToStockSQL(StockDf: DataFrame): Unit = {
val stockTable = config.getString("ibi.db.stage_ec_sql_stocks_table")
try {
stmt = conn.createStatement()
stmt.executeUpdate(truncateTable(stockTable))
EffectCalcLogger.info(s" TABLE $stockTable TRUNCATE ****", this.getClass.getName)
val numExecutors = StockDf.sparkSession.conf.get("spark.executor.instances").toInt
val numExecutorsCores = StockDf.sparkSession.conf.get("spark.executor.cores").toInt
val numPartitions = numExecutors * numExecutorsCores
EffectCalcLogger.info( s"coalesce($numPartitions) <---> (numExecutors = $numExecutors) * (numExecutorsCores = $numExecutorsCores)", this.getClass.getName)
val Integer_format_list = List( "forbearancetype", "ifrs9eligibilityflaggrosscarrying", "ifrs9eligibilityflagprovision", "intercompanygroupid", "closingdate" )
val String_format_list = List( "accountaggregategrosscarryoffbalance", "accountaggregategrosscarryonbalance", "accountaggregateprovoffbalance", "accountaggregateprovonbalance", "accounttype", "assetlocationcountryiso2code", "baseliiaggregategrosscarryoffbalance", "baseliiaggregategrosscarryoffbalancefinrep", "baseliiaggregategrosscarryoffbalancenote38", "baseliiaggregategrosscarryonbalance", "baseliiaggregategrosscarryonbalancefinrep", "baseliiaggregategrosscarryonbalancenote38", "baseliiaggregateprovoffbalance", "baseliiaggregateprovoffbalancefinrep", "baseliiaggregateprovoffbalancenote38", "baseliiaggregateprovonbalance", "baseliiaggregateprovonbalancefinrep", "baseliiaggregateprovonbalancenote38", "baselptfcode", "baselptfcodelabel", "businessunit", "businessunitlabel", "capitalisticgsname", "companyname", "contractid", "contractlineid", "contractprimarycurrency", "counterpartinternalratinglegalentity", "counterpartsectorfinrep", "countryinitialriskiso2code", "economicamountcurrencyprovision", "effectivedate", "essacc", "exposurenature", "forbonecontractindication", "groupproduct", "groupproductlabel", "groupthirdpartyid", "ifrs9implementationmethod", "ifrs9provisioningstage", "investmentcategorygrouping", "issuingapplicationcode", "libcountryriskgroup", "localthirdpartyid", "lreentitycountryiso2code", "lreid", "lreusualname", "monitoringstructuressbu", "monitoringstructuressbulabel", "nacecode", "natureoftherealeconomicactivitynaer", "originindication", "pole", "polelabel", "portfoliocode", "portfoliolabel", "reportingentitymagnitudecode", "situationtechnicalid", "stage", "subbusinessunit", "subbusinessunitlabel", "subpole", "subpolelabel", "subportfoliocode", "subportfoliolabel", "watchlist", "closingdate", "frequency", "batchid", "exchangerate", "ifrseligibilityflag" )
val Decimal_format_list = List( "grosscarryingamounteur", "provisionamounteur")
val selectWithCast = StockDf.columns.map(column => {
if (String_format_list.contains(column.toLowerCase))
col(column).cast(StringType)
else if (Integer_format_list.contains(column.toLowerCase))
col(column).cast(IntegerType)
else if (Decimal_format_list.contains(column.toLowerCase))
col(column).cast(DecimalType(30, 2))
else col(column)
})
val StockDfWithLoadDateTime =
StockDf.withColumn("loaddatetime", current_timestamp())
val orderOfColumnsInSQL = getTableColumns(stockTable, conn)
EffectCalcLogger.info( s" Starting writing to $stockTable table ", this.getClass.getName )
StockDfWithLoadDateTime.select(selectWithCast: _*).select(orderOfColumnsInSQL.map(col): _*).coalesce(numPartitions).write.mode(org.apache.spark.sql.SaveMode.Append).format("jdbc").options(dfMsqlWriteOptions.configMap ++ Map("dbTable" -> stockTable, "batchsize" -> "10000")).save()
EffectCalcLogger.info( s"Writing to $stockTable table completed ", this.getClass.getName )
conn.close()
} catch {
case e: Exception =>
EffectCalcLogger.error( s"Exception has been raised while pushing to $stockTable:" + e.printStackTrace(),this.getClass.getName )
throw e
}
}
}
######
now what the above code is basically trying to do is read the data from two different hive external tables (results and stock) and overwrite this data to their corresponding tables in mysql. now what I want you to do is restructure the code a bit because I see pushToResultsSQL and pushToStockSQL have lot of common code (try to create a function which makes there's a common peice of code and these two functions use this new function ), make sure the functionality doesn't change but the functions are efficient enough and follow all of the latest scala coding standards. overall, you need to make this code a standard code.
please give me the complete updated code (you can only if needed skip the column names in the vals, this is to ensure that I get everything from the updated code.)