最近加入云原生社区组织的k8s源码研习社,开始学习k8s底层源码,并整理成笔记。欢迎感兴趣的同学一起加入,共同学习进步。群里和社区里有各种大佬,随时可以帮你答疑解惑。github.com/cloudnative…
上一篇整理了client-go框架的Informer机制,informer源码分析, 同时api-server用到了go-restful这个web框架,go-restful的原理和源码参考go-restful 源码分析
后续的源码分析链路很长,很容易陷进去出不来,建议随时根据这张图查看目前分析到哪一步了。
kube-apiserver作为k8s最核心的组件,是各个组件之间沟通的桥梁,各个组件不会直接通信,而是都要经过api-server做中转。详见之前的另一篇博客,本文从源码角度分析api-server
kube-apiserver提供了3种http server服务,用于将庞大的kube-apiserver组件功能进行解耦,这三种Http Server分别是:
服务名 | 概述 | 对象管理 | 资源注册表 |
---|---|---|---|
KubeAPIServer | 核心服务,提供k8s内置核心资源服务,不允许开发者随意修改,如:Pod,Service等 | Master | Legacyscheme.Scheme |
APIExtensionsServer | API扩展服务,该服务提供了CRD自定义资源服务 | CustomResourceDefinitions | extensionsapiserver.Scheme |
AggregatorServer | API聚合服务,提供了聚合服务 | APIAggregator | aggregatorscheme.Scheme |
三种服务底层都依赖GenericAPIServer,通过GenericAPIServer可以将k8s资源与rest api进行映射
kube-apiserver是所有资源控制的入口,启动流程也略复杂,启动的代码逻辑可以分为9个步骤:
其中,三个sever是通过委托模式连接在一起的,初始化的过程都是类似的,包括:
kube-apiserver组件是一个单独的进程,启动的入口函数如下:
源码位置:cmd/kube-apiserver/apiserver.go
import (
...
// 引入legacyscheme,内部的init方法实现资源注册表的注册
"k8s.io/kubernetes/pkg/api/legacyscheme"
// 引入master,内部的init方法实现k8s所有资源的注册
"k8s.io/kubernetes/pkg/master"
...
)
func main() {
rand.Seed(time.Now().UnixNano())
// 创建一个带有默认参数的Cobra Command对象
command := app.NewAPIServerCommand()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
复制代码
kube-apiserver组件启动后的第一件事情是将k8s所支持的资源注册到Scheme资源注册表中,这样后面的启动逻辑才能拿到资源信息,并启动和运行前面介绍的三个服务
资源的注册过程不是函数调用触发的,而是通过import和init机制触发的。前面第0步提到过的import "k8s.io/kubernetes/pkg/api/legacyscheme"
资源注册包括两步:
代码路径:pkg/api/legacyscheme/scheme.go
定义了三个全局变量服务于kube-apiserver,组件在任何地方都可以调用
package legacyscheme
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
)
// Scheme资源注册表
var Scheme = runtime.NewScheme()
// Codec编解码器
var Codecs = serializer.NewCodecFactory(Scheme)
// 参数编解码器
var ParameterCodec = runtime.NewParameterCodec(Scheme)
apiserver启动时导入了master包,前面第0步介绍的import "k8s.io/kubernetes/pkg/master"
master包中的import_known_versions.go调用了k8s资源下的install包,通过导入包触发初始化函数。 每种资源下都定义install包,被引用时触发init函数完成资源注册过程
源码位置:pkg/master/import_known_versions.go
import (
// These imports are the API groups the API server will support.
_ "k8s.io/kubernetes/pkg/apis/admission/install"
_ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
_ "k8s.io/kubernetes/pkg/apis/apps/install"
_ "k8s.io/kubernetes/pkg/apis/authentication/install"
_ "k8s.io/kubernetes/pkg/apis/authorization/install"
_ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/certificates/install"
_ "k8s.io/kubernetes/pkg/apis/coordination/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
_ "k8s.io/kubernetes/pkg/apis/discovery/install"
_ "k8s.io/kubernetes/pkg/apis/events/install"
_ "k8s.io/kubernetes/pkg/apis/extensions/install"
_ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
_ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
_ "k8s.io/kubernetes/pkg/apis/networking/install"
_ "k8s.io/kubernetes/pkg/apis/node/install"
_ "k8s.io/kubernetes/pkg/apis/policy/install"
_ "k8s.io/kubernetes/pkg/apis/rbac/install"
_ "k8s.io/kubernetes/pkg/apis/scheduling/install"
_ "k8s.io/kubernetes/pkg/apis/settings/install"
_ "k8s.io/kubernetes/pkg/apis/storage/install"
)
以Core资源为例,查看install。 源码位置:pkg/apis/core/install/install.go
func init() {
// legacyscheme.Scheme是前面介绍的全局资源注册中心
Install(legacyscheme.Scheme)
}
// Install registers the API group and adds types to a scheme
func Install(scheme *runtime.Scheme) {
// 注册资源组
utilruntime.Must(core.AddToScheme(scheme))
utilruntime.Must(v1.AddToScheme(scheme))
// 注册版本顺序
utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion))
}
复制代码
k8s中所有组件统一使用cobra来解析命令行参数。kube-apiserver组件通过Cobra填充配置参数默认值并验证参数,前面第0步介绍的app.NewAPIServerCommand()
源码位置:cmd/kube-apiserver/app/server.go
func NewAPIServerCommand() *cobra.Command {
// 初始化各个模块的默认配置,内部调用了各个模块各自的默认配置
s := options.NewServerRunOptions()
cmd := &cobra.Command{
...
RunE: func(cmd *cobra.Command, args []string) error {
...
// 设置默认参数配置
completedOptions, err := Complete(s)
// 验证参数合法性
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
// 启动运行,常驻进程
// Run函数后面专门介绍
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
...
}
...
}
// genericapiserver.SetupSignalHandler()
func SetupSignalHandler() <-chan struct{} {
return SetupSignalContext().Done()
}
func SetupSignalContext() context.Context {
...
// 监听操作系统信号os.Interrupt和syscall.SIGTERM
// 并将监听的信号与stopChan绑定,确保进程终止时,groutine优雅退出
signal.Notify(shutdownHandler, shutdownSignals...)
...
}
复制代码
核心流程是前面介绍的kube-apiserver组件中三大服务的配置和创建,具体包括:
apiserver通用配置是kube-apiserver不同模块实例化所需的配置,具体包括:
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// 创建服务链
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
// 预运行
prepared, err := server.PrepareRun()
if err != nil {
return err
}
// 正式运行
return prepared.Run(stopCh)
}
// 创建服务链
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
...
// 创建 kubeapi-server 配置
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil {
return nil, err
}
// 创建 kubeapi-extension-server 配置
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
// 创建 kubeapi-extension-server 服务
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
// 创建 kubeapi-server 服务
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
// 创建 aggregator-server 配置
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
// 创建 aggregator-server 服务
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if insecureServingInfo != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
// 启动服务
if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
return nil, err
}
}
return aggregatorServer, nil
}
复制代码
func CreateKubeAPIServerConfig(...) (...) {
// 构建通用配置
genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
...
// 设置端口范围
serviceIPRange, apiServerServiceIP, err := master.ServiceIPRange(s.PrimaryServiceClusterIPRange)
...
// 构造master.Config
config := &master.Config{
GenericConfig: genericConfig,
ExtraConfig: master.ExtraConfig{
APIResourceConfigSource: storageFactory.APIResourceConfigSource,
...
},
}
...
}
复制代码
func buildGenericConfig(...) (...) {
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
// 配置启动、禁用GV
genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()
...
// openapi/swagger配置
// OpenAPIConfig用于生成OpenAPI规范
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
// etcd配置
// storageFactoryConfig对象定义了kube-apiserver与etcd的交互方式,如:etcd认证、地址、存储前缀等
// 该对象也定义了资源存储方式,如:资源信息、资源编码信息、资源状态等
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
if err != nil {
lastErr = err
return
}
storageFactory, lastErr = completedStorageFactoryConfig.New()
if lastErr != nil {
return
}
if genericConfig.EgressSelector != nil {
storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// NewSharedInformerFactory初始化
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// 认证配置
// 内部调用 authenticatorConfig.New()
// k8s提供9种认证机制,每种认证机制被实例化后都成为认证器
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
// 授权配置
// k8e提供6种授权机制,每种授权机制被实例化后都成为授权器
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
// 准入器admission配置
// k8s资源在认证和授权通过,被持久化到etcd之前进入准入控制逻辑
// 准入控制包括:对请求的资源进行自定义操作(校验、修改、拒绝)
// k8s支持31种准入控制
// 准入控制器通过Plugins数据结构统一注册、存放、管理
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
if err != nil {
lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
return
}
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
feature.DefaultFeatureGate,
pluginInitializers...)
if err != nil {
lastErr = fmt.Errorf("failed to initialize admission: %v", err)
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
genericConfig.FlowControl = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
}
return
}
// 认证初始化
func (o *BuiltInAuthenticationOptions) ApplyTo(...) error {
...
authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
...
}
// 根据配置决定9种认证器的初始化
func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
// 定义认证器列表
var authenticators []authenticator.Request
// 下面根据不同的开关,决定是否配置某种认证器
// RequestHeader认证器
if config.RequestHeaderConfig != nil {
requestHeaderAuthenticator := headerrequest.NewDynamicVerifyOptionsSecure(
config.RequestHeaderConfig.CAContentProvider.VerifyOptions,
config.RequestHeaderConfig.AllowedClientNames,
config.RequestHeaderConfig.UsernameHeaders,
config.RequestHeaderConfig.GroupHeaders,
config.RequestHeaderConfig.ExtraHeaderPrefixes,
)
authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
}
// X509 methods
if config.ClientCAContentProvider != nil {
certAuth := x509.NewDynamic(config.ClientCAContentProvider.VerifyOptions, x509.CommonNameUserConversion)
authenticators = append(authenticators, certAuth)
}
// TokenAuth认证器
if len(config.TokenAuthFile) > 0 {
tokenAuth, err := newAuthenticatorFromTokenFile(config.TokenAuthFile)
if err != nil {
return nil, nil, err
}
tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth))
}
// ServiceAccountAuth认证器
if len(config.ServiceAccountKeyFiles) > 0 {
serviceAccountAuth, err := newLegacyServiceAccountAuthenticator(config.ServiceAccountKeyFiles, config.ServiceAccountLookup, config.APIAudiences, config.ServiceAccountTokenGetter)
if err != nil {
return nil, nil, err
}
tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
}
if utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) && config.ServiceAccountIssuer != "" {
serviceAccountAuth, err := newServiceAccountAuthenticator(config.ServiceAccountIssuer, config.ServiceAccountKeyFiles, config.APIAudiences, config.ServiceAccountTokenGetter)
if err != nil {
return nil, nil, err
}
tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
}
// BootstrapToken认证器
if config.BootstrapToken {
if config.BootstrapTokenAuthenticator != nil {
// TODO: This can sometimes be nil because of
tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator))
}
}
// NOTE(ericchiang): Keep the OpenID Connect after Service Accounts.
//
// Because both plugins verify JWTs whichever comes first in the union experiences
// cache misses for all requests using the other. While the service account plugin
// simply returns an error, the OpenID Connect plugin may query the provider to
// update the keys, causing performance hits.
if len(config.OIDCIssuerURL) > 0 && len(config.OIDCClientID) > 0 {
oidcAuth, err := newAuthenticatorFromOIDCIssuerURL(oidc.Options{
IssuerURL: config.OIDCIssuerURL,
ClientID: config.OIDCClientID,
CAFile: config.OIDCCAFile,
UsernameClaim: config.OIDCUsernameClaim,
UsernamePrefix: config.OIDCUsernamePrefix,
GroupsClaim: config.OIDCGroupsClaim,
GroupsPrefix: config.OIDCGroupsPrefix,
SupportedSigningAlgs: config.OIDCSigningAlgs,
RequiredClaims: config.OIDCRequiredClaims,
})
if err != nil {
return nil, nil, err
}
tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, oidcAuth))
}
// WebhookTokenAuth认证器
if len(config.WebhookTokenAuthnConfigFile) > 0 {
webhookTokenAuth, err := newWebhookTokenAuthenticator(config)
if err != nil {
return nil, nil, err
}
tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth)
}
if len(tokenAuthenticators) > 0 {
// Union the token authenticators
tokenAuth := tokenunion.New(tokenAuthenticators...)
// Optionally cache authentication results
if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
}
authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
securityDefinitions["BearerToken"] = &spec.SecurityScheme{
SecuritySchemeProps: spec.SecuritySchemeProps{
Type: "apiKey",
Name: "authorization",
In: "header",
Description: "Bearer Token authentication",
},
}
}
// 匿名认证器
if len(authenticators) == 0 {
if config.Anonymous {
return anonymous.NewAuthenticator(), &securityDefinitions, nil
}
return nil, &securityDefinitions, nil
}
...
// 将多个认证器合并
authenticator := union.New(authenticators...)
...
}
// 授权初始化
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
...
return authorizationConfig.New()
}
// 6种授权器配置
func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
...
// 声明认证器Authorizer列表
var (
authorizers []authorizer.Authorizer
ruleResolvers []authorizer.RuleResolver
)
for _, authorizationMode := range config.AuthorizationModes {
switch authorizationMode {
// Node授权器
case modes.ModeNode:
graph := node.NewGraph()
node.AddGraphEventHandlers(
graph,
config.VersionedInformerFactory.Core().V1().Nodes(),
config.VersionedInformerFactory.Core().V1().Pods(),
config.VersionedInformerFactory.Core().V1().PersistentVolumes(),
config.VersionedInformerFactory.Storage().V1().VolumeAttachments(),
)
nodeAuthorizer := node.NewAuthorizer(graph, nodeidentifier.NewDefaultNodeIdentifier(), bootstrappolicy.NodeRules())
authorizers = append(authorizers, nodeAuthorizer)
ruleResolvers = append(ruleResolvers, nodeAuthorizer)
// AlwaysAllow授权器
case modes.ModeAlwaysAllow:
alwaysAllowAuthorizer := authorizerfactory.NewAlwaysAllowAuthorizer()
authorizers = append(authorizers, alwaysAllowAuthorizer)
ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)
// AlwaysDeny授权器
case modes.ModeAlwaysDeny:
alwaysDenyAuthorizer := authorizerfactory.NewAlwaysDenyAuthorizer()
authorizers = append(authorizers, alwaysDenyAuthorizer)
ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)
// ABAC授权器
case modes.ModeABAC:
abacAuthorizer, err := abac.NewFromFile(config.PolicyFile)
if err != nil {
return nil, nil, err
}
authorizers = append(authorizers, abacAuthorizer)
ruleResolvers = append(ruleResolvers, abacAuthorizer)
// Webhook授权器
case modes.ModeWebhook:
webhookAuthorizer, err := webhook.New(config.WebhookConfigFile,
config.WebhookVersion,
config.WebhookCacheAuthorizedTTL,
config.WebhookCacheUnauthorizedTTL,
config.CustomDial)
if err != nil {
return nil, nil, err
}
authorizers = append(authorizers, webhookAuthorizer)
ruleResolvers = append(ruleResolvers, webhookAuthorizer)
// RBAC授权器
case modes.ModeRBAC:
rbacAuthorizer := rbac.New(
&rbac.RoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().Roles().Lister()},
&rbac.RoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().RoleBindings().Lister()},
&rbac.ClusterRoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoles().Lister()},
&rbac.ClusterRoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoleBindings().Lister()},
)
authorizers = append(authorizers, rbacAuthorizer)
ruleResolvers = append(ruleResolvers, rbacAuthorizer)
default:
return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
}
}
// 将已启用的认证器合并到列表中
// 请求到来时,kube-apiserver会遍历认证器列表,当有一个返回True时,表明认证成功
return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
}
复制代码
func createAPIExtensionsConfig(...)(...) {
...
apiextensionsConfig := &apiextensionsapiserver.Config{
GenericConfig: &genericapiserver.RecommendedConfig{
Config: genericConfig,
SharedInformerFactory: externalInformers,
},
ExtraConfig: apiextensionsapiserver.ExtraConfig{
CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions),
MasterCount: masterCount,
AuthResolverWrapper: authResolverWrapper,
ServiceResolver: serviceResolver,
},
}
...
}
复制代码
func createAPIExtensionsServer(...) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
// APIExtensionsServer依赖GenericAPIServer
// 通过GenericConfig创建一个名为apiextensions-apiserver的服务
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
// APIExtensionsServer通过CustomResourceDefinitions对象进行管理
// 实例化该对象后才能注册APIExtensionsServer下的资源
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
apiResourceConfig := c.GenericConfig.MergedResourceConfig
// 实例化APIGroupInfo,该对象用于描述资源组信息
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
// 完成资源与资源存储对象的映射
// 如果开启了v1beta1资源版本,将资源版本、资源、资源存储存放到APIGroupInfo的map中
if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
// 通过NewRest创建资源存储对象
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage["customresourcedefinitions"] = customResourceDefinitionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
}
// 如果开启了v1资源版本,将资源版本、资源、资源存储存放到APIGroupInfo的map中
if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
// customresourcedefinitions
customResourceDefintionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage["customresourcedefinitions"] = customResourceDefintionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
}
// 注册api,这个函数后面单独介绍
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
// 初始化主controller
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
// 申明handler
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
establishingController,
c.ExtraConfig.ServiceResolver,
c.ExtraConfig.AuthResolverWrapper,
c.ExtraConfig.MasterCount,
s.GenericAPIServer.Authorizer,
c.GenericConfig.MaxRequestBodyBytes,
)
if err != nil {
return nil, err
}
// 添加handler函数
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
// 初始化crdController
crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
// 初始化namingController
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
// 初始化finalizingController
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
crdClient.Apiextensions(),
crdHandler,
)
// 初始化openapiController
var openapiController *openapicontroller.Controller
if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
}
// 注册hook函数:监听informer
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
return nil
})
// 注册hook函数:启动controller
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
}
// 启动前面初始化的各种controller
go crdController.Run(context.StopCh)
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go finalizingController.Run(5, context.StopCh)
return nil
})
// 注册hook函数:同步crd资源
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
}, context.StopCh)
})
return s, nil
}
复制代码
APIGroupInfo用于描述资源组信息,一个资源对应一个APIGroupInfo对象,每个资源对应一个资源存储对象
func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
return APIGroupInfo{
PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group),
// 这个map用于存储资源、资源存储对象的映射关系
// 格式:资源版本/资源/资源存储对象
// 资源存储对象RESTStorage,负责资源的增删改查
// 后续会将RESTStorage转换为http的handler函数
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
// TODO unhardcode this. It was hardcoded before, but we need to re-evaluate
OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
Scheme: scheme,
ParameterCodec: parameterCodec,
NegotiatedSerializer: codecs,
}
}
复制代码
注册APIGroupInfo的函数非常重要,将APIGroupInfo中的资源对象注册到APIExtensionServerHandler函数。其过程是:
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
return s.InstallAPIGroups(apiGroupInfo)
}
// InstallAPIGroups
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
...
// 遍历所有的资源信息,一次安装资源版本处理器
for _, apiGroupInfo := range apiGroupInfos {
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}
...
apiGroup := metav1.APIGroup{
Name: apiGroupInfo.PrioritizedVersions[0].Group,
Versions: apiVersionsForDiscovery,
PreferredVersion: preferredVersionForDiscovery,
}
s.DiscoveryGroupManager.AddGroup(apiGroup)
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
return nil
}
// installAPIResources
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
...
// 调用InstallREST
// 参数为go-restful的container对象
if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
}
}
return nil
}
// InstallREST
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
// 定义http path请求路径
// 格式://
// apiPrefix是api或者apis
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
// 实例化APIInstaller实例化器
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
// 注册api,返回go-restful的WebService对象
apiResources, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
// 这里用到go-restful框架的知识:将WebService添加到Container中
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
}
// installer.Install
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
// 构造WebService对象
ws := a.newWebService()
...
// 遍历所有的路径
for _, path := range paths {
// 实现Storage到Router的转换,将路由注册到webservice
apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
...
if apiResource != nil {
// 添加到列表中
apiResources = append(apiResources, *apiResource)
}
}
return apiResources, ws, errors
}
// 这个方法很长,核心功能是根据storage构造handler,再将handler和path构造成go-restful框架的Route对象,最后Route添加到webservice
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (...) {
...
// 判断storage实现了哪些Rest接口
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
...
// 构造action列表
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
...
for _, action := range actions {
...
// 构造go-restful的RouteBuilder对象
routes := []*restful.RouteBuilder{}
// 根据action的不同Verb,注册不同的handler
switch action.Verb {
case "GET":
...
// 初始化handler
handler = restfulGetResource(getter, exporter, reqScope)
...
// 构造route
route := ws.GET(action.Path).To(handler).xxx
...
// route追加到routes
routes = append(routes, route)
...
case "POST":
...
// handler初始化,后面专门介绍
handler = restfulCreateResource(creater, reqScope, admit)
route := ws.POST(action.Path).To(handler).xxx
routes = append(routes, route)
...
// 遍历所有的route
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
// 添加自定义扩展属性(k8s所有的扩展属性以x-打头)
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
// 将route加入到WebService中
ws.Route(route)
}
}
}
// handler初始化
func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
// 返回一个处理资源的handler
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}
// 返回一个http标准库handler函数,处理对应的路由请求
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
// http库标准的handler写法
return func(w http.ResponseWriter, req *http.Request) {
...
// 找到合适的SerializeInfo
s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
...
// 寻找合适的编解码器
decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)
// 解码
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
// 处理请求
result, err := finishRequest(timeout, func() (runtime.Object, error) {
...
})
...
}
}
复制代码
创建KubeAPIServer的流程与创建KubeAPIExtensionServer的流程类似,原理一样。包括:
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
...
}
func (c *Config) Complete() CompletedConfig {
...
// 内部调用createEndpointReconciler
if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
}
return CompletedConfig{&cfg}
}
// createEndpointReconciler
func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
switch c.ExtraConfig.EndpointReconcilerType {
// there are numerous test dependencies that depend on a default controller
case "", reconcilers.MasterCountReconcilerType:
return c.createMasterCountReconciler()
case reconcilers.LeaseEndpointReconcilerType:
return c.createLeaseReconciler()
case reconcilers.NoneEndpointReconcilerType:
return c.createNoneReconciler()
default:
klog.Fatalf("Reconciler not implemented: %v", c.ExtraConfig.EndpointReconcilerType)
}
return nil
}
func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
...
// 初始化Storage
leaseStorage, _, err := storagefactory.Create(*config)
...
return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases)
}
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
...
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
// 初始化Storage
return newETCD3Storage(c)
...
}
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
// 初始化kube-apiserver
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
// 初始化Master,k8s的核心服务通过Master对象进行管理
// 实例化后的对象才能注册KubeAPIServer下的资源
m := &Master{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
// 注册没有组名的资源组,路径前缀为"/api"
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
// 注册有组名的资源组,路径前缀为"/apis"
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
// 创建认证controller
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
...
// 启动controller
go controller.Run(1, hookContext.StopCh)
return nil
})
return m, nil
}
复制代码
func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
// 实例化APIGroupInfo
// 生成各种资源对应的storage
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
...
// 创建bootstrapController
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
// 注册api
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}
// 通过NewStorage、NewRest等创建各种资源的存储,存放到map中
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
...
restStorage := LegacyRESTStorage{}
podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
podStorage, err := podstore.NewStorage(...)
...
// NewStorage内部通过etcd客户端去操作etcd
controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
...
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
...
"replicationControllers": controllerStorage.Controller,
...
}
...
return restStorage, apiGroupInfo, nil
}
复制代码
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
...
// 内部也是调用installAPIResources,这个函数前面介绍过
if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
return err
}
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
return nil
}
复制代码
func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
apiGroupsInfo := []*genericapiserver.APIGroupInfo{}
// 遍历所有的provider
for _, restStorageBuilder := range restStorageProviders {
...
// 获取资源对应的Storage
apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
...
}
// InstallAPIGroups这个函数在前面已经分析过
if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}
// RESTStorageProvider接口,每种资源都实现了该接口,并实现自己的业务逻辑
// 实现逻辑都大同小异,跟前面介绍的一样。都调用NewStorage、NewREST操作etcd
type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error)
}
复制代码
func createAggregatorConfig(...) (*aggregatorapiserver.Config, error) {
...
aggregatorConfig := &aggregatorapiserver.Config{
GenericConfig: &genericapiserver.RecommendedConfig{
Config: genericConfig,
SharedInformerFactory: externalInformers,
},
ExtraConfig: aggregatorapiserver.ExtraConfig{
ProxyClientCertFile: commandOptions.ProxyClientCertFile,
ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
ServiceResolver: serviceResolver,
ProxyTransport: proxyTransport,
},
}
return aggregatorConfig, nil
}
复制代码
创建AggregatorServer的流程与创建KubeAPIExtensionServer的流程类似。
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
// 初始化delegate
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
...
// 创建autoRegistrationController
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
// 创建crdRegistrationController
crdRegistrationController := crdregistration.NewCRDRegistrationController(
apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
autoRegistrationController)
err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
// 启动crdRegistrationController
go crdRegistrationController.Run(5, context.StopCh)
go func() {
if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
crdRegistrationController.WaitForInitialSync()
}
// 启动autoRegistrationController
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})
...
}
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
// 创建kube-aggregator
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
// 初始化APIAggregator
s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: openAPIConfig,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
}
// 初始化Storage,逻辑和前面是一样的
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
// 安装api,和前面也一样
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
...
}
复制代码
前面三个服务的创建过程,都依赖GenericAPIServer。通过genericapiserver将k8s资源与RestAPI进行映射
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
...
// 构造handler链
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
s := &GenericAPIServer{
...
}
...
installAPI(s, c.Config)
...
}
// NewAPIServerHandler
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
...
// 创建go-restful的container对象
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
// 创建handler
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
复制代码
CreateServerChain的最后一步便是启动服务insecureServingInfo.Serve
函数
func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) error {
// 初始化http服务
insecureServer := &http.Server{
Addr: s.Listener.Addr().String(),
Handler: handler,
MaxHeaderBytes: 1 << 20,
}
...
// 启动服务,内部调用server.Serve(listener)
_, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
return err
}
func RunServer(
server *http.Server,
ln net.Listener,
shutDownTimeout time.Duration,
stopCh <-chan struct{},
) (<-chan struct{}, error) {
...
go func() {
...
// 通过go语言标准库server.Serve监听listener
// 并在运行过程中为每个连接建立groutine,groutine读取请求,调用handler函数来处理并响应请求
err := server.Serve(listener)
...
}()
return stoppedCh, nil
}
复制代码