本篇文章没有接上篇继续更新 kube-controller-manager,kube-controller-manager 的源码阅读笔记也会继续更新,笔者会同时阅读多个组件的源码,阅读笔记也会按组件进行交叉更新,交叉更新的目的一是为了加深印象避免阅读完后又很快忘记,二是某些代码的功能难以理解,避免死磕,但整体目标是将每个组件的核心代码阅读完。
在前面的文章中已经介绍过 kubelet 的架构以及启动流程,本章会继续介绍 kubelet 中的核心功能,kubelet 中包含数十个 manager 以及对 CNI、CRI、CSI 的调用。每个 manager 的功能各不相同,manager 之间也会有依赖关系,本文会介绍比较简单的 statusManager。
statusManager 源码分析
kubernetes 版本:v1.16
statusManager 的主要功能是将 pod 状态信息同步到 apiserver,statusManage 并不会主动监控 pod 的状态,而是提供接口供其他 manager 进行调用。
statusManager 的初始化
kubelet 在启动流程时会在 NewMainKubelet 方法中初始化其核心组件,包括各种 manager。
k8s.io/kubernetes/pkg/kubelet/kubelet.go:335
1  | func NewMainKubelet() (*Kubelet, error) {  | 
NewManager 是用来初始化 statusManager 对象的,其中参数的功能如下所示:
- kubeClient:用于和 apiserver 交互;
 - podManager:负责内存中 pod 的维护;
 - podStatuses:statusManager 的 cache,保存 pod 与状态的对应关系;
 - podStatusesChannel:当其他组件调用 statusManager 更新 pod 状态时,会将 pod 的状态信息发送到podStatusesChannel 中;
 - apiStatusVersions:维护最新的 pod status 版本号,每更新一次会加1;
 - podDeletionSafety:删除 pod 的接口;
 
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:118
1  | func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager {  | 
在初始化完成后,kubelet 会在 Run 方法中会以 goroutine 的方式启动 statusManager。
k8s.io/kubernetes/pkg/kubelet/kubelet.go:1398
1  | func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {  | 
statusManager 的代码主要在 k8s.io/kubernetes/pkg/kubelet/status/ 目录中,其对外暴露的接口有以下几个:
1  | type Manager interface {  | 
pod 对应的 status 字段如下所示:
1  | status:  | 
然后继续看 statusManager 的启动方法 start, 其主要逻辑为:
- 1、设置定时器,
syncPeriod默认为 10s; - 2、启动 
wait.Forevergoroutine 同步 pod 的状态,有两种同步方式,第一种是当监听到某个 pod 状态改变时会调用m.syncPod进行同步,第二种是当触发定时器时调用m.syncBatch进行批量同步; 
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:147
1  | func (m *manager) Start() {  | 
syncPod
syncPod 是用来同步 pod 最新状态至 apiserver 的方法,主要逻辑为:
- 1、调用 
m.needsUpdate判断是否需要同步状态,若apiStatusVersions中的 status 版本号小于当前接收到的 status 版本号或者apistatusVersions中不存在该 status 版本号则需要同步,若不需要同步则继续检查 pod 是否处于删除状态,若处于删除状态调用m.podDeletionSafety.PodResourcesAreReclaimed将 pod 完全删除; - 2、从 apiserver 获取 pod 的 oldStatus;
 - 3、检查 pod 
oldStatus与currentStatus的 uid 是否相等,若不相等则说明 pod 被重建过; - 4、调用 
statusutil.PatchPodStatus同步 pod 最新的 status 至 apiserver,并将返回的 pod 作为 newPod; - 5、检查 newPod 是否处于 terminated 状态,若处于 terminated 状态则调用 apiserver 接口进行删除并从 cache 中清除,删除后 pod 会进行重建;
 
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:514
1  | func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {  | 
needsUpdate
needsUpdate 方法主要是检查 pod 的状态是否需要更新,以及检查当 pod 处于 terminated 状态时保证 pod 被完全删除。
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:570
1  | func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {  | 
PodResourcesAreReclaimed
PodResourcesAreReclaimed 检查 pod 在 node 上占用的所有资源是否已经被回收,其主要逻辑为:
- 1、检查 pod 中的所有 container 是否都处于非 running 状态;
 - 2、从 podCache 中获取 podStatus,通过 podStatus 检查 pod 中的 container 是否已被完全删除;
 - 3、检查 pod 的 volume 是否被清理;
 - 4、检查 pod 的 cgroup 是否被清理;
 - 5、若以上几个检查项都通过说明在 kubelet 端 pod 已被完全删除;
 
k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:900
1  | func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {  | 
syncBatch
syncBatch 是定期将 statusManager 缓存 podStatuses 中的数据同步到 apiserver 的方法,主要逻辑为:
- 1、调用 
m.podManager.GetUIDTranslations从 podManager 中获取 mirrorPod uid 与 staticPod uid 的对应关系; - 2、从 apiStatusVersions 中清理已经不存在的 pod,遍历 apiStatusVersions,检查 podStatuses 以及 mirrorToPod 中是否存在该对应的 pod,若不存在则从 apiStatusVersions 中删除;
 - 3、遍历 podStatuses,首先调用 
needsUpdate检查 pod 的状态是否与 apiStatusVersions 中的一致,然后调用needsReconcile检查 pod 的状态是否与 podManager 中的一致,若不一致则将需要同步的 pod 加入到 updatedStatuses 列表中; - 4、遍历 updatedStatuses 列表,调用 
m.syncPod方法同步状态; 
syncBatch 主要是将 statusManage cache 中的数据与 apiStatusVersions 和 podManager 中的数据进行对比是否一致,若不一致则以 statusManage cache 中的数据为准同步至 apiserver。
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:469
1  | func (m *manager) syncBatch() {  | 
syncBatch 中主要调用了两个方法 needsUpdate 和 needsReconcile ,needsUpdate 在上文中已经介绍过了,下面介绍 needsReconcile 方法的主要逻辑。
needsReconcile
needsReconcile 对比当前 pod 的状态与 podManager 中的状态是否一致,podManager 中保存了 node 上 pod 的 object,podManager 中的数据与 apiserver 是一致的,needsReconcile  主要逻辑为:
- 1、通过 uid 从 podManager 中获取 pod 对象;
 - 2、检查 pod 是否为 static pod,若为 static pod 则获取其对应的 mirrorPod;
 - 3、格式化 pod status subResource;
 - 4、检查 podManager 中的 status 与 statusManager cache 中的 status 是否一致,若不一致则以 statusManager 为准进行同步;
 
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:598
1  | func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {  | 
以上就是 statusManager 同步 pod status 的主要逻辑,下面再了解一下 statusManager 对其他组件暴露的方法。
SetPodStatus
SetPodStatus 是为 pod 设置 status subResource 并会触发同步操作,主要逻辑为:
- 1、检查 
pod.Status.Conditions中的类型是否为 kubelet 创建的,kubelet 会创建ContainersReady、Initialized、Ready、PodScheduled和Unschedulable五种类型的 conditions; - 2、调用 
m.updateStatusInternal触发更新操作; 
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:179
1  | func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {  | 
updateStatusInternal
statusManager 对外暴露的方法中触发状态同步的操作都是由 updateStatusInternal 完成的,updateStatusInternal 会更新 statusManager 的 cache 并会将 newStatus 发送到 m.podStatusChannel 中,然后 statusManager 会调用 syncPod 方法同步到 apiserver。
- 1、从 cache 中获取 oldStatus;
 - 2、检查 
ContainerStatuses和InitContainerStatuses是否合法; - 3、为 status 设置 
ContainersReady、PodReady、PodInitialized、PodScheduledconditions; - 4、设置 status 的 
StartTime; - 5、格式化 status;
 - 6、将 newStatus 添加到 statusManager 的 cache podStatuses 中;
 - 7、将 newStatus 发送到 
m.podStatusChannel中; 
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:362
1  | func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {  | 
SetPodStatus 方法主要会用在 kubelet 的主 syncLoop 中,并在 syncPod 方法中创建 pod 时使用。
SetContainerReadiness
SetContainerReadiness 方法会设置 pod status subResource 中 container 是否为 ready 状态,其主要逻辑为:
- 1、获取 pod 对象;
 - 2、从 
m.podStatuses获取 oldStatus; - 3、通过 containerID 从 pod 中获取 containerStatus;
 - 4、若 container status 为 Ready 直接返回,此时该 container 状态无须更新;
 - 5、添加 PodReadyCondition 和 ContainersReadyCondition;
 - 6、调用 
m.updateStatusInternal触发同步操作; 
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:198
1  | func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {  | 
SetContainerReadiness 方法主要被用在 proberManager 中,关于 proberManager 的功能会在后文说明。
SetContainerStartup
SetContainerStartup 方法会设置 pod status subResource 中 container 是否为 started 状态,主要逻辑为:
- 1、通过 podUID 从 podManager 中获取 pod 对象;
 - 2、从 statusManager 中获取 pod 的 oldStatus;
 - 3、检查要更新的 container 是否存在;
 - 4、检查目标 container 的 started 状态是否已为期望值;
 - 5、设置目标 container 的 started 状态;
 - 6、调用 
m.updateStatusInternal触发同步操作; 
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:255
1  | func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {  | 
SetContainerStartup 方法也是主要被用在 proberManager 中。
TerminatePod
TerminatePod 方法的主要逻辑是把 pod .status.containerStatuses 和 .status.initContainerStatuses 中 container 的 state 置为 Terminated 状态并触发状态同步操作。
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:312
1  | func (m *manager) TerminatePod(pod *v1.Pod) {  | 
TerminatePod 方法主要会用在 kubelet 的主 syncLoop 中。
RemoveOrphanedStatuses
RemoveOrphanedStatuses 的主要逻辑是从 statusManager 缓存 podStatuses 中删除对应的 pod。
k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:457
1  | func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {  | 
总结
本文主要介绍了 statusManager 的功能以及使用,其功能其实非常简单,当 pod 状态改变时 statusManager 会将状态同步到 apiserver,statusManager 也提供了多个接口供其他组件调用,当其他组件需要改变 pod 的状态时会将 pod 的 status 信息发送到 statusManager 进行同步。