/ / Spark escribe en postgres lento - apache-spark, dataframe, apache-spark-sql

Spark escribe en postgres lento - apache-spark, dataframe, apache-spark-sql

Estoy escribiendo datos (aproximadamente 83 millones de registros) desde un marco de datos en postgresql y es un poco lento. Toma 2.7 horas para completar la escritura en db.

Mirando a los ejecutores, solo hay una tarea activa ejecutándose en un solo ejecutor. ¿Hay alguna manera de que pueda paralelizar las escrituras en db usando todos los ejecutores en Spark?

...
val prop = new Properties()
prop.setProperty("user", DB_USER)
prop.setProperty("password", DB_PASSWORD)
prop.setProperty("driver", "org.postgresql.Driver")



salesReportsDf.write
.mode(SaveMode.Append)
.jdbc(s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE", REPORTS_TABLE, prop)

Gracias

Respuestas

5 para la respuesta № 1

Entonces descubrí el problema. Básicamente, volver a particionar mi marco de datos aumenta el rendimiento de escritura de la base de datos en un 100%

def srcTable(config: Config): Map[String, String] = {

val SERVER             = config.getString("db_host")
val PORT               = config.getInt("db_port")
val DATABASE           = config.getString("database")
val USER               = config.getString("db_user")
val PASSWORD           = config.getString("db_password")
val TABLE              = config.getString("table")
val PARTITION_COL      = config.getString("partition_column")
val LOWER_BOUND        = config.getString("lowerBound")
val UPPER_BOUND        = config.getString("upperBound")
val NUM_PARTITION      = config.getString("numPartitions")

Map(
"url"     -> s"jdbc:postgresql://$SERVER:$PORT/$DATABASE",
"driver"  -> "org.postgresql.Driver",
"dbtable" -> TABLE,
"user"    -> USER,
"password"-> PASSWORD,
"partitionColumn" -> PARTITION_COL,
"lowerBound" -> LOWER_BOUND,
"upperBound" -> UPPER_BOUND,
"numPartitions" -> NUM_PARTITION
)

}

1 para la respuesta № 2

Spark también tiene una opción llamada "batchsize" mientras escribe usando jdbc. El valor predeterminado es bastante bajo. (1000)

connectionProperties.put("batchsize", "100000")

Establecerlo en valores mucho más altos debería acelerar la escritura en bases de datos externas.