首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

external-resizer源码分析-pvc扩容分析

更多 ceph-csi 其他源码分析,请查看下面这篇博文:kubernetes ceph-csi 分析 - 目录导航https://xie.infoq.cn/article/4b1d3e32f124307a49cd9c1e3

基于 tag v0.5.0

https://github.com/kubernetes-csi/external-resizer/releases/tag/v0.5.0

存储扩容过程

存储扩容分为 controller 端和 node 端两步,先进行 controller 端(external-resizer 触发)的扩容,然后再进行 node 端(kubelet 触发)扩容(当 volumemode 是 block,则不用进行 node 端扩容操作),存储的扩容操作才算完成。

controller 端存储扩容作用

将底层存储扩容,如 ceph rbd 扩容,则会让 ceph 集群中的 rbd image 扩容。

node 端存储扩容作用

在 pod 所在的 node 上做相应的操作,让 node 感知该存储已经扩容,如 ceph rbd filesystem 扩容,则会调用 node 上的文件系统扩容命令让文件系统扩容。

某些存储无需进行 node 端扩容操作如 cephfs。

存储扩容大致过程

(1)更改 pvc.Spec.Resources.Requests.storgage,触发扩容

(2)controller 端存储扩容:external-resizer watch pvc 对象,当发现 pvc.Spec.Resources.Requests.storgage 比 pvc.Status.Capacity.storgage 大,于是调 csi plugin 的 ControllerExpandVolume 方法进行 controller 端扩容,进行底层存储扩容,并更新 pv.Spec.Capacity.storgage。

(3)node 端存储扩容:kubelet 发现 pv.Spec.Capacity.storage 大于 pvc.Status.Capacity.storage,于是调 csi node 端扩容,对 dnode 上文件系统扩容,成功后 kubelet 更新 pvc.Status.Capacity.storage。

存储扩容详细过程

下面以 ceph rbd 存储扩容为例,对详细的存储扩容过程进行分析。

(1)修改 pvc 对象,修改申请存储大小(pvc.spec.resources.requests.storage);

(2)修改成功后,external-resizer 监听到该 pvc 的 update 事件,发现 pvc.Spec.Resources.Requests.storgage 比 pvc.Status.Capacity.storgage 大,于是调 ceph-csi 组件进行 controller 端扩容;

(3)ceph-csi 组件调用 ceph 存储,进行底层存储扩容;

(4)底层存储扩容完成后,ceph-csi 组件更新 pv 对象的.Spec.Capacity.storgage 的值为扩容后的存储大小;

(5)kubelet 的 volume manager 在 reconcile()调谐过程中发现 pv.Spec.Capacity.storage 大于 pvc.Status.Capacity.storage,于是调 ceph-csi 组件进行 node 端扩容;

(6)ceph-csi 组件对 node 上存储对应的文件系统扩容;

(7)扩容完成后,kubelet 更新 pvc.Status.Capacity.storage 的值为扩容后的存储大小。

本节将对 controller 端存储扩容进行分析,node 端存储扩容已经在之前有分析过了,可以看 kubelet 存储扩容源码分析https://xie.infoq.cn/article/49a4c9b205e5373aa1c943638

controller 端存储扩容

当 pvc.Spec.Resources.Requests 大小比 pvc.Status.Capacity.storgage 大时,会触发到 controller 端(external-resizer)的扩容逻辑。

controller 端(external-resizer)的扩容操作包括:

(1)调用 csi plugin 的 ControllerExpandVolume 方法进行存储扩容;

(2)更新 pv 对象的.spec.capacity.storage 为扩容后的存储大小;

(3)更新 pvc 对象的.Status.Conditions,追加键值对"FileSystemResizePending":"true",表明该 pvc 的 controller 端扩容已经完成,接下来将由 kubelet 完成 node 端的存储扩容操作。

external-resizer 源码分析

Run

主要逻辑:根据 workers 的数量,起数量相等的 goroutine 不断的跑 ctrl.syncPVCs 来处理 pvc 变更事件,筛选出需要扩容的 pvc,触发扩容操作。

