http://hadoop.apache.org/common/docs/r0.17.2/streaming.html#How+do+I+parse+XML+documents+using+streaming%3F
We will use Mahout XmlInputFormat class to process the xml files. Now for processing xml files, we need three things
1- Drive Class to run the program
2- Mapper Class
3- XmlInputFormat class
I am not using reducers to make the example simple. Now Lets do some programming to work out these things.
Driver Class:
Here is the code for driver class. which is explained below.
import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * @author root */ public class ParserDriver { /** * @param args the command line arguments */ public static void main(String[] args) { try { runJob(args[0], args[1]); } catch (IOException ex) { Logger.getLogger(ParserDriver.class.getName()).log(Level.SEVERE, null, ex); } } public static void runJob(String input, String output ) throws IOException { Configuration conf = new Configuration(); conf.set("xmlinput.start", " conf.set("xmlinput.end", " conf .set( "io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); Job job = new Job(conf, "jobName"); FileInputFormat.setInputPaths(job, input); job.setJarByClass(ParserDriver.class); job.setMapperClass(MyParserMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(XmlInputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); Path outPath = new Path(output); FileOutputFormat.setOutputPath(job, outPath); FileSystem dfs = FileSystem.get(outPath.toUri(), conf); if (dfs.exists(outPath)) { dfs.delete(outPath, true); } try { job.waitForCompletion(true); } catch (InterruptedException ex) { Logger.getLogger(ParserDriver.class.getName()).log(Level.SEVERE, null, ex); } catch (ClassNotFoundException ex) { Logger.getLogger(ParserDriver.class.getName()).log(Level.SEVERE, null, ex); } } } |
conf.set("xmlinput.start", "<startingTag>");
conf.set("xmlinput.end", "</endingTag>");
Then you need to set input path, output path which i am taking as command line arguments, need to set mapper class.
Next we will define our mapper.
Mapper:
To parse the xml files, you need some parser library, There are many ways to parse xml file in java like using SAX, DOM parser. I have used jdom library to parse the xml file. Here is the code for mapper class which is explained below.
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.input.SAXBuilder;
/**
*
* @author root
*/
public class MyParserMapper1 extends
Mapper<LongWritable, Text, NullWritable, Text> {
@Override
public void map(LongWritable key, Text value1,Context context)
throws IOException, InterruptedException {
String xmlString = value1.toString();
SAXBuilder builder = new SAXBuilder();
Reader in = new StringReader(xmlString);
String value="";
try {
Document doc = builder.build(in);
Element root = doc.getRootElement();
String tag1 =root.getChild("tag").getChild("tag1").getTextTrim() ;
String tag2 =root.getChild("tag").getChild("tag1").getChild("tag2").getTextTrim();
value= tag1+ ","+tag2;
context.write(NullWritable.get(), new Text(value));
} catch (JDOMException ex) {
Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) {
Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
The code is very simple, you are getting the record in value1 and then parsing the data and then sending the data using
context.write(NullWritable.get(), new Text(value));
I did not require key so i use NullWritable and value contains comma delimited record after parsing.
Next, i am also providing the Mahout XMLInputFormat class code which is compatible with new Hadoop API.
Mahout XMLinputFormat (Compatible with New Hadoop API):
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
/**
* Reads records that are delimited by a specifc begin/end tag.
*/
public class XmlInputFormat extends TextInputFormat {
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
@Override
public RecordReader<LongWritable,Text> createRecordReader(InputSplit is, TaskAttemptContext tac) {
return new XmlRecordReader();
}
public static class XmlRecordReader extends RecordReader<LongWritable,Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit is, TaskAttemptContext tac) throws IOException, InterruptedException {
FileSplit fileSplit= (FileSplit) is;
startTag = tac.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
endTag = tac.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(tac.getConfiguration());
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
value.set(buffer.getData(), 0, buffer.getLength());
key.set(fsin.getPos());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (fsin.getPos() - start) / (float) (end - start);
}
@Override
public void close() throws IOException {
fsin.close();
}
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1) return false;
// save to buffer:
if (withinBlock) buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length) return true;
} else i = 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;
}
}
}
}
To run this code, include the necessary jar files (jdom.jar,hadoop-core.jar) and you also need to make a single jar file. You can find the instructions to make a single jar file on the following link
http://java.sun.com/developer/technicalArticles/java_warehouse/single_jar/
Next, give the following command on the terminal to run the job.
hadoop jar MyParser.jar /user/root/Data/file.xml outputhere
Conclusion:
In this way, we can process large amount of xml files using hadoop and Mahout XML input format.