当创建一个deployment后,kubernetes会发生什么?

举报
可以交个朋友 发表于 2023/12/04 20:34:07 2023/12/04
【摘要】 当创建deployment资源后,kubernetes各个组件如何协同

一、Deployment、ReplicaSet和Pod的区别与联系

  • Pod资源  pod是kubernetes集群内部部署的最基本的单位,是一组容器逻辑上的集合。
  • ReplicaSet资源     ReplicaSet资源是官方用于替换ReplicationController资源,ReplicationController用来确保工作负载pod的副本数始终保持与期望值相同,会自动创建新的Pod实例或销毁多余实例来对齐工作负载期望副本数。具体由kube-controller进程中replicaset controller 协程管理(pkg/controller/replicaset)

        ReplicaSet 跟ReplicationController 没有本质的不同,只是ReplicaSet控制器在标签选择器上支持集合式selector上,除了可以定义键值对的选择形式,还支持 matchExpressions 字段,可以提供多种选择,如:In、NotIn、Exists、DoesNotExist操作字符等。ReplicaSet(RS)与 ReplicationController(RC)资源,都是通过标签选择器selector来匹配pod的标签,由对应的controller管理被匹配到的pod的,ReplicationController和ReplicaSet的yaml文件对比如下:

  • Deployment资源    虽然ReplicaSet控制器已经能够管理pod的副本数量了,但是为了更好的解决应用升级、回滚问题,kubernetes引入了Deployment控制器。Deployment控制器并不直接管理Pod,而是通过管理ReplicaSet来间接管理Pod,即:Deployment管理ReplicaSet,ReplicaSet管理Pod。

        Deployment的主要功能如下:

  1. 确保当前集群中Deployment工作负载有且只有N个Pod实例。
  2. 通过调整属性值replicas实现工作负载副本实例的伸缩。
  3. 通过更改模板中镜像名称来控制业务Pod实例的(重建/滚动)升级和回退

       

二、Deployment创建流程图

集群内(除API-server外)其他组件不直接参与与etcd通信,都是与api-server直接通信,kubenetes集群内各组件与kube-apiser通信方式采用List-watch机制实现,其中采用List定期做全量同步更新,而watch机制采用非阻塞式长连接方式与api-server保持通信,具体流程如下:

  1. 用户使用通过kubectl客户端发起创建Deployment资源对象请求至apiserver。

  2. apiserve对请求用户鉴权、准入控制操作,然后将该请求资源事件写入到etcd存储中。

  3. 考虑到Deployment-controller采用非阻塞式长连接watch机制实时获取Deployment资源对象信息,一旦集群中有Deployment变化(包括创建、更新、删除),则通过apiserver获取etcd中相关Deployment资源对象。

  4. apiserver将Deployment资源信息返回给Deployment-controller的watch接口长连接。

  5. Deployment-controller维护Deployment的生命周期,控制生成ReplicaSet资源对象,将ReplicaSet创建事件信息返回api-server。

  6. api-server接收到Deployment-controller返回的创建ReplicaSet事件信息后,然后将创建请求事件写入到etcd存储中。

  7. 考虑到ReplicaSet-controller采用非阻塞式长连接watch机制实时获取ReplicaSet资源对象信息,一旦集群中有ReplicaSet变化(包括创建、更新、删除),则通过apiserver获取etcd中相关ReplicaSet资源对象。

  8. api-server将相关ReplicaSet资源信息返回给ReplicaSet-controller的watch接口长连接。

  9. ReplicaSet-controller通过获取的ReplicaSet资源信息生成对应的Pod资源对象模板,并将Pod创建信息通过调用api-server写入etcd

  10. api-server将待调度Pod创建事件信息写入到etcd做持久化存储

三、更新Deployment