代码语言:javascript
复制
// pkg/controller/controller.go

// Run starts the controller.
func (ctrl *resizeController) Run(
  workers int, ctx context.Context) {
  defer ctrl.claimQueue.ShutDown()

  klog.Infof("Starting external resizer %s", ctrl.name)
  defer klog.Infof("Shutting down external resizer %s", ctrl.name)

  stopCh := ctx.Done()

  if !cache.WaitForCacheSync(stopCh, ctrl.pvSynced, ctrl.pvcSynced) {
    klog.Errorf("Cannot sync pv/pvc caches")
    return
  }

  for i := 0; i < workers; i++ {
    go wait.Until(ctrl.syncPVCs, 0, stopCh)
  }

  <-stopCh
}

1.syncPVCs

主要逻辑:调用 ctrl.syncPVC

代码语言:javascript
复制
// syncPVCs is the main worker.
func (ctrl *resizeController) syncPVCs() {
  key, quit := ctrl.claimQueue.Get()
  if quit {
    return
  }
  defer ctrl.claimQueue.Done(key)

  if err := ctrl.syncPVC(key.(string)); err != nil {
    // Put PVC back to the queue so that we can retry later.
    ctrl.claimQueue.AddRateLimited(key)
  } else {
    ctrl.claimQueue.Forget(key)
  }
}

1.1 syncPVC

处理扩容判断逻辑与执行扩容操作。

主要逻辑:

(1)获取 pvc 对象;

(2)调用 ctrl.pvcNeedResize 从 pvc 对象层面判断是否需要扩容;

(3)获取 pv 对象;

(4)调用 ctrl.pvNeedResize 对比 pvc 与 pv 对象判断是否需要扩容;

(5)如需扩容,则调用 ctrl.resizePVC 做扩容操作。

代码语言:javascript
复制
// syncPVC checks if a pvc requests resizing, and execute the resize operation if requested.
func (ctrl *resizeController) syncPVC(key string) error {
  klog.V(4).Infof("Started PVC processing %q", key)

  namespace, name, err := cache.SplitMetaNamespaceKey(key)
  if err != nil {
    klog.Errorf("Split meta namespace key of pvc %s failed: %v", key, err)
    return err
  }

  pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name)
  if err != nil {
    if k8serrors.IsNotFound(err) {
      klog.V(3).Infof("PVC %s/%s is deleted, no need to process it", namespace, name)
      return nil
    }
    klog.Errorf("Get PVC %s/%s failed: %v", namespace, name, err)
    return err
  }

  if !ctrl.pvcNeedResize(pvc) {
    klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc))
    return nil
  }

  pv, err := ctrl.pvLister.Get(pvc.Spec.VolumeName)
  if err != nil {
    if k8serrors.IsNotFound(err) {
      klog.V(3).Infof("PV %s is deleted, no need to process it", pvc.Spec.VolumeName)
      return nil
    }
    klog.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
    return err
  }

  if !ctrl.pvNeedResize(pvc, pv) {
    klog.V(4).Infof("No need to resize PV %q", pv.Name)
    return nil
  }

  return ctrl.resizePVC(pvc, pv)
}

下面先分析下 pvcNeedResize 与 pvNeedResize 方法。

pvcNeedResize

当 pvc.Status.Phase==Bound 而且 pvc.Spec.Resources.Requests.storgage 大小比 pvc.Status.Capacity.storgage 大时返回 true,说明符合扩容条件。

代码语言:js
复制
// pvcNeedResize returns true is a pvc requests a resize operation.
func (ctrl *resizeController) pvcNeedResize(pvc *v1.PersistentVolumeClaim) bool {
  // Only Bound pvc can be expanded.
  if pvc.Status.Phase != v1.ClaimBound {
    return false
  }
  if pvc.Spec.VolumeName == "" {
    return false
  }
  actualSize := pvc.Status.Capacity[v1.ResourceStorage]
  requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
  return requestSize.Cmp(actualSize) > 0
}

