kubelet 启动流程分析

本来这篇文章会继续讲述 kubelet 中的主要模块,但由于网友反馈能不能先从 kubelet 的启动流程开始,kubelet 的启动流程在很久之前基于 v1.12 写过一篇文章,对比了 v1.16 中的启动流程变化不大,但之前的文章写的比较简洁,本文会重新分析 kubelet 的启动流程。

Kubelet 启动流程

kubernetes 版本:v1.16

kubelet 的启动比较复杂,首先还是把 kubelet 的启动流程图放在此处,便于在后文中清楚各种调用的流程:

NewKubeletCommand

首先从 kubelet 的 main 函数开始,其中调用的 NewKubeletCommand 方法主要负责获取配置文件中的参数,校验参数以及为参数设置默认值。主要逻辑为:

  • 1、解析命令行参数;
  • 2、为 kubelet 初始化 feature gates 参数;
  • 3、加载 kubelet 配置文件;
  • 4、校验配置文件中的参数;
  • 5、检查 kubelet 是否启用动态配置功能;
  • 6、初始化 kubeletDeps,kubeletDeps 包含 kubelet 运行所必须的配置,是为了实现 dependency injection,其目的是为了把 kubelet 依赖的组件对象作为参数传进来,这样可以控制 kubelet 的行为;
  • 7、调用 Run 方法;

k8s.io/kubernetes/cmd/kubelet/app/server.go:111

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
func NewKubeletCommand() *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)

// 1、kubelet配置分两部分:
// KubeletFlag: 指那些不允许在 kubelet 运行时进行修改的配置集,或者不能在集群中各个 Nodes 之间共享的配置集。
// KubeletConfiguration: 指可以在集群中各个Nodes之间共享的配置集,可以进行动态配置。
kubeletFlags := options.NewKubeletFlags()
kubeletConfig, err := options.NewKubeletConfiguration()
if err != nil {
klog.Fatal(err)
}

cmd := &cobra.Command{
Use: componentKubelet,
DisableFlagParsing: true,
......
Run: func(cmd *cobra.Command, args []string) {
// 2、解析命令行参数
if err := cleanFlagSet.Parse(args); err != nil {
cmd.Usage()
klog.Fatal(err)
}
......

verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cleanFlagSet)

// 3、初始化 feature gates 配置
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}

if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
klog.Fatal(err)
}

if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
}

// 4、加载 kubelet 配置文件
if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
kubeletConfig, err = loadConfigFile(configFile)
......
}
// 5、校验配置文件中的参数
if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
klog.Fatal(err)
}

// 6、检查 kubelet 是否启用动态配置功能
var kubeletConfigController *dynamickubeletconfig.Controller
if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
func(kc *kubeletconfiginternal.KubeletConfiguration) error {
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
klog.Fatal(err)
}
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
}
}
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// 7、初始化 kubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer)
if err != nil {
klog.Fatal(err)
}

kubeletDeps.KubeletConfigController = kubeletConfigController
stopCh := genericapiserver.SetupSignalHandler()
if kubeletServer.KubeletFlags.ExperimentalDockershim {
if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
klog.Fatal(err)
}
return
}

// 8、调用 Run 方法
if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
klog.Fatal(err)
}
},
}
kubeletFlags.AddFlags(cleanFlagSet)
options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
options.AddGlobalFlags(cleanFlagSet)
......

return cmd
}

Run

该方法中仅仅调用 run 方法执行后面的启动逻辑。

k8s.io/kubernetes/cmd/kubelet/app/server.go:408

1
2
3
4
5
6
7
8
9
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
return fmt.Errorf("failed OS init: %v", err)
}
if err := run(s, kubeDeps, stopCh); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)
}
return nil
}

run

