/ / Spark master не се позовава на Custom InputFormat - xml, hadoop, apache-spark

Spark майсторът не извиква Custom InputFormat - xml, hadoop, apache-spark

Опитвам се да изследвам Apache Spark, като част от това исках да персонализирам InputFormat. В моя случай исках да чета xml файл и конвертирате всяко възникване на <text> до нов запис.

Написах персонализирано TextInputFormat (XMLRecordInputFormat.java) който се връща персонализирано **XMLRecordReader extends org.apache.hadoop.mapreduce.RecordReader**

Но не разбирам защо Spark master не се позовава на персонализиран входен формат (XMLRecordInputFormat.class)? По някаква причина той продължава да се държи като нормален разделител на линия.

Следва кодът:

import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;


import scala.Tuple2;

public class CustomizedXMLReader{

public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("CustomizedXMLReader")
.set("spark.executor.memory", "512m").set("record.delimiter.regex", "</bermudaview>");

JobConf jobConf = new JobConf(new Configuration(), CustomizedXMLReader.class);
jobConf.setInputFormat(XMLRecordInputFormat.class);
FileInputFormat.setInputPaths(jobConf, new Path(args[0]));


JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDD<LongWritable, Text> lines = ctx.hadoopRDD(jobConf,XMLRecordInputFormat.class, LongWritable.class, Text.class);



Function<Tuple2<LongWritable, Text>, XMLRecord> keyData =
new Function<Tuple2<LongWritable, Text>, XMLRecord>() {
@Override
public XMLRecord call(Tuple2<LongWritable, Text> arg0)
throws Exception {
// TODO Auto-generated method stub
System.out.println(arg0.toString());
XMLRecord record = new XMLRecord();
record.setPos(Long.getLong(arg0._1.toString()));
record.setXml(arg0._2.toString());
return record;
}
};

JavaRDD<XMLRecord> words = lines.map(keyData);

List<XMLRecord>  tupleList = words.collect();

Iterator<XMLRecord> itr = tupleList.iterator();

while(itr.hasNext()){
XMLRecord t = itr.next();
System.out.println(t.getXml());
System.out.println(t.getPos());
}
}
}

//following custom InputFormat implementation

public class XMLRecordInputFormat extends TextInputFormat{

public RecordReader<LongWritable, Text> createRecordReader(
InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException {
// TODO Auto-generated method stub
XMLRecordReader r = new XMLRecordReader();

return r;
}


}

Отговори:

1 за отговор № 1

Мисля, че измислих начина, по който да го направя.

Бях объркан с API „s и между org.apache.hadoop.mapred.RecordReader (интерфейс) и org.apache.hadoop.mapreduce.RecordReader (клас). А също и с InputFormat, който да използвам.

Изглежда FileInputFormat и org.apache.hadoop.mapred.RecordReader върви ръка за ръка. Моля, намерете пълния код за анализ на XML в JavaRDD.

В този пример търся да анализирам и извличам XML етикет ....

Основен клас

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;


import scala.Tuple2;

public class CustomizedXMLReader implements Serializable{

private static final long serialVersionUID = 1L;


public static void main(String[] args) {
CustomizedXMLReader reader = new CustomizedXMLReader();
reader.readUsingFileInputFormat(args);
}



/**
* Doing all reading using org.apache.hadoop.mapred.RecordReader interface. This is doing good.
* @param args
*/
public void readUsingFileInputFormat(String[] args){
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("CustomizedXMLReader")
.set("spark.executor.memory", "512m").set("record.delimiter.regex", "</name>");

JobConf jobConf = new JobConf(new Configuration(), CustomizedXMLReader.class);
jobConf.setInputFormat(XMLRecordFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, new Path(args[0]));


JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDD<Text, Text> lines = ctx.hadoopRDD(jobConf,XMLRecordFileInputFormat.class, Text.class, Text.class);



Function<Tuple2<Text, Text>, XMLRecord> keyData =
new Function<Tuple2<Text, Text>, XMLRecord>() {
private static final long serialVersionUID = 1L;

@Override
public XMLRecord call(Tuple2<Text, Text> arg0)
throws Exception {
System.out.println(arg0.toString());
XMLRecord record = new XMLRecord();
record.setPos(arg0._1.toString());
record.setXml(arg0._2.toString());
return record;
}
};

JavaRDD<XMLRecord> words = lines.map(keyData);

List<XMLRecord>  tupleList = words.collect();

Iterator<XMLRecord> itr = tupleList.iterator();

while(itr.hasNext()){
XMLRecord t = itr.next();
System.out.println(t.getXml());
System.out.println(t.getPos());


}
}
}

RecordReader

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.streaming.StreamXmlRecordReader;

public class XMLInterfaceRecordReader implements org.apache.hadoop.mapred.RecordReader<Text,Text>{

private StreamXmlRecordReader in;
private String delimiterRegex;
private long start;
private long pos;
private long end;
private static Long keyInt = 0L;


public XMLInterfaceRecordReader(InputSplit split, JobConf arg1, Reporter rep) throws IOException {
super();
FileSplit fSplit = (FileSplit) split;

this.delimiterRegex = "</name>";

start = fSplit.getStart();
end = start + fSplit.getLength();
arg1.set("stream.recordreader.begin", "<name>");
arg1.set("stream.recordreader.end", delimiterRegex);


final Path file = fSplit.getPath();
FileSystem fs = file.getFileSystem(arg1);
FSDataInputStream fileIn = fs.open(fSplit.getPath());

boolean skipFirstLine = false;
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}

in = new StreamXmlRecordReader(fileIn, fSplit,rep, arg1,fs);

this.pos = start;
}


@Override
public void close() throws IOException {
if (in != null) {
in.close();
}
}

@Override
public Text createKey() {
return new Text();
}

@Override
public Text createValue() {
return new Text();
}

@Override
public long getPos() throws IOException {
return pos;
}

@Override
public float getProgress() throws IOException {
if (start == end) {
return  (long) 0.0f;
}
else {
return  (long) Math.min(1.0f, (pos - start) / (float)(end - start));
}
}

@Override
public boolean next(Text Key, Text Value) throws IOException {
in.seekNextRecordBoundary();
Text key = new Text();
Text val = new Text();
in.next(key, val);

if(key.toString() != null && key.toString().length() > 0){
System.out.println(key.toString());
System.out.println(val.toString());
start += in.getPos();
Key.set(new LongWritable(++keyInt).toString());
Value.set(key.toString());
return true;
}else
return false;
}

}

Формат за въвеждане на файл

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class XMLRecordFileInputFormat extends FileInputFormat<Text, Text>{

XMLInterfaceRecordReader reader = null;

public XMLRecordFileInputFormat(){

}

@Override
public RecordReader<Text, Text> getRecordReader(InputSplit arg0,
JobConf arg1, Reporter arg2) throws IOException {
if(reader != null)
return reader;
else
return new XMLInterfaceRecordReader(arg0,arg1,arg2);
}

}