pvNeedResize

当 pv.Spec.Resources.Requests.storgage 大于或者等于 pvc.Status.Capacity.storgage,且 pvc 的.Status.Conditions 中有 key 为"FileSystemResizePending",值为“true”的键值对,则说明 controller 端扩容已完成,该方法返回 false;相反的,如果 pv.Spec.Resources.Requests.storgage 小于 pvc.Status.Capacity.storgage,则说明 controller 端未做扩容操作,需要进行扩容操作,返回 true。

扩容分为 controller 端和 node 端,先进行 controller 端(external-resizer 触发)的扩容,然后再进行 node 端(kubelet 触发)扩容,扩容操作才算完成。

代码语言:javascript
复制
// pvNeedResize returns true if a pv supports and also requests resize.
func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
  if !ctrl.resizer.CanSupport(pv, pvc) {
    klog.V(4).Infof("Resizer %q doesn't support PV %q", ctrl.name, pv.Name)
    return false
  }

  if (pv.Spec.ClaimRef == nil) || (pvc.Namespace != pv.Spec.ClaimRef.Namespace) || (pvc.UID != pv.Spec.ClaimRef.UID) {
    klog.V(4).Infof("persistent volume is not bound to PVC being updated: %s", util.PVCKey(pvc))
    return false
  }

  pvSize := pv.Spec.Capacity[v1.ResourceStorage]
  requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
  if pvSize.Cmp(requestSize) >= 0 {
    // If PV size is equal or bigger than request size, that means we have already resized PV.
    // In this case we need to check PVC's condition.
    // 1. If PVC in PersistentVolumeClaimResizing condition, we should continue to perform the
    //    resizing operation as we need to know if file system resize if required. (What's more,
    //    we hope the driver can find that the actual size already matched the request size and do nothing).
    // 2. If PVC in PersistentVolumeClaimFileSystemResizePending condition, we need to
    //    do nothing as kubelet will finish file system resizing and mark resize as finished.
    if util.HasFileSystemResizePendingCondition(pvc) {
      // This is case 2.
      return false
    }
    // This is case 1.
    return true
  }

  // PV size is smaller than request size, we need to resize the volume.
  return true
}

当 controller 端扩容已经完成时,util.HasFileSystemResizePendingCondition 返回 true。主要根据 pvc.Status.Conditions 中 key 为"FileSystemResizePending",值为“true”来判断。

代码语言:javascript
复制
const (
  // PersistentVolumeClaimFileSystemResizePending - controller resize is finished and a file system resize is pending on node
  PersistentVolumeClaimFileSystemResizePending PersistentVolumeClaimConditionType = "FileSystemResizePending"
)


// HasFileSystemResizePendingCondition returns true if a pvc has a FileSystemResizePending condition.
// This means the controller side resize operation is finished, and kublete side operation is in progress.
func HasFileSystemResizePendingCondition(pvc *v1.PersistentVolumeClaim) bool {
  for _, condition := range pvc.Status.Conditions {
    if condition.Type == v1.PersistentVolumeClaimFileSystemResizePending && condition.Status == v1.ConditionTrue {
      return true
    }
  }
  return false
}

1.1.1 resizePVC

该方法负责扩容操作的逻辑。

主要逻辑:

(1)调用 ctrl.markPVCResizeInProgress,更新 pvc 对象的.Status.Conditions,追加键值对"Resizing":"true",表明该 pvc 正在进行 resize;

(2)调用 ctrl.resizeVolume 做扩容操作;

(3)扩容完成,调用 ctrl.markPVCResizeFinished,更新 pvc 对象的.Status.Conditions,追加键值对"FileSystemResizePending":"true",表明该 pvc 的 controller 端扩容已经完成。