run 方法中主要是为 kubelet 的启动做一些基本的配置及检查工作,主要逻辑为:

  • 1、为 kubelet 设置默认的 FeatureGates,kubelet 所有的 FeatureGates 可以通过命令参数查看,k8s 中处于 Alpha 状态的 FeatureGates 在组件启动时默认关闭,处于 Beta 和 GA 状态的默认开启;
  • 2、校验 kubelet 的参数;
  • 3、尝试获取 kubelet 的 lock file,需要在 kubelet 启动时指定 --exit-on-lock-contention--lock-file,该功能处于 Alpha 版本默认为关闭状态;
  • 4、将当前的配置文件注册到 http server /configz URL 中;
  • 5、检查 kubelet 启动模式是否为 standalone 模式,此模式下不会和 apiserver 交互,主要用于 kubelet 的调试;
  • 6、初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依赖,主要有 KubeClientEventClientHeartbeatClientAuthcadvisorContainerManager
  • 7、检查是否以 root 用户启动;
  • 8、为进程设置 oom 分数,默认为 -999,分数范围为 [-1000, 1000],越小越不容易被 kill 掉;
  • 9、调用 RunKubelet 方法;
  • 10、检查 kubelet 是否启动了动态配置功能;
  • 11、启动 Healthz http server;
  • 12、如果使用 systemd 启动,通知 systemd kubelet 已经启动;

k8s.io/kubernetes/cmd/kubelet/app/server.go:472

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
// 1、为 kubelet 设置默认的 FeatureGates
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
// 2、校验 kubelet 的参数
if err := options.ValidateKubeletServer(s); err != nil {
return err
}

// 3、尝试获取 kubelet 的 lock file
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}
done := make(chan struct{})
if s.LockFilePath != "" {
klog.Infof("acquiring file lock on %q", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
}
if s.ExitOnLockContention {
klog.Infof("watching for inotify events for: %v", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
}
}
// 4、将当前的配置文件注册到 http server /configz URL 中;
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
}

// 5、判断是否为 standalone 模式
standaloneMode := true
if len(s.KubeConfig) > 0 {
standaloneMode = false
}

// 6、初始化 kubeDeps
if kubeDeps == nil {
kubeDeps, err = UnsecuredDependencies(s)
if err != nil {
return err
}
}
if kubeDeps.Cloud == nil {
if !cloudprovider.IsExternal(s.CloudProvider) {
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
......
kubeDeps.Cloud = cloud
}
}

hostName, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
if err != nil {
return err
}
// 7、如果是 standalone 模式将所有 client 设置为 nil
switch {
case standaloneMode:
kubeDeps.KubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil

// 8、为 kubeDeps 初始化 KubeClient、EventClient、HeartbeatClient 模块
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
if err != nil {
return err
}
if closeAllConns == nil {
return errors.New("closeAllConns must be a valid function other than nil")
}
kubeDeps.OnHeartbeatFailure = closeAllConns

kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet client: %v", err)
}

eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet event client: %v", err)
}

heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration

if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
if heartbeatClientConfig.Timeout > leaseTimeout {
heartbeatClientConfig.Timeout = leaseTimeout
}
}
heartbeatClientConfig.QPS = float32(-1)
kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
}
}
// 9、初始化 auth 模块
if kubeDeps.Auth == nil {
auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
}

var cgroupRoots []string

// 10、设置 cgroupRoot
cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}

runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
if err != nil {
} else if runtimeCgroup != "" {
cgroupRoots = append(cgroupRoots, runtimeCgroup)
}
if s.SystemCgroups != "" {
cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}

// 11、初始化 cadvisor
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s. ContainerRuntime, s.RemoteRuntimeEndpoint))
if err != nil {
return err
}
}

makeEventRecorder(kubeDeps, nodeName)

// 12、初始化 ContainerManager
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
s.CgroupRoot = "/"
}
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
}
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
return err
}
var hardEvictionThresholds []evictionapi.Threshold
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
if err != nil {
return err
}
}
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
return err
}

devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
......
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
if err != nil {
return err
}
}

// 13、检查是否以 root 权限启动
if err := checkPermissions(); err != nil {
klog.Error(err)
}

