当创建一个deployment后,kubernetes会发生什么?
一、Deployment、ReplicaSet和Pod的区别与联系
- Pod资源 pod是kubernetes集群内部部署的最基本的单位,是一组容器逻辑上的集合。
- ReplicaSet资源 ReplicaSet资源是官方用于替换ReplicationController资源,ReplicationController用来确保工作负载pod的副本数始终保持与期望值相同,会自动创建新的Pod实例或销毁多余实例来对齐工作负载期望副本数。具体由kube-controller进程中replicaset controller 协程管理(pkg/controller/replicaset)
ReplicaSet 跟ReplicationController 没有本质的不同,只是ReplicaSet控制器在标签选择器上支持集合式selector上,除了可以定义键值对的选择形式,还支持 matchExpressions 字段,可以提供多种选择,如:In、NotIn、Exists、DoesNotExist操作字符等。ReplicaSet(RS)与 ReplicationController(RC)资源,都是通过标签选择器selector来匹配pod的标签,由对应的controller管理被匹配到的pod的,ReplicationController和ReplicaSet的yaml文件对比如下:
- Deployment资源 虽然ReplicaSet控制器已经能够管理pod的副本数量了,但是为了更好的解决应用升级、回滚问题,kubernetes引入了Deployment控制器。Deployment控制器并不直接管理Pod,而是通过管理ReplicaSet来间接管理Pod,即:Deployment管理ReplicaSet,ReplicaSet管理Pod。
Deployment的主要功能如下:
- 确保当前集群中Deployment工作负载有且只有N个Pod实例。
- 通过调整属性值replicas实现工作负载副本实例的伸缩。
- 通过更改模板中镜像名称来控制业务Pod实例的(重建/滚动)升级和回退
二、Deployment创建流程图
集群内(除API-server外)其他组件不直接参与与etcd通信,都是与api-server直接通信,kubenetes集群内各组件与kube-apiser通信方式采用List-watch机制实现,其中采用List定期做全量同步更新,而watch机制采用非阻塞式长连接方式与api-server保持通信,具体流程如下:
-
用户使用通过kubectl客户端发起创建Deployment资源对象请求至apiserver。
-
apiserve对请求用户鉴权、准入控制操作,然后将该请求资源事件写入到etcd存储中。
-
考虑到Deployment-controller采用非阻塞式长连接watch机制实时获取Deployment资源对象信息,一旦集群中有Deployment变化(包括创建、更新、删除),则通过apiserver获取etcd中相关Deployment资源对象。
-
apiserver将Deployment资源信息返回给Deployment-controller的watch接口长连接。
-
Deployment-controller维护Deployment的生命周期,控制生成ReplicaSet资源对象,将ReplicaSet创建事件信息返回api-server。
-
api-server接收到Deployment-controller返回的创建ReplicaSet事件信息后,然后将创建请求事件写入到etcd存储中。
-
考虑到ReplicaSet-controller采用非阻塞式长连接watch机制实时获取ReplicaSet资源对象信息,一旦集群中有ReplicaSet变化(包括创建、更新、删除),则通过apiserver获取etcd中相关ReplicaSet资源对象。
-
api-server将相关ReplicaSet资源信息返回给ReplicaSet-controller的watch接口长连接。
-
ReplicaSet-controller通过获取的ReplicaSet资源信息生成对应的Pod资源对象模板,并将Pod创建信息通过调用api-server写入etcd
-
api-server将待调度Pod创建事件信息写入到etcd做持久化存储
三、更新Deployment
Deployment目前存在两种升级模式:销毁重建(Recreate)和滚动升级(RollingUpdate)
3.1、Deployment销毁重建方式升级示意图
-
获取 Deployment 的所有 ReplicaSet(包括旧的和新的)并同步它们的版本号。
-
缩小旧 ReplicaSet 的规模。
-
检查是否有旧的 Pod 仍在运行,如果有则等待它们停止运行。
-
如果需要创建新的 ReplicaSet,则创建它并扩大它的规模,使其等于 Deployment 中指定的副本数
-
清理旧的 ReplicaSet。
-
更新 Deployment 的状态。
3.2、Deployment平滑滚动方式升级示意图
以下部分引用开源kubernetes源码部分讲解流程
-
通过 getAllReplicaSetsAndSyncRevision函数 方法获取 Deployment 的所有 ReplicaSet,包括新的ReplicaSet 和旧的 ReplicaSet。
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) { _, allOldRSs := deploymentutil.FindOldReplicaSets(d, rsList) newRS, err := dc.getNewReplicaSet(ctx, d, rsList, allOldRSs, createIfNotExisted) if err != nil { return nil, nil, err } return newRS, allOldRSs, nil }
-
调用 reconcileNewReplicaSet 方法尝试增加新的 ReplicaSet 的副本数,如果成功增加,则更新 Deployment 的状态。
func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) { return false, nil } if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) { scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment) return scaled, err } newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS) if err != nil { return false, err } scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment) return scaled, err }
-
首先判断新的ReplicaSet的副本数是否与Deployment对象中指定的副本数相同,如果相同,则不需要进行任何操作,直接返回false和nil。说明已经达到期望状态。
-
如果新的ReplicaSet的副本数大于Deployment对象中指定的副本数,则需要进行缩容操作.说明 newRS 副本数已经超过期望值。
-
如果新的ReplicaSet的副本数小于Deployment对象中指定的副本数,则需要计算需要创建的新副本数。计算原则遵守 maxSurge 和 maxUnavailable 的约束。
-
-
调用 reconcileOldReplicaSets 方法尝试减少旧的 ReplicaSet 的副本数,如果成功减少,则更新 Deployment 的状态。
func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { logger := klog.FromContext(ctx) oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs) if oldPodsCount == 0 { return false, nil } allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs) logger.V(4).Info("New replica set", "replicaSet", klog.KObj(newRS), "availableReplicas", newRS.Status.AvailableReplicas) maxUnavailable := deploymentutil.MaxUnavailable(*deployment) minAvailable := *(deployment.Spec.Replicas) - maxUnavailable newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount if maxScaledDown <= 0 { return false, nil } oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, maxScaledDown) if err != nil { return false, nil } logger.V(4).Info("Cleaned up unhealthy replicas from old RSes", "count", cleanupCount) allRSs = append(oldRSs, newRS) scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment) if err != nil { return false, nil } logger.V(4).Info("Scaled down old RSes", "deployment", klog.KObj(deployment), "count", scaledDownCount) totalScaledDown := cleanupCount + scaledDownCount return totalScaledDown > 0, nil }
-
获取了旧 ReplicaSet 中的 Pod 数量,如果 Pod 数量为 0,则无法再进行缩容,返回 false。
-
获取了所有 ReplicaSet 的 Pod 数量,并检查是否可以进行缩容。(对于旧ReplicaSet副本不健康或者新的ReplicaSet副本已经正常运行则可以进行缩容)
-
计算了最大可以缩容的 Pod 数量,考虑最大不可用 Pod 数量、新 ReplicaSet 不可用的 Pod 数量以及 Surge Pod 的数量等因素。如果最大可以缩容的 Pod 数量小于等于 0,则无法进行缩容,返回 false。
-
-
如果所有 ReplicaSet 都已经更新完毕,即 DeploymentComplete 返回 true,则调用 cleanupDeployment 方法清理旧的 ReplicaSet。
func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error { logger := klog.FromContext(ctx) if !deploymentutil.HasRevisionHistoryLimit(deployment) { return nil } aliveFilter := func(rs *apps.ReplicaSet) bool { return rs != nil && rs.ObjectMeta.DeletionTimestamp == nil } cleanableRSes := controller.FilterReplicaSets(oldRSs, aliveFilter) diff := int32(len(cleanableRSes)) - *deployment.Spec.RevisionHistoryLimit if diff <= 0 { return nil } sort.Sort(deploymentutil.ReplicaSetsByRevision(cleanableRSes)) logger.V(4).Info("Looking to cleanup old replica sets for deployment", "deployment", klog.KObj(deployment)) for i := int32(0); i < diff; i++ { rs := cleanableRSes[i] if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil { continue } logger.V(4).Info("Trying to cleanup replica set for deployment", "replicaSet", klog.KObj(rs), "deployment", klog.KObj(deployment)) if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { return err } } return nil}
-
最后,调用 syncRolloutStatus 方法同步 Deployment 的状态。
func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { newStatus := calculateStatus(allRSs, newRS, d) if !util.HasProgressDeadline(d) { util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing) } currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing) isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason if util.HasProgressDeadline(d) && !isCompleteDeployment { switch { case util.DeploymentComplete(d, &newStatus): msg := fmt.Sprintf("Deployment %q has successfully progressed.", d.Name) if newRS != nil { msg = fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name) } condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg) util.SetDeploymentCondition(&newStatus, *condition) case util.DeploymentProgressing(d, &newStatus): msg := fmt.Sprintf("Deployment %q is progressing.", d.Name) if newRS != nil { msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name) } condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg) if currentCond != nil { if currentCond.Status == v1.ConditionTrue { condition.LastTransitionTime = currentCond.LastTransitionTime } util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing) } util.SetDeploymentCondition(&newStatus, *condition) case util.DeploymentTimedOut(ctx, d, &newStatus): msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name) if newRS != nil { msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name) } condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg) util.SetDeploymentCondition(&newStatus, *condition) } } if replicaFailureCond := dc.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 { // There will be only one ReplicaFailure condition on the replica set. util.SetDeploymentCondition(&newStatus, replicaFailureCond[0]) } else { util.RemoveDeploymentCondition(&newStatus, apps.DeploymentReplicaFailure) } if reflect.DeepEqual(d.Status, newStatus) { // Requeue the deployment if required. dc.requeueStuckDeployment(ctx, d, newStatus) return nil } newDeployment := d newDeployment.Status = newStatus _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) return err }
maxSurge 和 maxUnavailable 计算原则:maxSurge每次滚动升级允许超出所需规模的最大实例数,maxUnavailable:每次滚动升级允许的最大无效实例数
- 点赞
- 收藏
- 关注作者
评论(0)