Wednesday, August 25, 2010

Xml Processing in hadoop

In this post, I will describe how to process xml files using hadoop. XML files can be process using Hadoop streaming but we will process an other way which is more efficient than hadoop streaming. The details of streaming can be found on the following link
http://hadoop.apache.org/common/docs/r0.17.2/streaming.html#How+do+I+parse+XML+documents+using+streaming%3F

We will use Mahout XmlInputFormat class to process the xml files. Now for processing xml files, we need three things

1- Drive Class to run the program
2- Mapper Class
3- XmlInputFormat class

I am not using reducers to make the example simple. Now Lets do some programming to work out these things.

Driver Class:

Here is the code for driver class. which is explained below.




import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
*
* @author root
*/
public class ParserDriver {

/**
* @param args the command line arguments
*/
public static void main(String[] args) {
try {
runJob(args[0], args[1]);

} catch (IOException ex) {
Logger.getLogger(ParserDriver.class.getName()).log(Level.SEVERE, null, ex);
}

}


public static void runJob(String input,
String output ) throws IOException {

Configuration conf = new Configuration();

conf.set("xmlinput.start", "");
conf.set("xmlinput.end", "
");
conf
.set(
"io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");

Job job = new Job(conf, "jobName");


FileInputFormat.setInputPaths(job, input);
job.setJarByClass(ParserDriver.class);
job.setMapperClass(MyParserMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(XmlInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}


try {

job.waitForCompletion(true);

} catch (InterruptedException ex) {
Logger.getLogger(ParserDriver.class.getName()).log(Level.SEVERE, null, ex);
} catch (ClassNotFoundException ex) {
Logger.getLogger(ParserDriver.class.getName()).log(Level.SEVERE, null, ex);
}

}

}
The code is mostly self explanatory. You need to define the starting and ending tag of to split a record from the xml file and it can be defined in the following lines

conf.set("xmlinput.start", "<startingTag>");
conf.set("xmlinput.end", "</endingTag>");

Then you need to set input path, output path which i am taking as command line arguments, need to set mapper class.

Next we will define our mapper.

Mapper:

To parse the xml files, you need some parser library, There are many ways to parse xml file in java like using SAX, DOM parser. I have used jdom library to parse the xml file. Here is the code for mapper class which is explained below.


import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.input.SAXBuilder;

/**
 *
 * @author root
 */
public class MyParserMapper1   extends
    Mapper<LongWritable, Text, NullWritable, Text> {



    @Override
    public void map(LongWritable key, Text value1,Context context)

throws IOException, InterruptedException {

                String xmlString = value1.toString();
             
             SAXBuilder builder = new SAXBuilder();
            Reader in = new StringReader(xmlString);
    String value="";
        try {
           
            Document doc = builder.build(in);
            Element root = doc.getRootElement();
           
            String tag1 =root.getChild("tag").getChild("tag1").getTextTrim() ;
            
            String tag2 =root.getChild("tag").getChild("tag1").getChild("tag2").getTextTrim();
             value= tag1+ ","+tag2;
             context.write(NullWritable.get(), new Text(value));
        } catch (JDOMException ex) {
            Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null, ex);
        } catch (IOException ex) {
            Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null, ex);
        }
   
    }

}

The code is very simple, you are getting the record in value1 and then parsing the data and then sending the data using
context.write(NullWritable.get(), new Text(value));

I did not require key so i use NullWritable and value contains comma delimited record after parsing.

Next, i am also providing the Mahout XMLInputFormat class code which is compatible with new Hadoop API.

Mahout XMLinputFormat (Compatible with New Hadoop API):


import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;


/**
* Reads records that are delimited by a specifc begin/end tag.
*/
public class XmlInputFormat extends  TextInputFormat {

  public static final String START_TAG_KEY = "xmlinput.start";
  public static final String END_TAG_KEY = "xmlinput.end";

    @Override
    public RecordReader<LongWritable,Text> createRecordReader(InputSplit is, TaskAttemptContext tac)  {
       
       
   
        return new XmlRecordReader();

   

       
    }
  public static class XmlRecordReader extends RecordReader<LongWritable,Text> {
    private  byte[] startTag;
    private  byte[] endTag;
    private  long start;
    private  long end;
    private  FSDataInputStream fsin;
    private  DataOutputBuffer buffer = new DataOutputBuffer();
    private LongWritable key = new LongWritable();
    private Text value = new Text();

   

