/ / So verwenden Sie Multithreading in Kafka - Java, Multithreading, Apache-Kafka

Wie man Multithreading in Kafka verwendet - Java, Multithreading, Apache-Kafka

Ich versuche, Kafka-Nachrichten vom Hersteller mit zu lesen Java Multithreading.

Annehmen, Kafka-Produzent Senden Sie mehrere Nachrichten an Kafka-Verbraucher. dann, wie man diese mehrfachen Mitteilungen einzeln liest ExecutorService in JAVA

Antworten:

2 für die Antwort № 1

Ich habe Ihren erwähnten Fall implementiert und werde Ihnen Schritte mitteilen, die Sie beachten sollten.

Erstellen Sie eine Consumer-Klasse, die eine Runnable-Schnittstelle implementieren muss. Sie muss eine Kafkaconsumer-Instanz als Klassenmitglied haben. Sie können Consumer-Eigenschaften in der Konstruktormethode konfigurieren.

public LogConsumer(List<String> topics, String group, String brokerList) {

Properties propsConsumer = new Properties();
propsConsumer.put("bootstrap.servers", brokerList);
propsConsumer.put("group.id", group);
propsConsumer.put("enable.auto.commit", "false");
propsConsumer.put("key.deserializer", StringDeserializer.class);
propsConsumer.put("value.deserializer", ByteArrayDeserializer.class);
propsConsumer.put("auto.offset.reset", "latest");

this.consumer = new KafkaConsumer(propsConsumer);
this.consumer.subscribe(topics);
}

In der Ausführungsmethode können Sie dann jede Kafka-Nachricht wie unten angegeben verbrauchen.

public void run() {
try {
while (!flagOfThread) {
ConsumerRecords<String, byte []> records = consumer.poll(10000);
for (ConsumerRecord<String, byte []> record : records) {
handleRecord(record);
}
// At least one
consumer.commitSync();
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!flagOfThread){
LOG.error("Log Consumer is shutting down.",e);
}

} finally {
consumer.close();
}
}

In Ihrer Application Runner-Klasse sollten Sie einen Threadpool erstellen Die Anzahl der Thread-Pools muss der Anzahl der Themenpartitionen entsprechen .

 ExecutorService executorService = Executors.newFixedThreadPool(parallelismCount);
for (int i = 0; i < parallelismCount; i++) {
ExecutionLogConsumer bean = new LogConsumer(/*parameters*/);
executorService.execute(bean);
}

Jetzt können Sie anfangen, Ihre Nachrichten von Kafka zu verbrauchen :)