/ / Spark 2.1:toDS()関数を使ってRDDをカスタムカラムでデータセットに変換する - scala、apache-spark、apache-spark-sql、apache-spark-2.0

Spark 2.1:toDS()関数(scala、apache-spark、apache-spark-sql、apache-spark-2.0)を使用してカスタム列を使用してRDDをデータセットに変換する

RDDをデータセットに変換したい カスタム列 Spark SQLネイティブ関数を使う toDS().

コンパイル時にエラーは発生しませんが、実行時にエラーが発生しました No Encoder found for java.time.LocalDate.
ベロー、フルスタックトレースログ:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "_1")
- root class: "scala.Tuple3"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:49)
at observatory.Extraction$.locationYearlyAverageRecords(Extraction.scala:114)
at observatory.Extraction$.processExtraction(Extraction.scala:28)
at observatory.Main$.delayedEndpoint$observatory$Main$1(Main.scala:18)
at observatory.Main$delayedInit$body.apply(Main.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at observatory.Main$.main(Main.scala:7)
at observatory.Main.main(Main.scala)

私のRDDの構造は、署名が次のTuple3に基づいて3つの列で構成されています。

タイプTemperatureRecord =(LocalDate、Location、Double)

フィールド LocalDate パッケージから来るJavaオブジェクトです。 java.time.LocalDate.
フィールド ロケーション このシグネチャを持つ2つのDouble(GPS座標)で作られたカスタムタイプです。

ケースクラスLocation(緯度:ダブル、経度:ダブル)

以下に、サンプル行を1つ示します。

(1975-01-01、場所(70.933、-8.667)、-4.888888888888889)


私のアプリケーション/環境に関するいくつかの詳細:

  • スカラ:2.11.8
  • スパークコア:2.1.1
  • スパークSQL:2.1.1
  • Linux Ubuntu:16.04 LTS

この記事から読みました カスタムオブジェクトをデータセットに格納する方法 カスタムエンコーダを定義する必要がありますが、:(というアイデアはありません。

回答:

回答№1は0

問題は、Sparkが通常のクラス用のエンコーダを見つけられないことです。今日のところ、Sparkはエンコーダーのためにプリミティブ型を使うことを許すだけで、カスタムクラスのための良いサポートはありません。

あなたの場合に関しては、あなたの「カスタム」クラスがあなたが使用できる日付を表しているとすれば java.sql.date 代わりに java.time.LocalDate。利点は、Sparkがすでに提供しているエンコーダを利用できることです。

import java.sql.Date
case class TempRow(date: Date, loc: Location, temp: Double)

val ds = Seq(TempRow(java.sql.Date.valueOf("2017-06-01"),
Location(1.4,5.1), 4.9), TempRow(java.sql.Date.valueOf("2014-04-05"),
Location(1.5,2.5), 5.5))
.toDS

ds.show()

+----------+---------+----+
|      date|      loc|temp|
+----------+---------+----+
|2017-06-01|[1.4,5.1]| 4.9|
|2014-04-05|[1.5,2.5]| 5.5|
+----------+---------+----+

スキーマを確認してください。

ds.printSchema()

root
|-- date: date (nullable = true)
|-- loc: struct (nullable = true)
|    |-- i: double (nullable = false)
|    |-- j: double (nullable = false)
|-- temp: double (nullable = false)

より一般的なケースでは、1つのトリックがあります。Sparkデータセットにカスタムクラスの大部分を格納するために実行できます。カスタムオブジェクトの中間表現として文字列を使用する必要があるため、これがすべての場合に機能するわけではないことに注意してください。この問題は本当に苦痛なので、将来解決されることを願っています。

あなたのケースのための一つの解決策の下に見つけてください:

case class Location(val i: Double, val j: Double)
class TempRecord(val date: java.time.LocalDate, val loc: Location, val temp: Double)
type TempSerialized = (String, Location, Double)

implicit def fromSerialized(t: TempSerialized): TempRecord = new TempRecord(java.time.LocalDate.parse(t._1), t._2, t._3)
implicit def toSerialized(t: TempRecord): TempSerialized = (t.date.toString, t.loc, t.temp)

// Finally we can create datasets
val d = spark.createDataset(Seq[TempSerialized](
new TempRecord(java.time.LocalDate.now, Location(1.0,2.0), 3.0),
new TempRecord(java.time.LocalDate.now, Location(5.0,4.0), 4.0) )
).toDF("date", "location", "temperature").as[TempSerialized]

d.show()

+----------+---------+-----------+
|      date| location|temperature|
+----------+---------+-----------+
|2017-07-11|[1.0,2.0]|        3.0|
|2017-07-11|[5.0,4.0]|        4.0|
+----------+---------+-----------+

d.printSchema()

root
|-- date: string (nullable = true)
|-- location: struct (nullable = true)
|    |-- i: double (nullable = false)
|    |-- j: double (nullable = false)
|-- temperature: double (nullable = false)