Mein Spring-Boot-Projekt enthält eine Anwendung, die Kafka Streams API demonstriert. Ich kann alle Nachrichten im Thema konsumieren customer
Verwenden des Befehls
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic customer --from-beginning
Was ist der ähnliche Befehl in Kafka Streams API, um Nachrichten mit KStream oder KTable zu konsumieren? Ich habe es versucht
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
properties.put("auto.offset.reset", "earliest");
Beide haben nicht funktioniert. Ich habe einen Testfall erstellt, um mit zu konsumieren KafkaConsumer
Statt Streams funktionierte es nicht. Der Code wurde hochgeladen Github als Referenz. Jede Hilfe wäre großartig.
Antworten:
1 für die Antwort № 1Das Werkzeug bin/kafka-streams-application-reset.sh
erlaubt seit v1.1 zu suchen.