【KEDA】基于第三方事件源控制kubernetes应用扩缩容案例
一、背景
云原生应用弹性伸缩的演进与挑战
随着云原生技术的快速发展,Kubernetes已成为容器编排领域的事实标准。然而,在现代应用架构中,传统基于资源指标的弹性伸缩机制逐渐暴露出其局限性,无法充分满足事件驱动型工作负载的弹性需求。
事件驱动架构的兴起
现代应用架构正朝着微服务和事件驱动方向演进,产生了全新的弹性伸缩需求,这些应用通常依赖于各种事件源,如消息队列(Kafka、RabbitMQ)、云服务事件或数据库变更。这种架构下的弹性伸缩需要更加精细化和响应迅速的机制,能够基于业务事件而非底层资源指标进行决策。事件驱动的工作负载通常具有突发性和不可预测性,需要系统能够快速从零扩展到多个实例,并在处理完成后迅速缩容。
二、KEDA简介
KEDA(Kubernetes-based Event-Driven Autoscaler)作为CNCF毕业项目,应运而生,填补了Kubernetes在事件驱动自动扩缩容方面的空白。它通过扩展而非替代原生HPA的方式,为Kubernetes带来了基于事件的智能弹性伸缩能力,KEDA的核心价值在于它将外部事件源与Kubernetes弹性伸缩机制无缝集成,支持超过60+种事件源,允许应用根据业务实际需求进行扩缩容。
三、Keda基于第三方事件源弹性伸缩源案例
- keda根据数据库指标弹性伸缩——参考:案例一
- keda根据消息队列(如kafka、rabbitmq)弹性伸缩——参考:案例二、案例五
- keda根据键值对数据库指标弹性伸缩——参考:案例三
- keda根据业务本身非系统指标弹性伸缩——参考:案例四
- keda根据云服务事件弹性伸缩——参考:【KEDA】基于ELB服务事件进行弹性伸缩
3.1、案例一:基于数据库事件创建job
1.准备一个mongoDB数据库,并创建对应的账户和表,操作方式如下:
mongo --eval 'db.createUser({user:"test_user",pwd:"******",roles:[{role:"readWrite",db:"test"}]})'
mongo --eval 'db.auth("test_user","******")'
mongo --eval 'db.createCollection("test_collection")'
2.为keda创建secret和TriggerAuthentication资源,用来连接mongo数据库
kind: Secret
apiVersion: v1
metadata:
name: mongodb-secret
data:
#base64解码后的内容为:mongodb://test_user:******@mongodb.default.svc.cluster.local:27017/test;
#格式为:mongodb://用户名:密码@数据库ip:端口/表名
connect: bW9uZ29kYjovL3*****C5zdmMuY2x1c3Rlci5sb2NhbDoyNzAxNy90ZXN0
type: Opaque
---
kind: TriggerAuthentication
apiVersion: keda.sh/v1alpha1
metadata:
name: mongodb-trigger
spec:
secretTargetRef:
- key: connect
name: mongodb-secret
parameter: connectionString
3.创建ScaledJob(功能类似于ScaledObject,但作用于批处理应用的自动扩展)
资源用来配置Job模板以及指定数据库和查询表达式
kind: ScaledJob
apiVersion: keda.sh/v1alpha1
metadata:
name: mongodb-job
spec:
failedJobsHistoryLimit: 10
jobTargetRef:
backoffLimit: 1
template:
metadata:
creationTimestamp: null
spec:
containers:
- args:
- '--connectStr=mongodb://test_user:******@mongodb:27017/test'
- '--dataBase=test'
- '--collection=test_collection'
image: registry.cn-hangzhou.aliyuncs.com/carsnow/mongo-update:v6 #该镜像功能就是将test表中state字段修改为finished
imagePullPolicy: IfNotPresent
name: mongodb-update
restartPolicy: Never
maxReplicaCount: 30
pollingInterval: 30
triggers:
- authenticationRef:
name: mongodb-trigger
metadata:
collection: test_collection
dbName: test
metricName: global-metric
query: '{"type":"mp4","state":"waiting"}' #查询表达式
queryValue: '1' #查询记录数大于1
type: mongodb
4.在数据库中插入数据
mongo --eval 'db.test_collection.insert([{"type":"mp4","state":"waiting"},])'

