/ / Spark Streaming, kafka: java.lang.StackOverflowError - скала, apache-kafka, стрийминг

Spark стрийминг, kafka: java.lang.StackOverflowError - скала, apache-kafka, искра

Попадам под грешката в искровото излъчванеприлагане, аз съм с kafka за входен поток. Когато правех с гнездо, тя работеше добре. Но когато се промени в kafka това е "даване на грешка. Всеки, който има представа защо е хвърляне на грешка, трябва ли да се промени времето ми партида и проверка на времето за насочване?

ERROR StreamingContext: Грешка при стартирането на контекста, маркирането му като спряно java.lang.StackOverflowError

Моята програма:

def main(args: Array[String]): Unit = {

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(5))
val brokers = args(0)
val topics= args(1)
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

val inputStream = messages.map(_._2)
//    val inputStream = ssc.socketTextStream(args(0), args(1).toInt)
ssc.checkpoint(checkpointDirectory)
inputStream.print(1)
val parsedStream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
})
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try

val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})

state.checkpoint(Duration(10000))
state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
}
}

Отговори:

0 за отговор № 1

Опитайте да изтриете директорията на контролните точки.

Не съм сигурен, но изглежда, че вашият контекст на стрийминг не може да се възстанови от контролните точки.

така или иначе, тя работи за мен.