目录
![《[经验] python下的消息队列选择-rq》](https://res-hd.hc-cdn.cn/ecology/9.3.187/v2_resources/ydcomm/libs/images/loading.gif)
项目代码所在: https://github.com/BruceDone/web_message_queue_app
背景
某个web api项目需要将api的每次调用情况都记录下来,统一收集之后存入数据库,然后每个小时进行汇总与统计。这样方便业务部门进行数据分析,逻辑流程如下
用户请求 -> web api(记录请求) -> 请求返回数据 并记录本次请求 -> 业务人员汇总数据
|
用户请求 -> web api(记录请求) -> 请求返回数据 并记录本次请求 -> 业务人员汇总数据
|
从上面的看出,api业务访问数据和记录本次的访问行为其实可以完全分开,对于用户的视角,只会关注本次的请求是否OK,而对于运营人员,更多的关注是本次访问的行为结果,所以,在逻辑上我们需要将两个动作分开,如果按照早期的做法,我们大可以做如下的动作
1.http 请求 返回数据 2.记录本次http请求成功与否到数据库中
|
1.http 请求 返回数据
2.记录本次http请求成功与否到数据库中
|
每次的请求都绑定了两个动作, 如果并发多了(对于http应用来说是常态),对于日志数据库来说就是一个压力了,当然,我们可以做读写分离了来减轻数据库层面的压力,但是这样的耦合度太高了,对于用户来说api请求时间也有消耗,所以这种类似的异步操作,我们可以使用消息队列完成,本次,我主要从三种常用消息队列中间件来介绍
准备
- python
- docker-compose
- python-rq
安装redis环境
使用docker-compose 起一个redis 环境
创建一个redis的文件夹,在里面写入名字docker-compose.yml
的文件,内容如下
version: '2' services: redis: image: redis command: redis-server --requirepass "pythonrq" restart: always ports: - "6666:6379"
|
version: '2'
services:
redis:
image: redis
command: redis-server --requirepass "pythonrq"
restart: always
ports:
- "6666:6379"
|
使用命令docker-compose up -d
然后使用命令docker-compose ps
我们可以看到如下的输出
Name Command State Ports ------------------------------------------------------------------------------- redis_redis_1 docker-entrypoint.sh redis ... Up 0.0.0.0:6666->6379/tcp
|
Name Command State Ports
-------------------------------------------------------------------------------
redis_redis_1 docker-entrypoint.sh redis ... Up 0.0.0.0:6666->6379/tcp
|
这样我们已经使用docker-compose 起了一个redis服务了
安装python-rq
OK,我们准备一个简单的web api 项目, 本次选用的web框架为vibora
准备web框架
核心代码
app_factory.py
代码
# -*- coding: utf-8 -*- from vibora import Vibora from vibora.router import RouterStrategy from src.views.api import bp_api from src.settings import app_config from src.log.config import logger from redis import StrictRedis from rq import Queue def create_app(): app = Vibora(router_strategy=RouterStrategy.CLONE) # app.configure_static_files() app.add_blueprint(bp_api) app.logger = logger job_queue = Queue(connection=StrictRedis.from_url(app_config.redis_conn)) app.components.add(job_queue) return app
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# -*- coding: utf-8 -*-
from vibora import Vibora
from vibora.router import RouterStrategy
from src.views.api import bp_api
from src.settings import app_config
from src.log.config import logger
from redis import StrictRedis
from rq import Queue
def create_app():
app = Vibora(router_strategy=RouterStrategy.CLONE)
# app.configure_static_files()
app.add_blueprint(bp_api)
app.logger = logger
job_queue = Queue(connection=StrictRedis.from_url(app_config.redis_conn))
app.components.add(job_queue)
return app
|
我们在创建web app的时候,同时创建了一个组件,加入到了整个app runtime 中
api.py文件
# -*- coding: utf-8 -*- import time from vibora.responses import JsonResponse from vibora.blueprints import Blueprint from rq import Queue from src.log.config import logger from src.job.word import count_words bp_api = Blueprint() @bp_api.route('/time') async def get_time(job_queue: Queue): logger.info('get user request') job_queue.enqueue(count_words, str(time.time())) return JsonResponse({'time': str(time.time())})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# -*- coding: utf-8 -*-
import time
from vibora.responses import JsonResponse
from vibora.blueprints import Blueprint
from rq import Queue
from src.log.config import logger
from src.job.word import count_words
bp_api = Blueprint()
@bp_api.route('/time')
async def get_time(job_queue: Queue):
logger.info('get user request')
job_queue.enqueue(count_words, str(time.time()))
return JsonResponse({'time': str(time.time())})
|
在每次请求的时候,我们都会将一个job通过queue 放入到我们的队列系统中(redis),然后能过起rq worker
这个命令,单独起一个进程来消费我们的任务
实践
由于我在git上已经写好了compose 文件以及dockerfile,所以你在装好docker-compose
之后,在docker-compose.yml
所在的目录,使用命令 docker-compose up -d
就可以看到整个应用了
使用命令
curl 127.0.0.1:5252/time
另外开一个窗口,使用命令
docker-compose logs -f
可以看到日志的滚动
![《[经验] python下的消息队列选择-rq》](https://res-hd.hc-cdn.cn/ecology/9.3.187/v2_resources/ydcomm/libs/images/loading.gif)
总结
本次我们从
三个思维的角度来初步的接触消息队列的意义,事实上,消息队列在解耦和中间件的角色里面,有至关重要的作用,当然,引入该组件也是当前系统不能够支撑业务数据了,系统的发展也是层层递进的,不用过早的优化与设计,下次我们来看rabbit mq 与我们的web系统结合
本次的代码已经放在git 上了:https://github.com/BruceDone/web_message_queue_app
文章来源: brucedone.com,作者:大鱼的鱼塘,版权归原作者所有,如需转载,请联系作者。
原文链接:brucedone.com/archives/1178
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
评论(0)