上篇文章 kubernetes service 原理解析 已经分析了 service 原理以 kube-proxy 中三种模式的原理,本篇文章会从源码角度分析 kube-proxy 的设计与实现。
kubernetes 版本: v1.16
kube-proxy 启动流程
前面的文章已经说过 kubernetes 中所有组件都是通过其 run()
方法启动主逻辑的,run()
方法调用之前会进行解析命令行参数、添加默认值等。下面就直接看 kube-proxy 的 run()
方法:
- 若启动时指定了
--write-config-to
参数,kube-proxy 只将启动的默认参数写到指定的配置文件中,然后退出 - 初始化 ProxyServer 对象
- 如果启动参数
--cleanup
设置为 true,则清理 iptables 和 ipvs 规则并退出
k8s.io/kubernetes/cmd/kube-proxy/app/server.go:290
1 | func (o *Options) Run() error { |
Run()
方法中主要调用了 NewProxyServer()
方法来初始化 ProxyServer,然后会调用 runLoop()
启动主循环,继续看初始化 ProxyServer 的具体实现:
- 初始化 iptables、ipvs 相关的 interface
- 若启用了 ipvs 则检查内核版本、ipvs 依赖的内核模块、ipset 版本,内核模块主要包括:
ip_vs
,ip_vs_rr
,ip_vs_wrr
,ip_vs_sh
,nf_conntrack_ipv4
,nf_conntrack
,若没有相关模块,kube-proxy 会尝试使用modprobe
自动加载 - 根据 proxyMode 初始化 proxier,kube-proxy 启动后只运行一种 proxier
k8s.io/kubernetes/cmd/kube-proxy/app/server_others.go:57
1 | func NewProxyServer(o *Options) (*ProxyServer, error) { |
runLoop()
方法主要是启动 proxyServer。
k8s.io/kubernetes/cmd/kube-proxy/app/server.go:311
1 | func (o *Options) runLoop() error { |
o.proxyServer.Run()
中会启动已经初始化好的所有服务:
- 设定进程 OOMScore,可通过命令行配置,默认值为
--oom-score-adj="-999"
- 启动 metric server 和 healthz server,两者分别监听 10256 和 10249 端口
- 设置内核参数
nf_conntrack_tcp_timeout_established
和nf_conntrack_tcp_timeout_close_wait
- 将 proxier 注册到 serviceEventHandler、endpointsEventHandler 中
- 启动 informer 监听 service 和 endpoints 变化
- 执行
s.Proxier.SyncLoop()
,启动 proxier 主循环
k8s.io/kubernetes/cmd/kube-proxy/app/server.go:527
1 | func (s *ProxyServer) Run() error { |
回顾一下整个启动逻辑:
1 | o.Run() --> o.runLoop() --> o.proxyServer.Run() --> s.Proxier.SyncLoop() |
o.Run()
中调用了 NewProxyServer()
来初始化 proxyServer 对象,其中包括初始化每种模式对应的 proxier,该方法最终会调用 s.Proxier.SyncLoop()
执行 proxier 的主循环。
proxier 的初始化
看完了启动流程的逻辑代码,接着再看一下各代理模式的初始化,上文已经提到每种模式都是一个 proxier,即要实现 proxy.Provider
对应的 interface,如下所示:
1 | type Provider interface { |
首先要实现 service、endpoints 和 endpointSlice 对应的 handler,也就是对 OnAdd
、OnUpdate
、OnDelete
、OnSynced
四种方法的处理,详细的代码在下文进行讲解。EndpointSlice 是在 v1.16 中新加入的一个 API。Sync()
和 SyncLoop()
是主要用来处理iptables 规则的方法。
iptables proxier 初始化
首先看 iptables 模式的 NewProxier()
方法,其函数的具体执行逻辑为:
- 设置相关的内核参数
route_localnet
、bridge-nf-call-iptables
- 生成 masquerade 标记
- 设置默认调度算法 rr
- 初始化 proxier 对象
- 使用
BoundedFrequencyRunner
初始化 proxier.syncRunner,将 proxier.syncProxyRules 方法注入,BoundedFrequencyRunner
是一个管理器用于执行用户注入的函数,可以指定运行的时间策略。
k8s.io/kubernetes/pkg/proxy/iptables/proxier.go:249
1 | func NewProxier(ipt utiliptables.Interface, |
ipvs proxier 初始化
ipvs NewProxier()
方法主要逻辑为:
- 设定内核参数,
route_localnet
、br_netfilter
、bridge-nf-call-iptables
、conntrack
、conn_reuse_mode
、ip_forward
、arp_ignore
、arp_announce
等 - 和 iptables 一样,对于 SNAT iptables 规则生成 masquerade 标记
- 设置默认调度算法 rr
- 初始化 proxier 对象
- 初始化 ipset 规则
- 初始化 syncRunner 将 proxier.syncProxyRules 方法注入
- 启动
gracefuldeleteManager
定时清理 RS (realServer) 记录
k8s.io/kubernetes/pkg/proxy/ipvs/proxier.go:316
1 | func NewProxier(ipt utiliptables.Interface, |
userspace proxier 初始化
userspace NewProxier()
方法主要逻辑为:
- 初始化 iptables 规则
- 初始化 proxier
- 初始化 syncRunner 将 proxier.syncProxyRules 方法注入
k8s.io/kubernetes/pkg/proxy/userspace/proxier.go:187
1 | func NewProxier(......) (*Proxier, error) { |
proxier 接口实现
handler 的实现
上文已经提到过每种 proxier 都需要实现 interface 中的几个方法,首先看一下 ServiceHandler
、EndpointsHandler
和 EndpointSliceHandler
相关的,对于 service、endpoints 和 endpointSlices 三种对象都实现了 OnAdd
、OnUpdate
、OnDelete
和 OnSynced
方法。
1 | // 1.service 相关的方法 |
在启动逻辑的 Run()
方法中 proxier 已经被注册到了 serviceConfig、endpointsConfig、endpointSliceConfig 中,当启动 informer,cache 同步完成后会调用 OnSynced()
方法,之后当 watch 到变化后会调用 proxier 中对应的 OnUpdate()
方法进行处理,OnSynced()
会直接调用 proxier.syncProxyRules()
来刷新iptables 规则,而 OnUpdate()
会调用 proxier.syncRunner.Run()
方法,其最终也是调用 proxier.syncProxyRules()
方法刷新规则的,这种转换是在 BoundedFrequencyRunner
中体现出来的,下面看一下具体实现。
Sync() 以及 SyncLoop() 的实现
每种 proxier 的 Sync()
以及 SyncLoop()
方法如下所示,都是调用 syncRunner 中的相关方法,而 syncRunner 在前面的 NewProxier()
中已经说过了,syncRunner 是调用 async.NewBoundedFrequencyRunner()
方法初始化,至此,基本上可以确定了所有的核心都是在 BoundedFrequencyRunner
中实现的。
1 | func NewProxier() (*Proxier, error) { |
NewBoundedFrequencyRunner()
是其初始化的函数,其中的参数 minInterval
和 maxInterval
分别对应 proxier 中的 minSyncPeriod
和 syncPeriod
,两者的默认值分别为 0s 和 30s,其值可以使用 --iptables-min-sync-period
和 --iptables-sync-period
启动参数来指定。
k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:134
1 | func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner { |
在启动流程 Run()
方法最后调用的 s.Proxier.SyncLoop()
最终调用的是 BoundedFrequencyRunner
的 Loop()
方法,如下所示:
k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:169
1 | func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { |
proxier 的 OnUpdate()
中调用的 syncRunner.Run()
其实只是在 bfr.run 这个带 buffer 的 channel 中发送了一条数据,在 BoundedFrequencyRunner
的 Loop()
方法中接收到该数据后会调用 bfr.tryRun()
进行处理:
k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:191
1 | func (bfr *BoundedFrequencyRunner) Run() { |
而 tryRun()
方法才是最终调用 syncProxyRules()
刷新iptables 规则的。
k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:211
1 | func (bfr *BoundedFrequencyRunner) tryRun() { |
通过以上分析可知,syncProxyRules()
是每个 proxier 的核心方法,启动 informer cache 同步完成后会直接调用 proxier.syncProxyRules()
刷新iptables 规则,之后如果 informer watch 到相关对象的变化后会调用 BoundedFrequencyRunner
的 tryRun()
来刷新iptables 规则,定时器每 30s 会执行一次iptables 规则的刷新。
总结
本文主要介绍了 kube-proxy 的启动逻辑以及三种模式 proxier 的初始化,还有最终调用刷新iptables 规则的 BoundedFrequencyRunner,可以看到其中的代码写的很巧妙。而每种模式下的iptables 规则是如何创建、刷新以及转发的是如何实现的会在后面的文章中进行分析。