最近在准备抽取数据的工作。有一个id集合200多M,要从另一个500GB的数据集合中抽取出所有id集合中包含的数据集。id数据集合中每一个行就是一个id的字符串(Reduce side join要在每行的行尾加“,”号,而Map side join不必,如果加了也可以处理掉),类似,500GB的数据集合中每一行是某一id对应的全记录,用“,”号分隔。
为什么不在hive或者pig下面搞这个操作呢?主要是因为Hive配置了Kerberos认证之后,还有一个问题没有解决,包含metastore的主机无法从namenode主机获取票据,所以就暂时放一放吧。用MapReduce来搞吧。在Hive下比较方便,但在MapReduce中实现就比较麻烦。
1、概述
在传统数据库(如:MySql)中,JOIN操作常常是非常耗时的。而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧。下面分别介绍MapReduce中的几种常见join,比如有最常见的 map side join,reduce side join,semi join(这些在Hive中都有) 等。Map side join在处理多个小表关联大表时非常有用,而 reduce join 在处理多表关联时是比较麻烦的,会造成大量的网络IO,效率低下,但在有些时候也是非常有用的。
2. 常见的join方法介绍
2.1 map side join
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map须 task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
package com.unionpayadvisors;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.hsqldb.lib.StringUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class AccountTableJoin extends Configured implements Tool {
private static WriteLog log = WriteLog.getInstance();
private static String parseRaw(String str) {
if (StringUtil.isEmpty(str)) {
return str;
}
str = str.trim();
if (str.startsWith("\"")) {
str = str.substring(1);
}
if (str.endsWith("\"")) {
str = str.substring(0, str.length() - 1);
}
return str.trim();
}
public static class MapClass extends
Mapper {
// 用于缓存 user_account 中的数据
private Set accountSet = new HashSet();
private Text accKey = new Text();
private NullWritable nullValue = NullWritable.get();
private String[] kv;
private Jedis jedis = new JedisPool("192.168.2.101", 6379).getResource();
// 此方法会在map方法执行之前执行
// @Override
protected void setup(Context context) throws
IOException,InterruptedException {
BufferedReader in = null;
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
String accountLine = null;
for (Path path : paths) {
if (path.toString().contains("account")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (accountLine = in.readLine())) {
log.logger("AccountTableJoin",
"accountSet="+parseRaw(accountLine.split(",", -1)[0]));
accountSet.add(parseRaw(accountLine.split(",", -1)[0]));
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
kv = value.toString().split(",");
//map join: 在map阶段过滤掉不需要的数据
if(kv.length==4&&accountSet.contains(parseRaw(kv[0]))){
accKey.set(value);
context.write(accKey, nullValue);
}
}
}
public int run(String[] args) throws Exception {
log.logger("XXXXXXXXX", "begin in");
Job job = new Job(getConf(), "AccountTableJoin");
job.setJobName("AccountTableJoin");
job.setJarByClass(AccountTableJoin.class);
job.setMapperClass(MapClass.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),
args).getRemainingArgs();
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new AccountTableJoin(),
args);
System.exit(res);
}
/*
* hadoop jar AccountTableJoin.jar AccountTableJoin
* /user/he/sample_account.del /user/he/SAMPLE_SUM_2012070809101112.del
* /user/he/ACCOUNT_JOIN_RESULT
*/
}
WriteLog代码:
package com.unionpayadvisors;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintWriter;
import java.util.Calendar;
public class WriteLog {
/**写日志
* 写logString字符串到./log目录下的文件中
* @param logString 日志字符串
* @author tower
*/
private static WriteLog instance = null;
private WriteLog(){};
public static WriteLog getInstance() {
if( instance == null ) {
instance = new WriteLog();
}
return instance;
}
public void logger(String fileNameHead,String logString) {
try {
String logFilePathName=null;
Calendar cd = Calendar.getInstance();//日志文件时间
int year=cd.get(Calendar.YEAR);
String month=addZero(cd.get(Calendar.MONTH)+1);
String day=addZero(cd.get(Calendar.DAY_OF_MONTH));
String hour=addZero(cd.get(Calendar.HOUR_OF_DAY));
String min=addZero(cd.get(Calendar.MINUTE));
String sec=addZero(cd.get(Calendar.SECOND));
File fileParentDir=new File("./log");//判断log目录是否存在
if (!fileParentDir.exists()) {
fileParentDir.mkdir();
}
if (fileNameHead==null||fileNameHead.equals("")) {
logFilePathName="./log/"+year+month+day+hour+".log";//日志文件名
}else {
logFilePathName="./log/"+fileNameHead+year+month+day+hour+".log";//日志文件名
}
PrintWriter printWriter=new PrintWriter(new FileOutputStream(logFilePathName, true));//紧接文件尾写入日志字符串
String time="["+year+month+day+"-"+hour+":"+min+":"+sec+"] ";
printWriter.println(time+logString);
printWriter.flush();
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.getMessage();
}
}
/**整数i小于10则前面补0
* @param i
* @return
* @author tower
*/
public static String addZero(int i) {
if (i
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com