/ / Jak mogę naprawić błędy ConcurrentModificationException w Kafce? (0.9.0.1) - wielowątkowość, scala, apache-kafka

Jak mogę naprawić błędy ConcurrentModificationException w Kafka? (0.9.0.1) - wielowątkowość, scala, apache-kafka

Jestem świadomy stwierdzenia Kafki, że KafkaConsumer nie jest bezpieczny dla wątków.

Więc zrobiłem to: (Scala)

val m = Map(new TopicPartition(msg.topic(), msg.partition()) -> new OffsetAndMetadata(msg.offset()))
consumer.synchronized{ consumer.commitSync(m) }

Kładę mój dostęp do konsumenta wewnątrz zsynchronizowanego bloku, ale nadal dostaję błędy ConcurrentModificationException w wierszu z konsumentem.commitSync (m).

Dlaczego i co mogę z tym zrobić?

Używam strumieni Akka, więc pod okładkami muszą znajdować się tajemnice wątków, ale czy nie powinien zająć się tym zsynchronizowany blok?

Odpowiedzi:

0 dla odpowiedzi № 1

Jednym z podejść zawartych w dokumentacji jest utworzenie osobnego wątku, tylko dla KafkaConsumer, i komunikowanie się z pracą zewnętrzną za pomocą pewnego rodzaju współbieżnej kolejki.