5.查看是否创建对应的job以及数据库字段是否更新


3.2、案例二:基于kafka Lag实现负载弹性伸缩
1. 在Kafka 中,“Lag”表示消费者相对于生产者的偏移量(位移)之差。它表示了消费者组在某一时刻消费到的消息在整个分区中的相对位置。当Lag过大时需要及时扩容业务,从而加快消息消费。
2. 这里部署一个nginx应用作为HPA对象。

3.创建nginx应用的HPA策略
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: keda-nginx
namespace: keda
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: nginx
pollingInterval: 15
minReplicaCount: 1
maxReplicaCount: 10
triggers:
- type: kafka
metadata:
bootstrapServers: 192.168.0.248:9092
consumerGroup: k8s
topic: test-log
lagThreshold: "10"
offsetResetPolicy: latest
4. 这里的triggers中是kafka相关配置,查看cce nginx负载弹性伸缩。

5. 测试验证,在集群中部署日志采集系统,如下图

6. 将采集到的日志信息推送到kafka服务,通过logstash进行过滤后推送到es进行存储。可以发现服务刚起来时kafka topic 出现Lag

7. 查看HPA策略

可以看到nginx应用已经开始扩容

3.3、案例三:监听redis队列长度弹性伸缩
1. 部署待被伸缩实例数为0的目标deploy

2. 创建dcs实例或集群部署redis,本测试采用集群helm部署redis
helm repo add bitnami https://charts.bitnami.com/bitnami
helm pull bitnami/redis
修改values.yaml
global:
storageClass: "csi-disk-topology"
redis:
password: "******"
commonConfiguration: |-
appendonly yes
save "300 10"
执行命令安装
helm -n redis install redis ./redis

- 创建伸缩对象ScaledObject
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: hpa-app
namespace: hoking
spec:
scaleTargetRef: # 扩容目标对象
name: redis-demo
kind: Deployment
apiVersion: apps/v1
minReplicaCount: 0 # 最小副本数(队列无/或空闲时deploy实例副本缩到0)
maxReplicaCount: 6 # 最大副本数
triggers: # 触发规则
- metadata:
address: redis-master.redis.svc.cluster.local:6379 # Redis地址(由于在都是部署在集群内部服务可直接使用内网svc访问redis)
listName: redis-demo # Redis的列表的key名称
listLength: "10" # 触发伸缩的队列长度
password: "******"
type: redis # 指标类型
-
向redis队列中添加数据

-
查看负载情况
可以看到redis-demo负载副本已经扩容到1(队例里只需要有一个数据就能触发keda 0-1的扩容,之后的扩容交给hpa)

-
继续增加redis-demo队列数据达到阈值以上触发hpa扩容

