/ / Spark 2.1: Konvertieren Sie RDD mit benutzerdefinierten Spalten mithilfe der toDS () - Funktion - scala, apache-spark, apache-spark-sql, apache-spark-2.0

Spark 2.1: Konvertieren Sie RDD in Dataset mit benutzerdefinierten Spalten mit Hilfe der Funktion toDS () - Scala, Apache-Spark, Apache-Spark-SQL, Apache-Spark-2.0

Ich möchte ein RDD in ein Dataset umwandeln benutzerdefinierte Spalten Verwenden der systemeigenen Spark SQL-Funktion toDS ().

Ich habe keine Fehler beim Kompilieren, aber zur Laufzeit habe ich den Fehler erhalten No Encoder found for java.time.LocalDate.
Unten, das vollständige Stack-Trace-Protokoll:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "_1")
- root class: "scala.Tuple3"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:49)
at observatory.Extraction$.locationYearlyAverageRecords(Extraction.scala:114)
at observatory.Extraction$.processExtraction(Extraction.scala:28)
at observatory.Main$.delayedEndpoint$observatory$Main$1(Main.scala:18)
at observatory.Main$delayedInit$body.apply(Main.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at observatory.Main$.main(Main.scala:7)
at observatory.Main.main(Main.scala)

Die Struktur meiner RDD besteht aus drei Spalten, die auf Tuple3 basieren, wobei die Signatur lautet:

type TemperatureRecord = (LocalDate, Location, Double)

Feld LokalesDatum ist das Java-Objekt, das aus einem Paket stammt java.time.LocalDate.
Feld Ort ist ein benutzerdefinierter Typ, der aus zwei Double (GPS-Koordinaten) mit dieser Signatur besteht:

Fallklasse Ort (lat: Doppel, lon: Doppel)

Unten eine Beispielzeile:

(1975-01-01, Standort (70.933, -8.667), -4.888888888888889)


Einige Details zu meiner Anwendung / Umgebung:

  • Scala: 2.11.8
  • Zündkern: 2.1.1
  • Spark SQL: 2.1.1
  • Linux Ubuntu: 16.04 LTS

Ich habe aus diesem Artikel gelesen Wie werden benutzerdefinierte Objekte in Dataset gespeichert? dass ich benutzerdefinierte Encoder definieren muss, aber ich habe keine Ahnung :(.

Antworten:

0 für die Antwort № 1

Das Problem ist, dass Spark keinen Encoder für reguläre Klassen findet. Bis heute erlaubt Spark nur die Verwendung primitiver Typen für Encoder, und benutzerdefinierte Klassen werden nicht gut unterstützt.

Wie für Ihren Fall, wenn Ihre "benutzerdefinierte" Klasse ein Datum darstellt, das Sie verwenden können java.sql.date stattdessen java.time.LocalDate. Der Vorteil ist, dass Sie bereits von Spark bereitgestellte Encoder nutzen können.

import java.sql.Date
case class TempRow(date: Date, loc: Location, temp: Double)

val ds = Seq(TempRow(java.sql.Date.valueOf("2017-06-01"),
Location(1.4,5.1), 4.9), TempRow(java.sql.Date.valueOf("2014-04-05"),
Location(1.5,2.5), 5.5))
.toDS

ds.show()

+----------+---------+----+
|      date|      loc|temp|
+----------+---------+----+
|2017-06-01|[1.4,5.1]| 4.9|
|2014-04-05|[1.5,2.5]| 5.5|
+----------+---------+----+

Überprüfen Sie das Schema:

ds.printSchema()

root
|-- date: date (nullable = true)
|-- loc: struct (nullable = true)
|    |-- i: double (nullable = false)
|    |-- j: double (nullable = false)
|-- temp: double (nullable = false)

Für allgemeinere Fälle gibt es einen TrickSie können die meisten benutzerdefinierten Klassen in einem Spark-Dataset speichern. Beachten Sie, dass dies nicht in allen Fällen funktioniert, da Sie eine Zeichenfolge als Zwischendarstellung Ihres benutzerdefinierten Objekts verwenden müssen. Ich hoffe, dass dieses Problem in Zukunft gelöst wird, weil es wirklich schmerzhaft ist.

Hier finden Sie eine Lösung für Ihren Fall:

case class Location(val i: Double, val j: Double)
class TempRecord(val date: java.time.LocalDate, val loc: Location, val temp: Double)
type TempSerialized = (String, Location, Double)

implicit def fromSerialized(t: TempSerialized): TempRecord = new TempRecord(java.time.LocalDate.parse(t._1), t._2, t._3)
implicit def toSerialized(t: TempRecord): TempSerialized = (t.date.toString, t.loc, t.temp)

// Finally we can create datasets
val d = spark.createDataset(Seq[TempSerialized](
new TempRecord(java.time.LocalDate.now, Location(1.0,2.0), 3.0),
new TempRecord(java.time.LocalDate.now, Location(5.0,4.0), 4.0) )
).toDF("date", "location", "temperature").as[TempSerialized]

d.show()

+----------+---------+-----------+
|      date| location|temperature|
+----------+---------+-----------+
|2017-07-11|[1.0,2.0]|        3.0|
|2017-07-11|[5.0,4.0]|        4.0|
+----------+---------+-----------+

d.printSchema()

root
|-- date: string (nullable = true)
|-- location: struct (nullable = true)
|    |-- i: double (nullable = false)
|    |-- j: double (nullable = false)
|-- temperature: double (nullable = false)