Python:Peewee实践记录

举报
彭世瑜 发表于 2021/08/13 23:02:50 2021/08/13
【摘要】 文档:http://docs.peewee-orm.com/ 安装 $ pip install peewee 1 将已有数据表转为Model # 导出数据表为Model $ python -m pwiz -e mysql -H localhost -p 3306 -u root -P -o -i -t user data > user.py 12 打印执...

文档:http://docs.peewee-orm.com/

安装

$ pip install peewee

  
 
  • 1

将已有数据表转为Model

# 导出数据表为Model
$ python -m pwiz -e mysql -H localhost -p 3306 -u root -P -o -i -t user data > user.py

  
 
  • 1
  • 2

打印执行SQL


import logging

# 打印日志
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
logger.propagate = False  # 不向上传播

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

modle和dict转换

from playhouse.shortcuts import model_to_dict, dict_to_model

# 快捷方式类
class ShortcutModel(object): @classmethod def from_dict(cls, data, ignore_unknown=False): return dict_to_model(cls, data=data, ignore_unknown=ignore_unknown) def to_dict(self, recurse=True, backrefs=False, only=None, exclude=None, seen=None, extra_attrs=None, fields_from_query=None, max_depth=None, manytomany=False): return model_to_dict( self, recurse=recurse, backrefs=backrefs, only=only, exclude=exclude, seen=seen, extra_attrs=extra_attrs, fields_from_query=fields_from_query, max_depth=max_depth, manytomany=manytomany )

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

示例

class BaseModel(Model, ShortcutModel): class Meta: database = db

  
 
  • 1
  • 2
  • 3

外键关联操作

1、建表

from datetime import datetime

from peewee import *


db = SqliteDatabase('people.db')

class BaseModel(Model): class Meta: database = db class Person(BaseModel): id = IntegerField(primary_key=True) name = CharField() birthday = DateField() class Meta: table_name = 'person'


class Pet(BaseModel): id = IntegerField(primary_key=True) # 一对多: 一个Person -> 多个Pet owner = ForeignKeyField(Person, backref='pets') name = CharField() create_time = DateTimeField(default=datetime.now) update_time = DateTimeField(default=datetime.now) class Meta: table_name = 'pet'

# 创建表
db.create_tables([Person, Pet])

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

2、初始化数据

def init_data(): person1 = Person.create(name='Tom', birthday='2020-01-01') Pet.create(owner=person1, name='Dog') Pet.create(owner=person1, name='Cat') person2 = Person.create(name='Jack', birthday='2020-01-02') Pet.create(owner=person2, name='Dog') Pet.create(owner=person2, name='Cat')


init_data()

"""
sqlite> select * from person;
id name birthday  
----------  ----------  ----------
1 Tom 2020-01-01
2 Jack 2020-01-02

sqlite> select * from pet;
id owner_id name create_time update_time ----------  ----------  ----------  -------------------  -------------------
1 1 Dog 2021-03-02 10:16:07  2021-03-02 10:16:07
2 1 Cat 2021-03-02 10:16:07  2021-03-02 10:16:07
3 2 Dog 2021-03-02 10:36:01  2021-03-02 10:36:01
4 2 Cat 2021-03-02 10:36:01  2021-03-02 10:36:01


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

3、N+1问题

3-1、一对多,取列表


for row in Person.select(): print(row.name) for pet in row.pets: print(pet.name)

"""
取N+1次 : 先取一次person列表;然后一个Person对象,取一次pets列表
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1"', [])
('SELECT "t1"."id", "t1"."owner_id", "t1"."name", "t1"."create_time", "t1"."update_time" FROM "pet" AS "t1" WHERE ("t1"."owner_id" = ?)', [1])
('SELECT "t1"."id", "t1"."owner_id", "t1"."name", "t1"."create_time", "t1"."update_time" FROM "pet" AS "t1" WHERE ("t1"."owner_id" = ?)', [2])
"""

# 优化后方法

users = Person.select()
pets = Pet.select()
users_with_pets = prefetch(users, pets)
for row in users_with_pets: print(row.name) for pet in row.pets: print(pet.name)

"""
固定取两次:一次person列表;一次pet列表
('SELECT "t1"."id", "t1"."owner_id", "t1"."name", "t1"."create_time", "t1"."update_time" FROM "pet" AS "t1" WHERE ("t1"."owner_id" IN (SELECT "t2"."id" FROM "person" AS "t2"))', [])
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1"', [])
"""

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

3-2、一对一查询,取列表

pets = Pet.select()
for pet in pets: print(pet.name, pet.owner.name)

"""
N+1次查询:首先取pet列表;逐个取pet对应的person
('SELECT "t1"."id", "t1"."owner_id", "t1"."name", "t1"."create_time", "t1"."update_time" FROM "pet" AS "t1"', [])
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1" WHERE ("t1"."id" = ?) LIMIT ? OFFSET ?', [1, 1, 0])
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1" WHERE ("t1"."id" = ?) LIMIT ? OFFSET ?', [1, 1, 0])
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1" WHERE ("t1"."id" = ?) LIMIT ? OFFSET ?', [2, 1, 0])
('SELECT "t1"."id", "t1"."name", "t1"."birthday" FROM "person" AS "t1" WHERE ("t1"."id" = ?) LIMIT ? OFFSET ?', [2, 1, 0])
"""

# 优化方法

pets = Pet.select(Pet, Person).join(Person)
for pet in pets: print(pet.name, pet.owner.name)
"""
固定取1次
('SELECT 
"t1"."id", "t1"."owner_id", "t1"."name", "t1"."create_time", "t1"."update_time", 
"t2"."id", "t2"."name", "t2"."birthday" 
FROM "pet" AS "t1" INNER JOIN "person" AS "t2" 
ON ("t1"."owner_id" = "t2"."id")', [])
"""

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

打印SQL执行耗时

from functools import wraps
import time
import logging
from peewee import MySQLDatabase, SENTINEL, Model


logger = logging.getLogger('peewee')

# 计时器
def timer(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() ret = func(*args, **kwargs) end_time = time.time() logger.debug("time: %.2f s" % (end_time - start_time)) return ret return wrapper


class CustomMySQLDatabase(MySQLDatabase): """ 打印sql执行时间 see: https://github.com/coleifer/peewee/issues/2370 """ @timer def execute_sql(self, sql, params=None, commit=SENTINEL): return super().execute_sql(sql, params, commit)


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

支持原样查询返回dict字典对象

使用cator 模块

pip install cator

  
 
  • 1

doc: https://github.com/mouday/cator

from peewee import MySQLDatabase
from cator import DatabaseProxy

db = MySQLDatabase(**config)
db_proxy = DatabaseProxy(db)


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

单个模型数据转dict字典

from copy import deepcopy

class BaseModel(Model): def to_dict(self): """model to dict""" return deepcopy(self.__data__) class Meta: database = db


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

文档:avoiding-the-n-1-problem

文章来源: pengshiyu.blog.csdn.net,作者:彭世瑜,版权归原作者所有,如需转载,请联系作者。

原文链接:pengshiyu.blog.csdn.net/article/details/114281093

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。