Deployment目前存在两种升级模式:销毁重建(Recreate)和滚动升级(RollingUpdate

3.1、Deployment销毁重建方式升级示意图

  1. 获取 Deployment 的所有 ReplicaSet(包括旧的和新的)并同步它们的版本号。

  2. 缩小旧 ReplicaSet 的规模。

  3. 检查是否有旧的 Pod 仍在运行,如果有则等待它们停止运行。

  4. 如果需要创建新的 ReplicaSet,则创建它并扩大它的规模,使其等于 Deployment 中指定的副本数

  5. 清理旧的 ReplicaSet。

  6. 更新 Deployment 的状态。

3.2、Deployment平滑滚动方式升级示意图

以下部分引用开源kubernetes源码部分讲解流程

  1. 通过 getAllReplicaSetsAndSyncRevision函数 方法获取 Deployment 的所有 ReplicaSet,包括新的ReplicaSet 和旧的 ReplicaSet。

    func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) {
    	_, allOldRSs := deploymentutil.FindOldReplicaSets(d, rsList)
    	newRS, err := dc.getNewReplicaSet(ctx, d, rsList, allOldRSs, createIfNotExisted)
    	if err != nil {
    		return nil, nil, err
    	}
    	return newRS, allOldRSs, nil
    }
  2. 调用 reconcileNewReplicaSet 方法尝试增加新的 ReplicaSet 的副本数,如果成功增加,则更新 Deployment 的状态。

    func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
    	if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
    		return false, nil
    	}
    	if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
    		scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
    		return scaled, err
    	}
    	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
    	if err != nil {
    		return false, err
    	}
    	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment)
    	return scaled, err
    }
    1. 首先判断新的ReplicaSet的副本数是否与Deployment对象中指定的副本数相同,如果相同,则不需要进行任何操作,直接返回false和nil。说明已经达到期望状态

    2. 如果新的ReplicaSet的副本数大于Deployment对象中指定的副本数,则需要进行缩容操作.说明 newRS 副本数已经超过期望值

    3. 如果新的ReplicaSet的副本数小于Deployment对象中指定的副本数,则需要计算需要创建的新副本数。计算原则遵守 maxSurgemaxUnavailable 的约束。

  3. 调用 reconcileOldReplicaSets 方法尝试减少旧的 ReplicaSet 的副本数,如果成功减少,则更新 Deployment 的状态。

    func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
    	logger := klog.FromContext(ctx)
    	oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
    	if oldPodsCount == 0 {
    		return false, nil
    	}
    	allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
    	logger.V(4).Info("New replica set", "replicaSet", klog.KObj(newRS), "availableReplicas", newRS.Status.AvailableReplicas)
    	maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
    	minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
    	newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
    	maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
    	if maxScaledDown <= 0 {
    		return false, nil
    	}
    	oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, maxScaledDown)
    	if err != nil {
    		return false, nil
    	}
    	logger.V(4).Info("Cleaned up unhealthy replicas from old RSes", "count", cleanupCount)
    	allRSs = append(oldRSs, newRS)
    	scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment)
    	if err != nil {
    		return false, nil
    	}
    	logger.V(4).Info("Scaled down old RSes", "deployment", klog.KObj(deployment), "count", scaledDownCount)
    	totalScaledDown := cleanupCount + scaledDownCount
    	return totalScaledDown > 0, nil
    }
    1. 获取了旧 ReplicaSet 中的 Pod 数量,如果 Pod 数量为 0,则无法再进行缩容,返回 false。

    2. 获取了所有 ReplicaSet 的 Pod 数量,并检查是否可以进行缩容。(对于旧ReplicaSet副本不健康或者新的ReplicaSet副本已经正常运行则可以进行缩容)

    3. 计算了最大可以缩容的 Pod 数量,考虑最大不可用 Pod 数量、新 ReplicaSet 不可用的 Pod 数量以及 Surge Pod 的数量等因素。如果最大可以缩容的 Pod 数量小于等于 0,则无法进行缩容,返回 false。

  4. 如果所有 ReplicaSet 都已经更新完毕,即 DeploymentComplete 返回 true,则调用 cleanupDeployment 方法清理旧的 ReplicaSet。

    func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
    	logger := klog.FromContext(ctx)
    	if !deploymentutil.HasRevisionHistoryLimit(deployment) {
    		return nil
    	}
    	aliveFilter := func(rs *apps.ReplicaSet) bool {
    		return rs != nil && rs.ObjectMeta.DeletionTimestamp == nil
    	}
    	cleanableRSes := controller.FilterReplicaSets(oldRSs, aliveFilter)
    	diff := int32(len(cleanableRSes)) - *deployment.Spec.RevisionHistoryLimit
    	if diff <= 0 {
    		return nil
    	}
    	sort.Sort(deploymentutil.ReplicaSetsByRevision(cleanableRSes))
    	logger.V(4).Info("Looking to cleanup old replica sets for deployment", "deployment", klog.KObj(deployment))
    	for i := int32(0); i < diff; i++ {
    		rs := cleanableRSes[i]
    		if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil {
    			continue
    		}
    		logger.V(4).Info("Trying to cleanup replica set for deployment", "replicaSet", klog.KObj(rs), "deployment", klog.KObj(deployment))
    		if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
    			return err
    		}
    	}
    	return nil}
  5. 最后,调用 syncRolloutStatus 方法同步 Deployment 的状态。

    func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
    	newStatus := calculateStatus(allRSs, newRS, d)
    	if !util.HasProgressDeadline(d) {
    		util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
    	}
    	currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
    	isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason
    	if util.HasProgressDeadline(d) && !isCompleteDeployment {
    		switch {
    		case util.DeploymentComplete(d, &newStatus):
    			msg := fmt.Sprintf("Deployment %q has successfully progressed.", d.Name)
    			if newRS != nil {
    				msg = fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name)
    			}
    			condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg)
    			util.SetDeploymentCondition(&newStatus, *condition)
    		case util.DeploymentProgressing(d, &newStatus):
    			msg := fmt.Sprintf("Deployment %q is progressing.", d.Name)
    			if newRS != nil {
    				msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name)
    			}
    			condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg)
    			if currentCond != nil {
    				if currentCond.Status == v1.ConditionTrue {
    					condition.LastTransitionTime = currentCond.LastTransitionTime
    				}
    				util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
    			}
    			util.SetDeploymentCondition(&newStatus, *condition)
    		case util.DeploymentTimedOut(ctx, d, &newStatus):
    			msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name)
    			if newRS != nil {
    				msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name)
    			}
    			condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg)
    			util.SetDeploymentCondition(&newStatus, *condition)
    		}
    	}
    	if replicaFailureCond := dc.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 {
    		// There will be only one ReplicaFailure condition on the replica set.
    		util.SetDeploymentCondition(&newStatus, replicaFailureCond[0])
    	} else {
    		util.RemoveDeploymentCondition(&newStatus, apps.DeploymentReplicaFailure)
    	}
    	if reflect.DeepEqual(d.Status, newStatus) {
    		// Requeue the deployment if required.
    		dc.requeueStuckDeployment(ctx, d, newStatus)
    		return nil
    	}
    	newDeployment := d
    	newDeployment.Status = newStatus
    	_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
    	return err
    }

        maxSurgemaxUnavailable 计算原则:maxSurge每次滚动升级允许超出所需规模的最大实例数maxUnavailable每次滚动升级允许的最大无效实例数

qrcode_for_gh_0412fa5a12f4_344 (5).jpg

订阅本文作者或关注容器魔方

获取更多云原生技术资讯

本文评论区回复“云原生”,即可添加小助手微信k8s2222

领取应季云原生资料一份

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。