/ / Spark Streaming, kafka: java.lang.StackOverflowError - Scala, Apache-Kafka, Funken-Streaming

Spark-Streaming, kafka: java.lang.StackOverflowError - Scala, Apache-Kafka, Funken-Streaming

Ich werde beim Spark-Streaming unter dem FehlerAnwendung, ich verwende Kafka für den Eingabestrom. Als ich mit Socket arbeitete, funktionierte es gut. Aber wenn ich zu kafka gewechselt habe, gibt es einen Fehler. Jeder hat eine Ahnung, warum er einen Fehler auslöst. Muss ich meine Stapelzeit ändern und die Zeigereizeit überprüfen?

ERROR StreamingContext: Fehler beim Starten des Kontexts und Markierung als gestoppt java.lang.StackOverflowError

Mein Programm:

def main(args: Array[String]): Unit = {

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(5))
val brokers = args(0)
val topics= args(1)
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

val inputStream = messages.map(_._2)
//    val inputStream = ssc.socketTextStream(args(0), args(1).toInt)
ssc.checkpoint(checkpointDirectory)
inputStream.print(1)
val parsedStream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
})
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try

val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})

state.checkpoint(Duration(10000))
state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
}
}

Antworten:

0 für die Antwort № 1

Versuchen Sie, das Checkpoint-Verzeichnis zu löschen.

Ich bin nicht sicher, aber es scheint, dass Ihr Streaming-Kontext die Checkpoints nicht wiederherstellen kann.

trotzdem hat es für mich funktioniert.