我的第一个python web开发框架(11)——工具函数包说明(二)

举报
AllEmpty 发表于 2020/03/10 14:45:38 2020/03/10
【摘要】   db_helper.py是数据库操作包,主要有两个函数,分别是read()数据库读操作函数和write()数据库写操作函数。#!/usr/bin/env python# coding=utf-8import psycopg2from common import log_helperfrom config import const# 初始化数据库参数db_name = const.DB_N...

  db_helper.py是数据库操作包,主要有两个函数,分别是read()数据库读操作函数和write()数据库写操作函数。

#!/usr/bin/env python
# coding=utf-8

import psycopg2
from common import log_helper
from config import const

# 初始化数据库参数
db_name = const.DB_NAME
db_host = const.DB_HOST
db_port = const.DB_PORT
db_user = const.DB_USER
db_pass = const.DB_PASS


def read(sql):
    """
    连接pg数据库并进行数据查询
    如果连接失败,会把错误写入日志中,并返回false,如果sql执行失败,也会把错误写入日志中,并返回false
    如果所有执行正常,则返回查询到的数据,这个数据是经过转换的,转成字典格式,方便模板调用,其中字典的key是数据表里的字段名
    """
    try:
        # 连接数据库
        conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)
        # 获取游标
        cursor = conn.cursor()
    except Exception as e:
        print(e.args)
        log_helper.error('连接数据库失败:' + str(e.args))
        return False
    try:
        # 执行查询操作
        cursor.execute(sql)
        # 将返回的结果转换成字典格式
        data = [dict((cursor.description[i][0], value) for i, value in enumerate(row)) for row in cursor.fetchall()]
    except Exception as e:
        print(e.args)
        log_helper.error('sql执行失败:' + str(e.args) + ' sql:' + str(sql))
        return False
    finally:
        # 关闭游标和数据库链接
        cursor.close()
        conn.close()
    # 返回结果(字典格式)
    return data


def write(sql, vars):
    """
    连接pg数据库并进行写的操作
    如果连接失败,会把错误写入日志中,并返回false,如果sql执行失败,也会把错误写入日志中,并返回false,如果所有执行正常,则返回true
    """
    try:
        # 连接数据库
        conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)
        # 获取游标
        cursor = conn.cursor()
    except Exception as e:
        print(e.args)
        log_helper.error('连接数据库失败:' + str(e.args))
        return False
    try:
        # 执行sql语句
        cursor.execute(sql, vars)
        # 提交事务
        conn.commit()
    except Exception as e:
        print(e.args)
        # 如果出错,则事务回滚
        conn.rollback()
        log_helper.error('sql执行失败:' + str(e.args) + ' sql:' + str(sql))
        return False
    else:
        # 获取数据
        try:
            data = [dict((cursor.description[i][0], value) for i, value in enumerate(row))
                         for row in cursor.fetchall()]
        except Exception as e:
            # 没有设置returning或执行修改或删除语句时,记录不存在
            data = None
    finally:
        # 关闭游标和数据库链接
        cursor.close()
        conn.close()

    # 如果写入数据后,将数据库返回的数据返回给调用者
    return data

  read(sql)是用来执行数据库查询操作,里面没有事务提交,所以用它来执行增删改操作时,虽然能提交成功,但执行后数据库记录也不会有什么变化,所以只能用它来执行select语句

  write(sql, data)是用来执行数据库写操作的,write函数执行后会返回下面几种状态:

  1.False状态(数据库链接失败、sql语句不正确、链接数据库操时等执行数据库出现异常时返回这个状态)

  2.None状态(sql语句没有添加RETURNING id代码指定sql语句执行结束后返回指定字段值时出现;你如果修改代码第80行,data = None为data = True,执行成功时则会返回True状态)

  3.[] (sql语句添加了returning函数,且执行修改或删除时,记录不存在)

  4.{'id': 1,}(sql语句添加了returning函数,执行成功后返回我们指定的字段值)

  PS:我们在执行新增的时候,如果想要获取新增的id,postgresql有一个非常好用的函数returning,只需要在增删改语句的后面添加returning idreturning id,namereturning *等你想要返回的字段名称,语句执行成功以后都会返回这些指定的字段值,大家可以尝试修改测试用例代码,看看返回的值是什么。

#!/usr/bin/evn python
# coding=utf-8

import unittest
from common import db_helper


class DbHelperTest(unittest.TestCase):
    """数据库操作包测试类"""

    def setUp(self):
        """初始化测试环境"""
        print('------ini------')

    def tearDown(self):
        """清理测试环境"""
        print('------clear------')

    def test(self):
        # 新增记录,不带return参数
        sql = """
            INSERT INTO product_class(
              name, is_enable)
            VALUES (%s, %s)
        """
        data = ('糖果', 1)
        result = db_helper.write(sql, data)
        print(result)

        # 新增记录,使用return参数返回新增id
        sql = """
            INSERT INTO product_class(
              name, is_enable)
            VALUES (%s, %s)
            RETURNING id;
        """
        data = ('饼干', 1)
        result = db_helper.write(sql, data)
        print(result)

        # 修改不存在的记录
        sql = """
            UPDATE product_class
               SET name=%s, is_enable=%s
            WHERE id=10000
            RETURNING id;
        """
        data = ('糖果', 1)
        result = db_helper.write(sql, data)
        print(result)

        # 查询记录
        sql = """
            SELECT * FROM product_class
        """
        result = db_helper.read(sql)
        print(result)

