/ / Kafka Właściwy sposób odpytywania No Records - apache-kafka, kafka-consumer-api

Kafka Właściwa droga do ankiety No Records - apache-kafka, kafka-consumer-api

za utrzymanie mojego konsumenta przy życiu (bardzo długa zmiennaprzetwarzanie długości) Wdrażam puste wywołanie poll () w wątku w tle, które powstrzyma brokera przed ponowną równowagą, jeśli spędzę zbyt wiele czasu między ankietami (). Ustawiłem mój interwał sondowania na bardzo długi, ale nie „chcę ciągle zwiększać to na zawsze, aby dłużej i dłużej przetwarzać.

Jaki jest właściwy sposób ankietowania bez żadnych rekordów? Obecnie nazywam poll (), a następnie wracam do najwcześniejszych offsetu dla każdej partycji zwróconej w wywołaniu poll (), aby mogły być poprawnie odczytane przez główny wątek po zakończeniu przetwarzania poprzednich wiadomości.

ConsumerRecords<String, String> msgs = kafkaConsumer.poll(timeout);
Map<Integer, Long> partitionToOffsets = getEarliestPartitionOffsets(msgs); // helper method
seekToOffsets(partitionToOffsets);

Odpowiedzi:

3 dla odpowiedzi № 1

Właściwym sposobem radzenia sobie z długim czasem przetwarzania (i unikania ponownego wyważenia konsumenta) jest użycie KafkaConsumer.pause() / KafkaConsumer.resume() metody. Możesz przeczytać więcej na ten temat tutaj: