在前面的文章中已经介绍了 deployment controller 的设计与实现,deployment 控制的是 replicaset,而 replicaset 控制 pod 的创建与删除,deployment 通过控制 replicaset 实现了滚动更新、回滚等操作。而 replicaset 会直接控制 pod 的创建与删除,本文会继续从源码层面分析 replicaset 的设计与实现。
在分析源码前先考虑一下 replicaset 的使用场景,在平时的操作中其实我们并不会直接操作 replicaset,replicaset 也仅有几个简单的操作,创建、删除、更新等,但其地位是非常重要的,replicaset 的主要功能就是通过 add/del pod 来达到期望的状态。
ReplicaSetController 源码分析
kubernetes 版本: v1.16
启动流程
首先来看 replicaSetController 对象初始化以及启动的代码,在 startReplicaSetController 中有两个比较重要的变量:
- BurstReplicas:用来控制在一个 syncLoop 过程中 rs 最多能创建的 pod 数量,设置上限值是为了避免单个 rs 影响整个系统,默认值为 500;
- ConcurrentRSSyncs:指的是需要启动多少个 goroutine 处理 informer 队列中的对象,默认值为 5;
k8s.io/kubernetes/cmd/kube-controller-manager/app/apps.go:69
1
2
3
4
5
6
7
8
9
10
11
12func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}
下面是 replicaSetController 初始化的具体步骤,可以看到其会监听 pod 以及 rs 两个对象的事件。
k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:109
1 | func NewReplicaSetController(......) *ReplicaSetController { |
replicaSetController 初始化完成后会调用 Run
方法启动 5 个 goroutine 处理 informer 队列中的事件并进行 sync 操作,kube-controller-manager 中每个 controller 的启动操作都是如下所示流程。
k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:177
1 | func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { |
EventHandler
初始化 replicaSetController 时,其中有一个 expectations
字段,这是 rs 中一个比较特殊的机制,为了说清楚 expectations,先来看一下 controller 中所注册的 eventHandler,replicaSetController 会 watch pod 和 replicaSet 两个对象,eventHandler 中注册了对这两种对象的 add、update、delete 三个操作。
addPod
- 1、判断 pod 是否处于删除状态;
- 2、获取该 pod 关联的 rs 以及 rsKey,入队 rs 并更新 rsKey 的 expectations;
- 3、若 pod 对象没体现出关联的 rs 则为孤儿 pod,遍历 rsList 查找匹配的 rs,若该 rs.Namespace == pod.Namespace 并且 rs.Spec.Selector 匹配 pod.Labels,则说明该 pod 应该与此 rs 关联,将匹配的 rs 入队;
k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:255
1 | func (rsc *ReplicaSetController) addPod(obj interface{}) { |
updatePod
- 1、如果 pod label 改变或者处于删除状态,则直接删除;
- 2、如果 pod 的 OwnerReference 发生改变,此时 oldRS 需要创建 pod,将 oldRS 入队;
- 3、获取 pod 关联的 rs,入队 rs,若 pod 当前处于 ready 并非 available 状态,则会再次将该 rs 加入到延迟队列中,因为 pod 从 ready 到 available 状态需要触发一次 status 的更新;
- 4、否则为孤儿 pod,遍历 rsList 查找匹配的 rs,若找到则将 rs 入队;
k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:298
1 | func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { |
deletePod
- 1、确认该对象是否为 pod;
- 2、判断是否为孤儿 pod;
- 3、获取其对应的 rs 以及 rsKey;
- 4、更新 expectations 中 rsKey 的 del 值;
- 5、将 rs 入队;
k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:372
1 | func (rsc *ReplicaSetController) deletePod(obj interface{}) { |
AddRS 和 DeleteRS
以上两个操作仅仅是将对应的 rs 入队。
#####UpdateRS
其实 updateRS 也仅仅是将对应的 rs 进行入队,不过多了一个打印日志的操作,如下所示:
k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:232
1 | func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { |
至于 expectations 机制会在下文进行分析。
syncReplicaSet
syncReplicaSet 是 controller 的核心方法,它会驱动 controller 所控制的对象达到期望状态,主要逻辑如下所示:
- 1、根据 ns/name 获取 rs 对象;
- 2、调用 expectations.SatisfiedExpectations 判断是否需要执行真正的 sync 操作;
- 3、获取所有 pod list;
- 4、根据 pod label 进行过滤获取与该 rs 关联的 pod 列表,对于其中的孤儿 pod 若与该 rs label 匹配则进行关联,若已关联的 pod 与 rs label 不匹配则解除关联关系;
- 5、调用 manageReplicas 进行同步 pod 操作,add/del pod;
- 6、计算 rs 当前的 status 并进行更新;
- 7、若 rs 设置了 MinReadySeconds 字段则将该 rs 加入到延迟队列中;
k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:562
1 | func (rsc *ReplicaSetController) syncReplicaSet(key string) error { |
在 syncReplicaSet
方法中有几个重要的操作分别为:rsc.expectations.SatisfiedExpectations
、rsc.manageReplicas
、calculateStatus
,下面一一进行分析。
SatisfiedExpectations
该方法主要判断 rs 是否需要执行真正的同步操作,若需要 add/del pod 或者 expectations 已过期则需要进行同步操作。
k8s.io/kubernetes/pkg/controller/controller_utils.go:181
1 | func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool { |
manageReplicas
manageReplicas 是最核心的方法,它会计算 replicaSet 需要创建或者删除多少个 pod 并调用 apiserver 的接口进行操作,在此阶段仅仅是调用 apiserver 的接口进行创建,并不保证 pod 成功运行,如果在某一轮,未能成功创建的所有 Pod 对象,则不再创建剩余的 pod。一个周期内最多只能创建或删除 500 个 pod,若超过上限值未创建完成的 pod 数会在下一个 syncLoop 继续进行处理。
该方法主要逻辑如下所示:
- 1、计算已存在 pod 数与期望数的差异;
- 2、如果 diff < 0 说明 rs 实际的 pod 数未达到期望值需要继续创建 pod,首先会将需要创建的 pod 数在 expectations 中进行记录,然后调用 slowStartBatch 创建所需要的 pod,slowStartBatch 以指数级增长的方式批量创建 pod,创建 pod 过程中若出现 timeout err 则忽略,若为其他 err 则终止创建操作并更新 expectations;
- 3、如果 diff > 0 说明可能是一次缩容操作需要删除多余的 pod,如果需要删除全部的 pod 则直接进行删除,否则会通过 getPodsToDelete 方法筛选出需要删除的 pod,具体的筛选策略在下文会讲到,然后并发删除这些 pod,对于删除失败操作也会记录在 expectations 中;
在 slowStartBatch
中会调用 rsc.podControl.CreatePodsWithControllerRef
方法创建 pod,若创建 pod 失败会判断是否为创建超时错误,或者可能是超时后失败,但此时认为超时并不影响后续的批量创建动作,大家知道,创建 pod 操作提交到 apiserver 后会经过认证、鉴权、以及动态访问控制三个步骤,此过程有可能会超时,即使真的创建失败了,等到 expectations 过期后在下一个 syncLoop 时会重新创建。
k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:459
1 | func (rsc *ReplicaSetController) manageReplicas(......) error { |
slowStartBatch
会批量创建出已计算出的 diff pod 数,创建的 pod 数依次为 1、2、4、8……,呈指数级增长,其方法如下所示:
k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:658
1 | func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) { |
若 diff > 0 时再删除 pod 阶段会调用getPodsToDelete
对 pod 进行筛选操作,此阶段会选出最劣质的 pod,下面是用到的 6 种筛选方法:
- 1、判断是否绑定了 node:Unassigned < assigned;
- 2、判断 pod phase:PodPending < PodUnknown < PodRunning;
- 3、判断 pod 状态:Not ready < ready;
- 4、若 pod 都为 ready,则按运行时间排序,运行时间最短会被删除:empty time < less time < more time;
- 5、根据 pod 重启次数排序:higher restart counts < lower restart counts;
- 6、按 pod 创建时间进行排序:Empty creation time pods < newer pods < older pods;
上面的几个排序规则遵循互斥原则,从上到下进行匹配,符合条件则排序完成,代码如下所示:
k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:684
1 | func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod { |
k8s.io/kubernetes/pkg/controller/controller_utils.go:735
1 | type ActivePods []*v1.Pod |
calculateStatus
calculateStatus 会通过当前 pod 的状态计算出 rs 中 status 字段值,status 字段如下所示:
1 | status: |
k8s.io/kubernetes/pkg/controller/replicaset/replica_set_utils.go:85
1 | func calculateStatus(......) apps.ReplicaSetStatus { |
expectations 机制
通过上面的分析可知,在 rs 每次入队后进行 sync 操作时,首先需要判断该 rs 是否满足 expectations 机制,那么这个 expectations 的目的是什么?其实,rs 除了有 informer 的缓存外,还有一个本地缓存就是 expectations,expectations 会记录 rs 所有对象需要 add/del 的 pod 数量,若两者都为 0 则说明该 rs 所期望创建的 pod 或者删除的 pod 数已经被满足,若不满足则说明某次在 syncLoop 中创建或者删除 pod 时有失败的操作,则需要等待 expectations 过期后再次同步该 rs。
通过上面对 eventHandler 的分析,再来总结一下触发 replicaSet 对象发生同步事件的条件:
- 1、与 rs 相关的:AddRS、UpdateRS、DeleteRS;
- 2、与 pod 相关的:AddPod、UpdatePod、DeletePod;
- 3、informer 二级缓存的同步;
但是所有的更新事件是否都需要执行 sync 操作?对于除 rs.Spec.Replicas 之外的更新操作其实都没必要执行 sync 操作,因为 spec 其他字段和 status 的更新都不需要创建或者删除 pod。
在 sync 操作真正开始之前,依据 expectations 机制进行判断,确定是否要真正地启动一次 sync,因为在 eventHandler 阶段也会更新 expectations 值,从上面的 eventHandler 中可以看到在 addPod 中会调用 rsc.expectations.CreationObserved 更新 rsKey 的 expectations,将其 add 值 -1,在 deletePod 中调用 rsc.expectations.DeletionObserved 将其 del 值 -1。所以等到 sync 时,若 controllerKey(name 或者 ns/name)满足 expectations 机制则进行 sync 操作,而 updatePod 并不会修改 expectations,所以,expectations 的设计就是当需要创建或删除 pod 才会触发对应的 sync 操作,expectations 机制的目的就是减少不必要的 sync 操作。
什么条件下 expectations 机制会满足?
- 1、当 expectations 中不存在 rsKey 时,也就说首次创建 rs 时;
- 2、当 expectations 中 del 以及 add 值都为 0 时,即 rs 所需要创建或者删除的 pod 数都已满足;
- 3、当 expectations 过期时,即超过 5 分钟未进行 sync 操作;
最后再看一下 expectations 中用到的几个方法:
1 | // 创建了一个 pod 说明 expectations 中对应的 key add 期望值需要减少一个 pod, add -1 |
当在 syncLoop 中发现满足条件时,会执行 manageReplicas
方法,在 manageReplicas
中无论是为 rs 创建还是删除 pod 都会调用 ExpectCreations
或 ExpectDeletions
为 rsKey 创建 expectations 对象。
总结
本文主要从源码层面分析了 replicaSetController 的设计与实现,但是不得不说其在设计方面考虑了很多因素,文中只提到了笔者理解了或者思考后稍有了解的一些机制,至于其他设计思想还得自行阅读代码体会。
下面以一个流程图总结下创建 rs 的主要流程。
1 | SatisfiedExpectations |
参考: