Sto cercando di leggere i messaggi kafka dal produttore utilizzando Multithreading Java.
Supponiamo, Produttore Kafka invia più messaggi a Kafka Consumer. quindi come leggere quei messaggi multipli separatamente usando Servizio Executor in JAVA
risposte:
2 per risposta № 1Ho implementato il tuo caso citato e condividerò i passaggi che dovresti seguire.
Creare una classe consumer che deve implementare l'interfaccia Runnable, deve avere un'istanza Kafkaconsumer come membro della classe. È possibile configurare le proprietà del consumatore nel metodo di costruzione.
public LogConsumer(List<String> topics, String group, String brokerList) {
Properties propsConsumer = new Properties();
propsConsumer.put("bootstrap.servers", brokerList);
propsConsumer.put("group.id", group);
propsConsumer.put("enable.auto.commit", "false");
propsConsumer.put("key.deserializer", StringDeserializer.class);
propsConsumer.put("value.deserializer", ByteArrayDeserializer.class);
propsConsumer.put("auto.offset.reset", "latest");
this.consumer = new KafkaConsumer(propsConsumer);
this.consumer.subscribe(topics);
}
Quindi, nel metodo di esecuzione puoi consumare ogni messaggio kafka come condiviso di seguito.
public void run() {
try {
while (!flagOfThread) {
ConsumerRecords<String, byte []> records = consumer.poll(10000);
for (ConsumerRecord<String, byte []> record : records) {
handleRecord(record);
}
// At least one
consumer.commitSync();
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!flagOfThread){
LOG.error("Log Consumer is shutting down.",e);
}
} finally {
consumer.close();
}
}
Nella classe del corridore dell'applicazione, è necessario creare un threadpool, questo il conteggio del pool di thread deve essere uguale al conteggio delle partizioni degli argomenti .
ExecutorService executorService = Executors.newFixedThreadPool(parallelismCount);
for (int i = 0; i < parallelismCount; i++) {
ExecutionLogConsumer bean = new LogConsumer(/*parameters*/);
executorService.execute(bean);
}
Ora puoi iniziare a consumare i tuoi messaggi da kafka :)