utilruntime.ReallyCrash = s.ReallyCrashForTesting

// 14、为 kubelet 进程设置 oom 分数
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.Warning(err)
}

// 15、调用 RunKubelet 方法执行后续的启动操作
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}

if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
return err
}
}

// 16、启动 Healthz http server
if s.HealthzPort > 0 {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
if err != nil {
klog.Errorf("Starting healthz server failed: %v", err)
}
}, 5*time.Second, wait.NeverStop)
}

if s.RunOnce {
return nil
}

// 17、向 systemd 发送启动信号
go daemon.SdNotify(false, "READY=1")

select {
case <-done:
break
case <-stopCh:
break
}
return nil
}

RunKubelet

RunKubelet 中主要调用了 createAndInitKubelet 方法执行 kubelet 组件的初始化,然后调用 startKubelet 启动 kubelet 中的组件。

k8s.io/kubernetes/cmd/kubelet/app/server.go:989

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
return err
}
makeEventRecorder(kubeDeps, nodeName)

// 1、默认启动特权模式
capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: true,
})

credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)

if kubeDeps.OSInterface == nil {
kubeDeps.OSInterface = kubecontainer.RealOS{}
}

// 2、调用 createAndInitKubelet
k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
......
kubeServer.NodeStatusMaxImages)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}

if kubeDeps.PodConfig == nil {
return fmt.Errorf("failed to create kubelet, pod source config was nil")
}
podCfg := kubeDeps.PodConfig

rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))

if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
klog.Info("Started kubelet as runonce")
} else {
// 3、调用 startKubelet
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
klog.Info("Started kubelet")
}
return nil
}

createAndInitKubelet

createAndInitKubelet 中主要调用了三个方法来完成 kubelet 的初始化:

  • kubelet.NewMainKubelet:实例化 kubelet 对象,并对 kubelet 依赖的所有模块进行初始化;
  • k.BirthCry:向 apiserver 发送一条 kubelet 启动了的 event;
  • k.StartGarbageCollection:启动垃圾回收服务,回收 container 和 images;

k8s.io/kubernetes/cmd/kubelet/app/server.go:1089

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func createAndInitKubelet(......) {
k, err = kubelet.NewMainKubelet(
......
)
if err != nil {
return nil, err
}

k.BirthCry()

k.StartGarbageCollection()

return k, nil
}
kubelet.NewMainKubelet

NewMainKubelet 是初始化 kubelet 的一个方法,主要逻辑为:

  • 1、初始化 PodConfig 即监听 pod 元数据的来源(file,http,apiserver),将不同 source 的 pod configuration 合并到一个结构中;
  • 2、初始化 containerGCPolicy、imageGCPolicy、evictionConfig 配置;
  • 3、启动 serviceInformer 和 nodeInformer;
  • 4、初始化 containerRefManageroomWatcher
  • 5、初始化 kubelet 对象;
  • 6、初始化 secretManagerconfigMapManager
  • 7、初始化 livenessManagerpodManagerstatusManagerresourceAnalyzer
  • 8、调用 kuberuntime.NewKubeGenericRuntimeManager 初始化 containerRuntime
  • 9、初始化 pleg
  • 10、初始化 containerGCcontainerDeletorimageManagercontainerLogManager
  • 11、初始化 serverCertificateManagerprobeManagertokenManagervolumePluginMgrpluginManagervolumeManager
  • 12、初始化 workQueuepodWorkersevictionManager
  • 13、最后注册相关模块的 handler;

NewMainKubelet 中对 kubelet 依赖的所有模块进行了初始化,每个模块对应的功能在上篇文章“kubelet 架构浅析”有介绍,至于每个模块初始化的流程以及功能会在后面的文章中进行详细分析。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:335

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
if kubeCfg.SyncFrequency.Duration <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
}

if kubeCfg.MakeIPTablesUtilChains {
......
}

hostname, err := nodeutil.GetHostname(hostnameOverride)
if err != nil {
return nil, err
}

