【KEDA】基于第三方事件源控制kubernetes应用扩缩容案例

举报
可以交个朋友 发表于 2025/12/18 20:15:03 2025/12/18
【摘要】 一、背景云原生应用弹性伸缩的演进与挑战随着云原生技术的快速发展,Kubernetes已成为容器编排领域的事实标准。然而,在现代应用架构中,传统基于资源指标的弹性伸缩机制逐渐暴露出其局限性,无法充分满足事件驱动型工作负载的弹性需求。 事件驱动架构的兴起现代应用架构正朝着微服务和事件驱动方向演进,产生了全新的弹性伸缩需求,这些应用通常依赖于各种事件源,如消息队列(Kafka、RabbitMQ)...

一、背景

云原生应用弹性伸缩的演进与挑战
随着云原生技术的快速发展,Kubernetes已成为容器编排领域的事实标准。然而,在现代应用架构中,传统基于资源指标的弹性伸缩机制逐渐暴露出其局限性,无法充分满足事件驱动型工作负载的弹性需求。
 事件驱动架构的兴起
现代应用架构正朝着微服务和事件驱动方向演进,产生了全新的弹性伸缩需求,这些应用通常依赖于各种事件源,如消息队列(Kafka、RabbitMQ)、云服务事件或数据库变更。这种架构下的弹性伸缩需要更加精细化和响应迅速的机制,能够基于业务事件而非底层资源指标进行决策。事件驱动的工作负载通常具有突发性和不可预测性,需要系统能够快速从零扩展到多个实例,并在处理完成后迅速缩容。

二、KEDA简介

KEDA(Kubernetes-based Event-Driven Autoscaler)作为CNCF毕业项目,应运而生,填补了Kubernetes在事件驱动自动扩缩容方面的空白。它通过扩展而非替代原生HPA的方式,为Kubernetes带来了基于事件的智能弹性伸缩能力,KEDA的核心价值在于它将外部事件源与Kubernetes弹性伸缩机制无缝集成,支持超过60+种事件源,允许应用根据业务实际需求进行扩缩容。

三、Keda基于第三方事件源弹性伸缩源案例

  • keda根据数据库指标弹性伸缩——参考:案例一
  • keda根据消息队列(如kafka、rabbitmq)弹性伸缩——参考:案例二、案例五
  • keda根据键值对数据库指标弹性伸缩——参考:案例三
  • keda根据业务本身非系统指标弹性伸缩——参考:案例四
  • keda根据云服务事件弹性伸缩——参考:【KEDA】基于ELB服务事件进行弹性伸缩

3.1、案例一:基于数据库事件创建job

1.准备一个mongoDB数据库,并创建对应的账户和表,操作方式如下:

mongo --eval 'db.createUser({user:"test_user",pwd:"******",roles:[{role:"readWrite",db:"test"}]})'
mongo --eval 'db.auth("test_user","******")'
mongo --eval 'db.createCollection("test_collection")'

2.为keda创建secret和TriggerAuthentication资源,用来连接mongo数据库

kind: Secret
apiVersion: v1
metadata:
  name: mongodb-secret
data:
  #base64解码后的内容为:mongodb://test_user:******@mongodb.default.svc.cluster.local:27017/test;
  #格式为:mongodb://用户名:密码@数据库ip:端口/表名
  connect: bW9uZ29kYjovL3*****C5zdmMuY2x1c3Rlci5sb2NhbDoyNzAxNy90ZXN0
type: Opaque
---
kind: TriggerAuthentication
apiVersion: keda.sh/v1alpha1
metadata:
  name: mongodb-trigger
spec:
  secretTargetRef:
    - key: connect
      name: mongodb-secret      
      parameter: connectionString

3.创建ScaledJob(功能类似于ScaledObject,但作用于批处理应用的自动扩展)

资源用来配置Job模板以及指定数据库和查询表达式

kind: ScaledJob
apiVersion: keda.sh/v1alpha1
metadata:
  name: mongodb-job
