a b c d
a b c d
aa bb cc dd
ee ff gg hh
foo foo quux labs foo bar quux
第一个脚本,叫做mapper.py
#!/bin/env python
import sys
def mapword(w):
print "%s\t%d"%(w,1)
for line in sys.stdin:
line = line.strip()
words = line.split()
m = map(mapword,words)
第二个脚本,叫做sort.py
import sys
def mapword(w):
print "%s\t%d"%(w,1)
m = []
for line in sys.stdin:
line = line.strip()
word,count = line.split('\t')
m.append((word,count))
m = sorted(m)
for i in m:
print "%s\t%s"%i
注意,实际上在分布式环境下,这个是sort by而不是order by。
也就是说在每个分区下是有序的,而并不是全局有序。
如果要做到全局有序,必须在一个reduce里面进行。
第三脚本,是reducer.py
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
else:
if current_word:
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
if current_word == word:
print '%s\t%s' % (current_word, current_count)