设为首页 收藏本站
查看: 1038|回复: 0

[经验分享] python+hadoop=?

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-4-24 06:01:29 | 显示全部楼层 |阅读模式
  http://jason204.iteye.com/blog/1677664
  http://blog.cloudera.com/blog/2013/01/a-guide-to-python-frameworks-for-hadoop/(推荐)
  
  I recently joined Cloudera after working in computational biology/genomics for close to a decade. My analytical work is primarily performed in Python, along with its fantastic scientific stack. It was quite jarring to find out that theApache Hadoop ecosystem is primarily written in/for Java. So my first order of business was to investigate some of the options that exist for working with Hadoop from Python.
  In this post, I will provide an unscientific, ad hoc review of my experiences with some of the Python frameworks that exist for working with Hadoop, including:
   
         
  • Hadoop Streaming      
  • mrjob      
  • dumbo      
  • hadoopy      
  • pydoop      
  • and others   
  Ultimately, in my analysis, Hadoop Streaming is the fastest and most transparent option, and the best one for text processing. mrjob is best for rapidly working on Amazon EMR, but incurs a significant performance penalty. dumbo is convenient for more complex jobs (objects as keys; multistep MapReduce) without incurring as much overhead as mrjob, but it’s still slower than Streaming.
  Read on for implementation details, performance comparisons, and feature comparisons.
    Toy Problem Definition
  To test out the different frameworks, we will not be doing “word count”. Instead, we will be transforming the Google Books Ngram data. An n-gram is a synonym for a tuple of n words. The n-gram data set provides counts for every single 1-, 2-, 3-, 4-, and 5-gram observed in the Google Books corpus grouped by year. Each row in the n-gram data set is composed of 3 fields: the n-gram, the year, and the number of observations. (You can explore the data interactivelyhere.)
  We would like to aggregate the data to count the number of times any pair of words are observed near each other, grouped by year. This would allow us to determine if any pair of words are statistically near each other more often than we would expect by chance. Two words are “near” if they are observed within 4 words of each other. Or equivalently, two words are near each other if they appear together in any 2-, 3-, 4-, or 5-gram. So a row in the resulting data set would be comprised of a 2-gram, a year, and a count.
  There is one subtlety that must be addressed. The n-gram data set for each value of n is computed across the whole Google Books corpus. In principle, given the 5-gram data set, I could compute the 4-, 3-, and 2-gram data sets simply by aggregating over the correct n-grams. For example, if the 5-gram data set contains
(the, cat, in, the, hat)       1999     20
(the, cat, is, on, youtube)    1999     13
(how, are, you, doing, today)  1986   5000
  
  then we could aggregate this into 2-grams which would result in records like

(the, cat)  1999    33      // i.e., 20 + 13
  
  However, in practice, Google only includes an n-gram if it is observed more than 40 times across the entire corpus. So while a particular 5-gram may be too rare to meet the 40-occurrence threshold, the 2-grams it is composed of may be common enough to break the threshold in the Google-supplied 2-gram data. For this reason, we use the 2-gram data for words that are next to each other, the 3-gram data for pairs of words that are separated by one word, the 4-gram data for pairs of words that are separated by 2 words, etc. In other words, given the 2-gram data, the only additional information the 3-gram data provide are the outermost words of the 3-gram. In addition to being more sensitive to potentially rare n-grams, using only the outermost words of the n-grams helps ensure we avoid double counting. In total, we will be running our computation on the combination of 2-, 3-, 4-, and 5-gram data sets.
  The MapReduce pseudocode to implement this solution would look like so:

def map(record):
(ngram, year, count) = unpack(record)
// ensure word1 has the lexicographically first word:
(word1, word2) = sorted(ngram[first], ngram[last])
key = (word1, word2, year)
emit(key, count)
def reduce(key, values):
emit(key, sum(values))
  

Hardware
  These MapReduce jobs are executed on a ~20 GB random subset of the data. The full data set is split across 1500 files; we select a random subset of the files using this script. The filenames remain intact, which is important because the filename identifies the value of n in the n-grams for that chunk of data.
  The Hadoop cluster comprises five virtual nodes running CentOS 6.2 x64, each with 4 CPUs, 10 GB RAM, 100 GB disk, running CDH4. The cluster can execute 20 maps at a time, and each job is set to run with 10 reducers.
  The software versions I worked with on the cluster were as follows:


  • Hadoop: 2.0.0-cdh4.1.2
  • Python: 2.6.6
  • mrjob: 0.4-dev
  • dumbo: 0.21.36
  • hadoopy: 0.6.0
  • pydoop: 0.7 (PyPI) and the latest version on git repository
  • Java: 1.6

