/ / Spark 2.1: Konwertuj RDD na zestaw danych z niestandardowymi kolumnami za pomocą funkcji toDS () - scala, apache-spark, apache-spark-sql, apache-spark-2.0

Spark 2.1: Konwertuj RDD na zestaw danych z niestandardowymi kolumnami za pomocą funkcji toDS () - scala, apache-spark, apache-spark-sql, apache-spark-2.0

Chcę przekształcić RDD w zbiór danych z kolumny niestandardowe przy użyciu natywnej funkcji Spark SQL toDS ().

Nie mam żadnych błędów w czasie kompilacji, ale w czasie wykonywania otrzymałem błąd No Encoder found for java.time.LocalDate.
Poniżej znajduje się dziennik śledzenia pełnego stosu:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "_1")
- root class: "scala.Tuple3"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:49)
at observatory.Extraction$.locationYearlyAverageRecords(Extraction.scala:114)
at observatory.Extraction$.processExtraction(Extraction.scala:28)
at observatory.Main$.delayedEndpoint$observatory$Main$1(Main.scala:18)
at observatory.Main$delayedInit$body.apply(Main.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at observatory.Main$.main(Main.scala:7)
at observatory.Main.main(Main.scala)

Struktura mojego RDD składa się z trzech kolumn, opartych na Tuple3, gdzie podpis to:

type TemperatureRecord = (LocalDate, Location, Double)

Pole LocalDate to obiekt Java pochodzący z pakietu java.time.LocalDate.
Pole Lokalizacja jest typem niestandardowym wykonanym z dwóch współrzędnych Double (GPS) posiadających następujący podpis:

klasa przypadku Lokalizacja (łac .: podwójne, lon: podwójne)

Poniżej przykładowy wiersz:

(01.01.1975, Lokalizacja (70.933, -8.667), -4.888888888888889)


Kilka szczegółów na temat mojej aplikacji / środowiska:

  • Scala: 2.11.8
  • Rdzeń iskrowy: 2.1.1
  • Spark SQL: 2.1.1
  • Linux Ubuntu: 16.04 LTS

Przeczytałem z tego artykułu Jak przechowywać obiekty niestandardowe w zestawie danych? że muszę zdefiniować niestandardowy koder, ale nie mam pojęcia :(.

Odpowiedzi:

0 dla odpowiedzi № 1

Problem polega na tym, że Spark nie znajduje kodera dla zwykłych klas. Na dzień dzisiejszy Spark pozwala tylko na używanie typów pierwotnych dla koderów i nie ma dobrego wsparcia dla klas niestandardowych.

Jeśli chodzi o Twój przypadek, biorąc pod uwagę, że Twoja „niestandardowa” klasa reprezentuje datę, której możesz użyć java.sql.date zamiast java.time.LocalDate. Zaletą jest to, że możesz skorzystać z koderów już dostarczonych przez Spark.

import java.sql.Date
case class TempRow(date: Date, loc: Location, temp: Double)

val ds = Seq(TempRow(java.sql.Date.valueOf("2017-06-01"),
Location(1.4,5.1), 4.9), TempRow(java.sql.Date.valueOf("2014-04-05"),
Location(1.5,2.5), 5.5))
.toDS

ds.show()

+----------+---------+----+
|      date|      loc|temp|
+----------+---------+----+
|2017-06-01|[1.4,5.1]| 4.9|
|2014-04-05|[1.5,2.5]| 5.5|
+----------+---------+----+

Sprawdź schemat:

ds.printSchema()

root
|-- date: date (nullable = true)
|-- loc: struct (nullable = true)
|    |-- i: double (nullable = false)
|    |-- j: double (nullable = false)
|-- temp: double (nullable = false)

W bardziej ogólnych przypadkach jest jedna sztuczkamożesz wykonać, aby przechowywać większość klas niestandardowych w zestawie danych Spark. Pamiętaj, że nie działa to we wszystkich przypadkach, ponieważ musisz użyć ciągu jako pośredniej reprezentacji obiektu niestandardowego. Mam nadzieję, że ten problem zostanie rozwiązany w przyszłości, ponieważ jest to naprawdę uciążliwe.

Znajdź poniżej jedno rozwiązanie dla swojego przypadku:

case class Location(val i: Double, val j: Double)
class TempRecord(val date: java.time.LocalDate, val loc: Location, val temp: Double)
type TempSerialized = (String, Location, Double)

implicit def fromSerialized(t: TempSerialized): TempRecord = new TempRecord(java.time.LocalDate.parse(t._1), t._2, t._3)
implicit def toSerialized(t: TempRecord): TempSerialized = (t.date.toString, t.loc, t.temp)

// Finally we can create datasets
val d = spark.createDataset(Seq[TempSerialized](
new TempRecord(java.time.LocalDate.now, Location(1.0,2.0), 3.0),
new TempRecord(java.time.LocalDate.now, Location(5.0,4.0), 4.0) )
).toDF("date", "location", "temperature").as[TempSerialized]

d.show()

+----------+---------+-----------+
|      date| location|temperature|
+----------+---------+-----------+
|2017-07-11|[1.0,2.0]|        3.0|
|2017-07-11|[5.0,4.0]|        4.0|
+----------+---------+-----------+

d.printSchema()

root
|-- date: string (nullable = true)
|-- location: struct (nullable = true)
|    |-- i: double (nullable = false)
|    |-- j: double (nullable = false)
|-- temperature: double (nullable = false)