Python:Peewee实践记录
【摘要】 文档:http://docs.peewee-orm.com/
安装
$ pip install peewee
1
将已有数据表转为Model
# 导出数据表为Model
$ python -m pwiz -e mysql -H localhost -p 3306 -u root -P -o -i -t user data > user.py
12
打印执...
文档:http://docs.peewee-orm.com/
安装
$ pip install peewee
- 1
将已有数据表转为Model
# 导出数据表为Model
$ python -m pwiz -e mysql -H localhost -p 3306 -u root -P -o -i -t user data > user.py
- 1
- 2
打印执行SQL
import logging
# 打印日志
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
logger.propagate = False # 不向上传播
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
modle和dict转换
from playhouse.shortcuts import model_to_dict, dict_to_model
# 快捷方式类
class ShortcutModel(object): @classmethod def from_dict(cls, data, ignore_unknown=False): return dict_to_model(cls, data=data, ignore_unknown=ignore_unknown) def to_dict(self, recurse=True, backrefs=False, only=None, exclude=None, seen=None, extra_attrs=None, fields_from_query=None, max_depth=None, manytomany=False): return model_to_dict( self, recurse=recurse, backrefs=backrefs, only=only, exclude=exclude, seen=seen, extra_attrs=extra_attrs, fields_from_query=fields_from_query, max_depth=max_depth, manytomany=manytomany )
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
示例
class BaseModel(Model, ShortcutModel): class Meta: database = db
- 1
- 2
- 3
外键关联操作
1、建表
from datetime import datetime
from peewee import *
db = SqliteDatabase('people.db')
class BaseModel(Model): class Meta: database = db class Person(BaseModel): id = IntegerField(primary_key=True) name = CharField() birthday = DateField() class Meta: table_name = 'person'
class Pet(BaseModel): id = IntegerField(primary_key=True) # 一对多: 一个Person -> 多个Pet owner = ForeignKeyField(Person, backref='pets') name = CharField() create_time = DateTimeField(default=datetime.now) update_time = DateTimeField(default=datetime.now) class Meta: table_name = 'pet'
# 创建表
db.create_tables([Person, Pet])
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
2、初始化数据
def init_data(): person1 = Person.create(name='Tom', birthday='2020-01-01') Pet.create(owner=person1, name='Dog') Pet.create(owner=person1, name='Cat') person2 = Person.create(name='Jack', birthday='2020-01-02') Pet.create(owner=person2, name='Dog') Pet.create(owner=person2, name='Cat')
init_data()
"""
sqlite> select * from person;
id name birthday
---------- ---------- ----------
1 Tom 2020-01-01
2 Jack 2020-01-02
sqlite> select * from pet;
id owner_id name create_time update_time ---------- ---------- ---------- ------------------- -------------------
1 1 Dog 2021-03-02 10:16:07 2021-03-02 10:16:07
2 1 Cat 2021-03-02 10:16:07 2021-03-02 10:16:07
3 2 Dog 2021-03-02 10:36:01 2021-03-02 10:36:01
4 2 Cat 2021-03-02 10:36:01 2021-03-02 10:36:01
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
3、N+1问题
3-1、一对多,取列表
for row in Person.select(): print(row.name) for pet in row.pets: print(pet.name)
"""
取N+1次 : 先取一次person列表;然后一个Person对象,取一次pets列表
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1"', [])
('SELECT "t1"."id", "t1"."owner_id", "t1"."name", "t1"."create_time", "t1"."update_time" FROM "pet" AS "t1" WHERE ("t1"."owner_id" = ?)', [1])
('SELECT "t1"."id", "t1"."owner_id", "t1"."name", "t1"."create_time", "t1"."update_time" FROM "pet" AS "t1" WHERE ("t1"."owner_id" = ?)', [2])
"""
# 优化后方法
users = Person.select()
pets = Pet.select()
users_with_pets = prefetch(users, pets)
for row in users_with_pets: print(row.name) for pet in row.pets: print(pet.name)
"""
固定取两次:一次person列表;一次pet列表
('SELECT "t1"."id", "t1"."owner_id", "t1"."name", "t1"."create_time", "t1"."update_time" FROM "pet" AS "t1" WHERE ("t1"."owner_id" IN (SELECT "t2"."id" FROM "person" AS "t2"))', [])
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1"', [])
"""
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
3-2、一对一查询,取列表
pets = Pet.select()
for pet in pets: print(pet.name, pet.owner.name)
"""
N+1次查询:首先取pet列表;逐个取pet对应的person
('SELECT "t1"."id", "t1"."owner_id", "t1"."name", "t1"."create_time", "t1"."update_time" FROM "pet" AS "t1"', [])
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1" WHERE ("t1"."id" = ?) LIMIT ? OFFSET ?', [1, 1, 0])
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1" WHERE ("t1"."id" = ?) LIMIT ? OFFSET ?', [1, 1, 0])
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1" WHERE ("t1"."id" = ?) LIMIT ? OFFSET ?', [2, 1, 0])
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1" WHERE ("t1"."id" = ?) LIMIT ? OFFSET ?', [2, 1, 0])
"""
# 优化方法
pets = Pet.select(Pet, Person).join(Person)
for pet in pets: print(pet.name, pet.owner.name)
"""
固定取1次
('SELECT
"t1"."id", "t1"."owner_id", "t1"."name", "t1"."create_time", "t1"."update_time",
"t2"."id", "t2"."name", "t2"."birthday"
FROM "pet" AS "t1" INNER JOIN "person" AS "t2"
ON ("t1"."owner_id" = "t2"."id")', [])
"""
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
打印SQL执行耗时
from functools import wraps
import time
import logging
from peewee import MySQLDatabase, SENTINEL, Model
logger = logging.getLogger('peewee')
# 计时器
def timer(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() ret = func(*args, **kwargs) end_time = time.time() logger.debug("time: %.2f s" % (end_time - start_time)) return ret return wrapper
class CustomMySQLDatabase(MySQLDatabase): """ 打印sql执行时间 see: https://github.com/coleifer/peewee/issues/2370 """ @timer def execute_sql(self, sql, params=None, commit=SENTINEL): return super().execute_sql(sql, params, commit)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
支持原样查询返回dict字典对象
使用cator
模块
pip install cator
- 1
doc: https://github.com/mouday/cator
from peewee import MySQLDatabase
from cator import DatabaseProxy
db = MySQLDatabase(**config)
db_proxy = DatabaseProxy(db)
- 1
- 2
- 3
- 4
- 5
- 6
单个模型数据转dict字典
from copy import deepcopy
class BaseModel(Model): def to_dict(self): """model to dict""" return deepcopy(self.__data__) class Meta: database = db
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
文章来源: pengshiyu.blog.csdn.net,作者:彭世瑜,版权归原作者所有,如需转载,请联系作者。
原文链接:pengshiyu.blog.csdn.net/article/details/114281093
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)