/ / ¿Hay una forma estándar de restablecer las compensaciones al usar la anotación @KafkaListener? - apache-kafka, spring-kafka

¿Hay una forma estándar de restablecer las compensaciones al usar la anotación @KafkaListener? - apache-kafka, spring-kafka

Estoy usando la Interfaz KafkaListener en mi aplicación Spring Boot, que funciona bien. Las compensaciones son almacenadas por el propio Kafka.

Ahora digamos que uno de los consumidores de un tema implementa una nueva versión y arruina 2 horas de mensajes. Luego arreglan la aplicación y desean iniciar la nueva versión con un desplazamiento de hace dos horas.

Podría usar consumer.seek () con una llamada de consumer.offsetsForTimes () anterior, pero eso es sencillo cuando uso el mecanismo de sondeo, no cuando utilizo el Spring KafkaListener.

Intenté usar un ConsumerRebalanceListener, pero para configurar las compensaciones necesito que el consumidor esté activo, lo cual no tengo al crear una instancia del ConsumerRebalanceListener ...

¿Necesito cambiar al sondeo? ¿Puedo acceder al consumidor antes de que se invoque mi KafkaListener la primera vez, pero después de que se le hayan asignado las particiones al consumidor?

Respuestas

1 para la respuesta № 1

No concurrentemente el proximo Lanzamiento 2.0 introduce ConsumerAwareMessageListener, que soporta pasar el Consumer como un argumento en el @KafkaListener método.