关联博客:kubernetes/k8s CSI 分析-容器存储接口分析https://xie.infoq.cn/article/5cd26c1b24c5665820411bb5a
kubernetes 的设计初衷是支持可插拔架构,从而利于扩展kubernetes
的功能。在此架构思想下,kubernetes
提供了 3 个特定功能的接口,分别是容器网络接口CNI
、容器运行时接口CRI
和容器存储接口CSI
。kubernetes
通过调用这几个接口,来完成相应的功能。
下面我们来对容器运行时接口CRI
来做一下介绍与分析。
在本文中,会对CRI
是什么、为什么要有CRI
、CRI
系统架构做一下介绍,以及k8s
对CRI
进行相关操作的流程分析,包括了 pod 创建、删除等操作。
CRI 是Container Runtime Interface
(容器运行时接口)的简写。
CRI 解耦了 kubelet 与容器运行时,让 kubelet 无需重新编译就可以支持多种容器运行时。
kubelet 将通过CRI
接口来跟第三方容器运行时进行通信,来操作容器与镜像。
实现了 CRI 接口的容器运行时通常称为 CRI shim, 这是一个 gRPC Server,监听在本地的 unix socket 上;而 kubelet 作为 gRPC 的客户端来调用 CRI 接口,来进行 Pod 和容器、镜像的生命周期管理。另外,容器运行时需要自己负责管理容器的网络,推荐使用 CNI。
图 1:CRI shim 通信图
提出了 CRI 标准以后,意味着在新的版本里需要使用新的连接方式与 docker 通信,为了兼容以前的版本,k8s 提供了针对 docker 的 CRI 实现,也就是 kubelet 包下的dockershim
包,dockershim
是一个 grpc 服务,监听一个端口供 kubelet 连接,dockershim
收到 kubelet 的请求后,将其转化为 REST API 请求,再发送给docker daemon
。
图 2:dockershim 通信图
在 1.5 以前的版本中,k8s 依赖于 docker,为了支持不同的容器运行时,如rkt
、containerd
等,kubelet 从 1.5 开始加入了 CRI 标准,它将 Kubelet 与容器运行时解耦,将原来完全面向 Pod 级别的内部接口拆分成面向 Sandbox
和 Container
的 gRPC 接口,并将镜像管理和容器管理分离到不同的服务,方便后续其他容器运行时与 k8s 对接。
按照不同的功能可以分为四个部分:
(1)kubelet 中容器运行时的管理,kubeGenericRuntimeManager
,它管理与 CRI shim 通信的客户端,完成容器和镜像的管理(代码位置:pkg/kubelet/kuberuntime/kuberuntime_manager.go
);
(2)容器运行时接口 CRI,包括了容器运行时客户端接口与容器运行时服务端接口;
(3)CRI shim 客户端,kubelet 持有,用于与 CRI shim 服务端进行通信;
(4)CRI shim 服务端,即具体的容器运行时实现,包括 kubelet 内置的 dockershim
(代码位置:pkg/kubelet/dockershim
)以及外部的容器运行时如 cri-containerd
(用于支持容器引擎containerd
)、rktlet
(用于支持容器引擎rkt
)等。
在 CRI 之下,包括两种类型的容器运行时的实现:
(1)kubelet 内置的 dockershim
,实现了 Docker 容器引擎的支持以及 CNI 网络插件(包括 kubenet)的支持。dockershim
代码内置于 kubelet,被 kubelet 调用,让dockershim
起独立的 server 来建立 CRI shim,向 kubelet 暴露 grpc server;
(2)外部的容器运行时,用来支持 rkt
、containerd
等容器引擎的外部容器运行时。
kubelet 的 CRI 源码分析包括如下几部分:
(1)kubelet CRI 相关启动参数分析;
(2)kubelet CRI 相关 interface/struct 分析;
(3)kubelet CRI 初始化分析;
(4)kubelet 调用 CRI 创建 pod 分析;
(5)kubelet 调用 CRI 删除 pod 分析。
因篇幅原因,本篇博文先对前三部分做分析,下一篇博文再对 CRI 创建 pod 以及 CRI 删除 pod 做分析。
https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4
kubelet 组件 CRI 相关启动参数相关代码如下:
// pkg/kubelet/config/flags.go// AddFlags adds flags to the container runtime, according to ContainerRuntimeOptions.func (s *ContainerRuntimeOptions) AddFlags(fs *pflag.FlagSet) { dockerOnlyWarning := "This docker-specific flag only works when container-runtime is set to docker."
// General settings. fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'remote', 'rkt (deprecated)'.") fs.StringVar(&s.RuntimeCgroups, "runtime-cgroups", s.RuntimeCgroups, "Optional absolute name of cgroups to create and run the runtime in.") fs.BoolVar(&s.RedirectContainerStreaming, "redirect-container-streaming", s.RedirectContainerStreaming, "Enables container streaming redirect. If false, kubelet will proxy container streaming data between apiserver and container runtime; if true, kubelet will return an http redirect to apiserver, and apiserver will access container runtime directly. The proxy approach is more secure, but introduces some overhead. The redirect approach is more performant, but less secure because the connection between apiserver and container runtime may not be authenticated.")
// Docker-specific settings. fs.BoolVar(&s.ExperimentalDockershim, "experimental-dockershim", s.ExperimentalDockershim, "Enable dockershim only mode. In this mode, kubelet will only start dockershim without any other functionalities. This flag only serves test purpose, please do not use it unless you are conscious of what you are doing. [default=false]") fs.MarkHidden("experimental-dockershim") fs.StringVar(&s.DockershimRootDirectory, "experimental-dockershim-root-directory", s.DockershimRootDirectory, "Path to the dockershim root directory.") fs.MarkHidden("experimental-dockershim-root-directory") fs.StringVar(&s.PodSandboxImage, "pod-infra-container-image", s.PodSandboxImage, fmt.Sprintf("The image whose network/ipc namespaces containers in each pod will use. %s", dockerOnlyWarning)) fs.StringVar(&s.DockerEndpoint, "docker-endpoint", s.DockerEndpoint, fmt.Sprintf("Use this for the docker endpoint to communicate with. %s", dockerOnlyWarning)) fs.DurationVar(&s.ImagePullProgressDeadline.Duration, "image-pull-progress-deadline", s.ImagePullProgressDeadline.Duration, fmt.Sprintf("If no pulling progress is made before this deadline, the image pulling will be cancelled. %s", dockerOnlyWarning)) ...}
复制代码
// cmd/kubelet/app/options/options.go// AddFlags adds flags for a specific KubeletFlags to the specified FlagSetfunc (f *KubeletFlags) AddFlags(mainfs *pflag.FlagSet) { ... fs.StringVar(&f.RemoteRuntimeEndpoint, "container-runtime-endpoint", f.RemoteRuntimeEndpoint, "[Experimental] The endpoint of remote runtime service. Currently unix socket endpoint is supported on Linux, while npipe and tcp endpoints are supported on windows. Examples:'unix:///var/run/dockershim.sock', 'npipe:////./pipe/dockershim'") fs.StringVar(&f.RemoteImageEndpoint, "image-service-endpoint", f.RemoteImageEndpoint, "[Experimental] The endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. Currently unix socket endpoint is supported on Linux, while npipe and tcp endpoints are supported on windows. Examples:'unix:///var/run/dockershim.sock', 'npipe:////./pipe/dockershim'") ...}
复制代码
kubelet 组件启动参数的默认值在NewKubeletFlags
函数中设置。
// cmd/kubelet/app/options/options.go// NewKubeletFlags will create a new KubeletFlags with default valuesfunc NewKubeletFlags() *KubeletFlags { remoteRuntimeEndpoint := "" if runtime.GOOS == "linux" { remoteRuntimeEndpoint = "unix:///var/run/dockershim.sock" } else if runtime.GOOS == "windows" { remoteRuntimeEndpoint = "npipe:////./pipe/dockershim" }
return &KubeletFlags{ EnableServer: true, ContainerRuntimeOptions: *NewContainerRuntimeOptions(), CertDirectory: "/var/lib/kubelet/pki", RootDirectory: defaultRootDir, MasterServiceNamespace: metav1.NamespaceDefault, MaxContainerCount: -1, MaxPerPodContainerCount: 1, MinimumGCAge: metav1.Duration{Duration: 0}, NonMasqueradeCIDR: "10.0.0.0/8", RegisterSchedulable: true, ExperimentalKernelMemcgNotification: false, RemoteRuntimeEndpoint: remoteRuntimeEndpoint, NodeLabels: make(map[string]string), VolumePluginDir: "/usr/libexec/kubernetes/kubelet-plugins/volume/exec/", RegisterNode: true, SeccompProfileRoot: filepath.Join(defaultRootDir, "seccomp"), // prior to the introduction of this flag, there was a hardcoded cap of 50 images NodeStatusMaxImages: 50, EnableCAdvisorJSONEndpoints: true, }}
复制代码
CRI 相关启动参数的默认值在NewContainerRuntimeOptions
和NewMainKubelet
函数中设置。
// cmd/kubelet/app/options/container_runtime.go// NewContainerRuntimeOptions will create a new ContainerRuntimeOptions with// default values.func NewContainerRuntimeOptions() *config.ContainerRuntimeOptions { dockerEndpoint := "" if runtime.GOOS != "windows" { dockerEndpoint = "unix:///var/run/docker.sock" }
return &config.ContainerRuntimeOptions{ ContainerRuntime: kubetypes.DockerContainerRuntime, RedirectContainerStreaming: false, DockerEndpoint: dockerEndpoint, DockershimRootDirectory: "/var/lib/dockershim", PodSandboxImage: defaultPodSandboxImage, ImagePullProgressDeadline: metav1.Duration{Duration: 1 * time.Minute}, ExperimentalDockershim: false,
//Alpha feature CNIBinDir: "/opt/cni/bin", CNIConfDir: "/etc/cni/net.d", CNICacheDir: "/var/lib/cni/cache", }}
复制代码
// pkg/kubelet/kubelet.gofunc NewMainKubelet(...) { ... if remoteRuntimeEndpoint != "" { // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified if remoteImageEndpoint == "" { remoteImageEndpoint = remoteRuntimeEndpoint } } ...}
复制代码
下面来简单分析几个比较重要的 CRI 相关启动参数:
(1)--container-runtime
:指定 kubelet 要使用的容器运行时,可选值docker
、remote
、rkt (deprecated)
,默认值为docker
,即使用 kubelet 内置的容器运行时dockershim
。当需要使用外部容器运行时,该参数配置为remote
,并设置--container-runtime-endpoint
参数值为监听的 unix socket
位置。
(2)--runtime-cgroups
:容器运行时使用的 cgroups,可选值。
(3)--docker-endpoint
:docker 暴露服务的 socket 地址,默认值为unix:///var/run/docker.sock
,该参数配置当且仅当--container-runtime
参数值为docker
时有效。
(4)--pod-infra-container-image
:pod sandbox 的镜像地址,默认值为k8s.gcr.io/pause:3.1
,该参数配置当且仅当--container-runtime
参数值为docker
时有效。
(5)--image-pull-progress-deadline
:容器镜像拉取超时时间,默认值为 1 分钟,该参数配置当且仅当--container-runtime
参数值为docker
时有效。
(6)--experimental-dockershim
:设置为true
时,启用dockershim
模式,只启动 dockershim,默认值为false
,该参数配置当且仅当--container-runtime
参数值为docker
时有效。
(7)--experimental-dockershim-root-directory
:dockershim
根目录,默认值为/var/lib/dockershim
,该参数配置当且仅当--container-runtime
参数值为docker
时有效。
(8)--container-runtime-endpoint
:容器运行时的 endpoint,linux 中默认值为unix:///var/run/dockershim.sock
,注意与上面的--docker-endpoint
区分开来。
(9)--image-service-endpoint
:镜像服务的 endpointlinux 中默认值为unix:///var/run/dockershim.sock
。
(1)RuntimeService interface
:CRI shim 客户端-容器运行时接口;
代码位置:staging/src/k8s.io/cri-api/pkg/apis/services.go
(2)ImageManagerService interface
:CRI shim 客户端-容器镜像接口;
代码位置:staging/src/k8s.io/cri-api/pkg/apis/services.go
(3)RuntimeServiceServer interface
:CRI shim 服务端-容器运行时接口;
代码位置:staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2/api.pb.go
(4)ImageServiceServer interface
:CRI shim 服务端-容器镜像接口;
代码位置:staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2/api.pb.go
(5)CRIService interface
:包括了RuntimeServiceServer interface
、ImageServiceServer interface
与 CRI shim 服务端启动方法,所以其包括了一个 CRI shim 服务端需要实现的所有接口方法;
代码位置:pkg/kubelet/dockershim/docker_service.go
(6)DockerService interface
:包括了CRIService interface
。
代码位置:pkg/kubelet/dockershim/docker_service.go
说明:RuntimeService interface
与RuntimeServiceServer interface
、ImageManagerService interface
与ImageServiceServer interface
中的接口方法是相同的,它们之间的区别只是一个用于 CRI shim 客户端,一个用于 CRI shim 服务端。容器运行时接口负责管理 Pod 和容器的生命周期,容器镜像接口负责管理容器镜像的生命周期。
(1)RemoteRuntimeService struct
:实现了 CRI shim 客户端-容器运行时接口RuntimeService interface
,持有与 CRI shim 容器运行时服务端通信的客户端;
代码位置:pkg/kubelet/remote/remote_runtime.go
(2)RemoteImageService struct
:实现了 CRI shim 客户端-容器镜像接口ImageManagerService interface
,持有与 CRI shim 容器镜像服务端通信的客户端;
代码位置:pkg/kubelet/remote/remote_image.go
(3)dockerService struct
:实现了 CRI shim 服务端-容器运行时接口RuntimeServiceServer interface
;
代码位置:pkg/kubelet/dockershim/docker_service.go
、pkg/kubelet/dockershim/docker_container.go
(4)dockerService struct
:实现了 CRI shim 服务端-容器镜像接口ImageServiceServer interface
;
代码位置:pkg/kubelet/dockershim/docker_service.go
、pkg/kubelet/dockershim/docker_image.go
(5)DockerServer struct
:代表了 dockershim(kubelet 内置的 CRI shim)的服务端,其实现了CRIService interface
。
代码位置:pkg/kubelet/dockershim/remote/docker_server.go
RuntimeServiceServer
RuntimeServiceServer 提供了的接口,按照功能可以划分为四组:
(1)PodSandbox 的管理接口:PodSandbox 是对 Kubernete Pod 的抽象,用来给容器提供一个隔离的环境,并提供网络等共享的命名空间;
(2)Container 的管理接口:在指定的 PodSandbox 中创建、启动、停止和删除容器;
(3)Streaming API 接口:包括 Exec、Attach 和 PortForward 等和容器进行数据交互的接口,这三个接口返回的是运行时 Streaming Server 的 URL,而不是直接跟容器交互;
(4)runtime 状态接口:包括查询 runtime 名称、版本、API 版本和状态等。
ImageServiceServer
ImageServiceServer 提供了 5 个接口,用于管理容器镜像。
下面会对上面提到的接口/结构体做分析。
RuntimeService 负责管理 Pod 和容器的生命周期,是 CRI shim 客户端需要实现的容器运行时接口。
RuntimeService interface 包含了RuntimeVersioner
、ContainerManager
、PodSandboxManager
与ContainerStatsManager
接口,下面对对这些接口一一做介绍。
容器运行时会实现RuntimeService interface
。
// staging/src/k8s.io/cri-api/pkg/apis/services.go// RuntimeService interface should be implemented by a container runtime.// The methods should be thread-safe.type RuntimeService interface { RuntimeVersioner ContainerManager PodSandboxManager ContainerStatsManager
// UpdateRuntimeConfig updates runtime configuration if specified UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error // Status returns the status of the runtime. Status() (*runtimeapi.RuntimeStatus, error)}
复制代码
RuntimeVersioner interface
RuntimeVersioner interface 负责返回容器运行时的名称、版本以及 API 版本信息,只有一个接口函数 Version
。
// staging/src/k8s.io/cri-api/pkg/apis/services.go// RuntimeVersioner contains methods for runtime name, version and API version.type RuntimeVersioner interface { // Version returns the runtime name, runtime version and runtime API version Version(apiVersion string) (*runtimeapi.VersionResponse, error)}
复制代码
ContainerManager interface
ContainerManager interface 包含了对container
(业务容器)进行操作的一些方法,如CreateContainer
(创建容器)、StartContainer
(启动容器)、StopContainer
(停止容器)、RemoveContainer
(删除容器)等。
// staging/src/k8s.io/cri-api/pkg/apis/services.go// ContainerManager contains methods to manipulate containers managed by a// container runtime. The methods are thread-safe.type ContainerManager interface { // CreateContainer creates a new container in specified PodSandbox. CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) // StartContainer starts the container. StartContainer(containerID string) error // StopContainer stops a running container with a grace period (i.e., timeout). StopContainer(containerID string, timeout int64) error // RemoveContainer removes the container. RemoveContainer(containerID string) error // ListContainers lists all containers by filters. ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) // ContainerStatus returns the status of the container. ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error) // UpdateContainerResources updates the cgroup resources for the container. UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. Exec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) // Attach prepares a streaming endpoint to attach to a running container, and returns the address. Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) // ReopenContainerLog asks runtime to reopen the stdout/stderr log file // for the container. If it returns error, new container log file MUST NOT // be created. ReopenContainerLog(ContainerID string) error}
复制代码
PodSandboxManager interface
PodSandboxManager interface 包含了对pod sandbox
(pause container
)进行操作的一些方法,如RunPodSandbox
(创建并启动pause container
)、StopPodSandbox
(停止pause container
)、RemovePodSandbox
(删除pause container
)等。
// staging/src/k8s.io/cri-api/pkg/apis/services.go// PodSandboxManager contains methods for operating on PodSandboxes. The methods// are thread-safe.type PodSandboxManager interface { // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // the sandbox is in ready state. RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) // StopPodSandbox stops the sandbox. If there are any running containers in the // sandbox, they should be force terminated. StopPodSandbox(podSandboxID string) error // RemovePodSandbox removes the sandbox. If there are running containers in the // sandbox, they should be forcibly removed. RemovePodSandbox(podSandboxID string) error // PodSandboxStatus returns the Status of the PodSandbox. PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error) // ListPodSandbox returns a list of Sandbox. ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)}
复制代码
ContainerStatsManager interface
ContainerStatsManager interface 包含了对容器统计数据的查询接口,如ContainerStats
、ListContainerStats
。
// staging/src/k8s.io/cri-api/pkg/apis/services.go// ContainerStatsManager contains methods for retrieving the container// statistics.type ContainerStatsManager interface { // ContainerStats returns stats of the container. If the container does not // exist, the call returns an error. ContainerStats(containerID string) (*runtimeapi.ContainerStats, error) // ListContainerStats returns stats of all running containers. ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error)}
复制代码
ImageManagerService 负责管理镜像的生命周期,是 CRI shim 客户端需要实现的镜像接口。
ImageManagerService interface 包含了容器镜像的相关操作接口,如PullImage
(拉取镜像)、ListImages
(列出现存镜像列表)等。
// staging/src/k8s.io/cri-api/pkg/apis/services.go// ImageManagerService interface should be implemented by a container image// manager.// The methods should be thread-safe.type ImageManagerService interface { // ListImages lists the existing images. ListImages(filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) // ImageStatus returns the status of the image. ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error) // PullImage pulls an image with the authentication config. PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) // RemoveImage removes the image. RemoveImage(image *runtimeapi.ImageSpec) error // ImageFsInfo returns information of the filesystem that is used to store images. ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error)}
复制代码
CRIService interface 中定义了 CRI shim 服务端必须实现的一些方法,其中包括了RuntimeServiceServer interface
(容器运行时操作相关方法)、ImageServiceServer interface
(镜像操作相关方法)以及 CRI shim 服务端启动方法。
// pkg/kubelet/dockershim/docker_service.go// CRIService includes all methods necessary for a CRI server.type CRIService interface { runtimeapi.RuntimeServiceServer runtimeapi.ImageServiceServer Start() error}
// DockerService is an interface that embeds the new RuntimeService and// ImageService interfaces.type DockerService interface { CRIService
// For serving streaming calls. http.Handler
// For supporting legacy features. DockerLegacyService}
复制代码
实现了 CRI shim 客户端-容器运行时接口RuntimeService interface
,持有与 CRI shim 容器运行时服务端通信的客户端runtimeClient
。
// pkg/kubelet/remote/remote_runtime.go// RemoteRuntimeService is a gRPC implementation of internalapi.RuntimeService.type RemoteRuntimeService struct { timeout time.Duration runtimeClient runtimeapi.RuntimeServiceClient // Cache last per-container error message to reduce log spam logReduction *logreduction.LogReduction}
复制代码
实现了 CRI shim 客户端-容器镜像接口ImageManagerService interface
,持有与 CRI shim 容器镜像服务端通信的客户端imageClient
。
// pkg/kubelet/remote/remote_image.go// RemoteImageService is a gRPC implementation of internalapi.ImageManagerService.type RemoteImageService struct { timeout time.Duration imageClient runtimeapi.ImageServiceClient}
复制代码
DockerServer struct 代表了 dockershim(kubelet 内置的 CRI shim)的服务端,其实现了CRIService interface
。
// pkg/kubelet/dockershim/remote/docker_server.go// DockerServer is the grpc server of dockershim.type DockerServer struct { // endpoint is the endpoint to serve on. endpoint string // service is the docker service which implements runtime and image services. service dockershim.CRIService // server is the grpc server. server *grpc.Server}
复制代码
kubelet 中 CRI 相关初始化逻辑如下:
(1)当 kubelet 选用 dockershim 作为容器运行时,则初始化并启动容器运行时服务端 dockershim(初始化 dockershim 过程中也会初始化网络插件 CNI);
(2)初始化容器运行时 CRI shim 客户端(用于调用 CRI shim 服务端:内置的容器运行时 dockershim 或 remote 容器运行时);
(3)初始化kubeGenericRuntimeManager
,用于容器运行时的管理。初始化完成后,后续kubelet
对容器以及镜像的相关操作都会通过该结构体持有的CRI shim
客户端,与CRI shim
服务端进行通信来完成。
main (cmd/kubelet/kubelet.go)-> NewKubeletCommand (cmd/kubelet/app/server.go)-> Run (cmd/kubelet/app/server.go)
-> run (cmd/kubelet/app/server.go)
-> RunKubelet (cmd/kubelet/app/server.go)-> CreateAndInitKubelet(cmd/kubelet/app/server.go)-> kubelet.NewMainKubelet(pkg/kubelet/kubelet.go)-> getRuntimeAndImageServices(pkg/kubelet/kubelet.go) && kuberuntime.NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime_manager.go)
NewMainKubelet 函数中 CRI 相关逻辑:
(1)初始化并启动内置容器运行时服务端 dockershim:根据containerRuntime
的值(kubelet 启动参数--container-runtime
),如果是docker
,则初始化并启动docker CRI shim
即 kubelet 内置容器运行时dockershim
,暴露grpc socket
,如果是remote
,则不做初始化启动操作。
(2)调用getRuntimeAndImageServices
:初始化容器运行时 CRI shim 客户端,包括容器运行时客户端runtimeClient
以及容器镜像客户端imageClient
。
(3)调用kuberuntime.NewKubeGenericRuntimeManager
,以及klet
赋值:初始化kubeGenericRuntimeManager struct
,用于容器运行时的管理。初始化完成后,后续kubelet
对容器以及镜像的相关操作都会通过该结构体持有的CRI shim
客户端,与CRI shim
服务端进行通信来完成。
// pkg/kubelet/kubelet.gofunc NewMainKubelet(...) { ... switch containerRuntime { // (1)初始化并启动内置容器运行时服务端dockershim case kubetypes.DockerContainerRuntime: // Create and start the CRI shim running as a grpc server. streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions) ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig, &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming) if err != nil { return nil, err } if crOptions.RedirectContainerStreaming { klet.criHandler = ds }
// The unix socket for kubelet <-> dockershim communication. klog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q", remoteRuntimeEndpoint, remoteImageEndpoint) klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.") server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds) if err := server.Start(); err != nil { return nil, err }
// Create dockerLegacyService when the logging driver is not supported. supported, err := ds.IsCRISupportedLogDriver() if err != nil { return nil, err } if !supported { klet.dockerLegacyService = ds legacyLogProvider = ds } case kubetypes.RemoteContainerRuntime: // No-op. break default: return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime) } // (2)初始化容器运行时CRI shim客户端 runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout) if err != nil { return nil, err } klet.runtimeService = runtimeService
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil { klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient) } // (3)初始化```GenericRuntimeManager```,用于容器运行时的管理 runtime, err := kuberuntime.NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, klet.startupManager, seccompProfileRoot, containerRefManager, machineInfo, klet, kubeDeps.OSInterface, klet, httpClient, imageBackOff, kubeCfg.SerializeImagePulls, float32(kubeCfg.RegistryPullQPS), int(kubeCfg.RegistryBurst), kubeCfg.CPUCFSQuota, kubeCfg.CPUCFSQuotaPeriod, runtimeService, imageService, kubeDeps.ContainerManager.InternalContainerLifecycle(), legacyLogProvider, klet.runtimeClassManager, ) if err != nil { return nil, err } klet.containerRuntime = runtime klet.streamingRuntime = runtime klet.runner = runtime ...}
复制代码
这里对变量containerRuntime
值等于docker
时做分析,即 kubelet 启动参数--container-runtime
值为docker
,这时 kubelet 会使用内置的 CRI shim 即 dockershim 作为容器运行时,dockershim 调用 docker 进行容器以及镜像的相关操作。
初始化并启动dockershim
主要逻辑如下:
(1)调用dockershim.NewDockerService
:新建并初始化dockershim
服务端,包括初始化 docker client、初始化 cni 网络配置等操作;
(2)调用dockerremote.NewDockerServer
与server.Start
:启动dockershim
,暴露服务 socket。
新建并初始化dockershim
服务端,主要逻辑如下:
(1)调用NewDockerClientFromConfig
:创建 docker 的客户端-client 对象,包含了我们常用的 docker run,docker images 等所有操作调用;
(2)构建dockerService struct
;
(2)初始化 CNI 网络配置(CNI 网络配置初始化在专门进行 CNI 分析的博文再详细讲解)。
// pkg/kubelet/dockershim/docker_service.go// NewDockerService creates a new `DockerService` struct.// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.func NewDockerService(config *ClientConfig, podSandboxImage string, streamingConfig *streaming.Config, pluginSettings *NetworkPluginSettings, cgroupsName string, kubeCgroupDriver string, dockershimRootDir string, startLocalStreamingServer bool) (DockerService, error) { // (1)创建docker的客户端 client := NewDockerClientFromConfig(config)
c := libdocker.NewInstrumentedInterface(client)
checkpointManager, err := checkpointmanager.NewCheckpointManager(filepath.Join(dockershimRootDir, sandboxCheckpointDir)) if err != nil { return nil, err } // (2)构建```dockerService struct``` ds := &dockerService{ client: c, os: kubecontainer.RealOS{}, podSandboxImage: podSandboxImage, streamingRuntime: &streamingRuntime{ client: client, execHandler: &NativeExecHandler{}, }, containerManager: cm.NewContainerManager(cgroupsName, client), checkpointManager: checkpointManager, startLocalStreamingServer: startLocalStreamingServer, networkReady: make(map[string]bool), containerCleanupInfos: make(map[string]*containerCleanupInfo), }
// check docker version compatibility. if err = ds.checkVersionCompatibility(); err != nil { return nil, err }
// create streaming server if configured. if streamingConfig != nil { var err error ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime) if err != nil { return nil, err } }
// Determine the hairpin mode. if err := effectiveHairpinMode(pluginSettings); err != nil { // This is a non-recoverable error. Returning it up the callstack will just // lead to retries of the same failure, so just fail hard. return nil, err } klog.Infof("Hairpin mode set to %q", pluginSettings.HairpinMode) // (3)初始化CNI网络配置 // dockershim currently only supports CNI plugins. pluginSettings.PluginBinDirs = cni.SplitDirs(pluginSettings.PluginBinDirString) cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginCacheDir, pluginSettings.PluginBinDirs) cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDirs, pluginSettings.PluginCacheDir)) netHost := &dockerNetworkHost{ &namespaceGetter{ds}, &portMappingGetter{ds}, } plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU) if err != nil { return nil, fmt.Errorf("didn't find compatible CNI plugin with given settings %+v: %v", pluginSettings, err) } ds.network = network.NewPluginManager(plug) klog.Infof("Docker cri networking managed by %v", plug.Name())
// NOTE: cgroup driver is only detectable in docker 1.11+ cgroupDriver := defaultCgroupDriver dockerInfo, err := ds.client.Info() klog.Infof("Docker Info: %+v", dockerInfo) if err != nil { klog.Errorf("Failed to execute Info() call to the Docker client: %v", err) klog.Warningf("Falling back to use the default driver: %q", cgroupDriver) } else if len(dockerInfo.CgroupDriver) == 0 { klog.Warningf("No cgroup driver is set in Docker") klog.Warningf("Falling back to use the default driver: %q", cgroupDriver) } else { cgroupDriver = dockerInfo.CgroupDriver } if len(kubeCgroupDriver) != 0 && kubeCgroupDriver != cgroupDriver { return nil, fmt.Errorf("misconfiguration: kubelet cgroup driver: %q is different from docker cgroup driver: %q", kubeCgroupDriver, cgroupDriver) } klog.Infof("Setting cgroupDriver to %s", cgroupDriver) ds.cgroupDriver = cgroupDriver ds.versionCache = cache.NewObjectCache( func() (interface{}, error) { return ds.getDockerVersion() }, versionCacheTTL, )
// Register prometheus metrics. metrics.Register()
return ds, nil}
复制代码
NewDockerClientFromConfig
NewDockerClientFromConfig 函数主要是建立与 docker 通信的客户端。其中config
结构体里,dockerEndpoint
的值来自于 kubelet 启动参数--container-runtime-endpoint
的配置,默认是unix:///var/run/docker.sock
。
// pkg/kubelet/dockershim/docker_service.go// NewDockerClientFromConfig create a docker client from given configure// return nil if nil configure is given.func NewDockerClientFromConfig(config *ClientConfig) libdocker.Interface { if config != nil { // Create docker client. client := libdocker.ConnectToDockerOrDie( config.DockerEndpoint, config.RuntimeRequestTimeout, config.ImagePullProgressDeadline, config.WithTraceDisabled, config.EnableSleep, ) return client }
return nil}
复制代码
// pkg/kubelet/dockershim/libdocker/client.go// ConnectToDockerOrDie creates docker client connecting to docker daemon.// If the endpoint passed in is "fake://", a fake docker client// will be returned. The program exits if error occurs. The requestTimeout// is the timeout for docker requests. If timeout is exceeded, the request// will be cancelled and throw out an error. If requestTimeout is 0, a default// value will be applied.func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout, imagePullProgressDeadline time.Duration, withTraceDisabled bool, enableSleep bool) Interface { if dockerEndpoint == FakeDockerEndpoint { fakeClient := NewFakeDockerClient() if withTraceDisabled { fakeClient = fakeClient.WithTraceDisabled() }
if enableSleep { fakeClient.EnableSleep = true } return fakeClient } client, err := getDockerClient(dockerEndpoint) if err != nil { klog.Fatalf("Couldn't connect to docker: %v", err) } klog.Infof("Start docker client with request timeout=%v", requestTimeout) return newKubeDockerClient(client, requestTimeout, imagePullProgressDeadline)}
复制代码
dockerremote.NewDockerServer()
// pkg/kubelet/dockershim/remote/docker_server.go// NewDockerServer creates the dockershim grpc server.func NewDockerServer(endpoint string, s dockershim.CRIService) *DockerServer { return &DockerServer{ endpoint: endpoint, service: s, }}
复制代码
getRuntimeAndImageServices 函数主要逻辑:
(1)调用remote.NewRemoteRuntimeService
函数:实例化容器相关操作的 CRI shim 客户端-容器运行时客户端runtimeClient
,实现了上述 CRI 相关 interface/struct 分析中的 RuntimeService 接口(CRI shim 客户端接口);
(2)调用remote.NewRemoteImageService
函数:实例化镜像相关操作的 CRI shim 客户端-容器镜像客户端imageClient
,实现了上述 CRI 相关 interface/struct 分析中的 ImageManagerService 接口(CRI shim 客户端接口)。
// pkg/kubelet/kubelet.gofunc getRuntimeAndImageServices(remoteRuntimeEndpoint string, remoteImageEndpoint string, runtimeRequestTimeout metav1.Duration) (internalapi.RuntimeService, internalapi.ImageManagerService, error) { rs, err := remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, runtimeRequestTimeout.Duration) if err != nil { return nil, nil, err } is, err := remote.NewRemoteImageService(remoteImageEndpoint, runtimeRequestTimeout.Duration) if err != nil { return nil, nil, err } return rs, is, err}
复制代码
3.2.1 remote.NewRemoteRuntimeService
remote.NewRemoteRuntimeService 函数作用:实例化容器相关操作的 CRI shim 客户端-容器运行时客户端runtimeClient
,实现了上述CRI相关interface/struct分析
中的 RuntimeService 接口(CRI shim 客户端接口)。
主要逻辑:根据 kubelet 启动参数--container-runtime-endpoint
或使用默认值unix:///var/run/dockershim.sock
,尝试连接该 socket,建立 client。
// pkg/kubelet/remote/remote_runtime.go// NewRemoteRuntimeService creates a new internalapi.RuntimeService.func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) { klog.V(3).Infof("Connecting to runtime service %s", endpoint) addr, dailer, err := util.GetAddressAndDialer(endpoint) if err != nil { return nil, err } ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) if err != nil { klog.Errorf("Connect remote runtime %s failed: %v", addr, err) return nil, err }
return &RemoteRuntimeService{ timeout: connectionTimeout, runtimeClient: runtimeapi.NewRuntimeServiceClient(conn), logReduction: logreduction.NewLogReduction(identicalErrorDelay), }, nil}
复制代码
3.2.2 remote.NewRemoteImageService
remote.NewRemoteImageService 函数作用:实例化镜像相关操作的 CRI shim 客户端-容器镜像客户端imageClient
,实现了上述CRI相关interface/struct分析
中的 ImageManagerService 接口(CRI shim 客户端接口)。
主要逻辑:根据 kubelet 启动参数--image-service-endpoint
或使用默认值unix:///var/run/dockershim.sock
,尝试连接该 socket,建立 client。
// pkg/kubelet/remote/remote_runtime.go// NewRemoteImageService creates a new internalapi.ImageManagerService.func NewRemoteImageService(endpoint string, connectionTimeout time.Duration) (internalapi.ImageManagerService, error) { klog.V(3).Infof("Connecting to image service %s", endpoint) addr, dailer, err := util.GetAddressAndDialer(endpoint) if err != nil { return nil, err }
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) if err != nil { klog.Errorf("Connect remote image service %s failed: %v", addr, err) return nil, err }
return &RemoteImageService{ timeout: connectionTimeout, imageClient: runtimeapi.NewImageServiceClient(conn), }, nil}
复制代码
kuberuntime.NewKubeGenericRuntimeManager 函数主要是初始化kubeGenericRuntimeManager struct
,而kubeGenericRuntimeManager struct
是对KubeGenericRuntime interface
的实现。kubeGenericRuntimeManager
是 kubelet 中容器运行时的管理者,管理着CRI shim
客户端,负责与CRI shim服务端
交互,完成容器和镜像的管理。
初始化完成后,后续kubelet
对容器以及镜像的相关操作都会通过该结构体持有的CRI shim
客户端,与CRI shim
服务端进行通信来完成。
// pkg/kubelet/kuberuntime/kuberuntime_manager.go// NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManagerfunc NewKubeGenericRuntimeManager( recorder record.EventRecorder, livenessManager proberesults.Manager, startupManager proberesults.Manager, seccompProfileRoot string, containerRefManager *kubecontainer.RefManager, machineInfo *cadvisorapi.MachineInfo, podStateProvider podStateProvider, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, httpClient types.HttpGetter, imageBackOff *flowcontrol.Backoff, serializeImagePulls bool, imagePullQPS float32, imagePullBurst int, cpuCFSQuota bool, cpuCFSQuotaPeriod metav1.Duration, runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, internalLifecycle cm.InternalContainerLifecycle, legacyLogProvider LegacyLogProvider, runtimeClassManager *runtimeclass.Manager,) (KubeGenericRuntime, error) { kubeRuntimeManager := &kubeGenericRuntimeManager{ recorder: recorder, cpuCFSQuota: cpuCFSQuota, cpuCFSQuotaPeriod: cpuCFSQuotaPeriod, seccompProfileRoot: seccompProfileRoot, livenessManager: livenessManager, startupManager: startupManager, containerRefManager: containerRefManager, machineInfo: machineInfo, osInterface: osInterface, runtimeHelper: runtimeHelper, runtimeService: newInstrumentedRuntimeService(runtimeService), imageService: newInstrumentedImageManagerService(imageService), keyring: credentialprovider.NewDockerKeyring(), internalLifecycle: internalLifecycle, legacyLogProvider: legacyLogProvider, runtimeClassManager: runtimeClassManager, logReduction: logreduction.NewLogReduction(identicalErrorDelay), }
typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion) if err != nil { klog.Errorf("Get runtime version failed: %v", err) return nil, err }
// Only matching kubeRuntimeAPIVersion is supported now // TODO: Runtime API machinery is under discussion at https://github.com/kubernetes/kubernetes/issues/28642 if typedVersion.Version != kubeRuntimeAPIVersion { klog.Errorf("Runtime api version %s is not supported, only %s is supported now", typedVersion.Version, kubeRuntimeAPIVersion) return nil, ErrVersionNotSupported }
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName klog.Infof("Container runtime %s initialized, version: %s, apiVersion: %s", typedVersion.RuntimeName, typedVersion.RuntimeVersion, typedVersion.RuntimeApiVersion)
// If the container logs directory does not exist, create it. // TODO: create podLogsRootDirectory at kubelet.go when kubelet is refactored to // new runtime interface if _, err := osInterface.Stat(podLogsRootDirectory); os.IsNotExist(err) { if err := osInterface.MkdirAll(podLogsRootDirectory, 0755); err != nil { klog.Errorf("Failed to create directory %q: %v", podLogsRootDirectory, err) } }
kubeRuntimeManager.imagePuller = images.NewImageManager( kubecontainer.FilterEventRecorder(recorder), kubeRuntimeManager, imageBackOff, serializeImagePulls, imagePullQPS, imagePullBurst) kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager) kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
kubeRuntimeManager.versionCache = cache.NewObjectCache( func() (interface{}, error) { return kubeRuntimeManager.getTypedVersion() }, versionCacheTTL, )
return kubeRuntimeManager, nil}
复制代码
该博文先对 CRI 做了介绍,然后对 kubelet CRI 相关源码进行分析,包括 kubelet 组件 CRI 相关启动参数分析、CRI 相关 interface/struct 分析、CRI 相关初始化分析 3 个部分,剩下的其他部分分析,将在下一篇 CRI 博文里做分析。
CRI,全称Container Runtime Interface
,容器运行时接口。
在 1.5 以前的版本中,k8s 依赖于 docker,为了支持不同的容器运行时,如rkt
、containerd
等,kubelet 从 1.5 开始加入了 CRI 标准,它将 Kubelet 与容器运行时解耦,将原来完全面向 Pod 级别的内部接口拆分成面向 Sandbox
和 Container
的 gRPC 接口,并将镜像管理和容器管理分离到不同的服务。
实现了 CRI 接口的容器运行时通常称为 CRI shim, 这是一个 gRPC Server,监听在本地的 unix socket 上;而 kubelet 作为 gRPC 的客户端来调用 CRI 接口,来进行 Pod 和容器、镜像的生命周期管理。另外,容器运行时需要自己负责管理容器的网络,推荐使用 CNI。
提出了 CRI 标准以后,意味着在新的版本里需要使用新的连接方式与 docker 通信,为了兼容以前的版本,k8s 提供了针对 docker 的 CRI 实现,也就是 kubelet 包下的dockershim
包,dockershim
是一个 grpc 服务,监听一个端口供 kubelet 连接,dockershim
收到 kubelet 的请求后,将其转化为 REST API 请求,再发送给docker daemon
。
按照不同的功能可以分为四个部分:
(1)kubelet 中容器运行时的管理,kubeGenericRuntimeManager
,它管理与 CRI shim 通信的客户端,完成容器和镜像的管理(代码位置:pkg/kubelet/kuberuntime/kuberuntime_manager.go
);
(2)容器运行时接口 CRI,包括了容器运行时客户端接口与容器运行时服务端接口;
(3)CRI shim 客户端,kubelet 持有,用于与 CRI shim 服务端进行通信;
(4)CRI shim 服务端,即具体的容器运行时实现,包括 kubelet 内置的 dockershim
(代码位置:pkg/kubelet/dockershim
)以及外部的容器运行时如 cri-containerd
(用于支持容器引擎containerd
)、rktlet
(用于支持容器引擎rkt
)等。
在 CRI 之下,包括两种类型的容器运行时的实现:
(1)kubelet 内置的 dockershim
,实现了 Docker 容器引擎的支持以及 CNI 网络插件(包括 kubenet)的支持。dockershim
代码内置于 kubelet,被 kubelet 调用,让dockershim
起独立的 server 来建立 CRI shim,向 kubelet 暴露 grpc server;
(2)外部的容器运行时,用来支持 rkt
、containerd
等容器引擎的外部容器运行时。
kubelet 中 CRI 相关初始化逻辑如下:
(1)当 kubelet 选用 dockershim 作为容器运行时,则初始化并启动容器运行时服务端 dockershim(初始化 dockershim 过程中也会初始化网络插件 CNI);
(2)初始化容器运行时 CRI shim 客户端(用于调用 CRI shim 服务端:内置的容器运行时 dockershim 或 remote 容器运行时);
(3)初始化kubeGenericRuntimeManager
,用于容器运行时的管理。初始化完成后,后续kubelet
对容器以及镜像的相关操作都会通过该结构体持有的CRI shim
客户端,与CRI shim
服务端进行通信来完成
关联博客:kubernetes/k8s CSI 分析-容器存储接口分析https://xie.infoq.cn/article/5cd26c1b24c5665820411bb5a
领取专属 10元无门槛券
私享最新 技术干货