spec:
  failedJobsHistoryLimit: 10
  jobTargetRef:
    backoffLimit: 1
    template:
      metadata:
        creationTimestamp: null
      spec:
        containers:
          - args:
              - '--connectStr=mongodb://test_user:******@mongodb:27017/test'
              - '--dataBase=test'
              - '--collection=test_collection'
            image: registry.cn-hangzhou.aliyuncs.com/carsnow/mongo-update:v6   #该镜像功能就是将test表中state字段修改为finished
            imagePullPolicy: IfNotPresent
            name: mongodb-update
        restartPolicy: Never
  maxReplicaCount: 30
  pollingInterval: 30
  triggers:
    - authenticationRef:
        name: mongodb-trigger
      metadata:
        collection: test_collection
        dbName: test
        metricName: global-metric
        query: '{"type":"mp4","state":"waiting"}'  #查询表达式
        queryValue: '1'   #查询记录数大于1
      type: mongodb

4.在数据库中插入数据

mongo --eval 'db.test_collection.insert([{"type":"mp4","state":"waiting"},])'

image.png

5.查看是否创建对应的job以及数据库字段是否更新

image.png

image.png

3.2、案例二:基于kafka Lag实现负载弹性伸缩

1. 在Kafka 中,“Lag”表示消费者相对于生产者的偏移量(位移)之差。它表示了消费者组在某一时刻消费到的消息在整个分区中的相对位置。当Lag过大时需要及时扩容业务,从而加快消息消费。

2. 这里部署一个nginx应用作为HPA对象。

image.png

3.创建nginx应用的HPA策略

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: keda-nginx
  namespace: keda
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: nginx
  pollingInterval: 15
  minReplicaCount: 1
  maxReplicaCount: 10
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: 192.168.0.248:9092
        consumerGroup: k8s
        topic: test-log
        lagThreshold: "10"
        offsetResetPolicy: latest

4. 这里的triggers中是kafka相关配置,查看cce nginx负载弹性伸缩。

image.png

5. 测试验证,在集群中部署日志采集系统,如下图

image.png

6. 将采集到的日志信息推送到kafka服务,通过logstash进行过滤后推送到es进行存储。可以发现服务刚起来时kafka topic 出现Lag

image.png

7. 查看HPA策略

image.png

可以看到nginx应用已经开始扩容
image.png

3.3、案例三:监听redis队列长度弹性伸缩

1. 部署待被伸缩实例数为0的目标deploy

image.png

2. 创建dcs实例或集群部署redis,本测试采用集群helm部署redis

helm repo add bitnami https://charts.bitnami.com/bitnami
helm pull bitnami/redis

修改values.yaml

global:
  storageClass: "csi-disk-topology"
  redis:
    password: "******"

commonConfiguration: |-
  appendonly yes
  save "300 10"

执行命令安装

helm -n redis install redis ./redis

image.png

  1. 创建伸缩对象ScaledObject
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: hpa-app
  namespace: hoking
