これはスパーク/スカラーでの私の最初の実際の試みなので、穏やかにしてください。
testというファイルがあります。Sparkを使用して読み取りおよびインデックスを作成しようとしているHDFS上のjson。 saveToES()関数は実際にjsonで出力をフォーマットするのではなく、RDDの値フィールドを送信するだけだと考えています。
私は間違って何をしていますか?
Spark 1.2.0
Elasticsearch-hadoop 2.1.0.BUILD-20150217
test.json:
{"key":"value"}
スパークシェル:
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val input = sqlContext.jsonFile("hdfs://nameservice1/user/mshirley/test.json")
input.saveToEs("mshirley_spark_test/test")
エラー:
<snip>
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[["value"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=9): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 34, 118, 97, 108, 117, 101, 34, 93, 10]]; ]]; Bailing out..
<snip>
入力:
res2: org.apache.spark.sql.SchemaRDD =
SchemaRDD[6] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [key#0], MappedRDD[5] at map at JsonRDD.scala:47
input.printSchema():
root
|-- key: string (nullable = true)
回答:
回答№1は2https://github.com/elastic/elasticsearch-hadoop/issues/382
かわった:
import org.elasticsearch.spark._
に:
import org.elasticsearch.spark.sql._