/ / Spark dataframe che aggiunge un nuovo numero di colonna - Streaming strutturato - apache-spark, spark-structured-streaming

Spark dataframe che aggiunge un nuovo numero di colonne - Streaming strutturato - apache-spark, spark-structured-streaming

Sto usando spark Structured streaming. Ho un Dataframe e aggiungo una nuova colonna "current_ts".

inpuDF.withColumn("current_ts", lit(System.currentTimeMillis()))

Questo non aggiorna ogni riga con l'epoca attualetempo. Aggiorna lo stesso tempo di epcoh quando il lavoro è stato trigilizzato, facendo sì che ogni riga in DF avesse gli stessi valori. Questo funziona bene con normali lavori di scintilla. Si tratta di un problema con lo streaming strutturato spark?

risposte:

1 per risposta № 1

La buona scintilla registra le tue trasformazioni come grafico di lignaggio e esegue il grafico solo quando viene chiamata un'azione. Quindi chiamerà

System.currentTimeMillis()

quando viene attivata un'azione. Quello che non capisco è che cosa trovi confuso o cosa stai cercando di ottenere. Grazie.


1 per risposta № 2

Spark ha un funzione per creare una colonna con timestamp corrente. Il tuo codice dovrebbe assomigliare a questo:

import org.apache.spark.sql.functions

// ...

inpuDF.withColumn("current_ts", functions.current_timestamp())