使用hive向mysql,sqlserver,hbase插入数据
package com.hive.udf.test;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.IntWritable;
public class GenericSQLServerDBOutput extends GenericUDF {
private static final Log LOG = LogFactory.getLog(
GenericSQLServerDBOutput.class.getName());
private transient ObjectInspector[] argumentOI;//参数
private transient Connection connection = null;
private String url;//jdbc url
private String user;//用户名
private String pass;//密码
private String dbName;//数据库名称
private final IntWritable result = new IntWritable(-1);//返回参数
//初始化方法,确定返回类型
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentTypeException {
argumentOI = arguments;
for (int i = 0; i < 4; i++) {
if (arguments.getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments);
if (!(poi.getPrimitiveCategory() ==
PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
throw new UDFArgumentTypeException(i,
"The argument of functionshould be \""
+ serdeConstants.STRING_TYPE_NAME
+ "\", but \"" + arguments.getTypeName()
+ "\" is found");
}
}
}
for (int i = 4; i < arguments.length; i++) {
if (arguments.getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(i,
"The argument of function should be primative"
+ ", but \"" + arguments.getTypeName()
+ "\" is found");
}
}
return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
url = ((StringObjectInspector) argumentOI)
.getPrimitiveJavaObject(arguments.get());
user = ((StringObjectInspector) argumentOI)
.getPrimitiveJavaObject(arguments.get());
pass = ((StringObjectInspector) argumentOI)
.getPrimitiveJavaObject(arguments.get());
dbName = ((StringObjectInspector) argumentOI)
.getPrimitiveJavaObject(arguments.get());
try {
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
connection = DriverManager.getConnection(url + ";DatabaseName=" + dbName,
user, pass);
} catch (SQLException ex) {
LOG.error("Driver loading or connection issue", ex);
result.set(2);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
if (connection != null) {
try {
PreparedStatement ps =
connection.prepareStatement(((StringObjectInspector) argumentOI)
.getPrimitiveJavaObject(arguments.get()));
for (int i = 5; i < arguments.length; ++i) {
PrimitiveObjectInspector poi =
((PrimitiveObjectInspector) argumentOI);
ps.setObject(i - 4, poi.getPrimitiveJavaObject(arguments.get()));
}
ps.execute();
ps.close();
result.set(0);
} catch (SQLException e) {
LOG.error("Underlying SQL exception", e);
result.set(1);
} finally {
try {
connection.close();
} catch (Exception ex) {
LOG.error("Underlying SQL exception during close", ex);
}
}
}
return result;
}
@Override
//其用于Hadoop task内部,在使用到这个函数时来展示调试信息
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("dboutput(");
if (children.length > 0) {
sb.append(children);
for (int i = 1; i < children.length; i++) {
sb.append(",");
sb.append(children);
}
}
sb.append(")");
return sb.toString();
}
}
页:
[1]