Implementations
  Most of the Python frameworks wrap Hadoop Streaming, while others wrap Hadoop Pipes or implement their own alternatives. Below, I will discuss my experience with a number of tools for using Python to write Hadoop jobs, along with a final comparison of performance and features. One of the features I am interested in is the ease of getting up and running, so I did not attempt to optimize the performance of the individual packages.
  As with every large data set, there are bad records. We check for a few kinds of errors in each record including missing fields and wrong n-gram size. For the latter case, we must know the name of the file that is being processed in order to determine the expected n-gram size.
  All the code is available in this GitHub repo.

Hadoop Streaming
  Hadoop Streaming is the canonical way of supplying any executable to Hadoop as a mapper or reducer, including standard Unix tools or Python scripts. The executable must read from stdin and write to stdout using agreed-upon semantics. One of the disadvantages of using Streaming directly is that while the inputs to the reducer are grouped by key, they are still iterated over line-by-line, and the boundaries between keys must be detected by the user.
  Here is the code for the mapper:

#! /usr/bin/env python
import os
import re
import sys
# determine value of n in the current block of ngrams by parsing the filename
input_file = os.environ['map_input_file']
expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0])
for line in sys.stdin:
data = line.split('\t')
# perform some error checking
if len(data) < 3:
continue
# unpack data
ngram = data[0].split()
year = data[1]
count = data[2]
# more error checking
if len(ngram) != expected_tokens:
continue
# build key and emit
pair = sorted([ngram[0], ngram[expected_tokens - 1]])
print >>sys.stdout, &quot;%s\t%s\t%s\t%s&quot; % (pair[0], pair[1], year, count)
  
  And here is the reducer:

#! /usr/bin/env python
import sys
total = 0
prev_key = False
for line in sys.stdin:
data = line.split('\t')
curr_key = '\t'.join(data[:3])
count = int(data[3])
# found a boundary; emit current sum
if prev_key and curr_key != prev_key:
print >>sys.stdout, &quot;%s\t%i&quot; % (prev_key, total)
prev_key = curr_key
total = count
# same key; accumulate sum
else:
prev_key = curr_key
total += count
# emit last key
if prev_key:
print >>sys.stdout, &quot;%s\t%i&quot; % (prev_key, total)
  
  Hadoop Streaming separates the key and value with a tab character by default. Because we also separate the fields of our key with tab characters, we must tell Hadoop that the first three fields are all part of the key by passing these options:

-jobconf stream.num.map.output.key.fields=3
-jobconf stream.num.reduce.output.key.fields=3
  
  The command to execute the Hadoop job is

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.2.jar \
-input /ngrams \
-output /output-streaming \
-mapper mapper.py \
-combiner reducer.py \
-reducer reducer.py \
-jobconf stream.num.map.output.key.fields=3 \
-jobconf stream.num.reduce.output.key.fields=3 \
-jobconf mapred.reduce.tasks=10 \
-file mapper.py \
-file reducer.py
  
  Note that the files mapper.py and reducer.py must be specified twice on the command line: the first time points Hadoop at the executables, while the second time tells Hadoop to distribute the executables around to all the nodes in the cluster.
  Hadoop Streaming is clean and very obvious/precise about what is happening under the hood. In contrast, the Python frameworks all perform their own serialization/deserialization that can consume additional resources in a non-transparent way. Also, if there is a functioning Hadoop distribution, then Streaming should just work, without having to configure another framework on top of it. Finally, it’s trivial to send Unix commands and/or Java classes as mappers/reducers.
  The disadvantage of Streaming is that everything must be done manually. The user must decide how to encode objects as keys/values (e.g., as JSON objects). Also, support for binary data is not trivial. And as mentioned above, the reducer must keep track of key boundaries manually, which can be prone to errors.

