设为首页 收藏本站
查看: 287|回复: 0

[经验分享] mapreduce处理结果向输出至mysql(直接插入/更新/追加式更新)

[复制链接]

尚未签到

发表于 2016-10-17 09:30:00 | 显示全部楼层 |阅读模式
mapreduce处理结果向输出至mysql(直接插入/更新/追加式更新)
mapreducemysqlupdate 

 
Java代码   DSC0000.png


  • package cn.m15.ipj.job.usergroup;  
  •   
  • Mapreduce处理结果向输出至mysql  
  •   
  • 1.写入mysql  
  •     <1>job中输出的配置:  
  •         DBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,  
  •         MySQLConstant.MYSQL_FIX_OPEN_FIRST_FIELDS);  
  •         (DBOutputFormat为hadoop自带API,将输入插入数据库)  
  •         public final static String  
  •         MYSQL_FIX_USER =  
  •         "ipj_fix_user";  
  •         public final static String[]  
  •         MYSQL_FIX_OPEN_FIRST_FIELDS = {"app_id","version","imei","first_open","date"};  
  •            
  •     <2>reduce中写入:  
  •         private FixOpenAppFirstRecord record = new FixOpenAppFirstRecord();  
  •            
  •         record.setApp_id(Integer.parseInt(app_id));  
  •         record.setImei(imei);  
  •         record.setVersion(version);  
  •         record.setFirst_open(exactDate);  
  •         record.setDate(date);  
  •         context.write(record, NULL);  
  •            
  •     <3>FixOpenAppFirstRecord中字段的顺序配置(只列出一条):  
  •         @Override  
  •         public void write(PreparedStatement statement) throws SQLException {  
  •             statement.setInt(1, app_id);  
  •             statement.setString(2, version);  
  •             statement.setString(3, imei);  
  •             statement.setString(4, first_open);  
  •             statement.setString(5, date);  
  •         }  
  •     <4>注意:  
  •         1.reduce中record的set值的顺序无所谓,可以任意  
  •         2.job的mysql字段MYSQL_FIX_OPEN_FIRST_FIELDS的顺序一定要和类FixOpenAppFirstRecord中字段的配置顺序一致  
  • 2.更新mysql(改变值)  
  •     <1>job中输出的配置:  
  •         FixDBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,  
  •         MySQLConstant.MYSQL_FIX_IS_TAOBAO_FIELDS);  
  •         (FixDBOutputFormat为自定义Format类,用于更新mysql)  
  •         public final static String  
  •         MYSQL_FIX_USER =  
  •         "ipj_fix_user";  
  •         public final static String[]  
  •         MYSQL_FIX_IS_WEIBO_FIELDS = {"is_weibo","app_id","version","imei"};  
  •     <2>reduce中写入:  
  •         private FixIsMallUserRecord record = new FixIsMallUserRecord();  
  •         record.setApp_id(Integer.parseInt(app_id));  
  •         record.setVersion(version);  
  •         record.setImei(imei);  
  •         record.setIs_taobao(is_taobao);  
  •         context.write(record, NULL);  
  •     <3>FixIsMallUserRecord 中字段的顺序配置(只列出一条):  
  •         @Override  
  •         public void readFields(ResultSet resultSet) throws SQLException {  
  •             this.is_taobao = resultSet.getInt(1);  
  •             this.app_id = resultSet.getInt(2);  
  •             this.version = resultSet.getString(3);  
  •             this.imei = resultSet.getString(4);  
  •         }  
  •     <4>FixDBOutputFormat中关键的拼接sql代码:  
  •         public String constructQuery(String table, String[] fieldNames) {  
  •         if (fieldNames == null) {  
  •             throw new IllegalArgumentException(  
  •             "Field names may not be null");  
  •         }  
  •            
  •         StringBuilder query = new StringBuilder();  
  •         query.append("UPDATE ").append(table);  
  •         if (fieldNames.length > 0 && fieldNames[0] != null  
  •         && fieldNames[1] != null&& fieldNames[2] != null  
  •         && fieldNames[3] != null) {  
  •             query.append(" SET ");  
  •             query.append(fieldNames[0] + " =?");  
  •             query.append(" WHERE ");  
  •             query.append(fieldNames[1] + " =?");  
  •             query.append(" AND ");  
  •             query.append(fieldNames[2] + " =?");  
  •             query.append(" AND ");  
  •             query.append(fieldNames[3] + " =?");  
  •             return query.toString();  
  •             }  
  •             return null;  
  •         }  
  •     <5>注意:  
  •         1.reduce中record的set值的顺序无所谓,可以任意  
  •         2.job的mysql字段MYSQL_FIX_IS_WEIBO_FIELDS的顺序一定要和类  
  •         3.2中的顺序也一定要和FixDBOutputFormat类中的更新顺序一致(第一个参数为要更新的值,第二三四个参数分别为条件参数)  
  • 3.更新mysql(值的追加)  
  •     <1>job中输出的配置:  
  •         FixAppendDBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,  
  •         MySQLConstant.MYSQL_FIX_MALL_LOGIN_FIELDS);  
  •         (FixAppendDBOutputFormat自定义Format,用户更新mysql[追加])  
  •         public final static String  
  •         MYSQL_FIX_USER =  
  •         "ipj_fix_user";  
  •         public final static String[]  
  •         MYSQL_FIX_MALL_LOGIN_FIELDS = {"login_taobao_count","app_id","version","imei"};  
  •     <2>reduce中写入:  
  •         private FixMallTotalLoginRecord record = new FixMallTotalLoginRecord();  
  •         record.setApp_id(Integer.parseInt(app_id));  
  •         record.setVersion(version);  
  •         record.setImei(imei);  
  •         record.setLogin_taobao_count(num);  
  •         context.write(record, NULL);  
  •     <3>FixIsMallUserRecord 中字段的顺序配置(只列出一条):  
  •         @Override  
  •         public void write(PreparedStatement statement) throws SQLException {  
  •             statement.setInt(1, login_taobao_count);  
  •             statement.setInt(2, app_id);  
  •             statement.setString(3, version);  
  •             statement.setString(4, imei);  
  •         }  
  •     <4>FixAppendDBOutputFormat中关键的拼接sql代码  
  •         public String constructQuery(String table, String[] fieldNames) {  
  •             if (fieldNames == null) {  
  •                 throw new IllegalArgumentException  
  •                 ("Field names may not be null");  
  •             }  
  •             StringBuilder query = new StringBuilder();  
  •             query.append("UPDATE ").append(table);  
  •             if ( fieldNames.length > 0 &&  
  •             fieldNames[0] != null &&  
  •             fieldNames[1] != null &&  
  •             fieldNames[2] != null &&  
  •             fieldNames[3] != null) {  
  •                 query.append(" SET ");  
  •                 query.append(fieldNames[0] +  
  •                 " = "+fieldNames[0]+"+?");  
  •                 query.append(" WHERE ");  
  •                 query.append(fieldNames[1] + " =?");  
  •                 query.append(" AND ");  
  •                 query.append(fieldNames[2] + " =?");  
  •                 query.append(" AND ");  
  •                 query.append(fieldNames[3] + " =?");  
  •                 return query.toString();  
  •                 }  
  •             return null;  
  •         }  
  •     <5>注意:  
  •         1.reduce中record的set值的顺序无所谓,可以任意  
  •         2.job的mysql字段MYSQL_FIX_MALL_LOGIN_FIELDS的顺序一定要和类  
  •         3.2中的顺序也一定要和FixAppendDBOutputFormat类中的更新顺序一致(第一个参数为要更新的值[在原有基础上增加],第二三四个参数分别为条件参数)  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-287274-1-1.html 上篇帖子: 对付 MYSQL 的死连接,SLEEP的进程的来源探究[转] 下篇帖子: 终于解决了mysql的中文编码问题。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表