Ich versuche, einen generischen DataSet [T] -Reader zu erstellen, um dataframe.as [..] für jeden Leseraufruf zu vermeiden. Es gibt Unterstützung für primitive Typen und Fallklassen, also habe ich über Folgendes nachgedacht:
def read[T <: Product](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
Es wird jedoch ein Fehler angezeigt. Kann man so etwas tun?
Zweiter Zyklus:
def read[T <: Product](sql : String) : Dataset[T] = {
import sparkSession.implicits._
innerRead(sql)
}
private def innerRead[T <: Product : Encoder](sql : String): Dataset[T] = {
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
endet mit Typenkonflikt (foudn Encoder [Nothing], erforderlicher Encoder [T])
Ich habe versucht, nur newProductEncoder zu importieren, beendete das gleiche.
Antworten:
3 für die Antwort № 1Um ein DataFrame
zu einem Dataset
Du musst eine haben Encoder
. Sie können dies tun, indem Sie einfach einen Kontext hinzufügen, der an und gebunden ist Encoder
zum T
:
def read[T <: Product : Encoder](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
Ein Kontext ist der syntaktische Zucker für Folgendes:
def read[T <: Product](sql : String)(implicit $ev: Encoder[T]): Dataset[T]
was bedeutet, dass Sie im impliziten Kontext eine (und nur eine) Instanz eines Encoder[T]
.
Dies ist notwendig, weil die as
Die Methode selbst erfordert diesen Kontext.
Spark selbst kann Ihnen das meiste bieten Encoder
s benötigen Sie (Primitive, String
s und case class
so weit), indem Sie (wie Sie es bereits getan haben) die Implikationen für Ihre SparkSession
. Diese müssen jedoch im impliziten Bereich auf der Anrufseite verfügbar sein, was bedeutet, dass das, was Sie haben möchten, wahrscheinlich eher das Folgende ist:
def read[T <: Product : Encoder](spark: SparkSession, sql: String): Dataset[T] = {
import spark.implicits._
val df: DataFrame = spark.sqlContext.read.option("query", sql).load()
df.as[T]
}
val spark: SparkSession = ??? // your SparkSession object
import spark.implicits._
val ds: Dataset[YourType] = read[YourType](spark, "select something from a_table")
1 für die Antwort № 2
In Ihrem zweiten Zyklus müssen Sie möglicherweise den type-Parameter für den Aufruf von innerRead angeben:
innerRead[T](sql)