首先說下,由于最新的 0.8 版還是開發(fā)版本,因此我使用的是 0.79 版,API 也許會有些不同。 接著就從安裝開始介紹吧,以 Debian/Ubuntu 為例(請確保有管理員權(quán)限): 復(fù)制代碼 代碼如下: apt-get install mysql-server apt-get install mysql-client apt-get install libmysqlclient15-dev 2.python-mysqldb 復(fù)制代碼 代碼如下: apt-get install python-mysqldb 3.easy_install 復(fù)制代碼 代碼如下: wget http://peak./dist/ez_setup.py python ez_setup.py 4.MySQL-Python 復(fù)制代碼 代碼如下: easy_install MySQL-Python 5.SQLAlchemy 復(fù)制代碼 代碼如下: easy_install SQLAlchemy
如果是用其他操作系統(tǒng),遇到問題就 Google 一下吧。我是在 Mac OS X 上開發(fā)的,途中也遇到些問題,不過當(dāng)時沒記下來…… 裝好后就可以開始使用了: 復(fù)制代碼 代碼如下: from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker
這里的 DB_CONNECT_STRING 就是連接數(shù)據(jù)庫的路徑?!癿ysql+mysqldb”指定了使用 MySQL-Python 來連接,“root”和“123”分別是用戶名和密碼,“l(fā)ocalhost”是數(shù)據(jù)庫的域名,“ooxx”是使用的數(shù)據(jù)庫名(可省略),“charset”指定了連接時使用的字符集(可省略)。 create_engine() 會返回一個數(shù)據(jù)庫引擎,echo 參數(shù)為 True 時,會顯示每條執(zhí)行的 SQL 語句,生產(chǎn)環(huán)境下可關(guān)閉。 sessionmaker() 會生成一個數(shù)據(jù)庫會話類。這個類的實(shí)例可以當(dāng)成一個數(shù)據(jù)庫連接,它同時還記錄了一些查詢的數(shù)據(jù),并決定什么時候執(zhí)行 SQL 語句。由于 SQLAlchemy 自己維護(hù)了一個數(shù)據(jù)庫連接池(默認(rèn) 5 個連接),因此初始化一個會話的開銷并不大。對 Tornado 而言,可以在 BaseHandler 的 initialize() 里初始化: 復(fù)制代碼 代碼如下: class BaseHandler(tornado.web.RequestHandler): def initialize(self): self.session = models.DB_Session() def on_finish(self): 對其他 Web 服務(wù)器來說,可以使用 sqlalchemy.orm.scoped_session,它能保證每個線程獲得的 session 對象都是唯一的。不過 Tornado 本身就是單線程的,如果使用了異步方式,就可能會出現(xiàn)問題,因此我并沒使用它。 拿到 session 后,就可以執(zhí)行 SQL 了: 復(fù)制代碼 代碼如下: session.execute('create database abc') print session.execute('show databases').fetchall() session.execute('use abc') # 建 user 表的過程略 print session.execute('select * from user where id = 1').first() print session.execute('select * from user where id = :id', {'id': 1}).first() 不過這和直接使用 MySQL-Python 沒啥區(qū)別,所以就不介紹了;我還是喜歡 ORM 的方式,這也是我采用 SQLAlchemy 的唯一原因。 于是來定義一個表: 復(fù)制代碼 代碼如下: from sqlalchemy import Column
from sqlalchemy.types import CHAR, Integer, String from sqlalchemy.ext.declarative import declarative_base
def init_db(): def drop_db():
id = Column(Integer, primary_key=True) init_db() declarative_base() 創(chuàng)建了一個 BaseModel 類,這個類的子類可以自動與一個表關(guān)聯(lián)。 接著就開始使用這個表吧: 復(fù)制代碼 代碼如下: from sqlalchemy import func, or_, not_
query = session.query(User) query2 = session.query(User.name) print query2.filter(User.id == 1).scalar() # 如果有記錄,返回第一條記錄的第一個元素 query4 = session.query(User.id) print query4.count() query.filter(User.id == 1).update({User.name: 'c'}) user.name = 'd' session.delete(user) session.rollback() 增刪改查都涉及到了,自己看看輸出的 SQL 語句就知道了,于是基礎(chǔ)知識就介紹到此了。
如何批量插入大批數(shù)據(jù)? 復(fù)制代碼 代碼如下: session.execute( User.__table__.insert(), [{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)] ) session.commit() 上面我批量插入了 10000 條記錄,半秒內(nèi)就執(zhí)行完了;而 ORM 方式會花掉很長時間。 如何讓執(zhí)行的 SQL 語句增加前綴? 復(fù)制代碼 代碼如下: session.query(User.name).prefix_with('HIGH_PRIORITY').all()
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'}) 如何替換一個已有主鍵的記錄? 復(fù)制代碼 代碼如下: user = User(id=1, name='ooxx') session.merge(user) session.commit() 或者使用 MySQL 的 INSERT … ON DUPLICATE KEY UPDATE,需要用到 @compiles 裝飾器,有點(diǎn)難懂,自己搜索看吧:《SQLAlchemy ON DUPLICATE KEY UPDATE》 和 sqlalchemy_mysql_ext。 如何使用無符號整數(shù)? 復(fù)制代碼 代碼如下: from sqlalchemy.dialects.mysql import INTEGER
id = Column(INTEGER(unsigned=True), primary_key=True) 模型的屬性名需要和表的字段名不一樣怎么辦? 復(fù)制代碼 代碼如下: from_ = Column('from', CHAR(10))
如何獲取字段的長度? 復(fù)制代碼 代碼如下: User.name.property.columns[0].type.length
如何指定使用 InnoDB,以及使用 UTF-8 編碼? 復(fù)制代碼 代碼如下: class User(BaseModel): __table_args__ = { 'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8' } MySQL 5.5 開始支持存儲 4 字節(jié)的 UTF-8 編碼的字符了,iOS 里自帶的 emoji(如 ?? 字符)就屬于這種。 如果是對表來設(shè)置的話,可以把上面代碼中的 utf8 改成 utf8mb4,DB_CONNECT_STRING 里的 charset 也這樣更改。 如果對庫或字段來設(shè)置,則還是自己寫 SQL 語句比較方便,具體細(xì)節(jié)可參考《How to support full Unicode in MySQL databases》。 不建議全用 utf8mb4 代替 utf8,因?yàn)榍罢吒饕龝加酶嗫臻g。 如何設(shè)置外鍵約束? 復(fù)制代碼 代碼如下: from random import randint
from sqlalchemy import ForeignKey
id = Column(Integer, primary_key=True)
id = Column(Integer, primary_key=True)
for i in xrange(100): session.query(User).filter(User.age < 50).delete() 執(zhí)行這段代碼時,你應(yīng)該會遇到一個錯誤: 復(fù)制代碼 代碼如下: sqlalchemy.exc.IntegrityError: (IntegrityError) (1451, 'Cannot delete or update a parent row: a foreign key constraint fails (`ooxx`.`friendship`, CONSTRAINT `friendship_ibfk_1` FOREIGN KEY (`user_id1`) REFERENCES `user` (`id`))') 'DELETE FROM user WHERE user.age < %s' (50,) 原因是刪除 user 表的數(shù)據(jù),可能會導(dǎo)致 friendship 的外鍵不指向一個真實(shí)存在的記錄。在默認(rèn)情況下,MySQL 會拒絕這種操作,也就是 RESTRICT。InnoDB 還允許指定 ON DELETE 為 CASCADE 和 SET NULL,前者會刪除 friendship 中無效的記錄,后者會將這些記錄的外鍵設(shè)為 NULL。 除了刪除,還有可能更改主鍵,這也會導(dǎo)致 friendship 的外鍵失效。于是相應(yīng)的就有 ON UPDATE 了。其中 CASCADE 變成了更新相應(yīng)的外鍵,而不是刪除。 而在 SQLAlchemy 中是這樣處理的: 復(fù)制代碼 代碼如下: class Friendship(BaseModel):
__tablename__ = 'friendship' id = Column(Integer, primary_key=True) 如何連接表? 復(fù)制代碼 代碼如下: from sqlalchemy import distinct from sqlalchemy.orm import aliased
print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).all() # 所有有朋友的用戶 這里我沒提到 relationship,雖然它看上去很方便,但需要學(xué)習(xí)的內(nèi)容實(shí)在太多,還要考慮很多性能上的問題,所以干脆自己 join 吧。 為什么無法刪除 in 操作查詢出來的記錄? 復(fù)制代碼 代碼如下: session.query(User).filter(User.id.in_((1, 2, 3))).delete() 拋出這樣的異常: 復(fù)制代碼 代碼如下: sqlalchemy.exc.InvalidRequestError: Could not evaluate current criteria in Python. Specify 'fetch' or False for the synchronize_session parameter. 但這樣是沒問題的: 復(fù)制代碼 代碼如下: session.query(User).filter(or_(User.id == 1, User.id == 2, User.id == 3)).delete() 搜了下找到《Sqlalchemy delete subquery》這個問題,提到了 delete 的一個注意點(diǎn):刪除記錄時,默認(rèn)會嘗試刪除 session 中符合條件的對象,而 in 操作估計(jì)還不支持,于是就出錯了。解決辦法就是刪除時不進(jìn)行同步,然后再讓 session 里的所有實(shí)體都過期: 復(fù)制代碼 代碼如下: session.query(User).filter(User.id.in_((1, 2, 3))).delete(synchronize_session=False) session.commit() # or session.expire_all() 此外,update 操作也有同樣的參數(shù),如果后面立刻提交了,那么加上 synchronize_session=False 參數(shù)會更快。
1.定義一個新類,將它的方法設(shè)置為基類的方法: 復(fù)制代碼 代碼如下: class ModelMixin(object): @classmethod def get_by_id(cls, session, id, columns=None, lock_mode=None): if hasattr(cls, 'id'): scalar = False if columns: if isinstance(columns, (tuple, list)): query = session.query(*columns) else: scalar = True query = session.query(columns) else: query = session.query(cls) if lock_mode: query = query.with_lockmode(lock_mode) query = query.filter(cls.id == id) if scalar: return query.scalar() return query.first() return None BaseModel.get_by_id = get_by_id @classmethod @classmethod @classmethod @classmethod @classmethod 雖然很拙劣,但確實(shí)能用。順便還附送了一些有用的玩意,你懂的。 2.設(shè)置 declarative_base() 的 cls 參數(shù): 復(fù)制代碼 代碼如下: BaseModel = declarative_base(cls=ModelMixin) 這種方法不需要執(zhí)行“BaseModel.get_by_id = get_by_id”之類的代碼。不足之處就是 PyCharm 仍然無法找到這些方法的位置。 3.設(shè)置 __abstract__ 屬性: 復(fù)制代碼 代碼如下: class BaseModel(BaseModel): __abstract__ = True __table_args__ = { # 可以省掉子類的 __table_args__ 了 'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8' } # ... 這種方法最簡單,也可以繼承出多個類。 如何正確使用事務(wù)? 復(fù)制代碼 代碼如下: class User(BaseModel): __tablename__ = 'user' id = Column(Integer, primary_key=True) class TanseferLog(BaseModel): id = Column(Integer, primary_key=True) user = User(money=100) 然后開兩個 session,同時進(jìn)行兩次轉(zhuǎn)賬操作: 復(fù)制代碼 代碼如下: session1 = DB_Session() session2 = DB_Session() user1 = session1.query(User).get(1) user1 = session2.query(User).get(1) session1.commit() 現(xiàn)在看看結(jié)果: 復(fù)制代碼 代碼如下: >>> user1.money Decimal('0.00') >>> user2.money Decimal('100.00') >>> session.query(TanseferLog).count() 2L 兩次轉(zhuǎn)賬都成功了,但是只轉(zhuǎn)走了一筆錢,這明顯不科學(xué)。 可見 MySQL InnoDB 雖然支持事務(wù),但并不是那么簡單的,還需要手動加鎖。 復(fù)制代碼 代碼如下: user1 = session1.query(User).with_lockmode('read').get(1) user2 = session1.query(User).with_lockmode('read').get(2) if user1.money >= 100: user1.money -= 100 user2.money += 100 session1.add(TanseferLog(from_user=1, to_user=2, amount=100)) user1 = session2.query(User).with_lockmode('read').get(1) 現(xiàn)在在執(zhí)行 session1.commit() 的時候,因?yàn)?user1 和 user2 都被 session2 加了讀鎖,所以會等待鎖被釋放。超時以后,session1.commit() 會拋出個超時的異常,如果捕捉了的話,或者 session2 在另一個進(jìn)程,那么 session2.commit() 還是能正常提交的。這種情況下,有一個事務(wù)是肯定會提交失敗的,所以那些更改等于白做了。 接下來看看寫鎖,把上段代碼中的 'read' 改成 'update' 即可。這次在執(zhí)行 select 的時候就會被阻塞了: 那么什么時候用讀鎖呢?如果要保證事務(wù)運(yùn)行期間內(nèi),被讀取的數(shù)據(jù)不被修改,自己也不去修改,加讀鎖即可。 另外要注意的是,如果被查詢的字段沒有加索引的話,就會變成鎖整張表了: 復(fù)制代碼 代碼如下: session1.query(User).filter(User.id > 50).with_lockmode('update').all() session2.query(User).filter(User.id < 40).with_lockmode('update').all() # 不會被鎖,因?yàn)?id 是主鍵 session1.rollback() session1.query(User).filter(User.money == 50).with_lockmode('update').all() 要避免的話,可以這樣: 復(fù)制代碼 代碼如下: money = Column(DECIMAL(10, 2), index=True)
另一個注意點(diǎn)是子事務(wù)。 復(fù)制代碼 代碼如下: def step1(): # ... if success: session.commit() return True session.rollback() return False def step2(): session.begin_nested() 此外,rollback 一個子事務(wù),可以釋放這個子事務(wù)中獲得的鎖,提高并發(fā)性和降低死鎖概率。 如何對一個字段進(jìn)行自增操作? 復(fù)制代碼 代碼如下: user = session.query(User).with_lockmode('update').get(1) user.age += 1 session.commit() 如果不想多一次讀的話,這樣寫也是可以的: 復(fù)制代碼 代碼如下: session.query(User).filter(User.id == 1).update({
User.age: User.age + 1 }) session.commit() # 其實(shí)字段之間也可以做運(yùn)算: session.query(User).filter(User.id == 1).update({ User.age: User.age + User.id }) 您可能感興趣的文章:
|
|