Mysql回顾
【摘要】 1、控制语句(1)连接数据库 mysql -uroot -p123456 -P3306 -h127.0.0.1; mysql -uroot -p123456 -P3306 -h127.0.0.1 -e"show databases"; //直接执行SQL语句(2)查看所有用户 select user, host from mysql.user;(3)新增用户 create user test...
.*
#! /usr/bin/python3
import pymysql
def get_connection():
conn = pymysql.connect(
host="xx.xx.xx.xx",
user="root",
passwd="password",
db="test",
port=3306,
charset="utf8",
)
csr = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, csr
def close_connection(conn, csr):
if csr:
csr.close()
if conn:
conn.close()
def query_sql(sql, params=None):
result = None
conn, csr = get_connection()
try:
csr.execute(sql, params)
result = csr.fetchall()
except Exception as e:
print(str(e))
finally:
close_connection(conn, csr)
return result
def update_sql(sql, params=None):
result = None
conn, csr = get_connection()
try:
result = csr.execute(sql, params)
conn.commit()
except Exception as e:
conn.rollback()
print(str(e))
finally:
close_connection(conn, csr)
return result
if __name__ == "__main__":
# ex_sql = "insert into tb (id, type, num, price, visit) values (%s, %s, %s, %s, %s )"
# res = update_sql(ex_sql, ("12348", "english", 50, 38.5, 1200))
# print(res)
ex_sql = "select * from tb where visit > %s"
res = query_sql(ex_sql, (500,))
for r in res:
print(r["id"], r["type"])
#! /usr/bin/python3
import pymysql
from dbutils.pooled_db import PooledDB
class DB(object):
__pool = None
def __init__(self):
self.pool = DB.__get_conn_pool()
@staticmethod
def __get_conn_pool():
if DB.__pool is None:
DB.__pool = PooledDB(
creator=pymysql,
host="xx.xx.xx.xx",
user="root",
passwd="password",
db="test",
port=3306,
charset="utf8",
maxconnections=10240,
)
return DB.__pool
def _get_connection(self):
conn = self.pool.connection()
csr = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, csr
def _close_connection(self, conn, csr):
if csr:
csr.close()
if conn:
conn.close()
def query_sql(self, sql, params=None):
result = None
conn, csr = self._get_connection()
try:
csr.execute(sql, params)
result = csr.fetchall()
except Exception as e:
print(str(e))
finally:
self._close_connection(conn, csr)
return result
def update_sql(self, sql, params=None):
result = None
conn, csr = self._get_connection()
try:
result = csr.execute(sql, params)
conn.commit()
except Exception as e:
conn.rollback()
print(str(e))
finally:
self._close_connection(conn, csr)
return result
if __name__ == "__main__":
db = DB()
# ex_sql = "insert into tb (id, type, num, price, visit) values (%s, %s, %s, %s, %s )"
# res = db.update_sql(ex_sql, ("12348", "english", 50, 38.5, 1200))
# print(res)
ex_sql = "select * from tb where visit > %s"
res = db.query_sql(ex_sql, (500,))
for r in res:
print(r['id'], r['type'])
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)