this is how my code looks like rn ->
package org.socgen.ibi.effectCalc.jdbcConn
import com.typesafe.config.Config
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.sql.{Connection, DriverManager, Statement}
import org.socgen.ibi.effectCalc.logger.EffectCalcLogger
import org.socgen.ibi.effectCalc.common.MsSqlJdbcConnectionInfo
class EffectCalcJdbcConnection(config: Config) {
private val microsoftSqlserverJDBCSpark = "com.microsoft.sqlserver.jdbc.spark"
val url: String = config.getString("ibi.db.jdbcURL")
val user: String = config.getString("ibi.db.user")
private val pwd: String = config.getString("ibi.db.password")
private val driverClassName: String = config.getString("ibi.db.driverClass")
private val databaseName: String = config.getString("ibi.db.stage_ec_sql")
private val dburl = s"$url;databasename=$databaseName"
private val dfMsqlWriteOptions = new MsSqlJdbcConnectionInfo(dburl, user, pwd)
private val connectionProperties = new java.util.Properties()
connectionProperties.setProperty("Driver", s"$driverClassName")
connectionProperties.setProperty("AutoCommit", "true")
connectionProperties.put("user", s"$user")
connectionProperties.put("password", s"$pwd")
Class.forName(s"$driverClassName")
private val conn: Connection = DriverManager.getConnection(dburl, user, pwd)
private var stmt: Statement = null
private def truncateTable(table: String): String = {
"TRUNCATE TABLE " + table + ";"
}
private def getTableColumns(
table: String,
connection: Connection
): List[String] = {
val columnStartingIndex = 1
val statement = s"SELECT TOP 0 * FROM $table"
val resultSetMetaData =
connection.createStatement().executeQuery(statement).getMetaData
println("Metadata" + resultSetMetaData)
val columnToFilter = List("TO ADD")
(columnStartingIndex to resultSetMetaData.getColumnCount).toList
.map(resultSetMetaData.getColumnName)
.filterNot(columnToFilter.contains(_))
}
def pushToResultsSQL(ResultsDf: DataFrame): Unit = {
val resultsTable = config.getString("ibi.db.stage_ec_sql_results_table")
try {
stmt = conn.createStatement()
stmt.executeUpdate(truncateTable(resultsTable))
EffectCalcLogger.info(
s" TABLE $resultsTable TRUNCATE ****",
this.getClass.getName
)
val numExecutors =
ResultsDf.sparkSession.conf.get("spark.executor.instances").toInt
val numExecutorsCores =
ResultsDf.sparkSession.conf.get("spark.executor.cores").toInt
val numPartitions = numExecutors * numExecutorsCores
EffectCalcLogger.info(
s"coalesce($numPartitions) <---> (numExecutors = $numExecutors) * (numExecutorsCores = $numExecutorsCores)",
this.getClass.getName
)
val String_format_list = List( "accounttype", "baseliiaggregategrosscarryoffbalance", "baseliiaggregategrosscarryonbalance", "baseliiaggregateprovoffbalance", "baseliiaggregateprovonbalance", "closingbatchid", "closingclosingdate", "closingifrs9eligibilityflaggrosscarrying", "closingifrs9eligibilityflagprovision", "closingifrs9provisioningstage", "contractid", "contractprimarycurrency", "effectivedate", "exposurenature", "fxsituation", "groupproduct", "indtypprod", "issuingapplicationcode", "openingbatchid", "openingclosingdate", "openingifrs9eligibilityflaggrosscarrying", "openingifrs9eligibilityflagprovision", "openingifrs9provisioningstage", "reportingentitymagnitudecode", "transfert", "closingdate", "frequency", "batchid"
)
val Decimal_format_list = List( "alloctakeovereffect", "closinggrosscarryingamounteur", "closingprovisionamounteur", "exchangeeureffect", "expireddealseffect", "expireddealseffect2", "newproductioneffect", "openinggrosscarryingamounteur", "openingprovisionamounteur", "overallstageeffect", "stages1s2effect", "stages1s3effect", "stages2s1effect", "stages2s3effect", "stages3s1effect", "stages3s2effect"
)
val selectWithCast = ResultsDf.columns.map(column => {
if (String_format_list.contains(column.toLowerCase))
col(column).cast(StringType)
else if (Decimal_format_list.contains(column.toLowerCase))
col(column).cast(DoubleType).cast(DecimalType(30, 2))
else col(column)
})
print(s"This is selectWithCast for Results Table: $selectWithCast")
val ResultsDfWithLoadDateTime =
ResultsDf.withColumn("loaddatetime", current_timestamp())
print(
s"this is ResultsDfWithLoadDateTime: \n ${ResultsDfWithLoadDateTime.show(false) }"
)
val orderOfColumnsInSQL = getTableColumns(resultsTable, conn)
print(s"This is order of columns for results table: $orderOfColumnsInSQL")
EffectCalcLogger.info(
s" Starting writing to $resultsTable table ",
this.getClass.getName
)
ResultsDfWithLoadDateTime.select(selectWithCast: _*).select(orderOfColumnsInSQL.map(col): _*).coalesce(numPartitions).write.mode(org.apache.spark.sql.SaveMode.Append).format(microsoftSqlserverJDBCSpark).options(dfMsqlWriteOptions.configMap ++ Map("dbTable" -> resultsTable)).save()
EffectCalcLogger.info(
s"Writing to $resultsTable table completed ",
this.getClass.getName
)
conn.close()
} catch {
case e: Exception =>
EffectCalcLogger.error(
s"Exception has been raised while pushing to $resultsTable:" + e
.printStackTrace(),
this.getClass.getName
)
throw e
}
}
--------------------------------
now in this above code I want to not include the loaddatetime into the ResultsDf and rather exclude it from orderOfColumnsInSQL, can you tell me how it can be done