/ / Extraindo valor das colunas no dataframe da ignição - scala, apache-spark, apache-spark-sql, spark-dataframe, spark-streaming

Extraindo valor das colunas no dataframe da ignição - scala, apache-spark, apache-spark-sql, spark-dataframe, spark-streaming

Eu tenho uma exigência, onde preciso filtrarrows from spark dataframe em que o valor de uma determinada coluna (digamos, "price") precisa ser correspondido com os valores presentes em um mapa scala. A chave do scala map é o valor de outra coluna (digamos "id"). Meu dataframe contém duas colunas: id e price. Eu preciso filtrar todas as colunas onde o preço não corresponde ao preço mencionado no mapa scala.

Meu código se parece com isso:

object obj1{
// This method returns value price for items as per their id
getPrice(id:String):String {
//lookup in a map and return the price
}
}

object Main{
val validIds = Seq[String]("1","2","3","4")
val filteredDf = baseDataframe.where(baseDataframe("id").in(validIDs.map(lit(_)): _*) &&
baseDataframe("price") === (obj1.getPrice(baseDataframe("id").toString())))

// But this line send string "id" to obj1.getPrice() function
// rather than value of id column
}
}

Eu não sou capaz de passar valor de colunas de id para funcionar obj1.getPrice (). Alguma sugestão de como conseguir isso?

Obrigado,

Respostas:

0 para resposta № 1

Você pode escrever um udf para fazer isso:

val checkPrice(id: String, price: String) = validIds.exists(_ == id) && obj1.getPrice(id) == price
val checkPriceUdf = udf(checkPrice)

baseDataFrame.where(checkPriceUdf($"id", $"price"))

Ou outra solução é converter o Map do id -> preço para um quadro de dados e, em seguida, fazer uma junção interna com baseDataFrame no id e price colunas.