/ / Spark SQL - generischer Datensatzleser - Scala, Apache-Spark, Apache-Spark-SQL, Apache-Spark-Datensatz

Spark SQL - generischer Dataset-Reader - Scala, Apache-Spark, Apache-Spark-SQL, Apache-Spark-Datensatz

Ich versuche, einen generischen DataSet [T] -Reader zu erstellen, um dataframe.as [..] für jeden Leseraufruf zu vermeiden. Es gibt Unterstützung für primitive Typen und Fallklassen, also habe ich über Folgendes nachgedacht:

def read[T <: Product](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}

Es wird jedoch ein Fehler angezeigt. Kann man so etwas tun?

Zweiter Zyklus:

def read[T <: Product](sql : String) : Dataset[T] = {
import sparkSession.implicits._
innerRead(sql)
}

private def innerRead[T <: Product : Encoder](sql : String): Dataset[T] = {
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}

endet mit Typenkonflikt (foudn Encoder [Nothing], erforderlicher Encoder [T])

Ich habe versucht, nur newProductEncoder zu importieren, beendete das gleiche.

Antworten:

3 für die Antwort № 1

Um ein DataFrame zu einem Dataset Du musst eine haben Encoder. Sie können dies tun, indem Sie einfach einen Kontext hinzufügen, der an und gebunden ist Encoder zum T:

def read[T <: Product : Encoder](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}

Ein Kontext ist der syntaktische Zucker für Folgendes:

def read[T <: Product](sql : String)(implicit $ev: Encoder[T]): Dataset[T]

was bedeutet, dass Sie im impliziten Kontext eine (und nur eine) Instanz eines Encoder[T].

Dies ist notwendig, weil die as Die Methode selbst erfordert diesen Kontext.

Spark selbst kann Ihnen das meiste bieten Encoders benötigen Sie (Primitive, Strings und case classso weit), indem Sie (wie Sie es bereits getan haben) die Implikationen für Ihre SparkSession. Diese müssen jedoch im impliziten Bereich auf der Anrufseite verfügbar sein, was bedeutet, dass das, was Sie haben möchten, wahrscheinlich eher das Folgende ist:

def read[T <: Product : Encoder](spark: SparkSession, sql: String): Dataset[T] = {
import spark.implicits._
val df: DataFrame = spark.sqlContext.read.option("query", sql).load()
df.as[T]
}

val spark: SparkSession = ??? // your SparkSession object
import spark.implicits._
val ds: Dataset[YourType] = read[YourType](spark, "select something from a_table")

1 für die Antwort № 2

In Ihrem zweiten Zyklus müssen Sie möglicherweise den type-Parameter für den Aufruf von innerRead angeben:

innerRead[T](sql)