/ / INNER unisce due dataframe pyspark alle funzioni definite dall'utente [next Date] - python, pyspark, spark-dataframe, inner-join, pyspark-sql

INNER unisce due dataframe pyspark alle funzioni definite dall'utente [next Date] - python, pyspark, spark-dataframe, inner-join, pyspark-sql

Ecco i miei due pyspark Dataframe

a = sc.parallelize([["2017-05-14", "foo" , 24 , "abc"],
["2017-05-16", "user1", 26, "mno"],
["2017-05-17", "user2", 26, "mno"],
["2017-05-19", "user2", 27, "mno"],
["2017-05-19", "user3", 28, "mno"]])
.toDF(["A_Date", "user", "id","info"])

b = sc.parallelize([["2017-05-15", "foo", 24, "def"],
["2017-05-22", "user2", 27, "mno"],
["2017-05-20", "user3", 28, "mno"]])
.toDF(["B_Date", "user", "id","info"])

inserisci la descrizione dell'immagine qui

e voglio unire due dataframe che uniscono alcuni dataframe e la data di dataframe in un dato unito dovrebbero essere appena inferiori a dataframe b come mostrato di seguito.

c = sc.parallelize([["2017-05-15", "foo", 24, "def", "2017-05-14"],
["2017-05-22", "user2", 27, "mno", "2017-05-19"],
["2017-05-20", "user3", 28,"mno","2017-05-19"]])
.toDF(["B_Date", "user", "id","info", "A_Date"])

inserisci la descrizione dell'immagine qui

risposte:

0 per risposta № 1

Potresti usare il seguente approccio ..

b.join(a,((a.A_Date<b.B_Date)  & (a.user == b.user)))
.select(b.B_Date,b.user,b.id,b.info,a.A_Date)
.groupby("B_Date","user","id","info")
.agg(F.max("A_Date").alias("A_Date"))
.sort("B_Date")
.show()

Ciò comporta l'output richiesto:

+----------+-----+---+----+----------+
|    B_Date| user| id|info|    A_Date|
+----------+-----+---+----+----------+
|2017-05-15|  foo| 24| def|2017-05-14|
|2017-05-20|user3| 28| mno|2017-05-19|
|2017-05-22|user2| 27| mno|2017-05-19|
+----------+-----+---+----+----------+

Questo potrebbe essere relativamente lento a causa del cross join.

In alternativa puoi usare una funzione finestra:

a_lagged = a.withColumn("prev_A_Date", F.lag(a["A_Date"]).over(windowSpec))

b.join(a_lagged,((a_lagged.A_Date<b.B_Date) & ((a.A_Date>a_lagged.prev_A_Date) | a_lagged.prev_A_Date.isNull() )  & (a_lagged.user == b.user)))
.select(b.B_Date,b.user,b.id,b.info,a_lagged.A_Date)
.sort("B_Date")
.show()

Questo dà anche:

+----------+-----+---+----+----------+
|    B_Date| user| id|info|    A_Date|
+----------+-----+---+----+----------+
|2017-05-15|  foo| 24| def|2017-05-14|
|2017-05-20|user3| 28| mno|2017-05-19|
|2017-05-22|user2| 27| mno|2017-05-19|
+----------+-----+---+----+----------+

0 per risposta № 2

Se si guarda il codice sorgente di join

def join(self, other, on=None, how=None):
"""Joins with another :class:`DataFrame`, using the given join expression.

:param other: Right side of the join
:param on: a string for the join column name, a list of column names,
a join expression (Column), or a list of Columns.
If `on` is a string or a list of strings indicating the name of the join column(s),
the column(s) must exist on both sides, and this performs an equi-join.
:param how: str, default "inner".
One of `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.

È chiaro il on parametro può essere una qualsiasi condizione. Quindi puoi fare quanto segue per verificare le date durante l'adesione.

b.join(a, [a.user == b.user, a.id == b.id, a.A_Date < b.B_Date]).select(b.B_Date, b.user, b.id, a.A_Date)

Dovresti avere l'output desiderato dataframe