/ / Spark Streaming Kafka direkte Verbrauchsbegrenzung - Scala, Amazon-Web-Services, Apache-Spark, Apache-Kafka, Funken-Streaming

Spark Streaming Kafka direkte Konsumkonsum Geschwindigkeitsabfall - Scala, Amazon-Web-Services, Apache-Spark, Apache-Kafka, Funken-Streaming

Funken Streaming-UI Kafka direct consumer begann, die Lesevorgänge auf 450 Ereignisse (5 * 90 Partitionen) pro Batch (5 Sekunden) zu beschränken, es lief für 1 oder 2 Tage davor gut (ca. 5000 bis 40000 Ereignisse pro Batch)

Ich benutze Funke Standalone-Cluster (Spark und Spark-Streaming-Kafka Version 1.6.1) in AWS läuft und S3 Bucket für Checkpoint-Verzeichnis verwenden StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext)Es gibt keine Verzögerungen bei der Planung und genügend Speicherplatz auf jedem Worker-Knoten.

Keine Kafka-Client-Initialisierungsparameter geändert, ziemlich sicher, dass sich die Struktur von kafka nicht geändert hat:

val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)

Kann auch nicht verstehen, warum wenn direkte Verbraucherbeschreibung sagt The consumed offsets are by the stream itself Ich muss immer noch Checkpoint-Verzeichnis verwenden, wenn ich den Streaming-Kontext erstelle?

Antworten:

1 für die Antwort № 1

Dies ist normalerweise darauf zurückzuführen, dass über die Einstellung der Gegendruck aktiviert wird spark.streaming.backpressure.enabled um wahr zu sein. Wenn der Backpressure-Algorithmus feststellt, dass mehr Daten eintreffen als üblich, fängt er an, jeden Batch auf eine ziemlich kleine Größe zu beschränken, bis er sich wieder selbst "stabilisieren" kann. Dies hat manchmal falsche positive Ergebnisse und bewirkt, dass Ihr Stream die Verarbeitungsrate verlangsamt.

Wenn Sie die Heuristik ein wenig optimieren wollen, gibt es einige undokumentierte Flags, die es verwendet (stellen Sie nur sicher, dass Sie wissen, was Sie gerade tun):

val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)

Wenn du die blutigen Details willst, PIDRateEstimator ist was du suchst.