nodeName := types.NodeName(hostname)
if kubeDeps.Cloud != nil {
......
}

// 1、初始化 PodConfig
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
if err != nil {
return nil, err
}
}

// 2、初始化 containerGCPolicy、imageGCPolicy、evictionConfig
containerGCPolicy := kubecontainer.ContainerGCPolicy{
MinAge: minimumGCAge.Duration,
MaxPerPodContainer: int(maxPerPodContainerCount),
MaxContainers: int(maxContainerCount),
}
daemonEndpoints := &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
}

imageGCPolicy := images.ImageGCPolicy{
MinAge: kubeCfg.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
}

enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
if experimentalNodeAllocatableIgnoreEvictionThreshold {
enforceNodeAllocatable = []string{}
}
thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg. EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: experimentalKernelMemcgNotification,
PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
}
// 3、启动 serviceInformer 和 nodeInformer
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeDeps.KubeClient != nil {
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
go r.Run(wait.NeverStop)
}
serviceLister := corelisters.NewServiceLister(serviceIndexer)

nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if kubeDeps.KubeClient != nil {
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
go r.Run(wait.NeverStop)
}
nodeInfo := &CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}

......

// 4、初始化 containerRefManager、oomWatcher
containerRefManager := kubecontainer.NewRefManager()

oomWatcher := oomwatcher.NewWatcher(kubeDeps.Recorder)
clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
for _, ipEntry := range kubeCfg.ClusterDNS {
ip := net.ParseIP(ipEntry)
if ip == nil {
klog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)
} else {
clusterDNS = append(clusterDNS, ip)
}
}
httpClient := &http.Client{}
parsedNodeIP := net.ParseIP(nodeIP)
protocol := utilipt.ProtocolIpv4
if parsedNodeIP != nil && parsedNodeIP.To4() == nil {
protocol = utilipt.ProtocolIpv6
}

// 5、初始化 kubelet 对象
klet := &Kubelet{......}

if klet.cloud != nil {
klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
}

// 6、初始化 secretManager、configMapManager
var secretManager secret.Manager
var configMapManager configmap.Manager
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy:
secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
secretManager = secret.NewCachingSecretManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
configMapManager = configmap.NewCachingConfigMapManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
case kubeletconfiginternal.GetChangeDetectionStrategy:
secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
default:
return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
}

klet.secretManager = secretManager
klet.configMapManager = configMapManager
if klet.experimentalHostUserNamespaceDefaulting {
klog.Infof("Experimental host user namespace defaulting is enabled.")
}

machineInfo, err := klet.cadvisor.MachineInfo()
if err != nil {
return nil, err
}
klet.machineInfo = machineInfo

imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)

// 7、初始化 livenessManager、podManager、statusManager、resourceAnalyzer
klet.livenessManager = proberesults.NewManager()

klet.podCache = kubecontainer.NewCache()
var checkpointManager checkpointmanager.CheckpointManager
if bootstrapCheckpointPath != "" {
checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
}
}

klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)

klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)

if remoteRuntimeEndpoint != "" {
if remoteImageEndpoint == "" {
remoteImageEndpoint = remoteRuntimeEndpoint
}
}

pluginSettings := dockershim.NetworkPluginSettings{......}

klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)

var legacyLogProvider kuberuntime.LegacyLogProvider

// 8、调用 kuberuntime.NewKubeGenericRuntimeManager 初始化 containerRuntime
switch containerRuntime {
case kubetypes.DockerContainerRuntime:
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
if err != nil {
return nil, err
}
if crOptions.RedirectContainerStreaming {
klet.criHandler = ds
}

server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
return nil, err
}

supported, err := ds.IsCRISupportedLogDriver()
if err != nil {
return nil, err
}
if !supported {
klet.dockerLegacyService = ds
legacyLogProvider = ds
}
case kubetypes.RemoteContainerRuntime:
break
default:
return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
}
runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
if err != nil {
return nil, err
}
klet.runtimeService = runtimeService
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
}

