/ / Sparkウィンドウ関数を使用してデータフレームに列を作成する-scala、apache-spark、spark-dataframe

Sparkウィンドウ関数を使用すると、dataframe - scala、apache-spark、spark-dataframeの列が作成されます

以下のデータフレームのIDのグループに対して、前の日付(現在の日付を差し引いた日付)の値で新しい列を作成したいと思います

+---+----------+-----+
| id|      date|value|
+---+----------+-----+
|  a|2015-04-11|  300|
|  a|2015-04-12|  400|
|  a|2015-04-12|  200|
|  a|2015-04-12|  100|
|  a|2015-04-11|  700|
|  b|2015-04-02|  100|
|  b|2015-04-12|  100|
|  c|2015-04-12|  400|
+---+----------+-----+

リードウィンドウ関数を試してみました。

val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value")

var w1=Window.partitionBy("id").orderBy("date".desc)
var leadc1=lead(df1("value"),1).over(w1)
val df2=df1.withColumn("nvalue",leadc1)

+---+----------+-----+------+
| id|      date|value|nvalue|
+---+----------+-----+------+
|  a|2015-04-12|  400|   200|
|  a|2015-04-12|  200|   100|
|  a|2015-04-12|  100|   300|
|  a|2015-04-11|  300|   700|
|  a|2015-04-11|  700|  null|
|  b|2015-04-12|  100|   100|
|  b|2015-04-02|  100|  null|
|  c|2015-04-12|  400|  null|
+---+----------+-----+------+

しかし、ID「a」に同じ日付がある場合にわかるように、間違った結果が得られます。結果は次のようになります。

+---+----------+-----+------+
| id|      date|value|nvalue|
+---+----------+-----+------+
|  a|2015-04-12|  400|   300|
|  a|2015-04-12|  200|   300|
|  a|2015-04-12|  100|   300|
|  a|2015-04-11|  300|  null|
|  a|2015-04-11|  700|  null|
|  b|2015-04-12|  100|   100|
|  b|2015-04-02|  100|  null|
|  c|2015-04-12|  400|  null|
+---+----------+-----+------+

ウィンドウ関数を使用した解決策を探していますが、joinを使用した解決策はすでにあります。

ありがとう

回答:

回答№1は0

問題は、同じ日付の複数の行があることです。 lead とる value 次から 結果セットでは、次ではありません 日付。したがって、行を日付の降順で並べ替えると、次の行は同じ日付になる可能性があります。

特定の日付に使用する正しい値をどのように特定しますか?たとえば、700ではなく(id = a、date = 2015-04-11)から300を取得するのはなぜですか?

ウィンドウ関数でこれを行うには、複数のパスを実行する必要がある場合があります-これには最後の時間がかかります nvalue 同じID /日付グループ内のすべての行に適用しますが、行が最初にどのように順序付けられているかはわかりません。

 val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value")

var w1 = Window.partitionBy("id").orderBy("date".desc)
var leadc1 = lead(df1("value"),1).over(w1)
val df2 = df1.withColumn("nvalue",leadc1)
val w2 = Window.partitionBy("id", "date").orderBy("??? some way to distinguish row ordering")
val df3 = df1.withColumn("nvalue2", last_value("nvalue").over(w2))