Jestem świadomy stwierdzenia Kafki, że KafkaConsumer nie jest bezpieczny dla wątków.
Więc zrobiłem to: (Scala)
val m = Map(new TopicPartition(msg.topic(), msg.partition()) -> new OffsetAndMetadata(msg.offset()))
consumer.synchronized{ consumer.commitSync(m) }
Kładę mój dostęp do konsumenta wewnątrz zsynchronizowanego bloku, ale nadal dostaję błędy ConcurrentModificationException w wierszu z konsumentem.commitSync (m).
Dlaczego i co mogę z tym zrobić?
Używam strumieni Akka, więc pod okładkami muszą znajdować się tajemnice wątków, ale czy nie powinien zająć się tym zsynchronizowany blok?
Odpowiedzi:
0 dla odpowiedzi № 1Jednym z podejść zawartych w dokumentacji jest utworzenie osobnego wątku, tylko dla KafkaConsumer, i komunikowanie się z pracą zewnętrzną za pomocą pewnego rodzaju współbieżnej kolejki.