runtime, err := kuberuntime.NewKubeGenericRuntimeManager(......)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime

runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
}
klet.runtimeCache = runtimeCache

if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
klet.StatsProvider = stats.NewCadvisorStatsProvider(......)
} else {
klet.StatsProvider = stats.NewCRIStatsProvider(......)
}
// 9、初始化 pleg
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
klog.Errorf("Pod CIDR update failed %v", err)
}

// 10、初始化 containerGC、containerDeletor、imageManager、containerLogManager
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
if err != nil {
return nil, err
}
klet.containerGC = containerGC
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))

imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions. PodSandboxImage)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager

if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
containerLogManager, err := logs.NewContainerLogManager(
klet.runtimeService,
kubeCfg.ContainerLogMaxSize,
int(kubeCfg.ContainerLogMaxFiles),
)
if err != nil {
return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
}
klet.containerLogManager = containerLogManager
} else {
klet.containerLogManager = logs.NewStubContainerLogManager()
}
// 11、初始化 serverCertificateManager、probeManager、tokenManager、volumePluginMgr、pluginManager、volumeManager
if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet. getLastObservedNodeAddresses, certDirectory)
if err != nil {
return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
}
kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
cert := klet.serverCertificateManager.Current()
if cert == nil {
return nil, fmt.Errorf("no serving certificate available for the kubelet")
}
return cert, nil
}
}

klet.probeManager = prober.NewManager(......)
tokenManager := token.NewManager(kubeDeps.KubeClient)

klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
if err != nil {
return nil, err
}
klet.pluginManager = pluginmanager.NewPluginManager(
klet.getPluginsRegistrationDir(), /* sockDir */
klet.getPluginsDir(), /* deprecatedSockDir */
kubeDeps.Recorder,
)

if len(experimentalMounterPath) != 0 {
experimentalCheckNodeCapabilitiesBeforeMount = false
klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
}
klet.volumeManager = volumemanager.NewVolumeManager(......)

// 12、初始化 workQueue、podWorkers、evictionManager
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)

evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)

klet.evictionManager = evictionManager
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
if utilfeature.DefaultFeatureGate.Enabled(features.Sysctls) {
runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
if err != nil {
return nil, err
}

safeAndUnsafeSysctls := append(sysctlwhitelist.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
sysctlsWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls)
if err != nil {
return nil, err
}
klet.admitHandlers.AddPodAdmitHandler(runtimeSupport)
klet.admitHandlers.AddPodAdmitHandler(sysctlsWhitelist)
}

// 13、为 pod 注册相关模块的 handler
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
if err != nil {
return nil, err
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) {
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler())
}
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder),kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
for _, opt := range kubeDeps.Options {
opt(klet)
}

klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))

if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
klet.nodeLeaseController = nodelease.NewController(klet.clock, klet.heartbeatClient, string(klet.nodeName), kubeCfg.NodeLeaseDurationSeconds, klet.onRepeatedHeartbeatFailure)
}

klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewProcMountAdmitHandler(klet.containerRuntime))

klet.kubeletConfiguration = *kubeCfg

klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()

return klet, nil
}

startKubelet

startKubelet 中通过调用 k.Run 来启动 kubelet 中的所有模块以及主流程,然后启动 kubelet 所需要的 http server,在 v1.16 中,kubelet 默认仅启动健康检查端口 10248 和 kubelet server 的端口 10250。

