|
这是对mysqldb的一个简单封装,隐藏了cursor ,在小型的项目中可以使用,比较轻量,好定制,我再用scrapy爬取网页的时候就是使用这个 mysql 类的
1 #!/usr/bin/env python
2 #
3 # Copyright 2009 Facebook
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
15 # under the License.
16
17 """A lightweight wrapper around MySQLdb."""
18
19 import copy
20 import MySQLdb.constants
21 import MySQLdb.converters
22 import MySQLdb.cursors
23 import itertools
24 import logging
25 import time
26
27 class Connection(object):
28 """A lightweight wrapper around MySQLdb DB-API connections.
29
30 The main value we provide is wrapping rows in a dict/object so that
31 columns can be accessed by name. Typical usage::
32
33 db = database.Connection("localhost", "mydatabase")
34 for article in db.query("SELECT * FROM articles"):
35 print article.title
36
37 Cursors are hidden by the implementation, but other than that, the methods
38 are very similar to the DB-API.
39
40 We explicitly set the timezone to UTC and the character encoding to
41 UTF-8 on all connections to avoid time zone and encoding errors.
42 """
43 def __init__(self, host, database, user=None, password=None,
44 max_idle_time=7*3600):
45 self.host = host
46 self.database = database
47 self.max_idle_time = max_idle_time
48
49 args = dict(conv=CONVERSIONS, use_unicode=True, charset="utf8",
50 db=database, init_command='SET time_zone = "+8:00"',
51 sql_mode="TRADITIONAL")
52 if user is not None:
53 args["user"] = user
54 if password is not None:
55 args["passwd"] = password
56
57 # We accept a path to a MySQL socket file or a host(:port) string
58 if "/" in host:
59 args["unix_socket"] = host
60 else:
61 self.socket = None
62 pair = host.split(":")
63 if len(pair) == 2:
64 args["host"] = pair[0]
65 args["port"] = int(pair[1])
66 else:
67 args["host"] = host
68 args["port"] = 3306
69
70 self._db = None
71 self._db_args = args
72 self._last_use_time = time.time()
73 try:
74 self.reconnect()
75 except Exception:
76 logging.error("Cannot connect to MySQL on %s", self.host,
77 exc_info=True)
78
79 def __del__(self):
80 self.close()
81
82 def close(self):
83 """Closes this database connection."""
84 if getattr(self, "_db", None) is not None:
85 self._db.close()
86 self._db = None
87
88 def reconnect(self):
89 """Closes the existing database connection and re-opens it."""
90 self.close()
91 try:
92 from DBUtils import PooledDB
93
94 pool_con = PooledDB.PooledDB(creator=MySQLdb, mincached=1, maxcached=10, maxshared=10,
95 maxconnections=20, blocking=False, maxusage=100, **self._db_args)
96 self._db = pool_con.connection()
97 self._db.cursor().connection.autocommit(True)
98 except:
99 self._db = MySQLdb.connect(**self._db_args)
100 self._db.autocommit(True)
101
102 def iter(self, query, *parameters):
103 """Returns an iterator for the given query and parameters."""
104 self._ensure_connected()
105 cursor = MySQLdb.cursors.SSCursor(self._db)
106 try:
107 self._execute(cursor, query, parameters)
108 column_names = [d[0] for d in cursor.description]
109 for row in cursor:
110 yield Row(zip(column_names, row))
111 finally:
112 cursor.close()
113
114 def query(self, query, *parameters):
115 """Returns a row list for the given query and parameters."""
116 cursor = self._cursor()
117 try:
118 self._execute(cursor, query, parameters)
119 column_names = [d[0] for d in cursor.description]
120 return [Row(itertools.izip(column_names, row)) for row in cursor]
121 finally:
122 cursor.close()
123
124 def get(self, query, *parameters):
125 """Returns the first row returned for the given query."""
126 rows = self.query(query, *parameters)
127 if not rows:
128 return None
129 elif len(rows) > 1:
130 raise Exception("Multiple rows returned for Database.get() query")
131 else:
132 return rows[0]
133
134 # rowcount is a more reasonable default return value than lastrowid,
135 # but for historical compatibility execute() must return lastrowid.
136 def execute(self, query, *parameters):
137 """Executes the given query, returning the lastrowid from the query."""
138 return self.execute_lastrowid(query, *parameters)
139
140 def execute_lastrowid(self, query, *parameters):
141 """Executes the given query, returning the lastrowid from the query."""
142 cursor = self._cursor()
143
144 try:
145 self._execute(cursor, query, parameters)
146 return cursor.lastrowid
147 finally:
148 cursor.close()
149
150 def execute_rowcount(self, query, *parameters):
151 """Executes the given query, returning the rowcount from the query."""
152 cursor = self._cursor()
153 try:
154 self._execute(cursor, query, parameters)
155 return cursor.rowcount
156 finally:
157 cursor.close()
158
159 def executemany(self, query, parameters):
160 """Executes the given query against all the given param sequences.
161
162 We return the lastrowid from the query.
163 """
164 return self.executemany_lastrowid(query, parameters)
165
166 def executemany_lastrowid(self, query, parameters):
167 """Executes the given query against all the given param sequences.
168
169 We return the lastrowid from the query.
170 """
171 cursor = self._cursor()
172 try:
173 cursor.executemany(query, parameters)
174 return cursor.lastrowid
175 finally:
176 cursor.close()
177
178 def executemany_rowcount(self, query, parameters):
179 """Executes the given query against all the given param sequences.
180
181 We return the rowcount from the query.
182 """
183 cursor = self._cursor()
184 try:
185 cursor.executemany(query, parameters)
186 return cursor.rowcount
187 finally:
188 cursor.close()
189
190 def _ensure_connected(self):
191 # Mysql by default closes client connections that are idle for
192 # 8 hours, but the client library does not report this fact until
193 # you try to perform a query and it fails. Protect against this
194 # case by preemptively closing and reopening the connection
195 # if it has been idle for too long (7 hours by default).
196 if (self._db is None or
197 (time.time() - self._last_use_time > self.max_idle_time)):
198 self.reconnect()
199 self._last_use_time = time.time()
200
201 def _cursor(self):
202 self._ensure_connected()
203 return self._db.cursor()
204
205 def _execute(self, cursor, query, parameters):
206 try:
207 return cursor.execute(query, parameters)
208 except OperationalError:
209 logging.error("Error connecting to MySQL on %s", self.host)
210 self.close()
211 raise
212 finally:
213 cursor.close()
214
215
216 class Row(dict):
217 """A dict that allows for object-like property access syntax."""
218 def __getattr__(self, name):
219 try:
220 return self[name]
221 except KeyError:
222 raise AttributeError(name)
223
224
225 # Fix the access conversions to properly recognize unicode/binary
226 FIELD_TYPE = MySQLdb.constants.FIELD_TYPE
227 FLAG = MySQLdb.constants.FLAG
228 CONVERSIONS = copy.copy(MySQLdb.converters.conversions)
229
230 field_types = [FIELD_TYPE.BLOB, FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING]
231 if 'VARCHAR' in vars(FIELD_TYPE):
232 field_types.append(FIELD_TYPE.VARCHAR)
233
234 for field_type in field_types:
235 CONVERSIONS[field_type] = [(FLAG.BINARY, str)] + CONVERSIONS[field_type]
236
237
238 # Alias some common MySQL exceptions
239 IntegrityError = MySQLdb.IntegrityError
240 OperationalError = MySQLdb.OperationalError
使用的时候可以像下面这样
def db():
return database.Connection(host=MYSQL_HOST, database=MYSQL_DB, user=MYSQL_USER, password=MYSQL_PASSWORD)
db = db()
这样就可以直接调用查询了
插入数据的时候使用
db.execute("insert into `picdownload` (`hash`) values(%s)",hash)
返回受影响的行数,
查询的时候使用:
db.query("select `id` from `picdownload` where hash = %s",pichash)
返回一个列表,其中包含 查询结果的一个字典集
|
|