/ / Преобразуване на DataFrame на Spark в RDD [Vector] - скала, apache-spark, spark-dataframe, apache-spark-mllib

Конвертиране на DataFrame на Spark в RDD [Vector] - скала, apache-spark, spark-dataframe, apache-spark-mllib

Когато се опитвах да превърна Спарк DataFrame в РСР [org.apache.spark.mllib.linalg.Vector] използвайки следния код:

import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vectors

val df = sqlContext.createDataFrame(
Seq((0.1, 0.2, 0.4))
).toDF("t1", "t2", "t3")

df.rdd.map{ case Row(row: Seq[_]) =>
Vectors.dense(row.asInstanceOf[Seq[Double]].toArray)
}.collect

Получих съобщение за грешка по следния начин:

scala.MatchError: [0.1,0.2,0.4] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

Тогава опитах друг метод:

df.content.rdd.map{ case row =>
Vectors.dense(row.toSeq.toArray.map{
x => x.asInstanceOf[Double]
})
}.collect

Тя работи добре.

Докато първият метод е въведен в официалната версия на Spark-2.2.0-SNAPSHOT при преобразуване ред в Array [Двойна], не проработи.

Може ли някой да разбере причината?

Отговори:

2 за отговор № 1

Тези два метода не правят същото. В първия случай се опитвате да се съпротивлявате Row с един ArrayType колона. Тъй като вашият вход съдържа три колони MatchException е очакван резултат. Това може да работи само ако събирате например колони като масив

df.select(array(df.columns.map(col(_)): _*)).rdd.map {
case Row(xs: Seq[Double @unchecked]) => xs
}

или

df.select(array(df.columns.map(col(_)): _*)).rdd.map(_.getSeq[Double](0))

Във втория случай преобразувате ред към Seq[Any] който ви дава последователност от полеви стойности.