设为首页 收藏本站
查看: 1061|回复: 0

[经验分享] mapreduce读取mysql

[复制链接]

尚未签到

发表于 2016-9-11 07:20:51 | 显示全部楼层 |阅读模式
package com.sun.mysql;
import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
import java.sql.PreparedStatement;  
import java.sql.ResultSet;  
import java.sql.SQLException;  
import java.util.Iterator;  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;  
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;  
import org.apache.hadoop.mapreduce.lib.db.DBWritable;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
/**
* 从mysql中读数据(结果存放在HDFS中)然后经mapreduce处理
* @author asheng
*/
public class ReadDataFromMysql {  
/**
* 重写DBWritable
* @author asheng
* TblsRecord需要从mysql读取数据
*/
public static class TblsRecord implements Writable, DBWritable
{  
String tbl_name;  
String tbl_type;  
public TblsRecord()
{  

}  
@Override  
public void write(PreparedStatement statement) throws SQLException
{
statement.setString(1, this.tbl_name);  
statement.setString(2, this.tbl_type);  
}  
@Override  
public void readFields(ResultSet resultSet) throws SQLException
{  
this.tbl_name = resultSet.getString(1);  
this.tbl_type = resultSet.getString(2);  
}  
@Override  
public void write(DataOutput out) throws IOException
{  
Text.writeString(out, this.tbl_name);  
Text.writeString(out, this.tbl_type);  
}  
@Override  
public void readFields(DataInput in) throws IOException
{  
this.tbl_name = Text.readString(in);  
this.tbl_type = Text.readString(in);  
}  
public String toString()
{  
return new String(this.tbl_name + " " + this.tbl_type);  
}  
}  
/**
* Mapper
* @author asheng
* 下面的类中的Mapper一定是包org.apache.hadoop.mapreduce.Mapper;下的
*/
public static class ConnMysqlMapper extends Mapper<LongWritable,TblsRecord,Text,Text>
//TblsRecord是自定义的类型,也就是上面重写的DBWritable类
{  
public void map(LongWritable key,TblsRecord values,Context context)throws IOException,
InterruptedException
{  
//只是将从数据库读取进来数据转换成Text类型然后输出给reduce
context.write(new Text(values.tbl_name), new Text(values.tbl_type));  
}  
}  
/**
* Reducer
* @author asheng
* 下面的类中的Reducer一定是包org.apache.hadoop.mapreduce.Reducer;下的
*/
public static class ConnMysqlReducer extends Reducer<Text,Text,Text,Text> {  
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,
InterruptedException
{  
//循环遍历并写入相应的指定文件中
for(Iterator<Text> itr = values.iterator();itr.hasNext();)
{  
context.write(key, itr.next());  
}  
}  
}  
public static void main(String[] args) throws Exception
{  
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/mapreduce_test",                                                                                                                                                "root", "root");   
Job job = new Job(conf,"test mysql connection");  
job.setJarByClass(ReadDataFromMysql.class);  
job.setMapperClass(ConnMysqlMapper.class);  
job.setReducerClass(ConnMysqlReducer.class);  
job.setOutputKeyClass(Text.class);  
job.setOutputValueClass(Text.class);  
job.setInputFormatClass(DBInputFormat.class);  
FileOutputFormat.setOutputPath(job, new Path("hdfs://127.0.0.1:9000/user/lxw/output/"));  
//对应数据库中的列名  
String[] fields = { "TBL_NAME", "TBL_TYPE" };   
//setInput方法六个参数分别的含义:  
//1.Job;2.Class<? extends DBWritable>按照什么类型读取的  
//3.表名;4.where条件  
//5.order by语句;6.列名所组成的数组
DBInputFormat.setInput(job, TblsRecord.class,"lxw_tabls", "TBL_NAME like 'lxy%'", "TBL_NAME", fields);   
System.exit(job.waitForCompletion(true) ? 0 : 1);
//本程序表示从mysql数据库mapreduce_test的表lxw_tabls中查询处列TAB_NAME为lxy开头的数据并放入hdfs中
//执行完后的查看bin/hadoop fs -cat /user/lxw/output/part-r-00000
/*结果
lxyae
lxyaccg
lxybf
*/
}  
}
/*
mysql> select * from lxw_tabls;
+----------+----------+
| TBL_NAME | TBL_TYPE |
+----------+----------+
| zhao     | a        |
| qian     | b        |
| sun      | c        |
| li       | d        |
| lxya     | e        |
| lxyb     | f        |
| lxyacc   | g        |
+----------+----------+
7 rows in set (0.00 sec)
*/
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-270493-1-1.html 上篇帖子: MySQL update嵌套 下篇帖子: mysql修改日期
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表