k8s.io/kubernetes/cmd/kubelet/app/server.go:1070

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies,    enableCAdvisorJSONEndpoints, enableServer bool) {
// start the kubelet
go wait.Until(func() {
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)

// start the kubelet server
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg. EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}

至此,kubelet 对象以及其依赖模块在上面的几个方法中已经初始化完成了,除了单独启动了 gc 模块外其余的模块以及主逻辑最后都会在 Run 方法启动,Run 方法的主要逻辑在下文中会进行解释,此处总结一下 kubelet 启动逻辑中的调用关系如下所示:

1
2
3
4
5
6
7
8
9
10
11
                                                                                  |--> NewMainKubelet
|
|--> createAndInitKubelet --|--> BirthCry
| |
|--> RunKubelet --| |--> StartGarbageCollection
| |
| |--> startKubelet --> k.Run
|
NewKubeletCommand --> Run --> run --|--> http.ListenAndServe
|
|--> daemon.SdNotify

Run

Run 方法是启动 kubelet 的核心方法,其中会启动 kubelet 的依赖模块以及主循环逻辑,该方法的主要逻辑为:

  • 1、注册 logServer;
  • 2、判断是否需要启动 cloud provider sync manager;
  • 3、调用 kl.initializeModules 首先启动不依赖 container runtime 的一些模块;
  • 4、启动 volume manager
  • 5、执行 kl.syncNodeStatus 定时同步 Node 状态;
  • 6、调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步;
  • 7、判断是否启用 NodeLease 机制;
  • 8、执行 kl.updateRuntimeUp 定时更新 Runtime 状态;
  • 9、执行 kl.syncNetworkUtil 定时同步 iptables 规则;
  • 10、执行 kl.podKiller 定时清理异常 pod,当 pod 没有被 podworker 正确处理的时候,启动一个goroutine 负责 kill 掉 pod;
  • 11、启动 statusManager
  • 12、启动 probeManager
  • 13、启动 runtimeClassManager
  • 14、启动 pleg
  • 15、调用 kl.syncLoop 监听 pod 变化;

Run 方法中主要调用了两个方法 kl.initializeModuleskl.fastStatusUpdateOnce 来完成启动前的一些初始化,在初始化完所有的模块后会启动主循环。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1398

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// 1、注册 logServer
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.Warning("No api server defined - no node status update will be sent.")
}

// 2、判断是否需要启动 cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}

// 3、调用 kl.initializeModules 首先启动不依赖 container runtime 的一些模块
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
}

// 4、启动 volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

if kl.kubeClient != nil {
// 5、执行 kl.syncNodeStatus 定时同步 Node 状态
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)

// 6、调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步
go kl.fastStatusUpdateOnce()

// 7、判断是否启用 NodeLease 机制
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
go kl.nodeLeaseController.Run(wait.NeverStop)
}
}

// 8、执行 kl.updateRuntimeUp 定时更新 Runtime 状态
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

// 9、执行 kl.syncNetworkUtil 定时同步 iptables 规则
if kl.makeIPTablesUtilChains {
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
}

// 10、执行 kl.podKiller 定时清理异常 pod
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

// 11、启动 statusManager、probeManager、runtimeClassManager
kl.statusManager.Start()
kl.probeManager.Start()

if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}

// 12、启动 pleg
kl.pleg.Start()

// 13、调用 kl.syncLoop 监听 pod 变化
kl.syncLoop(updates, kl)
}

initializeModules

initializeModules 中启动的模块是不依赖于 container runtime 的,并且不依赖于尚未初始化的模块,其主要逻辑为:

  • 1、调用 kl.setupDataDirs 创建 kubelet 所需要的文件目录;
  • 2、创建 ContainerLogsDir /var/log/containers
  • 3、启动 imageManager,image gc 的功能已经在 RunKubelet 中启动了,此处主要是监控 image 的变化;
  • 4、启动 certificateManager,负责证书更新;
  • 5、启动 oomWatcher,监听 oom 并记录事件;
  • 6、启动 resourceAnalyzer

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1319

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (kl *Kubelet) initializeModules() error {
metrics.Register(
kl.runtimeCache,
collectors.NewVolumeStatsCollector(kl),
collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
)
metrics.SetNodeName(kl.nodeName)
servermetrics.Register()

// 1、创建文件目录
if err := kl.setupDataDirs(); err != nil {
return err
}

// 2、创建 ContainerLogsDir
if _, err := os.Stat(ContainerLogsDir); err != nil {
if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
}
}

