/ / Spark Streaming Kafka reduz a velocidade de consumo direto do consumidor - scala, amazon-web-services, apache-spark, apache-kafka, spark-streaming

Spark Streaming Kafka queda direta na velocidade de consumo - scala, amazon-web-services, apache-faísca, apache-kafka, streaming de faísca

interface do usuário de streaming de faísca O consumidor direto Kafka começou a limitar as leituras a 450 eventos (partições 5 * 90) por lote (5 segundos); estava funcionando bem por 1 ou 2 dias antes (cerca de 5000 a 40000 eventos por lote)

Estou usando o cluster independente do spark (spark e spark-streaming-kafka versão 1.6.1) em execução na AWS e usando o bucket S3 para o diretório do ponto de verificação StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext), não há atrasos de planejamento e espaço em disco suficiente em cada nó do trabalhador.

Não alterou nenhum parâmetro de inicialização do cliente Kafka, com certeza a estrutura do kafka não foi alterada:

val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)

Também não consigo entender por que, quando a descrição direta do consumidor diz The consumed offsets are by the stream itself Ainda preciso usar o diretório do ponto de verificação ao criar o contexto de streaming?

Respostas:

1 para resposta № 1

Geralmente, isso é resultado da ativação da contrapressão através da configuração spark.streaming.backpressure.enabled verdadeiro. Normalmente, quando o algoritmo de contrapressão vê que há mais dados chegando do que costumava, ele começa a limitar cada lote a um tamanho bastante pequeno até que possa se "estabilizar" novamente. Às vezes, isso tem falsos positivos e faz com que seu fluxo diminua a taxa de processamento.

Se você deseja ajustar um pouco a heurística, existem algumas sinalizações não documentadas que ela está usando (apenas verifique se você sabe o que está fazendo):

val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)

Se você quiser os detalhes sangrentos, PIDRateEstimator é o que você está procurando.