O consumidor direto Kafka começou a limitar as leituras a 450 eventos (partições 5 * 90) por lote (5 segundos); estava funcionando bem por 1 ou 2 dias antes (cerca de 5000 a 40000 eventos por lote)
Estou usando o cluster independente do spark (spark e spark-streaming-kafka versão 1.6.1) em execução na AWS e usando o bucket S3 para o diretório do ponto de verificação StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext)
, não há atrasos de planejamento e espaço em disco suficiente em cada nó do trabalhador.
Não alterou nenhum parâmetro de inicialização do cliente Kafka, com certeza a estrutura do kafka não foi alterada:
val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
Também não consigo entender por que, quando a descrição direta do consumidor diz The consumed offsets are by the stream itself
Ainda preciso usar o diretório do ponto de verificação ao criar o contexto de streaming?
Respostas:
1 para resposta № 1Geralmente, isso é resultado da ativação da contrapressão através da configuração spark.streaming.backpressure.enabled
verdadeiro. Normalmente, quando o algoritmo de contrapressão vê que há mais dados chegando do que costumava, ele começa a limitar cada lote a um tamanho bastante pequeno até que possa se "estabilizar" novamente. Às vezes, isso tem falsos positivos e faz com que seu fluxo diminua a taxa de processamento.
Se você deseja ajustar um pouco a heurística, existem algumas sinalizações não documentadas que ela está usando (apenas verifique se você sabe o que está fazendo):
val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
Se você quiser os detalhes sangrentos, PIDRateEstimator
é o que você está procurando.