/ / Przeczytaj temat Kafki od początku w KStream - apache-kafka, kafka-consumer-api, apache-kafka-streams

Przeczytaj temat Kafki od początku w KStream - apache-kafka, kafka-consumer-api, apache-kafka-streams

Mój projekt wiosennego rozruchu ma aplikację, która demonstruje API Kafka Streams. Jestem w stanie skonsumować wszystkie wiadomości w temacie customer za pomocą polecenia

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic customer --from-beginning

Jakie jest podobne polecenie w Kafka Streams API do odbierania wiadomości za pomocą KStream lub KTable? Próbowałem

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); properties.put("auto.offset.reset", "earliest");

Oba nie działały, stworzyłem przypadek testowy do konsumpcji KafkaConsumer zamiast strumieni, to nie działa. Kod przesłany do Github na przykład. Każda pomoc byłaby świetna.

Odpowiedzi:

1 dla odpowiedzi № 1

Narzędzie bin/kafka-streams-application-reset.sh pozwala szukać od wersji 1.1.

Cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application