设为首页 收藏本站
查看: 917|回复: 0

[经验分享] hadoop读写mysql数据库

[复制链接]

尚未签到

发表于 2018-10-29 13:51:35 | 显示全部楼层 |阅读模式
  hadoop技术推出一度曾遭到关系数据库研究者的挑衅和批评,认为MapReduce不具有关系数据库中的结构化数据存储和处理能力。为此,hadoop社区和研究人员做了多的努力,在hadoop0.19版支持
  MapReduce访问关系数据库,如:MySQL、Mongodb、PostgreSQL、Oracle 等几个数据库系统。Hadoop 访问关系数据库主要通过DBInputFormat类实现的,包的位置在 org.apache.hadoop.mapred.lib.db。
  本课程我们以 Mysql为例来学习 MapReduce读写数据。
  [读数据]
  DBInputFormat 在 Hadoop 应用程序中通过数据库供应商提供的 JDBC接口来与数据库进行交互,并且可以使用标准的 SQL 来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。
  第一、在使用 DBInputFormat 之前,必须将要使用的 JDBC 驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。
  第二、MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。
  提示:处理数据仓库的方法有:利用数据库的 Dump 工具将大量待分析的数据输出为文本,并上传到 HDFS 中进行理。
  下面我们来看看 DBInputFormat类的内部结构,DBInputFormat 类中包含以下三个内置类。

  1、protected>
  2、public static>  public void write(PreparedStatement statement) throwsSQLException;
  public void readFields(ResultSet result);

  3、protected static>  下面对怎样使用 DBInputFormat 读取数据库记录进行详细的介绍,具体步骤如下:
  步骤一、配置 JDBC 驱动、数据源和数据库访问的用户名和密码。代码如下。
  DBConfiguration.configureDB (Job job, StringdriverClass, String dbUrl, String userName, String passwd)
  MySQL 数据库的 JDBC 的驱动为“com.mysql.jdbc.Driver”,数据源为“jdbc:mysql://localhost/testDB”,其中testDB为访问的数据库。useName一般为“root”,passwd是你数据库的密码。
  步骤二、使用 setInput 方法操作 MySQL 中的表,setInput 方法的参数如下。

  DBInputFormat.setInput(Job job,>  这个方法的参数很容易看懂,inputClass实现DBWritable接口。string tableName表名, conditions表示查询的条件,orderby表示排序的条件,fieldNames是字段,这相当与把sql语句拆分的结果。当然也可以用sql语句进行重载,代码如下。

  setInput(Job job,>  步骤三、编写MapReduce函数,包括Mapper 类、Reducer 类、输入输出文件格式等,然后调用job.waitForCompletion(true)。
  我们通过示例程序来看看 MapReduce 是如何读数据的,假设 MySQL 数据库中有数据库 user,假设数据库中的字段有“uid”,“email”,“name"。
  第一步要实现DBwrite和Writable数据接口。代码如下:
  package com.dajiangtai.hadoop.advance;
  import java.io.DataInput;
  import java.io.DataOutput;
  import java.io.IOException;
  import java.sql.PreparedStatement;
  import java.sql.ResultSet;
  import java.sql.SQLException;
  import org.apache.hadoop.io.Writable;
  import org.apache.hadoop.mapred.lib.db.DBWritable;

  public>  int uid;
  String email;
  String name;
  /*
  *从数据库读取所需要的字段
  *
  */
  @Override
  public void readFields(ResultSet resultSet) throws SQLException {
  // TODO Auto-generated method stub
  this.uid = resultSet.getInt(1);
  this.email = resultSet.getString(2);
  this.name = resultSet.getString(3);
  }
  /*
  *向数据库写入数据
  *
  */
  @Override
  public void write(PreparedStatement statement) throws SQLException {
  // TODO Auto-generated method stub
  statement.setInt(1, this.uid);
  statement.setString(2, this.email);
  statement.setString(3, this.name);
  }
  /*
  *读取序列化数据
  *
  */
  @Override
  public void readFields(DataInput in) throws IOException {
  // TODO Auto-generated method stub
  this.uid = in.readInt();
  this.email = in.readUTF();
  this.name = in.readUTF();
  }
  /*
  *将数据序列化
  *
  */
  @Override
  public void write(DataOutput out) throws IOException {
  // TODO Auto-generated method stub
  out.writeInt(uid);
  out.writeUTF(email);
  out.writeUTF(name);
  }
  public String toString() {
  return new String(this.uid + " " + this.email + " " +this.name);
  }
  }
  第二步,实现Map和Reduce类

  public static>  public void map(LongWritable key,UserRecord values,Context context)
  throws IOException,InterruptedException {
  //从 mysql 数据库读取需要的数据字段
  context.write(new Text(values.uid+""), new Text(values.name +" "+values.email));
  }
  }

  public static>  public void reduce(Text key,Iterable< Text> values,Context context)
  throws IOException,InterruptedException {
  //将数据输出到HDFS中
  for(Iterator< Text> itr = values.iterator();itr.hasNext();) {
  context.write(key, itr.next());
  }
  }
  }
  第三步:主函数的实现
  /**
  * @function MapReduce 连接mysql数据库 读取数据
  * @author 小讲
  *
  */

  public>  public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  //输出路径
  Path output = new Path("hdfs://single.hadoop.dajiangtai.com:9000/advance/mysql/out");
  FileSystem fs = FileSystem.get(URI.create(output.toString()), conf);
  if (fs.exists(output)) {
  fs.delete(output);
  }
  //mysql的jdbc驱动
  DistributedCache.addFileToClassPath(new Path("hdfs://single.hadoop.dajiangtai.com:9000/advance/jar/mysql-connector-java-5.1.14.jar"), conf);
  //设置mysql配置信息   4个参数分别为: Configuration对象、mysql数据库地址、用户名、密码
  DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop.dajiangtai.com:3306/djtdb_www", "username", "password");
  Job job = new Job(conf,"test mysql connection");//新建一个任务
  job.setJarByClass(ConnMysql.class);//主类
  job.setMapperClass(ConnMysqlMapper.class);//Mapper
  job.setReducerClass(ConnMysqlReducer.class);//Reducer
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormatClass(DBInputFormat.class);//从数据库中读取数据
  FileOutputFormat.setOutputPath(job, output);
  //列名
  String[] fields = { "uid", "email","name" };
  //六个参数分别为:
  //1.Job;2.Class< extends DBWritable> 3.表名;4.where条件 5.order by语句;6.列名
  DBInputFormat.setInput(job, UserRecord.class,"user", null, null, fields);
  System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
  }
  第四步:运行命令如下。
  [hadoop@single-hadoop-dajiangtai-com djt]$ hadoop jar ConnMysql.jar  com.dajiangtai.hadoop.advance.ConnMysql
  第五步:查看结果如下所示。
  [hadoop@single-hadoop-dajiangtai-com djt]$ hadoop fs -text /advance/mysql/out/
  54 无情 589150803@qq.com
  55 冷血 222281364@qq.com
  提示 MapReduce 操作 MySQL 数据库在实际工作中比较常用,例如把 MySQL 中的数据迁移到 HDFS 中, 当然还有个很好的方法把 MySQL 或 Oracle 中的数据迁移到 HDFS 中,这个工具是 Pig,如果有这
  方面的需求建议使用 Pig。
  [写数据]
  数据处理结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了数据库接口,把 MapReduce 的结果直接输出到 MySQL、Oracle 等数据库。 主要的类如下所示。
  1、DBOutFormat: 提供数据库写入接口。
  2、DBRecordWriter:提供向数据库中写入的数据记录的接口。
  3、DBConfiguration:提供数据库配置和创建链接的接口。
  下面我们通过示例来看看 MapReduce 如何向数据库写数据,假设 MySQL 数据库中有数据库 test,假设数据库中的字段有“uid”,“email”,“name"。
  第一步同上定义UserRecord实现DBwrite和Writable数据接口。代码不再赘叙。
  第二步,实现Map和Reduce类,代码如下所示。

  public static>  {
  public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException
  {
  //读取 hdfs 中的数据
  String email = value.toString().split("\\s")[0];
  String name = value.toString().split("\\s")[1];
  context.write(new Text(email),new Text(name));
  }
  }

  public static>  {
  public void reduce(Text key,Iterable< Text> values,Context context)throws IOException,InterruptedException
  {
  //接收到的key value对即为要输入数据库的字段,所以在reduce中:
  //wirte的第一个参数,类型是自定义类型UserRecord,利用key和value将其组合成UserRecord,然后等待写入数据库
  //wirte的第二个参数,wirte的第一个参数已经涵盖了要输出的类型,所以第二个类型没有用,设为null
  for(Iterator< Text> itr = values.iterator();itr.hasNext();)
  {
  context.write(new UserRecord(key.toString(),itr.next().toString()),null);
  }
  }
  }
  第三步:主函数的实现,代码如下所示。
  /**
  * 将mapreduce的结果数据写入mysql中
  * @author 小讲
  */

  public>
  public static void main(String args[]) throws IOException, InterruptedException,>  {
  Configuration conf = new Configuration();
  //配置 JDBC 驱动、数据源和数据库访问的用户名和密码
  DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://hadoop.dajiangtai.com:3306/djtdb_www","username", "password");
  Job job = new Job(conf,"test mysql connection");//新建一个任务
  job.setJarByClass(WriteDataToMysql.class);//主类
  job.setMapperClass(ConnMysqlMapper.class); //Mapper
  job.setReducerClass(ConnMysqlReducer.class); //Reducer
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(DBOutputFormat.class);//向数据库写数据
  //输入路径
  FileInputFormat.addInputPath(job, new Path("hdfs://single.hadoop.dajiangtai.com:9000/advance/mysql/data/data.txt"));
  //设置输出到数据库    表名:test  字段:uid、email、name
  DBOutputFormat.setOutput(job, "test", "uid","email","name");
  System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
  }
  第四步:运行命令如下。
  [hadoop@single-hadoop-dajiangtai-com djt]$ hadoop jar WriteDataToMysql.jar  com.dajiangtai.hadoop.advance.WriteDataToMysql
  第五步:查看 MySQL 数据库记录。
  1364150803@qq.com        yangjun
  2yangjun13137367327@163.com  yangjun
  39458933@qq.com          binquan


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-628086-1-1.html 上篇帖子: ubuntu14.04 安装配置hadoop2.6-11426248 下篇帖子: Hadoop学习之第二章节:Hadoop命令
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表