mrjob
  mrjob is an open-source Python framework that wraps Hadoop Streaming and is actively developed by Yelp. Since Yelp operates entirely inside Amazon Web Services, mrjob’s integration with EMR is incredibly smooth and easy (using the boto package).
  mrjob provides a pythonic API to work with Hadoop Streaming, and allows the user to work with any objects as keys and mappers. By default, these objects are serialized as JSON objects internally, but there is also support for pickled objects. There are no other binary I/O formats available out of the box, but there is a mechanism to implement a custom serializer.
  Significantly, mrjob appears to be very actively developed, and has great documentation.
  As with all the Python frameworks, the implementation looks like pseudocode:

#! /usr/bin/env python
import os
import re
from mrjob.job import MRJob
from mrjob.protocol import RawProtocol, ReprProtocol
class NgramNeighbors(MRJob):
# mrjob allows you to specify input/intermediate/output serialization
# default output protocol is JSON; here we set it to text
OUTPUT_PROTOCOL = RawProtocol
def mapper_init(self):
# determine value of n in the current block of ngrams by parsing filename
input_file = os.environ['map_input_file']
self.expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0])
def mapper(self, key, line):
data = line.split('\t')
# error checking
if len(data) < 3:
return
# unpack data
ngram = data[0].split()
year = data[1]
count = int(data[2])
# more error checking
if len(ngram) != self.expected_tokens:
return
# generate key
pair = sorted([ngram[0], ngram[self.expected_tokens - 1]])
k = pair + [year]
# note that the key is an object (a list in this case)
# that mrjob will serialize as JSON text
yield (k, count)
def combiner(self, key, counts):
# the combiner must be separate from the reducer because the input
# and output must both be JSON
yield (key, sum(counts))
def reducer(self, key, counts):
# the final output is encoded as text
yield &quot;%s\t%s\t%s&quot; % tuple(key), str(sum(counts))
if __name__ == '__main__':
# sets up a runner, based on command line options
NgramNeighbors.run()
  
  mrjob is only required to be installed on the client node where the job is submitted. Here are the commands to run it:

export HADOOP_HOME=&quot;/usr/lib/hadoop-0.20-mapreduce&quot;
./ngrams.py -r hadoop --hadoop-bin /usr/bin/hadoop --jobconf mapred.reduce.tasks=10 -o hdfs:///output-mrjob hdfs:///ngrams
  
  Writing MapReduce jobs is incredibly intuitive and simple. However, there is a significant cost incurred by the internal serialization scheme. A binary scheme would most likely need to be implemented by the user (e.g., to supporttypedbytes). There are also some built-in utilities for log file parsing. Finally, mrjob allows the user to write multi-step MapReduce workflows, where intermediate output from one MapReduce job is automatically used as input into another MapReduce job.
  (Note: The rest of the implementations are all highly similar, aside from package-specific implementation details. They can all be found here.)

dumbo
  dumbo is another Python framework that wraps Hadoop Streaming. It seems to enjoy relatively broad usage, but is not developed as actively as mrjob at this point. It is one of the earlier Python Hadoop APIs, and is very mature. However, its documentation is lacking, which makes it a bit harder to use.
  It performs serialization with typedbytes, which allows for more compact data transfer with Hadoop, and can natively read SequenceFiles or any other file type by specifying a Java InputFormat. In fact, dumbo enables the user to execute code from any Python egg or Java JAR file.
  In my experience, I had to manually install dumbo on each node of my cluster for it to work. It only worked if typedbytes and dumbo were built as Python eggs. Finally, it failed to run with a combiner, as it would terminate on MemoryErrors.
  The command to run the job with dumbo is

dumbo start ngrams.py \
-hadoop /usr \
-hadooplib /usr/lib/hadoop-0.20-mapreduce/contrib/streaming \
-numreducetasks 10 \
-input hdfs:///ngrams \
-output hdfs:///output-dumbo \
-outputformat text \
-inputformat text
  

hadoopy
  hadoopy is another Streaming wrapper that is compatible with dumbo. Similarly, it focuses on typedbytes serialization of data, and directly writes typedbytes to HDFS.
  It has a nice debugging feature, in which it can directly write messages to stdout/stderr without disrupting the Streaming process. It feels similar to dumbo, but the documentation is better. The documentation also mentions experimental Apache HBase integration.
  With hadoopy, there are two ways to launch jobs:


  • launch requires Python/hadoopy to be installed on each node in the cluster, but has very little overhead after that.
  • launch_frozen does not even require that Python is installed on the nodes, but it incurs a ~15 second penalty for PyInstaller to work. (It’s claimed that this can be somewhat mitigated by optimizations and caching tricks.)
  Jobs in hadoopy must be launched from within a Python program. There is no built-in command line utility.
  I launch hadoopy via the launch_frozen scheme using my own Python script:

