easy-hadoop - EasyHadoop, Rapid Hadoop Programming - Google Project Hosting
EasyHadoop is a collection of ruby scripts to quickly achieve basic things (such as extracting certain columns from the dataset, calculating frequencies, normalizing strings, etc). Each script can be customized using several command line options available. Below is an example of how these scripts can be used in a Hadoop streaming environment. Scenario I: Frequency Count Dataset: Consider you have tab delimited search engine logs with the following fields:
Request ID
User ID
Search Term
Time
Browser
IP Address
Country
89aASDF89SAD
234JKLKLJ
walmart
09:00
FF
201.342.345.23
USA
sdf234ljlk23
234JKLKLJ
safeway
13:34
FF
201.342.345.23
USA
Task: Identify all the different search terms issued by a user and there frequency. SubTasks : Lets list all the process we need to do to achieve the above task
Extract Relevant Columns: User ID and Search Term:
Apply various normalization technique to Search Terms so that we do not differentiate restaurants from restaurant (note missing s)
Count Frequency
EasyHadoop: Now lets see how to achieve each of the above subtasks using EasyHadoop Extracting Columns: Use ColExtractor.rb to extract relevant User ID (index number 1) and Search Term (index number 2)
ruby ColExtractor.rb --key 1 --value 2 Normalization: Use Normalizer.rb to normalize a particular field. In this case, we only want to normalize the search term. Lets assume, we want to remove any non alphanumeric characters and apply porter stemming
ruby Normalizer.rb --field 1:porter,alphanum
Note, above we use column index number 1 instead of 2 because the output of ColExtractor will only have two fields (UserID TAB SearchTerm). This output will be used as an input to the Normalizer.rb. Thus the column index number of SearchTerm is now 1 (and not 2) Count: Before, we can do a count, we will need to join UserID and SearchTerm so that they together make a single key. Again Use Column Extractor to achieve this. Assume, we join the two entities using the Pipe (|) character.
ruby ColExtractor.rb --key 0,1 --key_delimiter='|'
Now, we just need to count number of times a particular key appears in our dataset. We can achieve this using Stat.rb as shown below:
ruby Stat.rb --operations:count First Attempt: Hadoop streaming using EasyHadoop Now, lets convert above steps into a hadoop streaming job. Save the code below in an executable file (.sh)
EASY_HADOOP="../easy-hadoop"
YP_DEFAULTS="../defaults"
BASE="/user/example"
INPUT="$BASE/raw"
JOB1="$BASE/tmp1"
JOB2="$BASE/output"
hadoop dfs -rmr $JOB1
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.1-streaming.jar -input $INPUT -output $JOB1 -mapper "ruby easyhadoop/Mapper/ColExtractor.rb --key 0,1 --key_delimiter |" -reducer "easyhadoop/Reducer/Stat.rb --operations:count" -jobconf mapred.map.tasks=50 -jobconf mapred.reduce.tasks=50 -jobconf mapred.output.compress=true -jobconf stream.recordreader.compression=gzip -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec -jobconf mapred.task.timeout=1800000 -cacheArchive hdfs://hadoop.cluster:9000/user/easyhadoop/lib/easyhadoop.zip#easyhadoop
#Wait for above execution to be done
RT=$?
[ $RT -ne 0 ] && echo "First Job Failed:$RT" && exit $RT
hadoop dfs -rmr $JOB2
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.1-streaming.jar