spec:
  scaleTargetRef:                           # 扩容目标对象
    name: redis-demo
    kind: Deployment
    apiVersion: apps/v1
  minReplicaCount: 0                        # 最小副本数(队列无/或空闲时deploy实例副本缩到0)
  maxReplicaCount: 6                        # 最大副本数
  triggers:                                 # 触发规则
    - metadata:
        address: redis-master.redis.svc.cluster.local:6379    # Redis地址(由于在都是部署在集群内部服务可直接使用内网svc访问redis)
        listName: redis-demo                                  # Redis的列表的key名称
        listLength: "10"                                      # 触发伸缩的队列长度
        password: "******"
      type: redis                                             # 指标类型

  1. 向redis队列中添加数据
    image.png

  2. 查看负载情况
    可以看到redis-demo负载副本已经扩容到1(队例里只需要有一个数据就能触发keda 0-1的扩容,之后的扩容交给hpa)
    image.png

  3. 继续增加redis-demo队列数据达到阈值以上触发hpa扩容
    image.png

  4. 查看自动生成的外部指标数据

 kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/hoking/s0-redis-redis-demo(#hpa的指标名)?labelSelector=scaledobject.keda.sh%2Fname%3Dhpa-app(so的名)"|jq

image.png
查看负载情况
扩容到3副本符合预期
image.png

清理redis队例数据触发1-0
image.png

过5分钟后触发副本缩容到0
image.png

image.png

3.4、案例四:基于prometheus采集业务本身指标弹性伸缩 1->n

1.部署nginx测试负载

image.png
并通过nginx-prometheus-exporter暴露prometheus指标
image.png

2.创建一个prometheus指标的ScaledObjec

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: prometheus-scaledobject
  namespace: test
spec:
  scaleTargetRef:
    name: ngiinx-test  # ScaledObject 绑定的 Deployment 名字
  pollingInterval: 5 # 检查每个触发器的时间间隔
  minReplicaCount: 1  # 最小副本数,可以缩小到零
  maxReplicaCount: 10 # 最大副本数
  triggers:
  - type: prometheus  # 使用的事件源
    metadata:         # 触发器的属性
      serverAddress: http://10.247.33.107:9090
      metricName: nginx_http_request_total
      threshold: '10'
      query: sum(rate(nginx_http_requests_total[1m]))

3.验证扩缩容

while :; do curl 10.247.155.181; done #访问nginx svc

如下图触发扩容
image.png
停止压测,5分钟后查看hpa,进行了自动缩容
image.png

3.5、案例五:基于RabbitMQ自动扩缩容

1. 创建测试的工作负载

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx
  labels:
    app: nginx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: swr.cn-north-4.myhuaweicloud.com/cpaas/nginx:perl
        ports:
        - containerPort: 80

2. 安装rabbitmq

集群外准备一个ecs,安装docker,docker安装rabbitmq:

docker run -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 swr.cn-north-4.myhuaweicloud.com/cpaas/rabbitmq:3.13-management

登录rabbitmq: http://<ip>:15672, 账号/密码:guest/guest

image.png

创建一个队列:test

image.png

给guest用户授权

image.png

3. 部署Rabbitmq Scaler

为keda创建secret和TriggerAuthentication资源,用来连接rabbitmq

apiVersion: v1
kind: Secret
metadata:
  name: keda-rabbitmq-secret
data:
  host: <AMQP URI connection string> # base64 encoded value of format amqp://guest:password@localhost:5672/<virtual_host>, virtual_host默认为"/"
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: keda-trigger-auth-rabbitmq-conn
  namespace: default
spec:
  secretTargetRef:
    - parameter: host
      name: keda-rabbitmq-secret
      key: host

4. 创建ScaledJob资源定义rabbitmq触发规则和扩缩容规则

ScaledJob资源定义参考:https://keda.sh/docs/2.14/concepts/scaling-deployments
triggers规则参考:https://keda.sh/docs/2.14/scalers/rabbitmq-queue/

扩容规则为:当前队列深度为100,配置为每30深度扩一个pod

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: rabbitmq-scaledobject
  namespace: default
spec:
  scaleTargetRef:
    name: nginx
  pollingInterval: 5 # 检查每个触发器的时间间隔
  minReplicaCount: 1  # 最小副本数,可以缩小到零
  maxReplicaCount: 10 # 最大副本数
  triggers:
  - type: rabbitmq
    metadata:
      protocol: amqp
      queueName: test    # 修改为上面创建的队列的名字
      mode: QueueLength
      value: "30"  # 扩容阈值
    authenticationRef:
      name: keda-trigger-auth-rabbitmq-conn

5. 查看scaleobject, scaleobject已经ready说明连接上rabbitmq。

image.png

6. 测试验证

构造生产者, 队列中添加100个任务

import pika
import json

credentials = pika.PlainCredentials('guest','guest')#mq用户名和密码,用于认证
queque_name = 'test'
#虚拟队列需要指定参数virtual_host,如果是默认的可以不填
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.248',port=5672,virtual_host='/',credentials=credentials))
channel = connection.channel()# 创建一个AMQP信道

#声明队列名字,并设置durable为True,为了避免rabbitMq-server挂掉数据丢失,将durable设为True
channel.queue_declare(queue= queque_name,durable=True)
for i in range(100): 
    message = json.dumps({'OrderId':"1000%s"%i})
    # exchange表示交换器,可以精确的指定消息应该发到哪个队列中,route_key设置队列的名称,body表示发送的内容
    channel.basic_publish(exchange='',routing_key= queque_name,body=message)
    print(message)
connection.close()

image.png

查看扩缩容结果,扩容出来3个pod
image.png

构建消费者

import pika
credentials = pika.PlainCredentials('guest','******')
queue_name = "test"  # 队列名
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.0.248',
    port=5672,
    virtual_host='/',
    credentials=credentials
))
channel = connection.channel()
#声明消息队列,消息在这个队列中传递,如果不存在,则创建队列
channel.queue_declare(queue=queue_name,durable=True)
# 定义一个回调函数来处理消息队列中消息,这里是打印出来
def callback(ch,method,properties,body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())
#告诉rabbitmq,用callback来接收消息
channel.basic_consume(queue_name,callback)
#开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()

查看缩容

image.png

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。