SQLAlchemy入门

举报
kwan的解忧杂货铺 发表于 2024/08/30 23:11:13 2024/08/30
【摘要】 一.基础介绍SQLAlchemy 是一个 Python 的 SQL 工具包和对象关系映射(ORM)工具,它提供了一个高层的 ORM 以及底层的 SQL 表达式语言。SQLAlchemy 是开源的,并且可以在商业和非商业项目中免费使用。它支持多种数据库系统,包括 PostgreSQL、MySQL、SQLite 等。 1. SQLAlchemy 的起源SQLAlchemy 最初由 Michae...

一.基础介绍

SQLAlchemy 是一个 Python 的 SQL 工具包和对象关系映射(ORM)工具,它提供了一个高层的 ORM 以及底层的 SQL 表达式语言。SQLAlchemy 是开源的,并且可以在商业和非商业项目中免费使用。它支持多种数据库系统,包括 PostgreSQL、MySQL、SQLite 等。

1. SQLAlchemy 的起源

SQLAlchemy 最初由 Michael Bayer 在 2005 年创建,目的是提供一个全面的 SQL 工具包和 ORM 解决方案,以满足 Python 社区的需求。随着时间的推移,SQLAlchemy 不断发展和完善,成为了 Python 数据库编程领域中最受欢迎的库之一。

2. SQLAlchemy 的核心组件

2.1 核心 SQL 工具包

SQLAlchemy 的核心 SQL 工具包提供了构建 SQL 查询的功能,它允许开发者以 Pythonic 的方式编写 SQL 语句。这包括了对数据库表的创建、数据的增删改查等操作。

2.2 ORM 层

ORM(Object-Relational Mapping)层是 SQLAlchemy 的另一个重要组成部分,它允许开发者使用 Python 类和对象来表示数据库中的表和行。ORM 层抽象了数据库操作,使得开发者可以不必编写 SQL 语句,而是通过操作 Python 对象来间接地与数据库交互。

3. SQLAlchemy 的优势

3.1 灵活性

SQLAlchemy 提供了灵活的 SQL 构建工具,开发者可以自由地编写 SQL 语句,同时也可以利用 ORM 层提供的抽象来简化数据库操作。

3.2 跨数据库支持

SQLAlchemy 支持多种数据库系统,这意味着开发者可以使用相同的代码库来操作不同的数据库,而不需要为每种数据库编写特定的代码。

3.3 强大的社区支持

由于 SQLAlchemy 的流行,它拥有一个活跃的社区,开发者可以在社区中找到大量的资源和帮助,包括文档、教程和第三方库。

二.实战步骤

1.数据库配置

# 数据库
database:
  TYPE: mysql
  DATABASE_URL: mysql://root:xxx@xxxx:9306/test?serverTimezone=Asia/Shanghai
  USERNAME: root
  PASSWORD: xxx
  HOST: xxxx
  PORT: 9306
  DBNAME: test
  MAX_OVERFLOW: 60
  POOL_TIMEOUT: 120
  POOL_SIZE: 30
  URL_PROPERTY: ?charset=utf8
  ECHO: True

2.model

from datetime import datetime

import pytz
from sqlalchemy import String, Column, Text, DateTime, JSON
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, attributes


def get_beijing_now():
    # 获取当前系统时区
    return datetime.now(pytz.timezone('Asia/Shanghai'))


# 基类
class Base(AsyncAttrs, DeclarativeBase):
    id: Mapped[int] = mapped_column(primary_key=True)
    create_time = Column(DateTime, default=get_beijing_now, nullable=False)
    update_time = Column(DateTime, default=get_beijing_now, onupdate=get_beijing_now, nullable=False)

    def to_dict(self):
        """
        转为字典输出
        :return:
        """
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}


@repr_generator
class AlchemyEntitySchemas(Base):
    __tablename__ = "entity_schemas"

    name = Column(String(255), nullable=False, comment='名称')

3.连接配置

from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.sql import text
from base.config import get_config_key
from urllib.parse import quote_plus as urlquote
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine, async_sessionmaker


