在进入组件源码分析前我们先来看下在k8s中创建一个Pod资源对象的流程是怎样的:
kube-apiserver
kube-apiserver 组件,它负责将 k8s 中的“资源组/资源版本/资源”以 RESTful 风格的形式对外暴露并提供服务。k8s 集群中的所有组件都通过kube-apiserver组件操作资源对象。kube-apiserver 属于核心组件,对于整个集群至关重要,它具有以下重要特性:
kube-apiserver 提供了3种 HTTP Server 服务用于将 kube-apiserver组件功能进行解耦,分别是 APIExtensionsSerer、KubeAPIServer、AggregatorServe。它们可以通过 kubectl 工具或接口进行资源管理,架构如下图:
Aggregator 和 APIExtensionsServer 对应两种主要扩展 APIServer 资源的方式,即分别是 AA 和 CRD。
kube-apiserver启动流程
在 kube-apiserver 组件启动过程中首先是将k8s所支持的资源注册到Scheme资源注册表中这样后面启动的逻辑才能够从Scheme资源注册表中拿到资源信息并启动和运行 APIExtensionsServer、KubeAPIServer、AggregatorServer 这3种服务。
然后是通过 Cobra 命令行进行参数解析, k8s 中所有组件都使用 Cobra 库来解析命令行参数。kube-apiserver 组件通过 Cobra 填充配置参数默认值并验证参数。
在 cmd/kube-apiserver/app/server.go 下的 Run 方法启动主逻辑。Run 方法中首先调用了 CreateServerChain 构建服务调用链,然后执行 server.PrepareRun()。
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
prepared, err := server.PrepareRun()
if err != nil {
return err
}
return prepared.Run(stopCh)
}
调用链中的内容代码如下:
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
// 1、为 kubeAPIServer 创建配置
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err
}
// 2、判断是否配置了 APIExtensionsServer,创建 apiExtensionsConfig
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
// 3、初始化 APIExtensionsServer
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
// 4、初始化 KubeAPIServer
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// 5、创建 AggregatorConfig
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
// 6、初始化 AggregatorServer
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
return nil, err
}
return aggregatorServer, nil
该方法实际上完成了server初始化,里面包含APIExtensionsServer、KubeAPIServer、AggregatorServer三个server,它们通过委托模式连接在一起的,初始化过程都是类似的,首先为每个server创建对应的config,然后初始化 http server,最终返回 aggregatorapiserver.APIAggregator 实例。初始化流程主要有:http filter chain 的配置、API Group 的注册、http path与handler 的关联以及 handler 后端存储 etcd 的配置。
我将上述 CreateServerChain 构建服务调用链方法做成了流程图,见下图:
server.PrepareRun()主要完成了健康检查、存活检查和OpenAPI路由的注册工作,最后调用 prepared.Run 方法来启动安全的 http server。
kube-apiserver 权限控制及认证
kube-apiserver 作为k8s集群请求入口,接收集群中组件和客户端访问请求, kube-apiserver 对接口访问请求提供了3种安全权限控制,每个请求都需要经过认证、授权及准入控制器才有权限操作资源对象。
认证
k8s支持同时开启多个认证功能,目前 kube-apiserver 提供了9种认证机制,每一种机制被实例化后成为认证器(Authenticator),认证器被封装在 http.Handler 请求处理函数中。当客户端请求通过任一认证器并返回true时,则表示认证通过。
源码位置:vendor/k8s.io/apiserver/pkg/endpoints/filters/authentication.go 45
func withAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler, apiAuds authenticator.Audiences, metrics recordMetrics) http.Handler {
if auth == nil {
klog.Warning("Authentication is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
...
resp, ok, err := auth.AuthenticateRequest(req)
...
if err != nil || !ok {
...
failed.ServeHTTP(w, req)
return
}
...
// authorization header is not required anymore in case of a successful authentication.
req.Header.Del("Authorization")
req = req.WithContext(genericapirequest.WithUser(req.Context(), resp.User))
handler.ServeHTTP(w, req)
})
}
这里稍微聊下Kubernetes的两类用户:
参考:
《Kubernetes源码剖析》
https://kubernetes.io/zh-cn/docs/concepts/overview/kubernetes-api/
https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kube_apiserver.html#createkubeapiserverconfig
END