本文尝试从Informer中的Lister、Watcher、Indexer、Store及Controller 5个组件展开对其进行详细阐述。希望对您有所帮助!
Kubernetes Informer 是 Kubernetes 客户端库中的一个核心组件,用于监控 Kubernetes API 资源的变化并在资源发生变化时通知用户。Informer 机制大大简化了 Kubernetes 中的资源管理和状态同步。以下是对 Kubernetes Informer 机制的详细解释。
Informer 通过与 Kubernetes API 服务器交互,使用 List 和 Watch 操作来跟踪资源的状态变化。Informer 主要由以下几个部分组成:
Lister 提供只读的资源列表操作,通常用于从本地缓存中快速获取资源。Lister 的主要接口方法包括:
List(selector labels.Selector) ([]*v1.Pod, error)
:返回符合选择器条件的资源列表。Get(name string) (*v1.Pod, error)
:根据资源名称返回资源对象。Watcher 使用 Kubernetes API 的 Watch 操作来监控资源的变化。当资源发生变化时,Watcher 会接收事件并传递给 Informer。
Indexer 是一个内存存储,用于存储资源对象并提供索引机制,以便快速检索资源。Indexer 提供以下功能:
Store 是一个简单的内存缓存,用于存储资源对象。它通常与 Indexer 结合使用,提供资源对象的快速存取。
Controller 负责调度和执行业务逻辑。它通过注册事件处理函数来响应资源的变化,并执行相应的操作。Controller 通常与 Informer 一起使用,以实现资源状态的自动化管理。
以下是一个简单的 Go 语言示例,展示如何使用 Informer 监控 Pod 资源的变化:
package main
import (
"fmt"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
"time"
)
func main() {
kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
factory := informers.NewSharedInformerFactory(clientset, time.Minute)
podInformer := factory.Core().V1().Pods().Informer()
stopCh := make(chan struct{})
defer close(stopCh)
podInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)
fmt.Printf("Pod updated: %s/%s\n", oldPod.Namespace, oldPod.Name)
fmt.Printf("New Pod details: %s/%s\n", newPod.Namespace, newPod.Name)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
},
},
)
podInformer.Run(stopCh)
}
Kubernetes Informer 机制通过 List 和 Watch 操作,实现了对 Kubernetes 资源的高效监控和处理。它通过本地缓存和事件处理机制,确保资源状态的实时同步,极大地简化了 Kubernetes 集群中资源管理的复杂性。通过合理使用 Informer,可以实现对 Kubernetes 资源的精细化管理和自动化运维。
使用 Mermaid 图表工具可以直观地表示 Kubernetes Informer 的工作流程。以下是基于前述解释的 Kubernetes Informer 工作流程的 Mermaid 图表示例。
这张图描述了 Kubernetes Informer 的完整工作流程,包括初始化、列表操作、监视、事件接收与处理、缓存更新以及周期性重新同步等步骤。通过这些步骤,Informer 能够有效地保持本地缓存与 Kubernetes API 服务器的资源状态同步,并在资源发生变化时及时执行相应的操作。
在 Kubernetes 的 Informer 架构中,Lister 是一个重要的组件,用于在本地缓存中存储和索引 Kubernetes API Server 中的资源对象。它提供了一种高效访问和检索资源对象的方式,避免了频繁地与 API Server 进行交互,从而提升了性能和效率。
在使用 client-go 库时,Lister 是 Informer 的一个基础组件,通常通过 SharedIndexInformer 或者其他具体的 Informer 实现来使用。以下是一个简单的示例,展示了如何使用 Listers 来获取和处理 Pod 资源对象:
package main
import (
"flag"
"fmt"
"log"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
)
func main() {
var kubeconfig string
if home := homedir.HomeDir(); home != "" {
flag.StringVar(&kubeconfig, "kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error building kubernetes clientset: %v", err)
}
// 创建一个 SharedIndexInformer 来监视 Pod 资源
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
// 启动 Informer,开始监视 Pod 资源
go podInformer.Run(make(chan struct{}))
// 等待 Informer 启动和同步完成
if !cache.WaitForCacheSync(make(chan struct{}), podInformer.HasSynced) {
log.Fatalf("Error syncing cache")
}
log.Printf("Pod Informer synced and ready")
// 获取 Lister
podLister := sharedInformerFactory.Core().V1().Pods().Lister()
// 示例:通过 Lister 获取所有命名空间下的 Pod 列表
allPods, err := podLister.List(v1.ListOptions{})
if err != nil {
log.Fatalf("Error listing pods: %v", err)
}
// 打印所有 Pod 的名称和命名空间
for _, pod := range allPods {
fmt.Printf("Pod: %s, Namespace: %s\n", pod.Name, pod.Namespace)
}
// 保持程序运行,等待事件处理
runtime.NewControllerStopChannel().Run()
}
这段代码演示了如何使用 Listers 在 Kubernetes 中获取和处理资源对象(如 Pod),并利用其快速访问和索引的优势。Lister 在 Informer 架构中扮演了重要角色,帮助提升程序的性能和效率,特别是在需要频繁访问和管理资源对象的场景下。
在 Kubernetes 的 Informer 架构中,Watcher 是一种用于监视和接收资源对象变化事件的重要组件。Watcher 负责与 Kubernetes API Server 建立连接,并订阅特定资源对象的变化通知。当 API Server 上的资源对象发生变化(如创建、更新、删除等操作)时,Watcher 将接收到相应的事件通知,并将这些事件传递给相应的 Informer 进行处理。
在 client-go 中,Watcher 通常由 SharedIndexInformer 或者 Factory 创建和管理。以下是一个简单示例,展示如何使用 Watcher 监视 Pod 资源对象的变化:
package main
import (
"flag"
"fmt"
"log"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/informers"
corev1 "k8s.io/api/core/v1"
)
func main() {
var kubeconfig string
if home := homedir.HomeDir(); home != "" {
flag.StringVar(&kubeconfig, "kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error building kubernetes clientset: %v", err)
}
// 创建一个 SharedIndexInformer 来监视 Pod 资源
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
// 定义 Watcher 事件处理函数
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newPod := newObj.(*corev1.Pod)
oldPod := oldObj.(*corev1.Pod)
fmt.Printf("Pod updated: %s/%s\n", newPod.Namespace, newPod.Name)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
},
})
// 启动 Informer,开始监视 Pod 资源
go podInformer.Run(make(chan struct{}))
// 等待 Informer 启动和同步完成
if !cache.WaitForCacheSync(make(chan struct{}), podInformer.HasSynced) {
log.Fatalf("Error syncing cache")
}
log.Printf("Pod Informer synced and ready")
// 保持程序运行,等待事件处理
runtime.NewControllerStopChannel().Run()
}
podInformer.Run
启动了 Pod 的 Informer,并通过 cache.WaitForCacheSync
等待 Informer 同步完成。这段示例代码展示了 Watcher 在 Informer 架构中的典型应用,用于实时监视和处理 Kubernetes 资源对象的变化事件,保证应用程序能够实时响应并处理重要的资源操作。
在 Kubernetes 的 Informer 架构中,Indexer 是一个关键的组件,用于在本地缓存中维护和管理资源对象的索引。Indexer 的主要作用是提供高效的资源对象检索和访问能力,通过多种索引方式(如名称、命名空间等)组织和存储资源对象,以支持快速的数据查询和操作。
以下是一个简化的示例,展示如何使用 client-go 库中的 Indexer 对 Pod 资源对象进行索引和查询:
package main
import (
"flag"
"fmt"
"log"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/informers"
corev1 "k8s.io/api/core/v1"
)
func main() {
var kubeconfig string
if home := homedir.HomeDir(); home != "" {
flag.StringVar(&kubeconfig, "kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error building kubernetes clientset: %v", err)
}
// 创建一个 SharedIndexInformer 来监视 Pod 资源
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
// 创建一个 Indexer
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
// 定义 Watcher 事件处理函数,并将事件添加到 Indexer 中
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
indexer.Add(pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newPod := newObj.(*corev1.Pod)
oldPod := oldObj.(*corev1.Pod)
fmt.Printf("Pod updated: %s/%s\n", newPod.Namespace, newPod.Name)
indexer.Update(newPod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
indexer.Delete(pod)
},
})
// 启动 Informer,开始监视 Pod 资源
go podInformer.Run(make(chan struct{}))
// 等待 Informer 启动和同步完成
if !cache.WaitForCacheSync(make(chan struct{}), podInformer.HasSynced) {
log.Fatalf("Error syncing cache")
}
log.Printf("Pod Informer synced and ready")
// 示例:通过 Indexer 查询所有命名空间下的 Pod 列表
allPods, err := indexer.ByIndex(cache.NamespaceIndex, "")
if err != nil {
log.Fatalf("Error listing pods: %v", err)
}
// 打印所有 Pod 的名称和命名空间
for _, obj := range allPods {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod: %s, Namespace: %s\n", pod.Name, pod.Namespace)
}
// 保持程序运行,等待事件处理
runtime.NewControllerStopChannel().Run()
}
cache.NewIndexer
创建了一个简单的 Indexer,并将其与 Pod 的 Informer 绑定。AddFunc
、UpdateFunc
和 DeleteFunc
方法处理 Pod 资源对象的增删改事件,并更新 Indexer 中的索引数据。indexer.ByIndex
方法查询所有命名空间下的 Pod 列表,并打印其名称和命名空间信息。这段示例代码展示了 Indexer 在 Informer 架构中的典型应用,通过维护资源对象的索引结构,实现了对 Kubernetes 资源对象的高效查询和访问,为应用程序提供了快速的数据检索和操作能力。
在 Kubernetes 的 Informer 架构中,Store 是一个重要的组件,用于在本地缓存中存储和管理 Kubernetes 资源对象的数据。Store 主要负责管理缓存中的资源对象集合,并提供对这些对象的快速访问、查询和操作能力,以支持 Informer 和其他控制器对资源对象的监视、管理和事件处理。
以下是一个简化的示例,展示如何使用 client-go 库中的 Store 对 Pod 资源对象进行存储和查询:
package main
import (
"flag"
"fmt"
"log"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/informers"
corev1 "k8s.io/api/core/v1"
)
func main() {
var kubeconfig string
if home := homedir.HomeDir(); home != "" {
flag.StringVar(&kubeconfig, "kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error building kubernetes clientset: %v", err)
}
// 创建一个 SharedIndexInformer 来监视 Pod 资源
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
// 创建一个 Store
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
// 定义 Watcher 事件处理函数,并将事件添加到 Store 中
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
store.Add(pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newPod := newObj.(*corev1.Pod)
oldPod := oldObj.(*corev1.Pod)
fmt.Printf("Pod updated: %s/%s\n", newPod.Namespace, newPod.Name)
store.Update(newPod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
store.Delete(pod)
},
})
// 启动 Informer,开始监视 Pod 资源
go podInformer.Run(make(chan struct{}))
// 等待 Informer 启动和同步完成
if !cache.WaitForCacheSync(make(chan struct{}), podInformer.HasSynced) {
log.Fatalf("Error syncing cache")
}
log.Printf("Pod Informer synced and ready")
// 示例:通过 Store 查询所有命名空间下的 Pod 列表
allPods := store.List()
// 打印所有 Pod 的名称和命名空间
for _, obj := range allPods {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod: %s, Namespace: %s\n", pod.Name, pod.Namespace)
}
// 保持程序运行,等待事件处理
runtime.NewControllerStopChannel().Run()
}
cache.NewStore
创建了一个简单的 Store,并将其与 Pod 的 Informer 绑定。AddFunc
、UpdateFunc
和 DeleteFunc
方法处理 Pod 资源对象的增删改事件,并更新 Store 中的数据。store.List
方法查询所有存储在 Store 中的 Pod 资源对象,并打印其名称和命名空间信息。这段示例代码展示了 Store 在 Informer 架构中的典型应用,通过管理和维护资源对象的本地缓存,实现了对 Kubernetes 资源对象的高效存储、查询和访问,为应用程序提供了快速和可靠的数据管理能力。
在 Kubernetes 中,Informer 架构中的 Controller 是负责监视和控制资源对象的逻辑组件,它通过监听 Informer 中的事件,并根据事件触发相应的业务逻辑或操作,来实现对资源对象的管理和控制。Controller 在 Kubernetes 中扮演着非常重要的角色,可以根据具体的需求实现各种自定义的控制逻辑,例如自动伸缩、负载均衡、故障恢复等。
以下是一个简化的示例场景,展示了如何使用 Informer 和 Controller 实现对 Deployment 资源对象的自动伸缩控制:
package main
import (
"flag"
"fmt"
"log"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/informers"
appsv1 "k8s.io/api/apps/v1"
)
func main() {
var kubeconfig string
if home := homedir.HomeDir(); home != "" {
flag.StringVar(&kubeconfig, "kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error building kubernetes clientset: %v", err)
}
// 创建一个 SharedIndexInformer 来监视 Deployment 资源
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
deploymentInformer := sharedInformerFactory.Apps().V1().Deployments().Informer()
// 定义 Controller 控制逻辑
controller := NewDeploymentController(clientset, deploymentInformer)
// 启动 Informer,开始监视 Deployment 资源
go deploymentInformer.Run(make(chan struct{}))
// 等待 Informer 启动和同步完成
if !cache.WaitForCacheSync(make(chan struct{}), deploymentInformer.HasSynced) {
log.Fatalf("Error syncing cache")
}
log.Printf("Deployment Informer synced and ready")
// 保持程序运行,等待事件处理
runtime.NewControllerStopChannel().Run()
}
// DeploymentController 定义一个简单的 Deployment 控制器
type DeploymentController struct {
clientset *kubernetes.Clientset
informer cache.SharedIndexInformer
}
// NewDeploymentController 创建一个新的 DeploymentController
func NewDeploymentController(clientset *kubernetes.Clientset, informer cache.SharedIndexInformer) *DeploymentController {
controller := &DeploymentController{
clientset: clientset,
informer: informer,
}
// 注册事件处理函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleDeploymentAdd,
UpdateFunc: controller.handleDeploymentUpdate,
DeleteFunc: controller.handleDeploymentDelete,
})
return controller
}
// 处理 Deployment 资源的添加事件
func (c *DeploymentController) handleDeploymentAdd(obj interface{}) {
deployment := obj.(*appsv1.Deployment)
fmt.Printf("Deployment added: %s/%s\n", deployment.Namespace, deployment.Name)
// 在这里编写自定义的添加处理逻辑,例如自动扩展 Pod 副本数
}
// 处理 Deployment 资源的更新事件
func (c *DeploymentController) handleDeploymentUpdate(oldObj, newObj interface{}) {
newDeployment := newObj.(*appsv1.Deployment)
oldDeployment := oldObj.(*appsv1.Deployment)
fmt.Printf("Deployment updated: %s/%s\n", newDeployment.Namespace, newDeployment.Name)
// 在这里编写自定义的更新处理逻辑
}
// 处理 Deployment 资源的删除事件
func (c *DeploymentController) handleDeploymentDelete(obj interface{}) {
deployment := obj.(*appsv1.Deployment)
fmt.Printf("Deployment deleted: %s/%s\n", deployment.Namespace, deployment.Name)
// 在这里编写自定义的删除处理逻辑
}
NewDeploymentController
函数中初始化并注册了 Informer 的事件处理函数。handleDeploymentAdd
、handleDeploymentUpdate
和 handleDeploymentDelete
方法分别处理 Deployment 资源的增加、更新和删除事件,每个方法中都可以编写自定义的业务逻辑。这段示例代码展示了 Controller 在 Kubernetes 中的典型应用,通过与 Informer 结合使用,实现了对 Deployment 资源对象的监视、管理和控制,为应用程序提供了灵活和强大的自动化操作能力。
在 Kubernetes 的 Informer 机制中,理论上 Informer 通过与 API Server 的长连接(watch)和定期的全量同步(resync)来确保事件不会丢失。但在某些情况下,Informer 仍然可能会丢失事件或出现延迟。这些情况主要包括以下几种:
SharedInformerFactory
可以在多个控制器之间共享同一个 Informer,提高资源利用效率,减少对 API Server 的负载。以下是一个示例,展示了如何处理 Informer 的事件丢失和重新同步:
package main
import (
"flag"
"fmt"
"log"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/informers"
corev1 "k8s.io/api/core/v1"
)
func main() {
var kubeconfig string
if home := homedir.HomeDir(); home != "" {
flag.StringVar(&kubeconfig, "kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error building kubernetes clientset: %v", err)
}
// 创建一个 SharedIndexInformer 来监视 Pod 资源
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
// 定义事件处理函数
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newPod := newObj.(*corev1.Pod)
oldPod := oldObj.(*corev1.Pod)
fmt.Printf("Pod updated: %s/%s\n", newPod.Namespace, newPod.Name)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
},
})
// 启动 Informer,开始监视 Pod 资源
stopCh := make(chan struct{})
defer close(stopCh)
go podInformer.Run(stopCh)
// 等待 Informer 启动和同步完成
if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
log.Fatalf("Error syncing cache")
}
log.Printf("Pod Informer synced and ready")
// 保持程序运行,等待事件处理
runtime.NewControllerStopChannel().Run()
}
cache.WaitForCacheSync
方法等待 Informer 的初始同步完成,确保缓存中有最新的资源状态。通过合理配置和使用 Kubernetes Informer,可以在大多数情况下避免事件丢失,并确保系统的稳定性和一致性。
在 Kubernetes 中,Informer
和 SharedInformer
是用于监视和缓存 Kubernetes 资源对象状态变化的工具,它们在实现上有一些区别和适用场景的差异。
Informer 是 Kubernetes 客户端库(例如 client-go)提供的一种机制,用于从 Kubernetes API Server 监听特定资源对象的变化事件(如 Pod、Deployment 等)。每个 Informer 实例都独立维护着对应资源对象的本地缓存,并通过长连接(watch)从 API Server 接收事件通知。以下是 Informer 的主要特点和应用场景:
SharedInformer 是对 Informer 的一种封装和扩展,它在 Kubernetes 客户端库(client-go)中提供,主要用于在多个控制器之间共享资源对象的监视和缓存。SharedInformer 可以有效地减少对 API Server 的请求次数,提高资源利用率,并简化多个控制器之间资源状态同步的复杂性。以下是 SharedInformer 的主要特点和应用场景:
示例场景:假设有一个监控系统需要同时监视 Pod 的状态并执行相应的操作:
通过选择合适的 Informer 或 SharedInformer 可以根据具体的业务需求和系统架构来优化资源对象的监视和控制,提高系统的效率和可扩展性。
Informer 在 Kubernetes 中设计的精髓主要体现在以下几个方面:
总的来说,Informer 设计的精髓在于其基于事件驱动的实时监视能力、本地缓存与同步策略的高效性、灵活的事件处理机制以及对大规模集群性能和可扩展性的考量。这些特点使得 Informer 成为 Kubernetes 中实现控制器逻辑和应用程序与集群交互的核心组件之一。