J'essaie d'écrire une fonction inline dans le cadre spark avec scala, qui prend une entrée de chaîne, exécute une instruction sql et me renvoie une valeur de chaîne
val testfunc: (String=>String)= (arg1:String) =>
{val k = sqlContext.sql("""select c_code from r_c_tbl where x_nm = "something" """)
k.head().getString(0)
}
J'enregistre cette fonction scala en tant que fichier UDF
val testFunc_test = udf(testFunc)
J'ai une base de données sur une table ruche
val df = sqlContext.table("some_table")
Ensuite, j'appelle le fichier UDF dans une colonne withColumn et tente de l'enregistrer dans un nouveau cadre de données.
val new_df = df.withColumn("test", testFunc_test($"col1"))
Mais chaque fois que j'essaie de faire cela, j'obtiens une erreur
16/08/10 21:17:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 10.0.1.5): java.lang.NullPointerException
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:41)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:1434)
Je suis relativement nouveau pour susciter et scala. Mais je ne sais pas pourquoi ce code ne devrait pas être exécuté. Toute idée ou un travail sera hautement apprécié.
Veuillez noter que je n'ai pas collé toute la pile d'erreur. S'il vous plaît laissez-moi savoir si c'est nécessaire.
Réponses:
1 pour la réponse № 1Vous ne pouvez pas utiliser sqlContext
dans votre UDF - Les UDF doivent être sérialisables pour êtreexpédiés aux exécuteurs, et le contexte (qui peut être considéré comme une connexion au cluster) ne peut pas être sérialisé et envoyé au nœud - uniquement l'application de pilote (où le fichier UDF est défini, mais non réalisé) peut utiliser le sqlContext
.
Il semblerait que votre cas d'utilisation (effectuez une sélection dans le tableau X par enregistrement dans le tableau Y) serait mieux réalisé en utilisant un join
.