【Python使用】嘿马头条项目从到完整开发教程第10篇:APScheduler定时任务,定时修正统计数据【附代码文档】

举报
程序员一诺python 发表于 2025/09/17 10:08:41 2025/09/17
【摘要】 1.APScheduler任务调度涵盖安装配置、使用方式、调度器Scheduler、执行器executors、触发器Trigger等核心组件。2. RPC远程过程调用包括RPC概念、背景用途、优缺点分析。3. Protocol Buffers数据序列化涉及文档结构、注释语法、数据类型、枚举类型、消息类型(字段编号、字段规则、嵌套类型、保留字段、默认值)。4. 客户端开发包含头条

🏆🏆🏆教程全知识点简介:1.APScheduler任务调度涵盖安装配置、使用方式、调度器Scheduler、执行器executors、触发器Trigger等核心组件。2. RPC远程过程调用包括RPC概念、背景用途、优缺点分析。3. Protocol Buffers数据序列化涉及文档结构、注释语法、数据类型、枚举类型、消息类型(字段编号、字段规则、嵌套类型、保留字段、默认值)。4. 客户端开发包含头条首页新闻推荐接口编写。5. 即时通讯技术涵盖需求场景、传统推送实现、Socket.IO(Python服务器端开发、事件处理)。6. Elasticsearch搜索引擎包括简介原理、倒排索引、分析器、相关性排序、集群概念、IK中文分析器、索引类型、文档操作(索引文档、获取文档、判断存在、更新删除)、Logstash数据导入、查询(基本查询、高级查询)、全文检索实现、Python客户端使用、联想提示(拼写纠错、自动补全)。7. 单元测试涵盖测试分类、基本写法、测试必要性。8. 服务器部署包括Gunicorn、Supervisor配置管理。9. 项目开发流程涉及产品介绍、原型图UI图、技术架构、开发环境(ToutiaoWeb虚拟机、Pycharm远程开发)。10. 数据库技术包含ORM理解、SQLAlchemy映射构建、数据库连接设置、模型类字段选项。11. 分布式系统涵盖分布式ID方案选择、Twitter Snowflake算法(64位ID划分、最大取值计算、移位偏移计算、序号循环掩码、时间戳处理)。12. Redis数据库包括Redis持久化机制。13. Git工作流涵盖Gitflow工作流(工作方式、历史分支、功能分支、发布分支、维护分支)、调试方法。14. 身份认证技术包含JWT、JWS、JWE概念、Python库使用、项目封装实施方案。15. 对象存储涉及OSS对象存储、七牛云存储服务。16. 缓存系统包括缓存架构、缓存数据保存方式、缓存有效期TTL、缓存淘汰策略、缓存问题(缓存穿透、缓存雪崩)、头条项目缓存设计(User Cache、Article Cache、Announcement Cache)、持久存储设计(阅读历史、搜索历史、统计数据)。


📚📚👉👉👉本站这篇博客:   https://bbs.huaweicloud.com/blogs/458350    中查看

📚📚👉👉👉本站这篇博客:   https://bbs.huaweicloud.com/blogs/460740    中查看

📚📚👉👉👉本站这篇博客:   https://bbs.huaweicloud.com/blogs/459524    中查看

✨ 本教程项目亮点

🧠 知识体系完整:覆盖从基础原理、核心方法到高阶应用的全流程内容
💻 全技术链覆盖:完整前后端技术栈,涵盖开发必备技能
🚀 从零到实战:适合 0 基础入门到提升,循序渐进掌握核心能力
📚 丰富文档与代码示例:涵盖多种场景,可运行、可复用
🛠 工作与学习双参考:不仅适合系统化学习,更可作为日常开发中的查阅手册
🧩 模块化知识结构:按知识点分章节,便于快速定位和复习
📈 长期可用的技术积累:不止一次学习,而是能伴随工作与项目长期参考


🎯🎯🎯全教程总章节


🚀🚀🚀本篇主要内容

APScheduler定时任务

定时修正统计数据

toutiao-backend/toutiao/__init__.py中添加APScheduler调度器对象

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

def create_app(config, enable_config_file=False):
    ...

    # 添加定时任务APScheduler
    executors = {
        'default': ThreadPoolExecutor(10)
    }
    app.scheduler = BackgroundScheduler(executors=executors)

    from .schedule.statistic import fix_statistics

    # 每天3点执行
    app.scheduler.add_job(fix_statistics, 'cron', hour=3, args=[app])
    # 立即执行,用于测试
    # app.scheduler.add_job(fix_statistics, 'date', args=[app])

    app.scheduler.start()

    ...

toutiao-backend/toutiao中新建schedule目录用于存放定时任务

toutiao-backend/toutiao/schedule/statistics.py

from cache import statistic as cache_statistic

def fix_process(count_storage_cls):
    """
    修复处理方法
    """
    # 进行数据库查询
    ret = count_storage_cls.db_query()
    # 设置redis数据
    count_storage_cls.reset(ret)

def fix_statistics(flask_app):
    """
    修正统计数据
    """
    with flask_app.app_context():
        fix_process(cache_statistic.UserArticlesCountStorage)
        fix_process(cache_statistic.UserFollowingsCountStorage)

common/cache/statistic.py

class CountStorageBase(object):
    """
    统计数量存储的父类
    """
    ...

    @classmethod
    def reset(cls, db_query_ret):
        """
        由定时任务调用的重置数据方法
        """
        # 设置redis的存储记录
        pl = current_app.redis_master.pipeline()
        pl.delete(cls.key)

        # zadd(key, score1, val1, score2, val2, ...)
        # 方式一
        # for data_id, count in db_query_ret:
        #     pl.zadd(cls.key, count, data_id)

        # 方式二
        redis_data = []
        for data_id, count in db_query_ret:
            redis_data.append(count)
            redis_data.append(data_id)

        # redis_data = [count1, data_id1, count2, data_id2, ..]
        pl.zadd(cls.key, *redis_data)
        # pl.zadd(cls.key, count1, data_id1, count2, data_id2, ..]

        pl.execute()