class Database:

    def __init__(self, url, pool_size=30, pool_timeout=1200, max_overflow=60, echo=False):
        try:
            self.engine = create_engine(url, poolclass=QueuePool, pool_size=pool_size,
                                        max_overflow=max_overflow, pool_timeout=pool_timeout,
                                        echo=echo, pool_recycle=7200, pool_pre_ping=True, echo_pool=echo)
            self.Session = sessionmaker(bind=self.engine, expire_on_commit=False, autocommit=False, autoflush=False)
            print("Database connected successfully.")
        except SQLAlchemyError as e:
            print(f"Error connecting to the database: {e}")

    def get_session(self):
        return self.Session()

    @staticmethod
    def close_session(_session):
        _session.close()

    @staticmethod
    def execute_query(query, _session):
        try:
            result = _session.execute(query)
            return result.fetchall()
        except SQLAlchemyError as e:
            print(f"Error executing query: {e}")
            return None
        finally:
            Database.close_session(_session)


class SyncDatabase:
    async_engine: AsyncEngine = None
    async_session = None

    def __init__(self, url, pool_size=30, pool_timeout=1200, max_overflow=60, echo=False):
        self.url = url
        self.max_overflow = max_overflow
        self.pool_timeout = pool_timeout
        self.pool_size = pool_size
        self.echo = echo
        self.connect()

    def connect(self):
        """创建数据库引擎和会话类"""
        try:
            self.async_engine = create_async_engine(self.url, echo=self.echo, pool_size=pool_size,
                                                    max_overflow=max_overflow, pool_timeout=pool_timeout,
                                                    pool_recycle=7200,
                                                    pool_pre_ping=True, echo_pool=self.echo)
            self.async_session = async_sessionmaker(bind=self.async_engine, class_=AsyncSession, expire_on_commit=False,
                                                    autocommit=False, autoflush=False)
            print("Database connected successfully.")
        except SQLAlchemyError as e:
            print(f"Error connecting to the database: {e}")


def get_db_url():
    userName = get_config_key("database", "USERNAME")
    password = get_config_key("database", "PASSWORD")
    dbHost = get_config_key("database", "HOST")
    dbPort = get_config_key("database", "PORT")
    dbName = get_config_key("database", "DBNAME")
    urlProperty = get_config_key("database", "URL_PROPERTY")

    if dbName is None:
        return f'mysql+pymysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}{urlProperty}'
    else:
        return f'mysql+pymysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}/{dbName}{urlProperty}'


def get_sync_db_url():
    userName = get_config_key("database", "USERNAME")
    password = get_config_key("database", "PASSWORD")
    dbHost = get_config_key("database", "HOST")
    dbPort = get_config_key("database", "PORT")
    dbName = get_config_key("database", "DBNAME")
    urlProperty = get_config_key("database", "URL_PROPERTY")
    if dbName is None:
        return f'mysql+aiomysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}{urlProperty}'
    else:
        return f'mysql+aiomysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}/{dbName}{urlProperty}'


url = get_db_url()
max_overflow = get_config_key("database", "MAX_OVERFLOW")
pool_timeout = get_config_key("database", "POOL_TIMEOUT")
pool_size = get_config_key("database", "POOL_SIZE")
echo = get_config_key("database", "ECHO")

# sqlalchemy实际操作对象,导入的时候应该导入这个对象
get_sqlalchemy_db = Database(url, pool_size=pool_size, pool_timeout=pool_timeout, max_overflow=max_overflow, echo=echo)

# 异步的
SYNC_DB_URI = get_sync_db_url()
_async_engine = create_async_engine(SYNC_DB_URI, echo=echo, pool_size=pool_size,
                                    max_overflow=max_overflow, pool_timeout=pool_timeout, pool_recycle=7200,
                                    pool_pre_ping=True, echo_pool=echo)
# 异步IO的 sqlalchemy实际操作对象,导入的时候应该导入这个对象
async_session_factory = async_sessionmaker(bind=_async_engine, class_=AsyncSession, expire_on_commit=False,
                                           autocommit=False, autoflush=False)

4.调用 SQL

@staticmethod
async def find_by_name(name: str):
    """
    根据名称查询
    """
    db = get_sqlalchemy_db
    try:
        with Session(db.engine) as session:
            stmt = select(AlchemySchemas)
            if name:
                stmt = stmt.where(AlchemySchemas.name == name)
            schemas_infos = session.scalars(stmt).all()
            return [schemas_info.to_dict() for schemas_info in schemas_infos] if schemas_infos else None
    except SQLAlchemyError as e:
        print(f"An error occurred: {e}")
        return None
    finally:
        db.close_session(session)
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。