// 3、启动 imageManager
kl.imageManager.Start()

// 4、启动 certificate manager
if kl.serverCertificateManager != nil {
kl.serverCertificateManager.Start()
}
// 5、启动 oomWatcher.
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("failed to start OOM watcher %v", err)
}

// 6、启动 resource analyzer
kl.resourceAnalyzer.Start()

return nil
}

fastStatusUpdateOnce

fastStatusUpdateOnce 会不断尝试更新 pod CIDR,一旦更新成功会立即执行updateRuntimeUpsyncNodeStatus来进行运行时的更新和节点状态更新。此方法只在 kubelet 启动时执行一次,目的是为了通过更新 pod CIDR,减少节点达到 ready 状态的时延,尽可能快的进行 runtime update 和 node status update。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:2262

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (kl *Kubelet) fastStatusUpdateOnce() {
for {
time.Sleep(100 * time.Millisecond)
node, err := kl.GetNode()
if err != nil {
klog.Errorf(err.Error())
continue
}
if len(node.Spec.PodCIDRs) != 0 {
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err)
continue
}
kl.updateRuntimeUp()
kl.syncNodeStatus()
return
}
}
}
updateRuntimeUp

updateRuntimeUp 方法在容器运行时首次启动过程中初始化运行时依赖的模块,并在 kubelet 的runtimeState中更新容器运行时的启动时间。updateRuntimeUp 方法首先检查 network 以及 runtime 是否处于 ready 状态,如果 network 以及 runtime 都处于 ready 状态,然后调用 initializeRuntimeDependentModules 初始化 runtime 的依赖模块,包括 cadvisorcontainerManagerevictionManagercontainerLogManagerpluginManage等。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:2168

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (kl *Kubelet) updateRuntimeUp() {
kl.updateRuntimeMux.Lock()
defer kl.updateRuntimeMux.Unlock()

// 1、获取 containerRuntime Status
s, err := kl.containerRuntime.Status()
if err != nil {
klog.Errorf("Container runtime sanity check failed: %v", err)
return
}
if s == nil {
klog.Errorf("Container runtime status is nil")
return
}

// 2、检查 network 和 runtime 是否处于 ready 状态
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
} else {
kl.runtimeState.setNetworkState(nil)
}

runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
if runtimeReady == nil || !runtimeReady.Status {
kl.runtimeState.setRuntimeState(err)
return
}
kl.runtimeState.setRuntimeState(nil)
// 3、调用 kl.initializeRuntimeDependentModules 启动依赖模块
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
initializeRuntimeDependentModules

该方法的主要逻辑为:

  • 1、启动 cadvisor
  • 2、获取 CgroupStats;
  • 3、启动 containerManagerevictionManagercontainerLogManager
  • 4、将 CSI Driver 和 Device Manager 注册到 pluginManager,然后启动 pluginManager

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1361

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (kl *Kubelet) initializeRuntimeDependentModules() {
// 1、启动 cadvisor
if err := kl.cadvisor.Start(); err != nil {
......
}

// 2、获取 CgroupStats
kl.StatsProvider.GetCgroupStats("/", true)

node, err := kl.getNodeAnyWay()
if err != nil {
klog.Fatalf("Kubelet failed to get node info: %v", err)
}

// 3、启动 containerManager、evictionManager、containerLogManager
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
klog.Fatalf("Failed to start ContainerManager %v", err)
}

kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)

kl.containerLogManager.Start()

kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))

kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// 4、启动 pluginManager
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
}

小结

Run 方法中可以看到,会直接调用 kl.syncNodeStatuskl.updateRuntimeUp,但在 kl.fastStatusUpdateOnce 中也调用了这两个方法,而在 kl.fastStatusUpdateOnce 中仅执行一次,在 Run 方法中会定期执行。在kl.fastStatusUpdateOnce 中调用的目的就是当 kubelet 首次启动时尽可能快的进行 runtime update 和 node status update,减少节点达到 ready 状态的时延。而在 kl.updateRuntimeUp 中调用的初始化 runtime 依赖模块的方法 kl.initializeRuntimeDependentModules 通过 sync.Once 调用仅仅会被执行一次。

