Eu tenho uma exigência, onde preciso filtrarrows from spark dataframe em que o valor de uma determinada coluna (digamos, "price") precisa ser correspondido com os valores presentes em um mapa scala. A chave do scala map é o valor de outra coluna (digamos "id"). Meu dataframe contém duas colunas: id e price. Eu preciso filtrar todas as colunas onde o preço não corresponde ao preço mencionado no mapa scala.
Meu código se parece com isso:
object obj1{
// This method returns value price for items as per their id
getPrice(id:String):String {
//lookup in a map and return the price
}
}
object Main{
val validIds = Seq[String]("1","2","3","4")
val filteredDf = baseDataframe.where(baseDataframe("id").in(validIDs.map(lit(_)): _*) &&
baseDataframe("price") === (obj1.getPrice(baseDataframe("id").toString())))
// But this line send string "id" to obj1.getPrice() function
// rather than value of id column
}
}
Eu não sou capaz de passar valor de colunas de id para funcionar obj1.getPrice (). Alguma sugestão de como conseguir isso?
Obrigado,
Respostas:
0 para resposta № 1Você pode escrever um udf para fazer isso:
val checkPrice(id: String, price: String) = validIds.exists(_ == id) && obj1.getPrice(id) == price
val checkPriceUdf = udf(checkPrice)
baseDataFrame.where(checkPriceUdf($"id", $"price"))
Ou outra solução é converter o Map
do id -> preço para um quadro de dados e, em seguida, fazer uma junção interna com baseDataFrame
no id
e price
colunas.