代码语言:javascript
复制
// resizePVC will:
// 1. Mark pvc as resizing.
// 2. Resize the volume and the pv object.
// 3. Mark pvc as resizing finished(no error, no need to resize fs), need resizing fs or resize failed.
func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
  if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc); err != nil {
    klog.Errorf("Mark pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
    return err
  } else if updatedPVC != nil {
    pvc = updatedPVC
  }

  // Record an event to indicate that external resizer is resizing this volume.
  ctrl.eventRecorder.Event(pvc, v1.EventTypeNormal, util.VolumeResizing,
    fmt.Sprintf("External resizer is resizing volume %s", pv.Name))

  err := func() error {
    newSize, fsResizeRequired, err := ctrl.resizeVolume(pvc, pv)
    if err != nil {
      return err
    }

    if fsResizeRequired {
      // Resize volume succeeded and need to resize file system by kubelet, mark it as file system resizing required.
      return ctrl.markPVCAsFSResizeRequired(pvc)
    }
    // Resize volume succeeded and no need to resize file system by kubelet, mark it as resizing finished.
    return ctrl.markPVCResizeFinished(pvc, newSize)
  }()

  if err != nil {
    // Record an event to indicate that resize operation is failed.
    ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeResizeFailed, err.Error())
  }

  return err
}

resizeVolume

主要逻辑:

(1)调用 ctrl.resizer.Resize 进行存储扩容;

(2)调用 util.UpdatePVCapacity 更新 pv 的.spec.capacity.storage。

代码语言:javascript
复制
// resizeVolume resize the volume to request size, and update PV's capacity if succeeded.
func (ctrl *resizeController) resizeVolume(
  pvc *v1.PersistentVolumeClaim,
  pv *v1.PersistentVolume) (resource.Quantity, bool, error) {
  requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]

  newSize, fsResizeRequired, err := ctrl.resizer.Resize(pv, requestSize)

  if err != nil {
    klog.Errorf("Resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
    return newSize, fsResizeRequired, fmt.Errorf("resize volume %s failed: %v", pv.Name, err)
  }
  klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)

  if err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient); err != nil {
    klog.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err)
    return newSize, fsResizeRequired, err
  }
  klog.V(4).Infof("Update capacity of PV %q to %s succeeded", pv.Name, newSize.String())

  return newSize, fsResizeRequired, nil
}

ctrl.resizer.Resize:组装请求,调用 r.client.Expand 进行存储扩容(实际是调用 csi plugin 的 ControllerExpandVolume 方法)

代码语言:javascript
复制
// Resize resizes the persistence volume given request size
// It supports both CSI volume and migrated in-tree volume
func (r *csiResizer) Resize(pv *v1.PersistentVolume, requestSize resource.Quantity) (resource.Quantity, bool, error) {
  oldSize := pv.Spec.Capacity[v1.ResourceStorage]

  var volumeID string
  var source *v1.CSIPersistentVolumeSource
  var pvSpec v1.PersistentVolumeSpec
  if pv.Spec.CSI != nil {
    // handle CSI volume
    source = pv.Spec.CSI
    volumeID = source.VolumeHandle
    pvSpec = pv.Spec
  } else {
    if csitranslationlib.IsMigratedCSIDriverByName(r.name) {
      // handle migrated in-tree volume
      csiPV, err := csitranslationlib.TranslateInTreePVToCSI(pv)
      if err != nil {
        return oldSize, false, fmt.Errorf("failed to translate persistent volume: %v", err)
      }
      source = csiPV.Spec.CSI
      pvSpec = csiPV.Spec
      volumeID = source.VolumeHandle
    } else {
      // non-migrated in-tree volume
      return oldSize, false, fmt.Errorf("volume %v is not migrated to CSI", pv.Name)
    }
  }

  if len(volumeID) == 0 {
    return oldSize, false, errors.New("empty volume handle")
  }

  var secrets map[string]string
  secreRef := source.ControllerExpandSecretRef
  if secreRef != nil {
    var err error
    secrets, err = getCredentials(r.k8sClient, secreRef)
    if err != nil {
      return oldSize, false, err
    }
  }

  secrets[pvCephMountPathKey] = pv.Annotations[pvCephMountPathKey]

  capability, err := GetVolumeCapabilities(pvSpec)
  if err != nil {
    return oldSize, false, fmt.Errorf("failed to get capabilities of volume %s with %v", pv.Name, err)
  }

  ctx, cancel := timeoutCtx(r.timeout)
  defer cancel()
  newSizeBytes, nodeResizeRequired, err := r.client.Expand(ctx, volumeID, requestSize.Value(), secrets, capability)
  if err != nil {
    return oldSize, nodeResizeRequired, err
  }

  return *resource.NewQuantity(newSizeBytes, resource.BinarySI), nodeResizeRequired, err
}

