/ / Impossible d’indexer JSON à partir de HDFS avec SchemaRDD.saveToES () dans Elasticsearch-hadoop - json, scala, elasticsearch, apache-spark, apache-spark-sql

Impossible d'indexer JSON à partir de HDFS à l'aide de SchemaRDD.saveToES () dans Elasticsearch-hadoop - json, scala, elasticsearch, apache-spark, apache-spark-sql

C’est ma première véritable tentative d’étincelle / scala, alors soyez gentil.

J'ai un fichier appelé test.json sur HDFS que je "tente de lire et d’indexer avec Spark. Je peux lire le fichier via SQLContext.jsonFile () mais lorsque je tente d’utiliser SchemaRDD.saveToEs (), je reçois une erreur de fragment JSON non valide. Je pense que la fonction saveToES () n’est pas en train de formater la sortie en json, mais plutôt d’envoyer le champ de valeur du RDD.

Qu'est-ce que je fais mal?

Spark 1.2.0

Elasticsearch-hadoop 2.1.0.BUILD-20150217

test.json:

{"key":"value"}

coquille d'allumage:

import org.apache.spark.SparkContext._
import org.elasticsearch.spark._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

val input = sqlContext.jsonFile("hdfs://nameservice1/user/mshirley/test.json")
input.saveToEs("mshirley_spark_test/test")

Erreur:

<snip>
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[["value"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=9): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 34, 118, 97, 108, 117, 101, 34, 93, 10]]; ]]; Bailing out..
<snip>

contribution:

res2: org.apache.spark.sql.SchemaRDD =
SchemaRDD[6] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [key#0], MappedRDD[5] at map at JsonRDD.scala:47

input.printSchema ():

root
|-- key: string (nullable = true)

Réponses:

2 pour la réponse № 1

https://github.com/elastic/elasticsearch-hadoop/issues/382

modifié:

import org.elasticsearch.spark._

à:

import org.elasticsearch.spark.sql._