-
查看自动生成的外部指标数据
kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/hoking/s0-redis-redis-demo(#hpa的指标名)?labelSelector=scaledobject.keda.sh%2Fname%3Dhpa-app(so的名)"|jq

查看负载情况
扩容到3副本符合预期

清理redis队例数据触发1-0

过5分钟后触发副本缩容到0


3.4、案例四:基于prometheus采集业务本身指标弹性伸缩 1->n
1.部署nginx测试负载

并通过nginx-prometheus-exporter暴露prometheus指标

2.创建一个prometheus指标的ScaledObjec
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: prometheus-scaledobject
namespace: test
spec:
scaleTargetRef:
name: ngiinx-test # ScaledObject 绑定的 Deployment 名字
pollingInterval: 5 # 检查每个触发器的时间间隔
minReplicaCount: 1 # 最小副本数,可以缩小到零
maxReplicaCount: 10 # 最大副本数
triggers:
- type: prometheus # 使用的事件源
metadata: # 触发器的属性
serverAddress: http://10.247.33.107:9090
metricName: nginx_http_request_total
threshold: '10'
query: sum(rate(nginx_http_requests_total[1m]))
3.验证扩缩容
while :; do curl 10.247.155.181; done #访问nginx svc
如下图触发扩容

停止压测,5分钟后查看hpa,进行了自动缩容

3.5、案例五:基于RabbitMQ自动扩缩容
1. 创建测试的工作负载
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
labels:
app: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: swr.cn-north-4.myhuaweicloud.com/cpaas/nginx:perl
ports:
- containerPort: 80
2. 安装rabbitmq
集群外准备一个ecs,安装docker,docker安装rabbitmq:
docker run -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 swr.cn-north-4.myhuaweicloud.com/cpaas/rabbitmq:3.13-management
登录rabbitmq: http://<ip>:15672, 账号/密码:guest/guest

创建一个队列:test

给guest用户授权

3. 部署Rabbitmq Scaler
为keda创建secret和TriggerAuthentication资源,用来连接rabbitmq
apiVersion: v1
kind: Secret
metadata:
name: keda-rabbitmq-secret
data:
host: <AMQP URI connection string> # base64 encoded value of format amqp://guest:password@localhost:5672/<virtual_host>, virtual_host默认为"/"
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: keda-trigger-auth-rabbitmq-conn
namespace: default
spec:
secretTargetRef:
- parameter: host
name: keda-rabbitmq-secret
key: host
4. 创建ScaledJob资源定义rabbitmq触发规则和扩缩容规则
ScaledJob资源定义参考:https://keda.sh/docs/2.14/concepts/scaling-deployments
triggers规则参考:https://keda.sh/docs/2.14/scalers/rabbitmq-queue/
扩容规则为:当前队列深度为100,配置为每30深度扩一个pod
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: rabbitmq-scaledobject
namespace: default
spec:
scaleTargetRef:
name: nginx
pollingInterval: 5 # 检查每个触发器的时间间隔
minReplicaCount: 1 # 最小副本数,可以缩小到零
maxReplicaCount: 10 # 最大副本数
triggers:
- type: rabbitmq
metadata:
protocol: amqp
queueName: test # 修改为上面创建的队列的名字
mode: QueueLength
value: "30" # 扩容阈值
authenticationRef:
name: keda-trigger-auth-rabbitmq-conn
5. 查看scaleobject, scaleobject已经ready说明连接上rabbitmq。

6. 测试验证
构造生产者, 队列中添加100个任务
import pika
import json
credentials = pika.PlainCredentials('guest','guest')#mq用户名和密码,用于认证
queque_name = 'test'
#虚拟队列需要指定参数virtual_host,如果是默认的可以不填
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.248',port=5672,virtual_host='/',credentials=credentials))
channel = connection.channel()# 创建一个AMQP信道
#声明队列名字,并设置durable为True,为了避免rabbitMq-server挂掉数据丢失,将durable设为True
channel.queue_declare(queue= queque_name,durable=True)
for i in range(100):
message = json.dumps({'OrderId':"1000%s"%i})
# exchange表示交换器,可以精确的指定消息应该发到哪个队列中,route_key设置队列的名称,body表示发送的内容
channel.basic_publish(exchange='',routing_key= queque_name,body=message)
print(message)
connection.close()

查看扩缩容结果,扩容出来3个pod

构建消费者
import pika
credentials = pika.PlainCredentials('guest','******')
queue_name = "test" # 队列名
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.0.248',
port=5672,
virtual_host='/',
credentials=credentials
))
channel = connection.channel()
#声明消息队列,消息在这个队列中传递,如果不存在,则创建队列
channel.queue_declare(queue=queue_name,durable=True)
# 定义一个回调函数来处理消息队列中消息,这里是打印出来
def callback(ch,method,properties,body):
ch.basic_ack(delivery_tag=method.delivery_tag)
print(body.decode())
#告诉rabbitmq,用callback来接收消息
channel.basic_consume(queue_name,callback)
#开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()
查看缩容

- 点赞
- 收藏
- 关注作者



评论(0)