syncLoop

syncLoop 是 kubelet 的主循环方法,它从不同的管道(file,http,apiserver)监听 pod 的变化,并把它们汇聚起来。当有新的变化发生时,它会调用对应的函数,保证 pod 处于期望的状态。

syncLoop 中首先定义了一个 syncTickerhousekeepingTicker,即使没有需要更新的 pod 配置,kubelet 也会定时去做同步和清理 pod 的工作。然后在 for 循环中一直调用 syncLoopIteration,如果在每次循环过程中出现错误时,kubelet 会记录到 runtimeState 中,遇到错误就等待 5 秒中继续循环。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1821

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
syncLoopIteration

syncLoopIteration 方法会监听多个 channel,当发现任何一个 channel 有数据就交给 handler 去处理,在 handler 中通过调用 dispatchWork 分发任务。它会从以下几个 channel 中获取消息:

  • 1、configCh:该信息源由 kubeDeps 对象中的 PodConfig 子模块提供,该模块将同时 watch 3 个不同来源的 pod 信息的变化(file,http,apiserver),一旦某个来源的 pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 pod 信息和更新的具体操作;
  • 2、syncCh:定时器,每隔一秒去同步最新保存的 pod 状态;
  • 3、houseKeepingCh:housekeeping 事件的通道,做 pod 清理工作;
  • 4、plegCh:该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态,如果状态发生变化,则这个 channel 产生事件;
  • 5、liveness Manager:健康检查模块发现某个 pod 异常时,kubelet 将根据 pod 的 restartPolicy 自动执行正确的操作;

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1888

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (kl *Kubelet) syncLoopIteration(......) bool {
select {
case u, open := <-configCh:
if !open {
return false
}

switch u.Op {
case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET:
}

if u.Op != kubetypes.RESTORE {
kl.sourcesReady.AddSource(u.Source)
}
case e := <-plegCh:
if isSyncPodWorthy(e) {
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
}
}

if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
break
}
handler.HandlePodSyncs([]*v1.Pod{pod})
}
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
if err := handler.HandlePodCleanups(); err != nil {
klog.Errorf("Failed cleaning pods: %v", err)
}
}
}
return true
}

最后再总结一下启动 kubelet 以及其依赖模块 Run 方法中的调用流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
      |--> kl.cloudResourceSyncManager.Run
|
| |--> kl.setupDataDirs
| |--> kl.imageManager.Start
Run --|--> kl.initializeModules ---|--> kl.serverCertificateManager.Start
| |--> kl.oomWatcher.Start
| |--> kl.resourceAnalyzer.Start
|
|--> kl.volumeManager.Run
| |--> kl.containerRuntime.Status
|--> kl.syncNodeStatus |
| |--> kl.updateRuntimeUp --| |--> kl.cadvisor.Start
| | | |
|--> kl.fastStatusUpdateOnce --| |--> kl.initializeRuntimeDependentModules --|--> kl.containerManager.Start
| | |
| |--> kl.syncNodeStatus |--> kl.evictionManager.Start
| |
|--> kl.updateRuntimeUp |--> kl.containerLogManager.Start
| |
|--> kl.syncNetworkUtil |--> kl.pluginManager.Run
|
|--> kl.podKiller
|
|--> kl.statusManager.Start
|
|--> kl.probeManager.Start
|
|--> kl.runtimeClassManager.Start
|
|--> kl.pleg.Start
|
|--> kl.syncLoop --> kl.syncLoopIteration

总结

本文主要介绍了 kubelet 的启动流程,可以看到 kubelet 启动流程中的环节非常多,kubelet 中也包含了非常多的模块,后续在分享 kubelet 源码的文章中会先以 Run 方法中启动的所有模块为主,各个击破。