私は文字列の入力を受け取り、SQL文を実行して文字列値を返すscalaを使ってsparkフレームワークにインライン関数を書くことを試みています
val testfunc: (String=>String)= (arg1:String) =>
{val k = sqlContext.sql("""select c_code from r_c_tbl where x_nm = "something" """)
k.head().getString(0)
}
私はこのスカラー関数をUDFとして登録しています
val testFunc_test = udf(testFunc)
私はハイブテーブル上にデータフレームを持っています
val df = sqlContext.table("some_table")
それから、withColumnでudfを呼び出し、新しいデータフレームに保存しようとしています。
val new_df = df.withColumn("test", testFunc_test($"col1"))
しかし、毎回私はこれをやってみるとエラーが出る
16/08/10 21:17:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 10.0.1.5): java.lang.NullPointerException
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:41)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:1434)
私はスパークとスカラには比較的新しいです。しかし、なぜこのコードが動かないのかわかりません。どんな洞察力や回避策も高く評価されます。
私はエラースタック全体を貼り付けていないことに注意してください。必要であれば教えてください。
回答:
回答№1は1あなたは使用することはできません sqlContext
あなたのUDFで - UDFはシリアライズ可能でなければならないエグゼキュータに出荷され、コンテキスト(クラスタへの接続と考えることができる)は直列化されてノードに送信されません。ドライバアプリケーションのみ(UDFは 定義された、 だがしかし 実行された)は、 sqlContext
.
ユースケース(テーブルYのレコードごとにテーブルXの選択を実行する)のように見えるのは、 join
.