/ / Spark Streaming Kafka - priamy pokles spotreby spotrebiteľa - Scala, Amazon-Web-Services, apache-iskra, apache-kafka, iskra-streaming

Spark Streaming Kafka priama spotreba spotrebiteľov spotreba rýchlosti - scala, amazon-web-services, apache-spark, apache-kafka, jiskra

UI na streamovanie iskier Priamy spotrebiteľ spoločnosti Kafka začal obmedzovať čítanie na 450 udalostí (5 x 90 oddielov) na dávku (5 sekúnd), fungovalo to dobre 1 alebo 2 dni pred tým (približne 5 000 až 40000 udalostí na dávku)

Používam samostatný klaster (iskra a iskra streaming-kafka verzia 1.6.1) bežiaci v AWS a pomocou vedra S3 pre adresár kontrolných bodov StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext), na každom pracovnom uzle nie sú naplánované oneskorenia a dostatok miesta na disku.

Nezmenili sa žiadne inicializačné parametre klienta Kafka, je isté, že sa štruktúra kafky nezmenila:

val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)

Taktiež nerozumiem, prečo sa hovorí priamy opis spotrebiteľa The consumed offsets are by the stream itself Pri vytváraní kontextu streamovania stále potrebujem adresár kontrolných bodov?

odpovede:

1 pre odpoveď č. 1

Toto je zvyčajne výsledkom povolenia protitlaku prostredníctvom nastavenia spark.streaming.backpressure.enabled pravda. Zvyčajne, keď algoritmus spätného tlaku zistí, že prichádzajú ďalšie údaje, na ktoré predtým zvykol, začne obmedzovať každú dávku na pomerne malú veľkosť, až kým sa sám znova „nestabilizuje“. To má niekedy falošné poplachy a spôsobuje to, že váš tok spomaľuje rýchlosť spracovania.

Ak chcete heuristiku trochu vylepšiť, existujú niektoré nezdokumentované príznaky, ktoré používa (uistite sa, že viete, čo robíte):

val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)

Ak chcete krvavé detaily, PIDRateEstimator je to, čo "re hľadajú.