if __name__ == '__main__':
    unittest.main()


  执行结果

------ini------
None
[{'id': 2}]
[]
[{'id': 1, 'name': '糖果', 'add_time': datetime.datetime(2017, 10, 16, 14, 51, 49), 'is_enable': 1}, {'id': 2, 'name': '饼干', 'add_time': datetime.datetime(2017, 10, 16, 15, 30, 50), 'is_enable': 1}]
------clear------


  encrypt_helper.py是加密操作包,目前只有md5加密函数,其他加密函数以后有需要再添加进来

#!/usr/bin/evn python
# coding=utf-8

import hashlib

def md5(text):
    """md5加密函数"""
    md5 = hashlib.md5()
    if not isinstance(text, bytes):
        text = str(text).encode('utf-8')
    md5.update(text)
    return md5.hexdigest()


  md5()参数类型支持各种类型,如果参数为非bytes类型时会自动转换为str类型来进行操作,例如以下测试用例,大家也可以尝试用元组、字典或列表类型测试看看结果

#!/usr/bin/evn python
# coding=utf-8

import unittest
from common import encrypt_helper


class DbHelperTest(unittest.TestCase):
    """数据库操作包测试类"""

    def setUp(self):
        """初始化测试环境"""
        print('------ini------')

    def tearDown(self):
        """清理测试环境"""
        print('------clear------')

    def test(self):
        result = encrypt_helper.md5(1)
        print(result)
        self.assertEqual(result, 'c4ca4238a0b923820dcc509a6f75849b')

        result = encrypt_helper.md5('1')
        print(result)
        self.assertEqual(result, 'c4ca4238a0b923820dcc509a6f75849b')

        result = encrypt_helper.md5(b'1')
        print(result)
        self.assertEqual(result, 'c4ca4238a0b923820dcc509a6f75849b')

if __name__ == '__main__':
    unittest.main()


  执行结果

------ini------
c4ca4238a0b923820dcc509a6f75849b
c4ca4238a0b923820dcc509a6f75849b
c4ca4238a0b923820dcc509a6f75849b
------clear------


  except_helper.py包主要功能是获取代码当前位置的堆栈信息,它被log_helper.py包的error()错误日志记录函数调用,输出发生错误时的堆栈信息内容,方便开发人员分析代码异常。

#!/usr/bin/evn python
# coding=utf-8

import os
import sys

def detailtrace():
    """获取程序当前运行的堆栈信息"""
    retStr = ""
    f = sys._getframe()
    f = f.f_back        # first frame is detailtrace, ignore it
    while hasattr(f, "f_code"):
        co = f.f_code
        retStr = "%s(%s:%s)->"%(os.path.basename(co.co_filename),
                  co.co_name,
                  f.f_lineno) + retStr
        f = f.f_back
    return retStr


  json_helper.py包里只有一个日期格式化类。

#!/usr/bin/evn python
# coding=utf-8

import json
import datetime

class CJsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.strftime('%Y-%m-%d %H:%M:%S')
        elif isinstance(obj, datetime.date):
            return obj.strftime('%Y-%m-%d')
        else:
            return json.JSONEncoder.default(self, obj)


  python的json将时间类型转换为字符串时,它会处理不了出现异常,需要使用这个自定义类进行格式化处理

  比如我们如果直接这样对时间类型进行转换时,就会出现异常:

def test(self):
        js = {
            'test5': datetime.datetime.now(),
        }
        print(js)
        result = json.dumps(js)
        print(result)


  执行结果:

------ini------
{'test5': datetime.datetime(2017, 10, 16, 16, 56, 58, 654832)}
------clear------

Error
Traceback (most recent call last):
  File "E:\Python\simple\code\test\json_helper_test.py", line 26, in test
    result = json.dumps(js)
  File "C:\Users\Empty\AppData\Local\Programs\Python\Python35-32\lib\json\__init__.py", line 230, in dumps
    return _default_encoder.encode(obj)
  File "C:\Users\Empty\AppData\Local\Programs\Python\Python35-32\lib\json\encoder.py", line 198, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "C:\Users\Empty\AppData\Local\Programs\Python\Python35-32\lib\json\encoder.py", line 256, in iterencode
    return _iterencode(o, 0)
  File "C:\Users\Empty\AppData\Local\Programs\Python\Python35-32\lib\json\encoder.py", line 179, in default
    raise TypeError(repr(o) + " is not JSON serializable")
TypeError: datetime.datetime(2017, 10, 16, 16, 56, 58, 654832) is not JSON serializable


  改成下面代码的话就正常了

def test(self):
        js = {
            'test5': datetime.datetime.now(),
        }
        print(js)
        result = json.dumps(js, cls=json_helper.CJsonEncoder)
        print(result)


  执行结果

------ini------
{'test5': datetime.datetime(2017, 10, 16, 16, 59, 40, 756103)}
{"test5": "2017-10-16 16:59:40"}
------clear------


  本文对应的源码下载

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。