class UserArticlesCountStorage(CountStorageBase):
    """
    用户文章数量
    """
    key = 'count:user:arts'

    @staticmethod
    def db_query():
        ret = db.session.query(Article.user_id, func.count(Article.id)) \
            .filter(Article.status == Article.STATUS.APPROVED).group_by(Article.user_id).all()
        return ret


class UserFollowingsCountStorage(CountStorageBase):
    """
    用户关注数量
    """
    key = 'count:user:followings'

    @staticmethod
    def db_query():
        ret = db.session.query(Relation.user_id, func.count(Relation.target_user_id)) \
            .filter(Relation.relation == Relation.RELATION.FOLLOW)\
            .group_by(Relation.user_id).all()
        return ret

[Jupyter 文档]

APScheduler定时任务

RPC简介

1. 什么是RPC

远程过程调用(英语:Remote Procedure Call,缩写为 RPC,也叫远程程序调用)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用远程方法调用

2. 背景与用途

在单台计算机中, 可以通过程序调用来传递控制和数据;或者说通过程序调用, 可以将多个程序组成一个整体来实现某个功能。

[aiohttp 文档]

如果将这种调用机制推广到多台彼此间可以进行网络通讯的计算机,由多台计算机中的多个程序组成一个整体来实现某个功能,这也是可以的。调用的一方(发起远程过程调用,然后调用这方的环境挂起,参数通过网络传递给被调用方,被调用的一方执行程序,当程序执行完成后,产生的结果再通过网络回传给调用的一方,调用的一方恢复继续执行。这样一种原型思想,就是 所说的RPC远程过程调用。

RPC这种思想最早可以追溯到1976年,RPC的发展到今天已经40年有余了。

如今的计算机应用中,单机性能上很难承受住产品的压力,需要不断扩充多台机器来提升整体的性能。同时为了充分利用这些集群里的计算机,需要对其从架构上进行划分,以提供不同的服务,服务间相互调用完成整个产品的功能。RPC就能帮助 解决这些服务间的信息传递和调用。

3. 概念说明

关于RPC的概念, 可以从广义和狭义来分别进行理解。

广义

可以将所有通过网络来进行通讯调用的实现统称为RPC。

按照这样来理解的话,那 发现HTTP其实也算是一种RPC实现。

狭义

区别于HTTP的实现方式,在传输的数据格式上和传输的控制上独立实现。比如在机器间通讯传输的数据不采用HTTP协议的方式(分为起始行、header、body三部份),而是使用自定义格式的二进制方式。

更多时候谈到的RPC都是指代这种狭义上的理解。

[aiofiles 文档]

4. 优缺点

[pydantic 文档]

相比于传统HTTP的实现而言:

优点

  • 效率高
  • 发起RPC调用的一方,在编写代码时可忽略RPC的具体实现,如同编写本地函数调用一样

缺点

  • 通用性不如HTTP好 因为传输的数据不是HTTP协议格式,所以调用双方需要专门实现的通信库,对于不同的编程开发语言,都要有相关实现。而HTTP作为一个标准协议,大部分的语言都已有相关的实现,通用性更好。

HTTP更多的面向用户与产品服务器的通讯。

RPC更多的面向产品内部服务器间的通讯。 thrift

[invoke 文档]

RPC结构

RPC的设计思想是力图使远程调用中的通讯细节对于使用者透明,调用双方无需关心网络通讯的具体实现。因而实现RPC要进行一定的封装。

RPC原理上是按如下结构流程进行实现的。

流程:

  1. 调用者(Caller, 也叫客户端、Client)以本地调用的方式发起调用;
  2. Client stub(客户端存根,可理解为辅助助手)收到调用后,负责将被调用的方法名、参数等打包编码成特定格式的能进行网络传输的消息体;
  3. Client stub将消息体通过网络发送给对端(服务端)
  4. Server stub(服务端存根,同样可理解为辅助助手)收到通过网络接收到消息后按照相应格式进行拆包解码,获取方法名和参数;
  5. Server stub根据方法名和参数进行本地调用;
  6. 被调用者(Callee,也叫Server)本地调用执行后将结果返回给server stub;
  7. Server stub将返回值打包编码成消息,并通过网络发送给对端(客户端);
  8. Client stub收到消息后,进行拆包解码,返回给Client;
  9. Client得到本次RPC调用的最终结果。

gRPC

简介

  • gRPC是由Google公司开源的高性能RPC框架。

  • gRPC支持多语言

gRPC原生使用C、Java、Go进行了三种实现,而C语言实现的版本进行封装后又支持C++、C#、Node、ObjC、 Python、Ruby、PHP等开发语言

  • gRPC支持多平台

支持的平台包括:Linux、Android、iOS、MacOS、Windows

  • gRPC的消息协议使用Google自家开源的Protocol Buffers协议机制(proto3) 序列化

  • gRPC的传输使用HTTP/2标准,支持双向流和连接多路复用

架构

C语言实现的gRPC支持多语言,其架构如下

![](https://fileserver.developer.huaweicloud.com/FileServer/getFile/communitytemp/20250917/community/383/461/571/0001696944383461571.20250917020810.34675145661312839976280291861952:20250917030816:2415:D007E8FEF5432DD36C21D6689F283D53805544C8DE952C2A7B

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。