/ / Kafka-Verbraucher tritt nicht der benutzerdefinierten groupId bei - java, spring, apache-kafka, spring-kafka

Kafka-Verbraucher schließt sich keiner benutzerdefinierten groupId an - Java, Frühling, Apache-Kafka, Frühling-Kafka

Ich habe Kafka ConsumerFactory nach Spring eingerichtetKafka-Dokumentation. Die groupId scheint jedoch nicht verwendet zu werden. Vielleicht verstehe ich das Ganze auch nur falsch, also wollte ich Sie wissen lassen, was ich erlebt habe.

Dies ist meine Konfiguration, die nicht zu funktionieren scheint:

@Bean
ConsumerFactory<String, KafkaEvent> kafkaEventConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
getConsumerProperties(),
new StringDeserializer(),
new JsonDeserializer<>(KafkaEvent.class));
}

Map<String, Object> getConsumerProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // TODO
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);


props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);

props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 70000);

return props;
}

Und ich habe eine @KafkaEventListener konfiguriert wie folgt, ohne die groupId erneut explizit anzugeben:

@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC)
public class KafkaEventListener {

@Autowired
private ConsumerFactory<String, KafkaEvent> consumerFactory;

@KafkaHandler
public void listenTo(@Payload KafkaEvent event) {
LOGGER.error(LogMarker.KAFKA, consumerFactory.getConfigurationProperties().toString());
}

}

Ich kann auch sehen, dass meine groupId "myGroupId" istim Fehlerprotokoll wie oben protokolliert enthalten. Was mich jedoch misstrauisch macht, ist die DEBUG-Protokollierung eines ConsumerCoordinators, der immer angibt, einer anderen groupId beizutreten, und ich bin ein bisschen besorgt, dass dies korrekt aussieht.

2017-09-04 15:28:13.904 (    ) INFO consumer.internals.AbstractCoordinator             - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.904 (    ) INFO consumer.internals.AbstractCoordinator             - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.906 (    ) INFO consumer.internals.ConsumerCoordinator             - Setting newly assigned partitions [] for group org.springframework.kafka.KafkaListenerEndpointContainer#0
2017-09-04 15:28:13.907 (    ) INFO consumer.internals.ConsumerCoordinator             - Setting newly assigned partitions [my-topic-0] for group org.springframework.kafka.KafkaListenerEndpointContainer#0

Auch beim Spring Startup wird die ConsumerConfig ausgegeben. Ich kann sehen, dass die groupId falsch ist, andere Attribute jedoch korrekt übernommen werden.

Soweit ich verstanden habe, kann ich die groupId global festlegen, indem ich sie in der ConsumerFactory oder in application.properties mithilfe von festlege spring.kafka.consumer.group-id. Beide Varianten funktionieren jedoch nicht.

Nur wenn ich die groupId über konfiguriere @KafkaListener Anmerkung Das LOG gibt an, dass der Consumer der richtigen Gruppe beigetreten ist:

2017-09-04 15:38:30.787 (    ) DEBUG consumer.internals.AbstractCoordinator             - Received successful JoinGroup response for group myGroupId: org.apache.kafka.common.requests.JoinGroupResponse@4c51c449

Mit dieser Konfiguration:

@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC, groupId = "myGroupId")

Wir verwenden Spring Boot 2.0.0.M3 (also Spring Kafka 2.0.0.M3)

Antworten:

1 für die Antwort № 1

Es ist ein Fehler in M3; am Master fixiert (2.0.3.BUILD-SNAPSHOT) (und in 1.3.0.M2). Wir gehen davon aus, dass wir den Release-Kandidaten für 2.0.0.RC1 noch in dieser Woche veröffentlichen werden (und auf das Spring Framework RC4 warten werden).