设为首页 收藏本站
查看: 707|回复: 0

[经验分享] python大数据分析代码案例

[复制链接]

尚未签到

发表于 2018-8-8 13:19:11 | 显示全部楼层 |阅读模式
  #查询用户余额代码案例
  import sys
  import MySQLdb
  import pandas as pd
  optmap = {
  'dbuser' : 'aduser',
  'dbpass' : '123654',
  'dbhost' : '192.168.10.14',
  'dbport' : 3306,
  'dbname' : 'HBAODB'
  }
  def sql_select(reqsql):
  try:
  db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])
  db_cursor=db_conn.cursor()
  db_conn.query("use %s"%optmap['dbname'])
  count = db_cursor.execute(reqsql)
  ret = db_cursor.fetchall()
  db_cursor.close()
  db_conn.close
  return ret
  except MySQLdb.Error,e:
  print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])
  return ''
  def getusercoin(userid):
  i = int(userid) % 10

  reqsql = "select>  #print reqsql
  ret = sql_select(reqsql) #调用前面的函数
  #print ret
  return ret[0]
  def getall(userlist):
  userdata = pd.DataFrame(columns=('userid', 'coin'))
  index = 0
  for userid in userlist:
  coins = getusercoin(userid) #调用前面的函数
  #print coins[0],coins[1]/100.0
  if coins[0] is not None:
  userdata.loc[index] = (str(coins[0]), coins[1]/100.0)
  else:
  userdata.loc[index] = (str(userid), 0)
  index += 1
  #print userdata.tail(10)
  df = spark.createDataFrame(userdata)
  #df.createOrReplaceTempView('userdata')
  df.show(50)
  #用户消费查询代码案例
  import sys
  import MySQLdb
  import pandas as pd
  import datetime
  import time
  optmap = {
  'dbuser' : 'aduser',
  'dbpass' : '123654',
  'dbhost' : '192.168.10.12',
  'dbport' : 3306,
  'dbname' : 'JIESUANDB'
  }
  def sql_select(reqsql):
  try:
  db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])
  db_cursor=db_conn.cursor()
  db_conn.query("use %s"%optmap['dbname'])
  count = db_cursor.execute(reqsql)
  ret = db_cursor.fetchall()
  db_cursor.close()
  db_conn.close
  return ret
  except MySQLdb.Error,e:
  print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])
  return ''
  #用户人民币消费
  def getuserconsume(userid, startday): #定义带参函数
  strdate = startday.strftime("%y%m%d")
  # 送礼物 +  守护 +  点歌 +  表情贴
  reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DBIOPTYPE=2 AND (OPTYPE=1 OR OPTYPE=4 OR OPTYPE=17 OR OPTYPE=25)" % (strdate, int(userid))
  print reqsql
  ret = sql_select(reqsql) #调用前面的函数
  #print ret
  if ret[0][0] is not None:
  return float(ret[0][1])/100.0
  else:
  return 0
  #用户充值
  def getusercharge(userid, startday):
  strdate = startday.strftime("%y%m%d")
  reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DUBIOPTYPE=1 AND (OPTYPE=1016 OR OPTYPE=1020 OR OPTYPE=1021)" % (strdate, int(userid))
  print reqsql
  ret = sql_select(reqsql)#调用前面的函数
  print ret
  if ret[0][0] is not None:
  return float(ret[0][1])/100.0
  else:
  return 0
  #用户当天结余人民币
  def getusercurcoin(userid, startday):
  strdate = startday.strftime("%y%m%d")
  reqsql = "select CONSUMERID,CURRENTNUM from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))
  print reqsql
  ret = sql_select(reqsql)
  print ret
  if ret:
  return float(ret[0][1])/100.0
  else:
  return 0
  def getconsume():
  startdate = datetime.date(2017, 1, 1)
  enddate = datetime.date(2017, 2, 2)
  userid = 3101011990
  userdata = pd.DataFrame(columns=('date','userid','charge', 'consume', 'dayleftcoin'))
  index = 0
  # 计算日差
  td = enddate - startdate
  datelen = td.days + 1
  #print datelen
  delta = datetime.timedelta(days=1)
  allcoins = 0
  for i in range(0,datelen):
  startday = startdate + delta * i
  consume_coin = getuserconsume(userid, startday)#调用前面的函数
  charge = getusercharge(userid, startday)#调用前面的函数
  dayleftcoin = getusercurcoin(userid, startday)#调用前面的函数
  userdata.loc[index] = (startday.strftime("%Y-%m-%d"),str(userid), charge, consume_coin, dayleftcoin)
  index += 1
  #userdata.loc[index] = ('total',str(userid), allcoins, 0)
  print userdata.tail(100)
  return
  getconsume()
  #查询用户机器ID 代码案例
  import sys
  import MySQLdb
  import pandas as pd
  import datetime
  optmap = {
  'dbuser' : 'aduser',
  'dbpass' : '123654',
  'dbhost' : '192.168.10.15',
  'dbport' : 3306,
  'dbname' : 'JIQIDB'
  }
  def sql_select(reqsql):
  try:
  db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])
  db_cursor=db_conn.cursor()
  db_conn.query("use %s"%optmap['dbname'])
  count = db_cursor.execute(reqsql)
  ret = db_cursor.fetchall()
  db_cursor.close()
  db_conn.close
  return ret
  except MySQLdb.Error,e:
  print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])
  return ''
  def getusermid(userid, months):
  i = int(userid) % 50
  reqsql = "select USERID,MACHINEID from LOGINHISTORY%s%u where USERID=%u group by MACHINEID" % (months,int(i), int(userid))
  print reqsql
  ret = sql_select(reqsql)
  #print ret
  #print ret[0]
  return ret
  def getall(userlist):
  today = datetime.date.today()
  months = today.strftime("%Y%m")
  userdata = pd.DataFrame(columns=('USERID', 'MACHINEID'))
  index = 0
  for userid in userlist:
  coins = getusermid(userid, months)
  for i in range(len(coins)):
  #print coins
  userdata.loc[index] = (str(coins[0]), str(coins[1]))
  index += 1
  #print coins[0],coins[1]/100.0
  #userdata.loc[index] = (str(coins[0]), coins[1]/100.0)
  #index += 1
  #print userdata.tail(10)
  df = spark.createDataFrame(userdata)
  #df.createOrReplaceTempView('userdata')
  df.show(1000)
  #人民币统计代码案例
  from pyspark.sql import Row
  from pyspark.sql.types import *
  from pyspark.sql.functions import udf
  import MySQLdb
  import mysql_op
  import datetime
  import time
  from mysql_op import MySQL
  import pandas as pd
  import numpy as np
  from fastparquet import ParquetFile
  from fastparquet import write
  def fromDayToDay(startdate, datelen, func):
  delta = datetime.timedelta(days=1)
  for i in range(0,datelen):
  startday = startdate + delta * i
  endday = startdate + delta * (i + 1)
  func(startday, endday)
  return
  def fromDayToEndDay(startdate, datelen, endday, func):
  delta = datetime.timedelta(days=1)
  for i in range(0,datelen):
  startday = startdate + delta * i
  #endday = startdate + delta * (i + 1)
  func(startday, endday)
  return
  # 获取人民币数据
  def saveDayPackageData(startday, endday):
  #数据库连接参数
  dbconfig = {'host':'192.168.10.12',
  'port': 3306,
  'user':'user',
  'passwd':'123654',
  'db':'JIESUANDB',
  'charset':'utf8'}
  #连接数据库,创建这个类的实例
  mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])
  strday = startday.strftime("%Y%m%d")
  tsstart=time.mktime(startday.timetuple())
  tsend=time.mktime(endday.timetuple())
  strdate = startday.strftime("%y%m%d")
  sql = "SELECT OPTIME,CONSUMERID,PRESERVENUM,CURRENTNUM,DUBIOPTYPE,DUBIOPNUM,OPTYPE,OPDETAIL,OPNUM FROM `DUBIJIESUANTONGJI_%s`" % (strdate)
  print sql
  pddf = pd.read_sql(sql, con=mysql_cn)
  mysql_cn.close()
  print pddf.head(5)
  dflen = len(pddf.index)
  if dflen > 0:
  print pddf.describe()
  write("/home/haoren/logstatis/billdata"+strday+".parq", pddf)
  return
  def savePackageData():
  startday = datetime.date(2016, 12, 28)
  endday = datetime.date(2016, 12, 28)
  td = endday - startday
  datelen = td.days + 1
  # 获取包裹数据
  fromDayToDay(startday, datelen, saveDayPackageData)
  # 获取WF册数据
  def saveDayWifiPhoneRegData(startday, endday):
  #数据库连接参数
  dbconfig = {'host':'192.168.10.15',
  'port': 3306,
  'user':'user',
  'passwd':'123654',
  'db':'AADB',
  'charset':'utf8'}
  #连接数据库,创建这个类的实例
  mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])
  strday = startday.strftime("%Y%m%d")
  tsstart=time.mktime(startday.timetuple())
  tsend=time.mktime(endday.timetuple())
  strdate = startday.strftime("%y%m%d")
  sql = "select USERID from NEW_WEB_USER where TIME< %d AND TYPE=17" % (tsend)
  print sql
  pddf = pd.read_sql(sql, con=mysql_cn)
  mysql_cn.close()
  print pddf.head(5)
  dflen = len(pddf.index)
  if dflen > 0:
  print pddf.describe()
  write("/home/haoren/logstatis/wifiphonereg"+strday+".parq", pddf)
  return
  def saveWifiPhoneReg():
  startday = datetime.date(2016, 12, 1)
  endday = datetime.date(2016, 12, 1)
  td = endday - startday
  datelen = td.days + 1
  # 获取包裹数据
  fromDayToDay(startday, datelen, saveDayWifiPhoneRegData)
  OPTypeName = {
  0:"会员",
  1:"道具",
  }
  OpDetailName19 = {
  1:"购物保存收益",
  2:"下注和返注",
  3:"发红包",
  4:"抢红包",
  }
  OpDetailName22 = {
  1:"活动1收益到总账号",
  2:"活动2收益到总账号",
  3:"活动3收益到总账号",
  }
  OpDetailName23 = {
  0:"购买会员",
  1:"购买道具",
  2:"扫雷",
  }
  def getOpTypeName(func):
  name = OPTypeName.get(func)
  if name == None:
  return ""
  else:
  return name.decode('utf8')
  def getOpDetailName(func, detail):
  if func == 19:
  if detail > 10000 and detail < 30000:
  return "包裹回滚".decode('utf8')
  elif detail > 50000 and detail < 60000:
  return "红包接龙".decode('utf8')
  else:
  name = OpDetailName19.get(detail)
  if name == None:
  return ""
  else:
  return name.decode('utf8')
  elif func == 22:
  name = OpDetailName22.get(detail)
  if name == None:
  return ""
  else:
  return name.decode('utf8')
  elif func == 23:
  name = OpDetailName23.get(detail)
  if name == None:
  return ""
  else:
  return name.decode('utf8')
  else:
  return ""
  def getDayPackageData(startday, endday):
  strday = startday.strftime("%Y%m%d")
  print strday + '人民币数据'
  df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")
  df.show(10)
  #df.createOrReplaceTempView('billdata')
  #df.registerTempTable("billdata")
  #sqlret = sqlc.sql("SELECT count(*) from billdata")
  #sqlret.show(1)
  df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))
  df2.show(10)
  df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))
  df.show(10)
  df.createOrReplaceTempView('billdata')
  return
  def getPackageData():
  startday = datetime.date(2016, 12, 28)
  endday = datetime.date(2016, 12, 28)
  td = endday - startday
  datelen = td.days + 1
  # 获取包裹数据
  fromDayToDay(startday, datelen, getDayPackageData)#调用前面的函数
  print 'getPackageData finish'
  # 获取充值数据
  def getChargeInfo(startday, endday):
  #数据库连接参数
  dbconfig = {'host':'192.168.10.14',
  'port': 3306,
  'user':'user',
  'passwd':'123654',
  'db':'BAOIMDB',
  'charset':'utf8'}
  #连接数据库,创建这个类的实例
  mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])
  strday = startday.strftime("%Y%m%d")
  tsstart=time.mktime(startday.timetuple())
  tsend=time.mktime(endday.timetuple())
  regdata = pd.DataFrame()
  for i in range(0, 20):
  sql = "SELECT * FROM `USERCONSUMPTIONRECORD%d` where TIME > %d AND TIME < %d" % (i, tsstart, tsend)
  print sql
  #pddf = pd.DataFrame()
  pddf = pd.read_sql(sql, con=mysql_cn)
  #print pddf.head(5)
  if len(pddf.index) > 0:
  regdata = regdata.append(pddf,ignore_index=True)
  print regdata.tail(5)
  if len(regdata.index) > 0:
  print regdata.describe()
  write("/home/haoren/logstatis/register"+strday+".parq", regdata)
  mysql_cn.close()
  return
  def pudf(x):
  return getOpTypeName(x.OPTYPE)
  def getMergeData(strday):
  dfbill = ParquetFile("/home/haoren/logstatis/billdata"+strday+".parq").to_pandas()
  dfwifireg = ParquetFile("/home/haoren/logstatis/wifiphonereg"+strday+".parq").to_pandas()
  tempdf = pd.merge(dfbill, dfwifireg, left_on='CONSUMERID', right_on='USERID')
  #write("/home/haoren/logstatis/analyze"+strday+".parq", tempdf)
  #print tempdf.head(10)
  tempdf['OPTYPENAME'] = tempdf.apply(lambda x:getOpTypeName(x.OPTYPE), axis=1)
  #print tempdf.head(10)
  tempdf['DETAILNAME'] = tempdf.apply(lambda x:getOpDetailName(x.OPTYPE,x.OPDETAIL), axis=1)
  df = spark.createDataFrame(tempdf)
  df.show(10)
  return df
  def analyzeDayBillData(startday, endday):
  strday = startday.strftime("%Y%m%d")
  print strday + '人民币数据'
  df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")
  dfwifireg = spark.read.load("/home/haoren/logstatis/wifiphonereg"+strday+".parq")
  df3 = df.join(dfwifireg, df.CONSUMERID == dfwifireg.USERID)
  df3.show(10)
  df3.write.parquet("/home/haoren/logstatis/analyze"+strday+".parq")
  #df2 = df3.withColumn('OPTYPENAME', udf(getOpTypeName)(df3.OPTYPE))
  #df2.show(10)
  #df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))
  #df.show(10)
  #df.createOrReplaceTempView('analyzebilldata')
  return
  def analyzeDayBillData2(startday, endday):
  strday = startday.strftime("%Y%m%d")
  print strday + '人民币数据'
  #df = spark.read.load("/home/haoren/logstatis/analyze"+strday+".parq")
  df = getMergeData(strday)
  return
  df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))
  df2.show(10)
  df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))
  df.show(10)
  df.createOrReplaceTempView('analyzebilldata')
  return
  def analyzeBillData():
  startday = datetime.date(2016, 12, 28)
  endday = datetime.date(2016, 12, 28)
  td = endday - startday
  datelen = td.days + 1
  # 获取包裹数据
  fromDayToDay(startday, datelen, analyzeDayBillData2)#调用前面的函数
  print 'analyzeBillData finish'
  savePackageData()
  getPackageData()
  #saveWifiPhoneReg()
  #analyzeBillData()

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-548714-1-1.html 上篇帖子: Python环境安装 下篇帖子: [Python]使用smtplib类库发邮件
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表