代码语言:javascript
复制
// pkg/csi/client.go
func (c *client) Expand(
  ctx context.Context,
  volumeID string,
  requestBytes int64,
  secrets map[string]string,
  capability *csi.VolumeCapability) (int64, bool, error) {
  req := &csi.ControllerExpandVolumeRequest{
    Secrets:          secrets,
    VolumeId:         volumeID,
    CapacityRange:    &csi.CapacityRange{RequiredBytes: requestBytes},
    VolumeCapability: capability,
  }
  resp, err := c.ctrlClient.ControllerExpandVolume(ctx, req)
  if err != nil {
    return 0, false, err
  }
  return resp.CapacityBytes, resp.NodeExpansionRequired, nil
}

util.UpdatePVCapacity:更新 pv 对象的.spec.capacity.storage 为扩容后的大小。

代码语言:javascript
复制
// UpdatePVCapacity updates PVC capacity with requested size.
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error {
  newPV := pv.DeepCopy()
  newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity
  patchBytes, err := getPatchData(pv, newPV)
  if err != nil {
    return fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err)
  }
  _, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(pv.Name, types.StrategicMergePatchType, patchBytes)
  if updateErr != nil {
    return fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr)
  }
  return nil
}

至此,external-resizer 的扩容分析结束。

总结

存储扩容分为 controller 端和 node 端两步,先进行 controller 端(external-resizer 触发)的扩容,然后再进行 node 端(kubelet 触发)扩容(当 volumemode 是 block,则不用进行 node 端扩容操作),存储的扩容操作才算完成。

controller 端存储扩容作用

将底层存储扩容,如 ceph rbd 扩容,则会让 ceph 集群中的 rbd image 扩容。

node 端存储扩容作用

在 pod 所在的 node 上做相应的操作,让 node 感知该存储已经扩容,如 ceph rbd filesystem 扩容,则会调用 node 上的文件系统扩容命令让文件系统扩容。

某些存储无需进行 node 端扩容操作如 cephfs。

存储扩容大致过程

(1)更改 pvc.Spec.Resources.Requests.storgage,触发扩容

(2)controller 端存储扩容:external-resizer watch pvc 对象,当发现 pvc.Spec.Resources.Requests.storgage 比 pvc.Status.Capacity.storgage 大,于是调 csi plugin 的 ControllerExpandVolume 方法进行 controller 端扩容,进行底层存储扩容,并更新 pv.Spec.Capacity.storgage。

(3)node 端存储扩容:kubelet 发现 pv.Spec.Capacity.storage 大于 pvc.Status.Capacity.storage,于是调 csi node 端扩容,对 dnode 上文件系统扩容,成功后 kubelet 更新 pvc.Status.Capacity.storage

controller 端存储扩容过程

controller 端(external-resizer)的主要扩容操作包括:

(1)调用 csi plugin 的 ControllerExpandVolume 方法进行存储扩容;

(2)更新 pv 对象的.spec.capacity.storage 为扩容后的存储大小;

(3)更新 pvc 对象的.Status.Conditions,追加键值对"FileSystemResizePending":"true",表明该 pvc 的 controller 端扩容已经完成,接下来将由 kubelet 完成 node 端的存储扩容操作。

node 端存储扩容

kubelet 存储扩容源码分析

https://xie.infoq.cn/article/49a4c9b205e5373aa1c943638

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/caa96707c858372930c972aaa
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券