/ / UDFからspark sqlクエリを実行しようとしています - scala、hadoop、apache-spark、apache-spark-sql、spark-dataframe

UDFからスパークSQLクエリを実行しようとしています - scala、hadoop、apache-spark、apache-spark-sql、spark-dataframe

私は文字列の入力を受け取り、SQL文を実行して文字列値を返すscalaを使ってsparkフレームワークにインライン関数を書くことを試みています

val testfunc: (String=>String)= (arg1:String) =>
{val k = sqlContext.sql("""select c_code from r_c_tbl where x_nm = "something" """)
k.head().getString(0)
}

私はこのスカラー関数をUDFとして登録しています

   val testFunc_test = udf(testFunc)

私はハイブテーブル上にデータフレームを持っています

    val df = sqlContext.table("some_table")

それから、withColumnでudfを呼び出し、新しいデータフレームに保存しようとしています。

    val new_df = df.withColumn("test", testFunc_test($"col1"))

しかし、毎回私はこれをやってみるとエラーが出る

16/08/10 21:17:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,       10.0.1.5): java.lang.NullPointerException
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:41)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:1434)

私はスパークとスカラには比較的新しいです。しかし、なぜこのコードが動かないのかわかりません。どんな洞察力や回避策も高く評価されます。

私はエラースタック全体を貼り付けていないことに注意してください。必要であれば教えてください。

回答:

回答№1は1

あなたは使用することはできません sqlContext あなたのUDFで - UDFはシリアライズ可能でなければならないエグゼキュータに出荷され、コンテキスト(クラスタへの接続と考えることができる)は直列化されてノードに送信されません。ドライバアプリケーションのみ(UDFは 定義された、 だがしかし 実行された)は、 sqlContext.

ユースケース(テーブルYのレコードごとにテーブルXの選択を実行する)のように見えるのは、 join.