/ / Spark UDF виключення при доступі до змінної широкомовлення - apache-spark, broadcast, udf, notserializableexception

Іскровий UDF виняток при доступі до трансляційної змінної - apache-іскри, трансляція, udf, nonserializableexception

Я маю труднощі з доступом до scala.collection.immutable.Map зсередини іскри УДФ.

Я "" транслюю карту

val browserLangMap = sc.broadcast (Source.fromFile(browserLangFilePath).getLines.map(_.split(,)).map(e => (e(0).toInt,e(1))).toMap)

створення UDF для доступу до карти

def addBrowserCode = udf((browserLang:Int) => if(browserLangMap.value.contains(browserLang)) browserLangMap.value(browserLang) else "")`

використовуючи UDF для додавання нового стовпця

val joinedDF = rawDF.join(broadcast(geoDF).as("GEO"), $"start_ip" === $"GEO.start_ip_num", "left_outer")
.withColumn("browser_code", addBrowserCode($"browser_language"))
.selectExpr(getSelectQuery:_*)

повна стека стеження -> https://www.dropbox.com/s/p1d5322fo9cxro6/stack_trace.txt?dl=0

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$
Serialization stack:
- object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$@30b4ba52)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: MetaDataSchema$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(browser_language#235))
- field (class: org.apache.spark.sql.catalyst.expressions.If, name: falseValue, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.If, if (isnull(browser_language#235)) null else UDF(browser_language#235))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, if (isnull(browser_language#235)) null else UDF(browser_language#235) AS browser_language#507)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@5ae38c4e)
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@5ae38c4e))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 80 more

Я знаю його доступ до карти, що викликає це. Коли я знімаю посилання на те, що в УДФ немає виключення.

def addBrowserCode = udf((browserLang:Int) => browserLang.toString())  //Test UDF without accessing broadcast Map and it works

Версія Spark 1.6

Відповіді:

1 для відповіді № 1

Я знайшов, що це дивна поведінка з ": paste" в іскровій оболонці. Це відбувається лише тоді, коли я вставляю весь код у одну багаторядкову пасту з: paste.

Той же код працює відмінно, якщо спочатку вставити трансляцію і створення UDF, а потім вставити об'єднання + saveToFile в окрему: вставити.

Можливо, випуск оболонок scala. Не знаю.


0 для відповіді № 2

Основна причина пов'язана з оголошенням val sc: SparkContext = spark.sparkContext в коді для змінної широкомовної передачі. Якщо код виконується на іскровій оболонці, sc вже доступний за замовчуванням. Оголошення sc двічі (один за замовчуванням і один у коді) викликає цю проблему "Task not serializable". Тому, на відміну від попередньої заявленої відповіді, немає ніякої проблеми з іскровою оболонкою. Тільки тимчасово видалити декларацію SparkContext, поки в іскрі-оболонці, код буде ОК.