        @Override
        public void initialize(InputSplit is, TaskAttemptContext tac) throws IOException, InterruptedException {
            FileSplit fileSplit= (FileSplit) is;
            startTag = tac.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
            endTag = tac.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");

           
                start = fileSplit.getStart();
                end = start + fileSplit.getLength();
                Path file = fileSplit.getPath();

                FileSystem fs = file.getFileSystem(tac.getConfiguration());
                fsin = fs.open(fileSplit.getPath());
                fsin.seek(start);

             


           
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
             if (fsin.getPos() < end) {
        if (readUntilMatch(startTag, false)) {
          try {
            buffer.write(startTag);
            if (readUntilMatch(endTag, true)) {
           
            value.set(buffer.getData(), 0, buffer.getLength());
            key.set(fsin.getPos());
                   return true;
            }
          } finally {
            buffer.reset();
          }
        }
      }
      return false;
        }

        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
                   return value;
           
           

        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return (fsin.getPos() - start) / (float) (end - start);
        }

        @Override
        public void close() throws IOException {
            fsin.close();
        }
        private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
      int i = 0;
      while (true) {
        int b = fsin.read();
        // end of file:
        if (b == -1) return false;
        // save to buffer:
        if (withinBlock) buffer.write(b);

        // check if we're matching:
        if (b == match[i]) {
          i++;
          if (i >= match.length) return true;
        } else i = 0;
        // see if we've passed the stop point:
        if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;
      }
    }

  }
 
   
}

To run this code, include the necessary jar files (jdom.jar,hadoop-core.jar) and you also need to make a single jar file. You can find the instructions to make a single jar file on the following link

http://java.sun.com/developer/technicalArticles/java_warehouse/single_jar/

Next, give the following command on the terminal to run the job.


hadoop jar MyParser.jar /user/root/Data/file.xml outputhere


Conclusion:

In this way, we can process large amount of xml files using hadoop and Mahout XML input format.

7 comments:

  1. This is exactly what I needed. I have just saved lot of time translating mahout example to the new api.
    Thanks!!

    ReplyDelete
  2. Hi Shuja,

    I have a doubt with xml processing. HDFS splits files in chunks of 64mbs and you program is going to lose records divided between end of a chunk and start of next one.

    ReplyDelete
  3. Can you help me resolve this

    11/12/30 05:34:45 INFO input.FileInputFormat: Total input paths to process : 1
    11/12/30 05:34:45 INFO mapred.JobClient: Running job: job_201112300438_0010
    11/12/30 05:34:46 INFO mapred.JobClient: map 0% reduce 0%
    11/12/30 05:34:56 INFO mapred.JobClient: Task Id : attempt_201112300438_0010_m_000000_0, Status : FAILED
    java.lang.NullPointerException
    at xmlhadoop.ParserDriver$MyParserMapper1.map(ParserDriver.java:65)
    at xmlhadoop.ParserDriver$MyParserMapper1.map(ParserDriver.java:46)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

    ReplyDelete
  4. for nested document tag --> improved code

    private boolean readUntilMatch(byte[] match, boolean withinBlock)
    throws IOException {
    int i = 0;
    int j = 0;
    int nestedTags = 0;
    while (true) {
    int b = fsin.read();
    // end of file:
    if (b == -1)
    return false;
    // save to buffer:
    if (withinBlock)
    buffer.write(b);

    // check if we're matching for start tag again(nested tag) if you come here for search of end tag
    if (withinBlock && b == startTag[j]) {
    j++;
    if (j >= startTag.length) {
    nestedTags++;
    j = 0;
    }
    }else {
    j = 0;
    }


    // check if we're matching:
    if (b == match[i]) {
    i++;
    if (i >= match.length) {
    if(nestedTags==0) // Break the loop if there were no nested tags
    return true;
    else {
    --nestedTags; // Else decrement the count
    i = 0; // Reset the index
    }
    }
    } else {
    i = 0;
    }
    // see if we've passed the stop point:
    if (!withinBlock && i == 0 && fsin.getPos() >= end)
    return false;
    }
    }

    ReplyDelete
  5. This has been an informative blog for me. My expert keyword research teammates needs this kind of templates for their next project. Thanks a lot, have a nice day!

    ReplyDelete
  6. I think I have really come on the right place for getting the perfect info. Hadoop

    ReplyDelete