/ / स्कैला - स्पार्क - एक स्ट्रिंग कॉलम वाले डेटाफ्रेम को एक डीएफ में रिग प्रकार के साथ कॉलम के साथ कैसे परिवर्तित करें? - जेसन, स्कैला, अपाचे-स्पार्क

स्कैला - स्पार्क - एक स्ट्रिंग कॉलम वाले डेटाफ्रेम को एक डीएफ में रिग प्रकार के साथ कॉलम के साथ कैसे परिवर्तित करें? - जेसन, स्कैला, अपाचे-स्पार्क

मैं वर्तमान में एक समस्या का सामना कर रहा हूं कि मैं हल करने में असमर्थ हूं। मैं स्पार्क 1.6 का उपयोग कर रहा हूँ।

मेरे पास एक कॉलम के साथ एक टेक्स्ट डेटाफ्रेम हैकई क्षेत्रों के साथ एक स्ट्रिंग JSON युक्त। कुछ फ़ील्ड को स्ट्रिंग के लिए अनुमानित किया जाना चाहिए, दूसरों को ऐरे और कुछ लांग तक, कुछ स्कीमा के अनुसार "मैंने एक सही जेसन से अनुमान लगाया है:

 {"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"test@test.com","prices_vat":["20295930","20295930"]}

मैं केवल इसे फ़ील्ड के स्ट्रिंग कॉलम के साथ एक डीएफ में बदलने में कामयाब रहा। मैं इसे सही प्रकार में बदलने में सक्षम नहीं था।

कामयाब स्कीमा df_schema में है। कॉलम "मान" में स्ट्रिंग JSON है जिसमें मुझे पार्स करने की आवश्यकता है। यहां मेरा कोड है:

     var b = sqlContext.createDataFrame(df_txt.rdd,df_schema)
val z= {
b.select( b.columns.map(c => get_json_object(b("value"), s"$$.$c").alias(c)): _*)
}
var c = sqlContext.createDataFrame(z.rdd,df_schema)
c.show(1)

मैं इस अपवाद के साथ समाप्त होता हूं क्योंकि "price_vat" फ़ील्ड में सरणी को स्ट्रिंग के रूप में समझा जाता है और dray_schema ine की तरह ऐरे के रूप में नहीं:

   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 1 times, most recent failure: Lost task 0.0 in stage 38.0 (TID 32, localhost): scala.MatchError: ["20295930","20295930"] (of class java.lang.String)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:159)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

कृपया मेरी मदद करें !

उत्तर:

जवाब के लिए 3 № 1

सौभाग्य से स्पार्क ने JSON डेटा को संभालने के लिए कार्यक्षमता में कुछ बनाया है:

scala> val jsonRDD = sc.parallelize(
|      """{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"test@test.com","prices_vat":["20295930","20295930"]}""" :: Nil)
jsonRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27

scala> val df = sqlContext.read.json(jsonRDD)
df: org.apache.spark.sql.DataFrame = [email: string, eventid: string, prices_vat: array<string>]

scala> df.show
+-------------+--------------------+--------------------+
|        email|             eventid|          prices_vat|
+-------------+--------------------+--------------------+
|test@test.com|3bc1c5d2-c10f-48d...|[20295930, 20295930]|
+-------------+--------------------+--------------------+


scala> df.printSchema
root
|-- email: string (nullable = true)
|-- eventid: string (nullable = true)
|-- prices_vat: array (nullable = true)
|    |-- element: string (containsNull = true)

यह भी ध्यान रखें कि यदि आप स्पार्क को उन संख्याओं को पहचानना चाहते हैं prices_vat फ़ील्ड उन्हें तदनुसार स्वरूपित किया जाना चाहिए:

scala> val jsonRDD2 = sc.parallelize(
|      """{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"test@test.com","prices_vat":[20295930,20295930]}""" :: Nil)
jsonRDD2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:27

scala> val df2 = sqlContext.read.json(jsonRDD2)
df2: org.apache.spark.sql.DataFrame = [email: string, eventid: string, prices_vat: array<bigint>]

scala> df2.show
+-------------+--------------------+--------------------+
|        email|             eventid|          prices_vat|
+-------------+--------------------+--------------------+
|test@test.com|3bc1c5d2-c10f-48d...|[20295930, 20295930]|
+-------------+--------------------+--------------------+


scala> df2.printSchema
root
|-- email: string (nullable = true)
|-- eventid: string (nullable = true)
|-- prices_vat: array (nullable = true)
|    |-- element: long (containsNull = true)

यदि आपके पास जेसन है DataFrame पहले से ही आप ऐसा कुछ कर सकते हैं:

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val df = sc.parallelize(
|      """{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"test@test.com","prices_vat":[20295930,20295930]}""" :: Nil).toDF("json")
df: org.apache.spark.sql.DataFrame = [json: string]

scala> df.show
+--------------------+
|                json|
+--------------------+
|{"eventid":"3bc1c...|
+--------------------+


scala> val rdd = df.rdd.map{case Row(json: String) => json}
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[43] at map at <console>:30

scala> val outDF = sqlContext.read.json(rdd)
outDF: org.apache.spark.sql.DataFrame = [email: string, eventid: string, prices_vat: array<bigint>]

scala> outDF.show
+-------------+--------------------+--------------------+
|        email|             eventid|          prices_vat|
+-------------+--------------------+--------------------+
|test@test.com|3bc1c5d2-c10f-48d...|[20295930, 20295930]|
+-------------+--------------------+--------------------+

जवाब के लिए 0 № 2

Evan058 के लिए धन्यवाद, हमने यह पता लगाया कि इस समस्या से निपटने के लिए कैसे। इसे मेरे कोड में जोड़ने से काम लगता है:

var y= df_txt.select("value").rdd.map(r => r(0).asInstanceOf[String]).collect()
var o = sc.parallelize(y)
val r = sqlContext.read.json(o)