python launch_hadoopy.py
  
  After running it with launch_frozen, I installed hadoopy on all nodes and used the launchmethod instead. The performance was not significantly different.

pydoop
  In contrast to the other frameworks, pydoop wraps Hadoop Pipes, which is a C++ API into Hadoop. The project claims that they can provide a richer interface with Hadoop and HDFS because of this, as well as better performance, but this is not clear to me. However, one advantage is the ability to implement a Python Partitioner, RecordReader, andRecordWriter. All input/output must be strings.
  Most importantly, I could not successfully build pydoop via pip or directly from source.

Others


  • happy is a framework for writing Hadoop jobs through Jython, but seems to be dead.
  • Disco is a full-blown non-Hadoop reimplementation of MapReduce. Its core is written in Erlang, with the primary API in Python. It is developed at Nokia, but is much less used than Hadoop.
  • octopy is a reimplementation of MapReduce purely in Python in a single source file. It is not intended for “serious” computation.
  • Mortar is another option for working with Python that was just recently launched. Through a web app, the user can submit Apache Pig or Python jobs to manipulate data sitting in Amazon S3.
  • There are several higher-level interfaces into the Hadoop ecosystem, such as Apache Hive and Pig. Pig provides the facility to write user-defined-functions with Python, but it appears to run them through Jython. Hive also has a Python wrapper called hipy.
  • (Added Jan. 7 2013) Luigi is a Python framework for managing multistep batch job pipelines/workflows. It is probably a bit similar to Apache Oozie but it has some built-in functionality for wrapping Hadoop Streaming jobs (though it appears to be a light wrapper). Luigi has a nice feature of extracting out a Python traceback if your Python code crashes a job, and also has nice command-line features. It has a great introductory README file but seems to lack comprehensive reference documentation. Luigi is actively developed and used at Spotify for running many jobs there.

Native Java
  Finally, I implemented the MR job using the new Hadoop Java API. After building it, I ran it like so:

hadoop jar /root/ngrams/native/target/NgramsComparison-0.0.1-SNAPSHOT.jar NgramsDriver hdfs:///ngrams hdfs:///output-native
  

A Note About Counters
  In my initial implementations of these MR jobs, I used counters to keep track of the number of bad records. In Streaming, this requires writing messages to stderr. It turns out this incurs a significant overhead: the Streaming job took 3.4x longer than the native Java job. The frameworks were similarly penalized.

Performance Comparison
  The MapReduce job was also implemented in Java as a baseline for performance. All values for the Python frameworks are ratios relative to the corresponding Java performance.
DSC0000.png

  Java is obviously the fastest, with Streaming taking 50% longer, and the Python frameworks taking substantially longer still. From a profile of the mrjob mapper, it appears a substantial amount of time is spent in serialization/deserialization. The binary formats in dumbo and hadoopy may ameliorate the problem. The dumbo implementation may have been faster if the combiner was allowed to run.

Feature Comparison
  Mostly gleaned from the respective packages’ documentation or code repositories.
DSC0001.png


Conclusions
  Streaming appears to be the fastest Python solution, without any magic under the hood. However, it requires care when implementing the reducer, and also when working with more complex objects.
  All the Python frameworks look like pseudocode, which is a huge plus.
  mrjob seems highly active, easy-to-use, and mature. It makes multistep MapReduce flows easy, and can easily work with complex objects. It also works seamlessly with EMR. But it appears to perform the slowest.
  The other Python frameworks appear to be somewhat less popular. Their main advantage appears to be built-in support for binary formats, but this is probably something that can be implemented by the user, if it matters.
  So for the time being:


  • Prefer Hadoop Streaming if possible. It’s easy enough, as long as care is taken with the reducer.
  • Prefer mrjob to rapidly get on Amazon EMR, at the cost of significant computational overhead.
  • Prefer dumbo for more complex jobs that may include complex keys and multistep MapReduce workflows; it’s slower than Streaming but faster than mrjob.
  If you have your own observations based on practice, or for that matter any errors to point out, please do so in comments.

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-60034-1-1.html 上篇帖子: python的md5和sha1加密 下篇帖子: python基础学习笔记(五)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表