zsyzhou 发表于 2015-4-27 06:26:54

对于python mysqldb 的一个简单的封装

  
  这是对mysqldb的一个简单封装,隐藏了cursor ,在小型的项目中可以使用,比较轻量,好定制,我再用scrapy爬取网页的时候就是使用这个 mysql 类的
  




1 #!/usr/bin/env python
2 #
3 # Copyright 2009 Facebook
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
8 #
9 #   http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
15 # under the License.
16
17 """A lightweight wrapper around MySQLdb."""
18
19 import copy
20 import MySQLdb.constants
21 import MySQLdb.converters
22 import MySQLdb.cursors
23 import itertools
24 import logging
25 import time
26
27 class Connection(object):
28   """A lightweight wrapper around MySQLdb DB-API connections.
29
30   The main value we provide is wrapping rows in a dict/object so that
31   columns can be accessed by name. Typical usage::
32
33         db = database.Connection("localhost", "mydatabase")
34         for article in db.query("SELECT * FROM articles"):
35             print article.title
36
37   Cursors are hidden by the implementation, but other than that, the methods
38   are very similar to the DB-API.
39
40   We explicitly set the timezone to UTC and the character encoding to
41   UTF-8 on all connections to avoid time zone and encoding errors.
42   """
43   def __init__(self, host, database, user=None, password=None,
44                  max_idle_time=7*3600):
45         self.host = host
46         self.database = database
47         self.max_idle_time = max_idle_time
48
49         args = dict(conv=CONVERSIONS, use_unicode=True, charset="utf8",
50                     db=database, init_command='SET time_zone = "+8:00"',
51                     sql_mode="TRADITIONAL")
52         if user is not None:
53             args["user"] = user
54         if password is not None:
55             args["passwd"] = password
56
57         # We accept a path to a MySQL socket file or a host(:port) string
58         if "/" in host:
59             args["unix_socket"] = host
60         else:
61             self.socket = None
62             pair = host.split(":")
63             if len(pair) == 2:
64               args["host"] = pair
65               args["port"] = int(pair)
66             else:
67               args["host"] = host
68               args["port"] = 3306
69
70         self._db = None
71         self._db_args = args
72         self._last_use_time = time.time()
73         try:
74             self.reconnect()
75         except Exception:
76             logging.error("Cannot connect to MySQL on %s", self.host,
77                           exc_info=True)
78
79   def __del__(self):
80         self.close()
81
82   def close(self):
83         """Closes this database connection."""
84         if getattr(self, "_db", None) is not None:
85             self._db.close()
86             self._db = None
87
88   def reconnect(self):
89         """Closes the existing database connection and re-opens it."""
90         self.close()
91         try:
92            from DBUtils import PooledDB
93
94            pool_con = PooledDB.PooledDB(creator=MySQLdb, mincached=1, maxcached=10, maxshared=10,
95   maxconnections=20, blocking=False, maxusage=100, **self._db_args)
96            self._db = pool_con.connection()
97            self._db.cursor().connection.autocommit(True)
98         except:
99            self._db = MySQLdb.connect(**self._db_args)
100            self._db.autocommit(True)
101
102   def iter(self, query, *parameters):
103         """Returns an iterator for the given query and parameters."""
104         self._ensure_connected()
105         cursor = MySQLdb.cursors.SSCursor(self._db)
106         try:
107             self._execute(cursor, query, parameters)
108             column_names = for d in cursor.description]
109             for row in cursor:
110               yield Row(zip(column_names, row))
111         finally:
112             cursor.close()
113
114   def query(self, query, *parameters):
115         """Returns a row list for the given query and parameters."""
116         cursor = self._cursor()
117         try:
118             self._execute(cursor, query, parameters)
119             column_names = for d in cursor.description]
120             return
121         finally:
122             cursor.close()
123
124   def get(self, query, *parameters):
125         """Returns the first row returned for the given query."""
126         rows = self.query(query, *parameters)
127         if not rows:
128             return None
129         elif len(rows) > 1:
130             raise Exception("Multiple rows returned for Database.get() query")
131         else:
132             return rows
133
134   # rowcount is a more reasonable default return value than lastrowid,
135   # but for historical compatibility execute() must return lastrowid.
136   def execute(self, query, *parameters):
137         """Executes the given query, returning the lastrowid from the query."""
138         return self.execute_lastrowid(query, *parameters)
139
140   def execute_lastrowid(self, query, *parameters):
141         """Executes the given query, returning the lastrowid from the query."""
142         cursor = self._cursor()
143
144         try:
145             self._execute(cursor, query, parameters)
146             return cursor.lastrowid
147         finally:
148             cursor.close()
149
150   def execute_rowcount(self, query, *parameters):
151         """Executes the given query, returning the rowcount from the query."""
152         cursor = self._cursor()
153         try:
154             self._execute(cursor, query, parameters)
155             return cursor.rowcount
156         finally:
157             cursor.close()
158
159   def executemany(self, query, parameters):
160         """Executes the given query against all the given param sequences.
161
162         We return the lastrowid from the query.
163         """
164         return self.executemany_lastrowid(query, parameters)
165
166   def executemany_lastrowid(self, query, parameters):
167         """Executes the given query against all the given param sequences.
168
169         We return the lastrowid from the query.
170         """
171         cursor = self._cursor()
172         try:
173             cursor.executemany(query, parameters)
174             return cursor.lastrowid
175         finally:
176             cursor.close()
177
178   def executemany_rowcount(self, query, parameters):
179         """Executes the given query against all the given param sequences.
180
181         We return the rowcount from the query.
182         """
183         cursor = self._cursor()
184         try:
185             cursor.executemany(query, parameters)
186             return cursor.rowcount
187         finally:
188             cursor.close()
189
190   def _ensure_connected(self):
191         # Mysql by default closes client connections that are idle for
192         # 8 hours, but the client library does not report this fact until
193         # you try to perform a query and it fails.Protect against this
194         # case by preemptively closing and reopening the connection
195         # if it has been idle for too long (7 hours by default).
196         if (self._db is None or
197             (time.time() - self._last_use_time > self.max_idle_time)):
198             self.reconnect()
199         self._last_use_time = time.time()
200
201   def _cursor(self):
202         self._ensure_connected()
203         return self._db.cursor()
204
205   def _execute(self, cursor, query, parameters):
206         try:
207             return cursor.execute(query, parameters)
208         except OperationalError:
209             logging.error("Error connecting to MySQL on %s", self.host)
210             self.close()
211             raise
212         finally:
213             cursor.close()
214
215
216 class Row(dict):
217   """A dict that allows for object-like property access syntax."""
218   def __getattr__(self, name):
219         try:
220             return self
221         except KeyError:
222             raise AttributeError(name)
223
224
225 # Fix the access conversions to properly recognize unicode/binary
226 FIELD_TYPE = MySQLdb.constants.FIELD_TYPE
227 FLAG = MySQLdb.constants.FLAG
228 CONVERSIONS = copy.copy(MySQLdb.converters.conversions)
229
230 field_types =
231 if 'VARCHAR' in vars(FIELD_TYPE):
232   field_types.append(FIELD_TYPE.VARCHAR)
233
234 for field_type in field_types:
235   CONVERSIONS = [(FLAG.BINARY, str)] + CONVERSIONS
236
237
238 # Alias some common MySQL exceptions
239 IntegrityError = MySQLdb.IntegrityError
240 OperationalError = MySQLdb.OperationalError
  使用的时候可以像下面这样



def db():
return database.Connection(host=MYSQL_HOST, database=MYSQL_DB, user=MYSQL_USER, password=MYSQL_PASSWORD)
db = db()
  这样就可以直接调用查询了
  插入数据的时候使用



db.execute("insert into `picdownload` (`hash`) values(%s)",hash)
  返回受影响的行数,
  查询的时候使用:



db.query("select `id` from `picdownload` where hash = %s",pichash)
  
  返回一个列表,其中包含 查询结果的一个字典集
  
  
  
页: [1]
查看完整版本: 对于python mysqldb 的一个简单的封装