/ / Spark Streaming Kafka calo della velocità di consumo diretto dei consumatori: scala, amazon-web-services, apache-spark, apache-kafka, spark-streaming

Spark Streaming Kafka riduce drasticamente la velocità di consumo dei consumatori - scala, amazon-web-services, apache-spark, apache-kafka, spark-streaming

scintilla streaming UI Il consumatore diretto di Kafka ha iniziato a limitare le letture a 450 eventi (5 * 90 partizioni) per batch (5 secondi), funzionava bene per 1 o 2 giorni prima (circa da 5000 a 40000 eventi per batch)

Uso il cluster spark standalone (spark e streaming-streaming-kafka versione 1.6.1) in esecuzione in AWS e utilizzando il bucket S3 per la directory checkpoint StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext), non ci sono ritardi di pianificazione e spazio su disco sufficiente su ciascun nodo di lavoro.

Non ha cambiato alcun parametro di inizializzazione del client Kafka, piuttosto sicuro che la struttura di kafka non è cambiata:

val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)

Inoltre, non è possibile capire perché quando dice la descrizione del consumatore diretto The consumed offsets are by the stream itself Devo ancora usare la directory checkpoint quando creo il contesto di streaming?

risposte:

1 per risposta № 1

Questo di solito è il risultato dell'abilitazione della contropressione attraverso l'impostazione spark.streaming.backpressure.enabled al vero Di solito, quando l'algoritmo di contropressione vede che ci sono più dati in arrivo, è abituato a farlo, inizia a limitare ogni batch a dimensioni piuttosto piccole fino a quando non riesce a "stabilizzarsi" di nuovo. Questo a volte ha falsi positivi e fa sì che il tuo stream rallenti la velocità di elaborazione.

Se vuoi modificare un po 'l'euristica, ci sono alcune bandiere non documentate che sta usando (assicurati di sapere cosa stai facendo):

val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)

Se vuoi i dettagli cruenti, PIDRateEstimator è quello che stai cercando.