|
这是我写的一个pyhon操作文件入进数据库的程序,由于功底还不扎实!所以写得很粗糙!
下面代码是一个公共的输出类,所有的输出语句通过下面程序来展示:
1 #!/user/bin/env python
2 #coding:utf-8
3
4
5 try:
6 import time
7 except Exception, e:
8 raise e
9
10
11 def msgOutput( content, codeZT, codeType = "UTF-8" ):
12 if codeZT == 1:
13 return "[Msg-Prompt][%s] %s" % ( time.strftime("%Y/%m/%d-%X",time.localtime()), content )
14 else:
15 return "\033[1;31m[Msg-Error ][%s] %s\033[0m" % ( time.strftime("%Y/%m/%d-%X",time.localtime()), content)
下面是数据库类,暂时还不完善
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 """
5 oracle链接类
6 """
7 import sys
8 sys.path.append( "这里是上面输出类的路径" ) #msgOutput.py的路径
9 from msgOutput import msgOutput
10 import cx_Oracle,time
11
12
13 #***************************************************************************************************
14 #数据库类
15 #***************************************************************************************************
16
17 class oracleDb:
18 con = None
19 def __init__(self,username,password,dbname):
20 self.username = username
21 self.password = password
22 self.dbname = dbname
23
24 #获取链接
25 def connectDB(self):
26 try:
27 self.con = cx_Oracle.connect(self.username,self.password,self.dbname)
28 print msgOutput('数据库已建立连接...',1)
29 return self.con
30 except Exception,e:
31 print msgOutput('数据库连接失败,原因: %s...' %str( e ).replace("\n","") ,0)
32 sys.exit(0)
33
34 #def executeDB(self,sqlTxt):
35 # try:
36 # con = self.connectDB()
37 # cursor = con.cursor()
38 # cursor.execute(sqlTxt)
39 # gename = cursor.fetchall()
40 # cursor.close()
41 # return gename
42 # except Exception,e:
43 # print msgOutput('数据库查询失败,原因: %s...' %str( e ).replace("\n","") ,0)
44 # self.con.close()
45 # sys.exit(0)
46 #关闭链接
47 def __del__(self):
48 # self.cursor().clos[Msg-Prompt][%s])
49 try:
50 self.con.close()
51 print msgOutput('数据库已断开连接...' ,1)
52 except Exception,e:
53 print msgOutput('数据库关闭失败,原因: %s...' %str( e ).replace("\n","") ,0)
下面程序就是文件入库的具体程序:
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 """
5 @author:niklaus
6 @date:20150723
7 """
8
9 import sys,os
10 11 import time,linecache
12 import datetime
13 sys.path.append( "这里是下面模块导入的路径" )
14 from msgOutput import msgOutput
15 from oracleDb import oracleDb
16 nameReplace="文件入库程序"
17
18 #***************************************************************************************************
19 #逻辑层
20 #***************************************************************************************************
21 def fileLoad(fileName,flag,tableName,*column_names):
22 username = '数据库用户名'
23 password = '数据库密码'
24 dbname = '数据库库名'
25 db = oracleDb(username,password,dbname)
26 conn = db.connectDB()
27 columnList = [item.upper() for item in column_names] #将可变参数存入列表
28 checkFileName(fileName)
29 checkTableName(tableName,username,conn)
30 checkColums(tableName,username,columnList,conn)
31 loadInfos(fileName,flag,tableName,columnList)
32 truncateTable(tableName,conn)
33 loadDate(fileName,flag,tableName,columnList,conn)
34
35 #***************************************************************************************************
36 #检查文件是否在
37 #***************************************************************************************************
38 def checkFileName(fileName):
39 try:
40 if not os.path.exists(fileName):
41 print msgOutput("数据文件[%s]不存在!请仔细检查..." %fileName,0 )
42 sys.exit(0)
43 except Exception,e:
44 raise e
45
46 def loadInfos(fileName,flag,tableName,columnList):
47 lenFile = int(os.popen('wc -l %s' % (fileName)).read().split()[0])
48 print msgOutput("数据文件[%s]共有 %d 条数据待入库..." %(fileName,lenFile),1 )
49 print msgOutput("数据库表[%s]..." % (tableName.upper()),1 )
50 print msgOutput("数据库表中字段%s..." % columnList,1 )
51
52 #***************************************************************************************************
53 #检查数据库表是否存在
54 #***************************************************************************************************
55 def checkTableName(tableName,username,conn):
56 try:
57 sqlTxt = r''' select object_name from all_objects
58 where owner = '%s'
59 and object_type = 'TABLE'
60 and object_name = '%s'
61 ''' % (username.upper(),tableName.upper())
62 cursor = conn.cursor()
63 cursor.execute(sqlTxt)
64 gename = cursor.fetchall()
65 if len(gename) == 0:
66 print msgOutput('数据库中表[%s]不存在!请检查确认...' %(tableName.upper()),0)
67 sys.exit(0)
68 else:
69 return True
70 except Exception,e:
71 print msgOutput('数据库查询失败,原因: %s...' %str( e ).replace("\n","") ,0)
72 conn.close()
73 sys.exit(0)
74
75
76 #***************************************************************************************************
77 #检查数据库表字段与输入的是否一致
78 #***************************************************************************************************
79 def checkColums(tableName,username,columnList,conn):
80 try:
81 sqlTxt = r'''
82 select column_name
83 from dba_tab_columns x
84 where x.table_name = '%s'
85 and x.owner = '%s'
86 ''' % (tableName.upper(),username.upper())
87 cursor = conn.cursor()
88 cursor.execute(sqlTxt)
89 gename = cursor.fetchall()
90 columnDb = [ item[0] for item in gename ]
91 for item in columnList:
92 if item not in columnDb:
93 print msgOutput('输入字段与数据库表中字段不一致!请仔细检查!...' ,0)
94 sys.exit(0)
95 except Exception,e:
96 print msgOutput('数据库查询失败,原因: %s...' %str( e ).replace("\n","") ,0)
97 conn.close()
98 sys.exit(0)
99 #sqlTxt = r'''
100 # select column_name
101 # from dba_tab_columns x
102 #
103 #
104 #
105 #gename = db.executeDB(sqlTxt)
106 #columnDb = [ item[0] for item in gename ]
107 #for item in columnList:
108 # if item not in columnDb:
109 # print msgOutput('输入字段与数据库表中字段不一致!请仔细检查!...' ,0)
110 # sys.exit(0)
111 #***************************************************************************************************
112 #清空将入库的表
113 #***************************************************************************************************
114 def truncateTable(tableName,conn):
115 try:
116 sqlTxt = r'''
117 truncate table %s
118 ''' % tableName
119 cursor = conn.cursor()
120 cursor.execute(sqlTxt)
121 print msgOutput('数据库表[%s]已经清空!...' % tableName.upper(),1)
122 return True
123 except Exception,e:
124 conn.close()
125 print msgOutput('数据库执行失败,原因: %s...' %str( e ).replace("\n","") ,0)
126
127 #***************************************************************************************************
128 #文件处理
129 #***************************************************************************************************
130 def loadDate(fileName,flag,tableName,columnList,conn,recordDefault=200000):
131 #recordDefault = 200000 if recordDefault > 200000 else recordDefault
132 try:
133 lines = [ item.split(flag)[:len(columnList)] for item in linecache.getlines(fileName)]
134 lenCount = len(lines)
135 listInput = []
136 for x in xrange(0,lenCount,recordDefault):
137 listInput = lines[x:x+recordDefault]
138 if (dataInput(tableName,columnList,conn,listInput)):
139 print msgOutput('数据文件[%s]已经入库 %d 条数据...' %(fileName,len(listInput)),1)
140 listInput = []
141 except Exception,e:
142 print msgOutput ('文件处理失败,原因: %s...' %str( e ).replace("\n","") ,0)
143
144 #***************************************************************************************************
145 #数据入库
146 #***************************************************************************************************
147 def dataInput(tableName,columnList,conn,listInput):
148 try:
149 sqlValues = ",:".join(columnList)
150 sqlTxt = r'''
151 insert into %s values(:%s)
152 ''' %(tableName,sqlValues)
153 cursor = conn.cursor()
154 cursor.executemany(sqlTxt,listInput)
155 conn.commit()
156 return True
157 except Exception,e:
158 print msgOutput('数据文件入库失败,原因: %s...' %str( e ).replace("\n","") ,0)
159 conn.close()
160 sys.exit(0)
161
162 def main():
163 flag = '|(这里是文件中数据与数据之间的分割符)'
164 fileName = '文件的名称(全路径)'
165 tableName = 'temp_file_load_cs'
166 print msgOutput('%s开始运行...' %nameReplace,1)
167 fileLoad(fileName,flag,tableName,"mr_copyright","mr_songid","mr_songname","mr_singerid","mr_singer")
168 print msgOutput('\033[1;32m%s序运行结束...\033[0m' %nameReplace,1)
169
170 if __name__=='__main__':
171 main()
下面是程序的运行结果:

可以看出,我是每次入库20万条数据!360多万条数据入库用了一分三十三秒钟,总体来说速度还是不错的!
由于面向对象还不熟练,这次主要是python的面向过程编程,下次将该程序改成面向对象处理!
ps:本文为博主原创文章,由于功底不好,请勿随意转载! |
|