Python--数据库的写法
在python中写sql语句的方法(在实际工程中,PostgreSQL数据库比Mysql数据库更常用、效率更高!这二者都是关系型数据库,都是用sql语句进行操作):
首先建立一个pg数据库类:
# *-*- coding: utf-8 -*-
from psycopg2 import pool
from trafficgo_nash.utils.config import CONF
from trafficgo_nash.pipeline.wcc_client import get_wcc_detype
from trafficgo_nash.offline.trafficgo.common.constants import WCC_TOOL_DEFAULT
class DataBaseService(object):
"""PostgreSQL数据库操作封装
"""
def __init__(self, minconn=2, maxconn=10):
# 通过pool,连接到云上的PostgreSQL数据库中
self.connectPool = pool.SimpleConnectionPool(
minconn,
maxconn,
host=CONF.pgdb_ip, # 云上PostgreSQL数据库的ip
port=CONF.pgdb_port, # 云上PostgreSQL数据库的端口号
user=CONF.pgdb_user, # 云上PostgreSQL数据库的用户名
password=CONF.pgdb_passwd, # 云上PostgreSQL数据库的登录密码
database=CONF.pgdb_database, # 要访问的数据库名
options=f"-c search_path={CONF.pgdb_schema}", # 要访问的Schema名(即文件夹名)
keepalives=1,
keepalives_idle=30,
keepalives_interval=10,
keepalives_count=5)
def getConnect(self):
conn = self.connectPool.getconn()
cursor = conn.cursor()
return conn, cursor
def closeConnect(self, conn, cursor):
cursor.close()
self.connectPool.putconn(conn)
def closeAll(self):
self.connectPool.closeall()
def execute(self, sql, value=None):
conn, cursor = self.getConnect()
try:
res = cursor.execute(sql, value)
conn.commit()
return res
except Exception as e:
conn.rollback()
raise e
finally:
self.closeConnect(conn, cursor)
def executemany(self, sql, value=None):
conn, cursor = self.getConnect()
try:
res = cursor.executemany(sql, value)
conn.commit()
return res
except Exception as e:
conn.rollback()
raise e
finally:
self.closeConnect(conn, cursor)
def fetchone(self, sql):
conn, cursor = self.getConnect()
cursor.execute(sql)
result = cursor.fetchone()
self.closeConnect(conn, cursor)
return result
def fetchall(self, sql):
conn, cursor = self.getConnect()
cursor.execute(sql)
result = cursor.fetchall()
self.closeConnect(conn, cursor)
return result
然后我们就可以运行以下代码,进行表中数据的插入了:
db_service = DataBaseService() # 初始化上面第2点中的类
# sql_insert_query中的语句就是sql语句
sql_insert_query = """ INSERT INTO training_plan (cross_id, origin_plan,
zero_plan, plan_crosses, create_time)
VALUES (%s,%s,%s,%s,%s)""" # 在插入的时候,即使插入的cross_id是int类型,但是也是用:%s
# records_to_insert是插入PostgreSQL数据库的一行数据
records_to_insert = [(cross_id,
str(original_plan).replace("\'", "\""), # original_plan是一个很大的json,这里是将其变为1个很长的string,存在PostgreSQL数据库的某行某列中!
str(zero_plan).replace("\'", "\""), # zero_plan是一个很大的json,这里是将其变为1个很长的string,存在PostgreSQL数据库的某行某列中!
','.join([get_system_id(one_id) for one_id in
CONF.all_cross]),
str(datetime.datetime.now()))]
# 特别注意:这里用.replace("\'", "\"")将string中的单引号,替换为双引号!注意单引号、双引号前面要加上转义符:'\'! 参考:https://www.runoob.com/python/python-strings.html
# 执行该sql语句!
db_service.executemany(sql_insert_query, records_to_insert)
同时,我们也可以运行以下语句,查询目前id的最大值:(id是表中每一行的行号(主键),从1开始编号,比如我们现在数据有10行,那么id就是从1~10,比如我把第8行删掉了,那么id=8这一行就不存在了,这时我再新插入一行,那么id就是从11开始的,不会回到8的!所以表中的行数不一定就等于最大的id号!)
db_service = DataBaseService() # 初始化上面第2点中的类
sql_inquire_query = """ SELECT MAX(id) FROM training_plan """
print(db_service.fetchone(sql_inquire_query)[0])
- 点赞
- 收藏
- 关注作者
评论(0)