Ecco i miei due pyspark Dataframe
a = sc.parallelize([["2017-05-14", "foo" , 24 , "abc"],
["2017-05-16", "user1", 26, "mno"],
["2017-05-17", "user2", 26, "mno"],
["2017-05-19", "user2", 27, "mno"],
["2017-05-19", "user3", 28, "mno"]])
.toDF(["A_Date", "user", "id","info"])
b = sc.parallelize([["2017-05-15", "foo", 24, "def"],
["2017-05-22", "user2", 27, "mno"],
["2017-05-20", "user3", 28, "mno"]])
.toDF(["B_Date", "user", "id","info"])
e voglio unire due dataframe che uniscono alcuni dataframe e la data di dataframe in un dato unito dovrebbero essere appena inferiori a dataframe b come mostrato di seguito.
c = sc.parallelize([["2017-05-15", "foo", 24, "def", "2017-05-14"],
["2017-05-22", "user2", 27, "mno", "2017-05-19"],
["2017-05-20", "user3", 28,"mno","2017-05-19"]])
.toDF(["B_Date", "user", "id","info", "A_Date"])
risposte:
0 per risposta № 1Potresti usare il seguente approccio ..
b.join(a,((a.A_Date<b.B_Date) & (a.user == b.user)))
.select(b.B_Date,b.user,b.id,b.info,a.A_Date)
.groupby("B_Date","user","id","info")
.agg(F.max("A_Date").alias("A_Date"))
.sort("B_Date")
.show()
Ciò comporta l'output richiesto:
+----------+-----+---+----+----------+
| B_Date| user| id|info| A_Date|
+----------+-----+---+----+----------+
|2017-05-15| foo| 24| def|2017-05-14|
|2017-05-20|user3| 28| mno|2017-05-19|
|2017-05-22|user2| 27| mno|2017-05-19|
+----------+-----+---+----+----------+
Questo potrebbe essere relativamente lento a causa del cross join.
In alternativa puoi usare una funzione finestra:
a_lagged = a.withColumn("prev_A_Date", F.lag(a["A_Date"]).over(windowSpec))
b.join(a_lagged,((a_lagged.A_Date<b.B_Date) & ((a.A_Date>a_lagged.prev_A_Date) | a_lagged.prev_A_Date.isNull() ) & (a_lagged.user == b.user)))
.select(b.B_Date,b.user,b.id,b.info,a_lagged.A_Date)
.sort("B_Date")
.show()
Questo dà anche:
+----------+-----+---+----+----------+
| B_Date| user| id|info| A_Date|
+----------+-----+---+----+----------+
|2017-05-15| foo| 24| def|2017-05-14|
|2017-05-20|user3| 28| mno|2017-05-19|
|2017-05-22|user2| 27| mno|2017-05-19|
+----------+-----+---+----+----------+
0 per risposta № 2
Se si guarda il codice sorgente di join
def join(self, other, on=None, how=None):
"""Joins with another :class:`DataFrame`, using the given join expression.
:param other: Right side of the join
:param on: a string for the join column name, a list of column names,
a join expression (Column), or a list of Columns.
If `on` is a string or a list of strings indicating the name of the join column(s),
the column(s) must exist on both sides, and this performs an equi-join.
:param how: str, default "inner".
One of `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
È chiaro il on
parametro può essere una qualsiasi condizione. Quindi puoi fare quanto segue per verificare le date durante l'adesione.
b.join(a, [a.user == b.user, a.id == b.id, a.A_Date < b.B_Date]).select(b.B_Date, b.user, b.id, a.A_Date)
Dovresti avere l'output desiderato dataframe