/ / Korzystanie z funkcji okna Spark prowadzi do utworzenia kolumny w dataframe - scala, apache-spark, spark-dataframe

Korzystanie z funkcji okna Spark prowadzi do utworzenia kolumny w ramce danych - scala, apache-spark, spark-dataframe

Chciałbym utworzyć nową kolumnę z wartością poprzedniej daty (data pomniejszona o aktualną datę) dla grupy identyfikatorów dla poniższej ramki danych

+---+----------+-----+
| id|      date|value|
+---+----------+-----+
|  a|2015-04-11|  300|
|  a|2015-04-12|  400|
|  a|2015-04-12|  200|
|  a|2015-04-12|  100|
|  a|2015-04-11|  700|
|  b|2015-04-02|  100|
|  b|2015-04-12|  100|
|  c|2015-04-12|  400|
+---+----------+-----+

Próbowałem z funkcją okna ołowiu.

val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value")

var w1=Window.partitionBy("id").orderBy("date".desc)
var leadc1=lead(df1("value"),1).over(w1)
val df2=df1.withColumn("nvalue",leadc1)

+---+----------+-----+------+
| id|      date|value|nvalue|
+---+----------+-----+------+
|  a|2015-04-12|  400|   200|
|  a|2015-04-12|  200|   100|
|  a|2015-04-12|  100|   300|
|  a|2015-04-11|  300|   700|
|  a|2015-04-11|  700|  null|
|  b|2015-04-12|  100|   100|
|  b|2015-04-02|  100|  null|
|  c|2015-04-12|  400|  null|
+---+----------+-----+------+

Ale jak widzimy, gdy mam tę samą datę w id „a” otrzymuję zły wynik. Wynik powinien być podobny

+---+----------+-----+------+
| id|      date|value|nvalue|
+---+----------+-----+------+
|  a|2015-04-12|  400|   300|
|  a|2015-04-12|  200|   300|
|  a|2015-04-12|  100|   300|
|  a|2015-04-11|  300|  null|
|  a|2015-04-11|  700|  null|
|  b|2015-04-12|  100|   100|
|  b|2015-04-02|  100|  null|
|  c|2015-04-12|  400|  null|
+---+----------+-----+------+

Mam już rozwiązanie wykorzystujące złączenie, chociaż szukam rozwiązania wykorzystującego funkcję okna.

Dzięki

Odpowiedzi:

0 dla odpowiedzi № 1

Problem polega na tym, że masz wiele wierszy z tą samą datą. lead zajmie value od następnego rząd w zestawie wyników, a nie w następnym data. Kiedy więc posortujesz wiersze według daty w porządku malejącym, następny wiersz może mieć tę samą datę.

Jak rozpoznajesz prawidłową wartość do użycia w określonym dniu? na przykład dlaczego bierzesz 300 z (id = a, data = 2015-04-11), a nie 700?

Aby to zrobić za pomocą funkcji okna, może być konieczne wykonanie wielu przebiegów - zajmie to ostatnie nvalue i zastosuj go do wszystkich wierszy w tej samej grupie identyfikatorów / dat - ale nie jestem pewien, jak początkowo są uporządkowane wiersze.

 val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value")

var w1 = Window.partitionBy("id").orderBy("date".desc)
var leadc1 = lead(df1("value"),1).over(w1)
val df2 = df1.withColumn("nvalue",leadc1)
val w2 = Window.partitionBy("id", "date").orderBy("??? some way to distinguish row ordering")
val df3 = df1.withColumn("nvalue2", last_value("nvalue").over(w2))