/ / स्पार्क स्ट्रीमिंग काफ्का प्रत्यक्ष उपभोक्ता उपभोग गति ड्रॉप

स्पार्क स्ट्रीमिंग काफ्का प्रत्यक्ष उपभोक्ता खपत की गति ड्रॉप - स्कैला, अमेज़ॅन-वेब-सेवाएं, अपाचे-स्पार्क, अपाचे-काफ्का, स्पार्क-स्ट्रीमिंग

स्पार्क स्ट्रीमिंग यूआई कफ़्का प्रत्यक्ष उपभोक्ता ने प्रति बैच (5 सेकंड) में 450 घटनाओं (5 * 90 विभाजन) तक रीड्स को सीमित करना शुरू कर दिया, यह उससे 1 या 2 दिन पहले ठीक चल रहा था (लगभग 5000 से 40000 घटनाएं प्रति बैच)

मैं AWS में स्पार्क स्टैंडअलोन क्लस्टर (स्पार्क और स्पार्क-स्ट्रीमिंग-काफ्का संस्करण 1.6.1) का उपयोग कर रहा हूं और चेकपॉइंट निर्देशिका के लिए S3 बाल्टी का उपयोग कर रहा हूं StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext), प्रत्येक कार्यकर्ता नोड पर विलंब और पर्याप्त डिस्क स्थान शेड्यूल नहीं कर रहे हैं।

किसी भी काफ़्का ग्राहक आरंभीकरण मापदंडों को नहीं बदला, काफ़्का की संरचना में कोई बदलाव नहीं हुआ:

val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)

यह भी समझ में नहीं आता है कि जब प्रत्यक्ष उपभोक्ता विवरण क्यों कहता है The consumed offsets are by the stream itself स्ट्रीमिंग संदर्भ बनाते समय मुझे अभी भी चेकपॉइंट निर्देशिका का उपयोग करने की आवश्यकता है?

उत्तर:

उत्तर № 1 के लिए 1

यह आमतौर पर सेटिंग के माध्यम से बैकस्पेस को सक्षम करने का परिणाम है spark.streaming.backpressure.enabled सच करने के लिए। आमतौर पर, जब बैकस्पेस एल्गोरिथ्म देखता है कि इसमें अधिक डेटा आ रहा है तो इसका उपयोग किया जाता है, यह प्रत्येक बैच को एक छोटे आकार में कैपिंग करना शुरू कर देता है जब तक कि यह फिर से "स्थिर" न हो जाए। इससे कभी-कभी गलत सकारात्मकता होती है और आपकी धारा प्रसंस्करण दर को धीमा कर देती है।

यदि आप उत्तराधिकारी को थोड़ा मोड़ना चाहते हैं, तो कुछ अविवादित झंडे हैं जो इसका उपयोग कर रहे हैं (बस सुनिश्चित करें कि आप जानते हैं कि आप "क्या कर रहे हैं"):

val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)

यदि आप विवरण चाहते हैं, PIDRateEstimator क्या आप "के लिए फिर से देख रहे हैं ।