/ / Mit der Spark-Fensterfunktion können Sie Spalten im Datenrahmen erstellen - Scala, Apache-Spark, Spark-Datenrahmen

Mit der Spark-Fensterfunktion können Sie Spalten in Datenrahmen erstellen - Scala, Apache-Spark, Spark-Datenrahmen

Ich möchte eine neue Spalte mit dem Wert des vorherigen Datums (Datum abzüglich des aktuellen Datums) für die Gruppe von IDs für den folgenden Datenrahmen erstellen

+---+----------+-----+
| id|      date|value|
+---+----------+-----+
|  a|2015-04-11|  300|
|  a|2015-04-12|  400|
|  a|2015-04-12|  200|
|  a|2015-04-12|  100|
|  a|2015-04-11|  700|
|  b|2015-04-02|  100|
|  b|2015-04-12|  100|
|  c|2015-04-12|  400|
+---+----------+-----+

Ich habe es mit Bleifensterfunktion versucht.

val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value")

var w1=Window.partitionBy("id").orderBy("date".desc)
var leadc1=lead(df1("value"),1).over(w1)
val df2=df1.withColumn("nvalue",leadc1)

+---+----------+-----+------+
| id|      date|value|nvalue|
+---+----------+-----+------+
|  a|2015-04-12|  400|   200|
|  a|2015-04-12|  200|   100|
|  a|2015-04-12|  100|   300|
|  a|2015-04-11|  300|   700|
|  a|2015-04-11|  700|  null|
|  b|2015-04-12|  100|   100|
|  b|2015-04-02|  100|  null|
|  c|2015-04-12|  400|  null|
+---+----------+-----+------+

Aber wie wir sehen können, wenn ich das gleiche Datum in der ID "a" habe, erhalte ich ein falsches Ergebnis. Das Ergebnis sollte so sein

+---+----------+-----+------+
| id|      date|value|nvalue|
+---+----------+-----+------+
|  a|2015-04-12|  400|   300|
|  a|2015-04-12|  200|   300|
|  a|2015-04-12|  100|   300|
|  a|2015-04-11|  300|  null|
|  a|2015-04-11|  700|  null|
|  b|2015-04-12|  100|   100|
|  b|2015-04-02|  100|  null|
|  c|2015-04-12|  400|  null|
+---+----------+-----+------+

Ich habe bereits eine Lösung mit Join, obwohl ich nach einer Lösung mit Window-Funktion suche.

Vielen Dank

Antworten:

0 für die Antwort № 1

Das Problem ist, dass Sie mehrere Zeilen mit demselben Datum haben. lead werde nehmen value vom nächsten Reihe in der Ergebnismenge, nicht die nächste termin. Wenn Sie also die Zeilen in absteigender Reihenfolge nach Datum sortieren, könnte die nächste Zeile dasselbe Datum haben.

Wie ermitteln Sie den richtigen Wert für ein bestimmtes Datum? Zum Beispiel, warum nehmen Sie 300 aus (id = a, date = 2015-04-11) und nicht 700?

Um dies mit Fensterfunktionen zu tun, müssen Sie möglicherweise mehrere Durchgänge ausführen - dies würde den letzten dauern nvalue und wende es auf alle Zeilen in derselben ID / Datums-Gruppierung an - aber ich bin nicht sicher, wie deine Zeilen anfänglich angeordnet sind.

 val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value")

var w1 = Window.partitionBy("id").orderBy("date".desc)
var leadc1 = lead(df1("value"),1).over(w1)
val df2 = df1.withColumn("nvalue",leadc1)
val w2 = Window.partitionBy("id", "date").orderBy("??? some way to distinguish row ordering")
val df3 = df1.withColumn("nvalue2", last_value("nvalue").over(w2))