撒的阿坎努斯 发表于 2018-10-8 09:46:39

MySQL to Hbase 数据的抽取

  一个简单粗暴的方法从MySQL数据库抽取数据到Hbase实现的过程:
  rowKey利用MySQL表的主键ID特性作为HBASE的id
  code:
  ####
  #!/usr/bin/env python
  #coding=utf-8
  import sys
  reload(sys)
  sys.setdefaultencoding('utf-8')
  import MySQLdb
  import datetime,time
  sys.path.append('/usr/lib/python2.6/site-packages/hbase')
  from thrift import Thrift
  from thrift.transport import TSocket
  from thrift.transport import TTransport
  from thrift.protocol import TBinaryProtocol
  from hbase import Hbase
  from hbase.ttypes import *
  import csv
  from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, TRegionInfo
  from hbase.ttypes import IOError, AlreadyExists
  def client_conn():
  transport=TSocket.TSocket("172.16.10.87",9090)
  transport=TTransport.TBufferedTransport(transport)
  protocol=TBinaryProtocol.TBinaryProtocol(transport)
  client=Hbase.Client(protocol)
  transport.open()
  return client
  if __name__=="__main__":
  client=client_conn()
  conn = MySQLdb.connect(host="172.161.110.10", user="dlan", passwd="root123", port=5029, db='coolqi', charset='utf8')
  cur = conn.cursor()
  sql="select * from ca_record where ca_time>=STR_TO_DATE('20170720','%Y%m%d')"
  print sql
  cur.execute(sql)
  data=cur.fetchall()
  for k in xrange(len(data)):
  datalist2=[]
  rowKey=data
  print rowKey
  user_id=data
  ca_result=data
  ca_time=data
  real_name=data

  >  sex=data
  datalist=
  datalist1=["user_id","ca_result","ca_time","real_name","id_card","sex"]
  for j in range(len(datalist)):
  args=str(datalist)
  #print args
  if isinstance(args,str):
  mutations="+':'+str(j)+"'"+","+"value="+"'"+str(args)+"')"+"]"
  datalist2.append(mutations)
  #print datalist2
  client.mutateRow('ca_record',str(rowKey),+':'+str(j),value=str(args))])
  #client.mutateRows('ca_record',datalist2)
  ###在for k in xrange(len(data)): 可以利用enumerate()函数减少上面的循环 .

页: [1]
查看完整版本: MySQL to Hbase 数据的抽取