/ / kafka.cluster.BrokerEndPoint ne peut pas être converti en question kafka.cluster.Broker - scala, apache-spark, apache-kafka

kafka.cluster.BrokerEndPoint ne peut pas être converti en problème kafka.cluster.Broker - scala, apache-spark, apache-kafka

J'utilise kafka2.11-0.11.0.1, scala 2.11 et spark 2.2.0. J'ai ajouté les fichiers jars suivants au chemin de génération java de eclipse:

kafka-streams-0.11.0.1,
kafka-tools-0.11.0.1,
spark-streaming_2.11-2.2.0,
spark-streaming-kafka_2.11-1.6.3,
spark-streaming-kafka-0-10_2.11-2.2.0,
kafka_2.11-0.11.0.1.

Et mon code est ci-dessous:

import kafka.serializer.StringDecoder
import kafka.api._
import kafka.api.ApiUtils._
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.SparkContext._


object KafkaExample {

def main(args: Array[String]) {

val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))

val kafkaParams = Map("bootstrap.servers" -> "kafkaIP:9092")

val topics = List("logstash_log").toSet

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics).map(_._2)

stream.print()

ssc.checkpoint("C:/checkpoint/")
ssc.start()
ssc.awaitTermination()
}
}

C'est un code très simple pour connecter simplement spark et kafka. Cependant, j'obtiens cette erreur:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:146)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at com.defne.KafkaExample$.main(KafkaExample.scala:28)
at com.defne.KafkaExample.main(KafkaExample.scala)

Où est-ce que je me trompe?

NOTE: J'ai essayé "metadata.broker.list" au lieu de "bootstrap.server" mais aucun changement.

Réponses:

0 pour la réponse № 1

Votre problème est que vous avez trop de dépendances Kafka chargées, et que celles ramassées lors de l'exécution ne sont pas compatibles avec la version attendue par Spark.

Votre réel le problème est le PartitionMetadata classe. Dans 0.8.2, cela ressemble à ceci (c'est ce que vous obtenez de spark-streaming-kafka_2.11-1.6.3):

case class PartitionMetadata(partitionId: Int,
val leader: Option[Broker],
replicas: Seq[Broker],
isr: Seq[Broker] = Seq.empty,
errorCode: Short = ErrorMapping.NoError) extends Logging

Et dans> 0.10.0.0 comme ceci:

case class PartitionMetadata(partitionId: Int,
leader: Option[BrokerEndPoint],
replicas: Seq[BrokerEndPoint],
isr: Seq[BrokerEndPoint] = Seq.empty,
errorCode: Short = Errors.NONE.code) extends Logging

Regardez comment leader changé de Option[Broker] à Option[BrokerEndPoint]? C'est ce que crie Spark.

Vous devez nettoyer vos dépendances, tout ce dont vous avez besoin est (si vous utilisez Spark 2.2), c’est:

spark-streaming_2.11-2.2.0,
spark-streaming-kafka-0-10_2.11-2.2.0