前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊PowerJob的MapProcessor

聊聊PowerJob的MapProcessor

原创
作者头像
code4it
发布于 2024-01-24 01:41:13
发布于 2024-01-24 01:41:13
22400
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下PowerJob的MapProcessor

MapProcessor

tech/powerjob/worker/core/processor/sdk/MapProcessor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface MapProcessor extends BasicProcessor {

    Logger log = LoggerFactory.getLogger(MapProcessor.class);

    int RECOMMEND_BATCH_SIZE = 200;

    /**
     * 分发子任务
     * @param taskList 子任务,再次执行时可通过 TaskContext#getSubTask 获取
     * @param taskName 子任务名称,即子任务处理器中 TaskContext#getTaskName 获取到的值
     * @throws PowerJobCheckedException map 失败将抛出异常
     */
    default void map(List<?> taskList, String taskName) throws PowerJobCheckedException {

        if (CollectionUtils.isEmpty(taskList)) {
            return;
        }

        TaskDO task = ThreadLocalStore.getTask();
        WorkerRuntime workerRuntime = ThreadLocalStore.getRuntimeMeta();

        if (taskList.size() > RECOMMEND_BATCH_SIZE) {
            log.warn("[Map-{}] map task size is too large, network maybe overload... please try to split the tasks.", task.getInstanceId());
        }

        // 修复 map 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
        if (TaskConstant.ROOT_TASK_NAME.equals(taskName) || TaskConstant.LAST_TASK_NAME.equals(taskName)) {
            log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically." ,task.getInstanceId() ,taskName , taskName);
            taskName ="X-"+taskName;
        }

        // 1. 构造请求
        ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);

        // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常)
        boolean requestSucceed = TransportUtils.reliableMapTask(req, task.getAddress(), workerRuntime);

        if (requestSucceed) {
            log.info("[Map-{}] map task[name={},num={}] successfully!", task.getInstanceId(), taskName, taskList.size());
        }else {
            throw new PowerJobCheckedException("map failed for task: " + taskName);
        }
    }

    /**
     * 是否为根任务
     * @return true -> 根任务 / false -> 非根任务
     */
    default boolean isRootTask() {
        TaskDO task = ThreadLocalStore.getTask();
        return TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName());
    }
}

MapProcessor接口继承了BasicProcessor,它提供了默认的map方法用于分发子任务,它主要是构造了ProcessorMapTaskRequest,通过TransportUtils.reliableMapTask发送请求;它还提供了isRootTask方法用于判断当前任务是不是根任务

ProcessorMapTaskRequest

tech/powerjob/worker/pojo/request/ProcessorMapTaskRequest.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Getter
@NoArgsConstructor
public class ProcessorMapTaskRequest implements PowerSerializable {

    private Long instanceId;
    private Long subInstanceId;

    private String taskName;
    private List<SubTask> subTasks;

    @Getter
    @NoArgsConstructor
    @AllArgsConstructor
    public static class SubTask {
        private String taskId;
        private byte[] taskContent;
    }

    public ProcessorMapTaskRequest(TaskDO parentTask, List<?> subTaskList, String taskName) {

        this.instanceId = parentTask.getInstanceId();
        this.subInstanceId = parentTask.getSubInstanceId();
        this.taskName = taskName;
        this.subTasks = Lists.newLinkedList();

        subTaskList.forEach(subTask -> {
            // 同一个 Task 内部可能多次 Map,因此还是要确保线程级别的唯一
            String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.getTaskIDAddr().getAndIncrement();
            // 写入类名,方便反序列化
            subTasks.add(new SubTask(subTaskId, SerializerUtils.serialize(subTask)));
        });
    }
}

ProcessorMapTaskRequest的构造器将subTaskList转换为一系列的SubTask,它使用SerializerUtils.serialize序列化(Kryo)了用户定义的subTask

reliableMapTask

tech/powerjob/worker/common/utils/TransportUtils.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public static boolean reliableMapTask(ProcessorMapTaskRequest req, String address, WorkerRuntime workerRuntime) throws PowerJobCheckedException {
        try {
            return reliableAsk(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address, req, workerRuntime.getTransporter()).isSuccess();
        } catch (Throwable throwable) {
            throw new PowerJobCheckedException(throwable);
        }
    }

    private static AskResponse reliableAsk(ServerType t, String rootPath, String handlerPath, String address, PowerSerializable req, Transporter transporter) throws Exception {
        final URL url = easyBuildUrl(t, rootPath, handlerPath, address);
        final CompletionStage<AskResponse> completionStage = transporter.ask(url, req, AskResponse.class);
        return completionStage
                .toCompletableFuture()
                .get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }    

reliableMapTask方法通过reliableAsk往taskTracker/mapTask接口发送请求,默认是5s超时

onReceiveProcessorMapTaskRequest

tech/powerjob/worker/actors/TaskTrackerActor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * 子任务 map 处理器
     */
    @Handler(path = WTT_HANDLER_MAP_TASK)
    public AskResponse onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) {

        HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());
        if (taskTracker == null) {
            log.warn("[TaskTrackerActor] receive ProcessorMapTaskRequest({}) but system can't find TaskTracker.", req);
            return null;
        }

        boolean success = false;
        List<TaskDO> subTaskList = Lists.newLinkedList();

        try {

            req.getSubTasks().forEach(originSubTask -> {
                TaskDO subTask = new TaskDO();

                subTask.setTaskName(req.getTaskName());
                subTask.setSubInstanceId(req.getSubInstanceId());

                subTask.setTaskId(originSubTask.getTaskId());
                subTask.setTaskContent(originSubTask.getTaskContent());

                subTaskList.add(subTask);
            });

            success = taskTracker.submitTask(subTaskList);
        }catch (Exception e) {
            log.warn("[TaskTrackerActor] process map task(instanceId={}) failed.", req.getInstanceId(), e);
        }

        AskResponse response = new AskResponse();
        response.setSuccess(success);
        return response;
    }

TaskTrackerActor提供了onReceiveProcessorMapTaskRequest方法处理ProcessorMapTaskRequest,它将入参的subTasks转换为一系列的TaskDO,然后通过taskTracker.submitTask提交

submitTask

tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * 提交Task任务(MapReduce的Map,Broadcast的广播),上层保证 batchSize,同时插入过多数据可能导致失败
     *
     * @param newTaskList 新增的子任务列表
     */
    public boolean submitTask(List<TaskDO> newTaskList) {
        if (finished.get()) {
            return true;
        }
        if (CollectionUtils.isEmpty(newTaskList)) {
            return true;
        }
        // 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!)
        newTaskList.forEach(task -> {
            task.setInstanceId(instanceId);
            task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
            task.setFailedCnt(0);
            task.setLastModifiedTime(System.currentTimeMillis());
            task.setCreatedTime(System.currentTimeMillis());
            task.setLastReportTime(-1L);
        });

        log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList);
        return taskPersistenceService.batchSave(newTaskList);
    }

submitTask方法先填充一些字段信息,比如设置status为TaskStatus.WAITING_DISPATCH,然后调用taskPersistenceService.batchSave保存

batchSave

tech/powerjob/worker/persistence/TaskPersistenceService.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public boolean batchSave(List<TaskDO> tasks) {
        if (CollectionUtils.isEmpty(tasks)) {
            return true;
        }
        try {
            return execute(() -> taskDAO.batchSave(tasks));
        }catch (Exception e) {
            log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e);
        }
        return false;
    }

    private static  <T> T execute(SupplierPlus<T> executor) throws Exception {
        return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS);
    }    

batchSave通过taskDAO.batchSave报错,它针对异常会重试3次,每次间隔100ms

TaskDAOImpl.batchSave

tech/powerjob/worker/persistence/TaskDAOImpl.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public boolean batchSave(Collection<TaskDO> tasks) throws SQLException {
        String insertSql = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)";
        boolean originAutoCommitFlag ;
        try (Connection conn = connectionFactory.getConnection()) {
            originAutoCommitFlag = conn.getAutoCommit();
            conn.setAutoCommit(false);
            try ( PreparedStatement ps = conn.prepareStatement(insertSql)) {
                for (TaskDO task : tasks) {
                    fillInsertPreparedStatement(task, ps);
                    ps.addBatch();
                }
                ps.executeBatch();
                return true;
            } catch (Throwable e) {
                conn.rollback();
                throw e;
            } finally {
                conn.setAutoCommit(originAutoCommitFlag);
            }
        }
    }

TaskDAOImpl的batchSave直接通过jdbc的executeBatch进行批量保存

Dispatcher

tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去
     */
    protected class Dispatcher implements Runnable {

        // 数据库查询限制,每次最多查询几个任务
        private static final int DB_QUERY_LIMIT = 100;

        @Override
        public void run() {

            if (finished.get()) {
                return;
            }

            Stopwatch stopwatch = Stopwatch.createStarted();

            // 1. 获取可以派发任务的 ProcessorTracker
            List<String> availablePtIps = ptStatusHolder.getAvailableProcessorTrackers();

            // 2. 没有可用 ProcessorTracker,本次不派发
            if (availablePtIps.isEmpty()) {
                log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId);
                return;
            }

            // 3. 避免大查询,分批派发任务
            long currentDispatchNum = 0;
            long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
            AtomicInteger index = new AtomicInteger(0);

            // 4. 循环查询数据库,获取需要派发的任务
            while (maxDispatchNum > currentDispatchNum) {

                int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum);
                List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit);
                currentDispatchNum += needDispatchTasks.size();

                needDispatchTasks.forEach(task -> {
                    // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address
                    String ptAddress = task.getAddress();
                    if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
                        ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
                    }
                    dispatchTask(task, ptAddress);
                });

                // 数量不足 或 查询失败,则终止循环
                if (needDispatchTasks.size() < dbQueryLimit) {
                    break;
                }
            }

            log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
        }
    }

HeavyTaskTracker每5s调度一次Dispatcher,其run方法先通过ptStatusHolder.getAvailableProcessorTrackers()获取可以派发任务的ProcessorTracker,之后循环从数据库拉取一批状态为TaskStatus.WAITING_DISPATCH的任务,通过轮询的方式进行dispatchTask

dispatchTask

tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected void dispatchTask(TaskDO task, String processorTrackerAddress) {

        // 1. 持久化,更新数据库(如果更新数据库失败,可能导致重复执行,先不处理)
        TaskDO updateEntity = new TaskDO();
        updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
        // 写入处理该任务的 ProcessorTracker
        updateEntity.setAddress(processorTrackerAddress);
        boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);
        if (!success) {
            log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName());
            return;
        }

        // 2. 更新 ProcessorTrackerStatus 状态
        ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
        // 3. 初始化缓存
        taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L));

        // 4. 任务派发
        TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress());
        TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter());

        log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());
    }

dispatchTask先更新status为DISPATCH_SUCCESS_WORKER_UNCHECK,更新处理该任务的ProcessorTracker,之后构建TaskTrackerStartTaskReq,通过TransportUtils.ttStartPtTask派送执行task的请求

onReceiveTaskTrackerStartTaskReq

tech/powerjob/worker/actors/ProcessorTrackerActor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @Handler(path = RemoteConstant.WPT_HANDLER_START_TASK, processType = ProcessType.NO_BLOCKING)
    public void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {

        Long instanceId = req.getInstanceInfo().getInstanceId();

        // 创建 ProcessorTracker 一定能成功
        ProcessorTracker processorTracker = ProcessorTrackerManager.getProcessorTracker(
                instanceId,
                req.getTaskTrackerAddress(),
                () -> new ProcessorTracker(req, workerRuntime));

        TaskDO task = new TaskDO();

        task.setTaskId(req.getTaskId());
        task.setTaskName(req.getTaskName());
        task.setTaskContent(req.getTaskContent());
        task.setFailedCnt(req.getTaskCurrentRetryNums());
        task.setSubInstanceId(req.getSubInstanceId());

        processorTracker.submitTask(task);
    }

ProcessorTrackerActor的onReceiveTaskTrackerStartTaskReq主要是获取或者创建processorTracker,然后执行其submitTask提交任务

ProcessorTracker.submitTask

tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public void submitTask(TaskDO newTask) {

        // 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁
        // 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T
        if (lethal) {
            ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq()
                    .setInstanceId(instanceId)
                    .setSubInstanceId(newTask.getSubInstanceId())
                    .setTaskId(newTask.getTaskId())
                    .setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue())
                    .setResult(lethalReason)
                    .setReportTime(System.currentTimeMillis());

            TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime);
            return;
        }

        boolean success = false;
        // 1. 设置值并提交执行
        newTask.setInstanceId(instanceInfo.getInstanceId());
        newTask.setAddress(taskTrackerAddress);

        HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime);
        try {
            threadPool.submit(heavyProcessorRunnable);
            success = true;
        } catch (RejectedExecutionException ignore) {
            log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",
                    instanceId, newTask.getTaskId(), newTask.getTaskName());
        } catch (Exception e) {
            log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);
        }

        // 2. 回复接收成功
        if (success) {
            ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
            reportReq.setInstanceId(instanceId);
            reportReq.setSubInstanceId(newTask.getSubInstanceId());
            reportReq.setTaskId(newTask.getTaskId());
            reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
            reportReq.setReportTime(System.currentTimeMillis());

            TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime);

            log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
                    instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());
        }
    }

ProcessorTracker的submitTask方法创建HeavyProcessorRunnable,提交到threadPool执行,之后回复ProcessorReportTaskStatusReq,告知status为TaskStatus.WORKER_RECEIVED

小结

  • MapProcessor接口继承了BasicProcessor,它提供了默认的map方法用于分发子任务,它主要是构造了ProcessorMapTaskRequest,通过TransportUtils.reliableMapTask发送请求,它先把task保存下来,初始的status为TaskStatus.WAITING_DISPATCH;
  • HeavyTaskTracker每5s调度一次Dispatcher,其run方法先通过ptStatusHolder.getAvailableProcessorTrackers()获取可以派发任务的ProcessorTracker,之后循环从数据库拉取一批状态为TaskStatus.WAITING_DISPATCH的任务,通过轮询的方式进行dispatchTask;
  • dispatchTask先更新status为DISPATCH_SUCCESS_WORKER_UNCHECK,更新处理该任务的ProcessorTracker,之后构建TaskTrackerStartTaskReq,通过TransportUtils.ttStartPtTask派送执行task的请求;
  • ProcessorTracker的submitTask方法创建HeavyProcessorRunnable,提交到threadPool执行,之后回复ProcessorReportTaskStatusReq,告知status为TaskStatus.WORKER_RECEIVED;
  • 最后通过HeavyProcessorRunnable调用对应的processor.process方法执行具体的任务

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
kubernetes(k8s)命名空间一直Terminating
回到刚才窗口 将 terminating 状态的命名空间信息导出到 json 文件:
小陈运维
2022/04/01
1.3K0
快速搭建kubernetes与kubeSphere环境(亲测有效)
最近在学习k8s,但是安装k8s实在太麻烦了,也耗费了不少时间,下面来说说安装步骤。以下是我的机器配置,其实2核4G也可以,但是因为我想学习下kubeSphere就要求配置高了
蒋老湿
2019/11/14
6.3K0
k8s集群中namespace状态一直显示Terminating如何解决
因为k8s主节点使用了认证,如果直接使用命令会拒绝连接,需要使用kube-proxy进行代理8081端口
linus_lin
2024/09/06
1290
k8s集群中namespace状态一直显示Terminating如何解决
k8s命名空间和运行环境
k8s默认会自动生成3个命名空间 default:所有未指定Namespace的对象都会被分配在default命名空间。 kube-system:所有由Kubernetes系统创建的资源都处于这个命名空间。 kube-public:此命名空间下的资源可以被所有人访问(包括未认证用户)。
丁D
2022/08/12
1.6K0
Kubernetes Dashboard 终结者:KubeSphere
2018 年 7 月份,青云在 Cloud Insight 云计算峰会上推出了一款全新的容器平台——KubeSphere,旨在帮助企业快速低成本管理容器。并且 KubeSphere 本身是开源的,它是基于 Kubernetes 构建的分布式、多租户、企业级开源容器平台,具有强大且完善的网络与存储能力,并通过极简的人机交互提供完善的多集群管理、CI / CD 、微服务治理、应用管理等功能,帮助企业在云、虚拟化及物理机等异构基础设施上快速构建、部署及运维容器架构,实现应用的敏捷开发与全生命周期管理。
米开朗基杨
2019/09/24
1.9K0
Kubernetes Dashboard 终结者:KubeSphere
【k8s】鲲鹏(arm64)+银河麒麟V10离线部署K8S+KubeSphere+Harbor
本文只演示离线部署过程,离线制品和其他安装包可查看之前文章自己制作,也可添加作者微信:sd_zdhr获取。
编码如写诗
2025/03/14
4194
【k8s】鲲鹏(arm64)+银河麒麟V10离线部署K8S+KubeSphere+Harbor
ARM 版 OpenEuler 22.03 部署 KubeSphere v3.4.0 不完全指南
本文介绍了如何在 openEuler 22.03 LTS SP2 aarch64 架构服务器上部署 KubeSphere 和 Kubernetes 集群。我们将使用 KubeSphere 开发的 KubeKey 工具实现自动化部署,在三台服务器上实现高可用模式最小化部署 Kubernetes 集群和 KubeSphere。
运维有术
2023/10/18
8400
ARM 版 OpenEuler 22.03 部署 KubeSphere v3.4.0 不完全指南
33 张高清大图,带你玩转 KubeSphere v4.1.2 部署与扩展组件安装
备受瞩目的 KubeSphere v4.1.2 已经正式官宣发布,该版本带来了一个重大优化:增加默认的扩展组件仓库。
运维有术
2024/11/12
7950
33 张高清大图,带你玩转 KubeSphere v4.1.2 部署与扩展组件安装
KubeSphere3.0启用k8s多集群
多集群功能涉及到多个集群之间的网络连通,了解集群之前的网络拓扑有助于减少接下来的工作量。
jwangkun
2021/12/23
1.3K0
KubeSphere3.0启用k8s多集群
KubeSphere 最佳实战:59 张高清大图,带你实战入门 KubeSphere DevOps
KubeSphere 基于 Jenkins[1] 的 DevOps 系统是专为 Kubernetes 中的 CI/CD 工作流设计的,它提供了一站式的解决方案,帮助开发和运维团队用非常简单的方式构建、测试和发布应用到 Kubernetes。它还具有插件管理、Binary-to-Image (B2I)[2]、Source-to-Image (S2I)[3]、代码依赖缓存、代码质量分析、流水线日志等功能。
运维有术
2024/08/19
4320
KubeSphere 最佳实战:59 张高清大图,带你实战入门 KubeSphere DevOps
KubeSphere DevOps 初体验,内置 Jenkins 引擎
KubeSphere 是在 Kubernetes 之上构建的以应用为中心的多租户容器平台,提供全栈的 IT 自动化运维的能力,简化企业的 DevOps 工作流。KubeSphere 提供了运维友好的向导式操作界面,帮助企业快速构建一个强大和功能丰富的容器云平台。KubeSphere 支持部署在任何基础设施环境,提供在线与离线安装,支持一键升级与扩容集群,并且各功能组件支持模块化和可插拔的安装。
LinuxSuRen
2020/07/03
2.3K0
K8S:分享一次“乌龙问题”(人为导致的无法正常删除命名空间)
问题出在这里:DiscoveryFailed:Discovery failed for some groups, 1 failing: unable to retrieve the complete list of server APIs: metrics.k8s.io/v1beta1: the server is currently unable to handle the request
不背锅运维
2023/05/07
1.3K0
K8S:分享一次“乌龙问题”(人为导致的无法正常删除命名空间)
k8s基础-命名空间
k8s命名空间为对象名称提供了一个作用域,我们可以把资源放到不同的命名空间中,这样我们可以使用同名的资源名称,只要保证同一命名空间中的资源名称唯一即可
dogfei
2020/07/31
7210
k8s删除Terminating状态的命名空间
背景: 我们都知道在k8s中namespace有两种常见的状态,即Active和Terminating状态,其中后者一般会比较少见,只有当对应的命名空间下还存在运行的资源,但是该命名空间被删除时才会出现所谓的terminating状态,这种情况下只要等待k8s本身将命名空间下的资源回收后,该命名空间将会被系统自动删除。但是今天遇到命名空间下已没相关资源,但依然无法删除terminating状态的命名空间的情况,特此记录一下.
BGBiao
2019/10/23
4.2K0
ARM 版 Kylin V10 部署 KubeSphere v3.4.0 不完全指南
本文介绍了如何在 麒麟 V10 aarch64 架构服务器上部署 KubeSphere 和 Kubernetes 集群。我们将使用 KubeSphere 开发的 KubeKey 工具实现自动化部署,在三台服务器上实现高可用模式最小化部署 Kubernetes 集群和 KubeSphere。
运维有术
2023/11/09
1.4K1
ARM 版 Kylin V10 部署 KubeSphere v3.4.0 不完全指南
删除namespace为什么会Terminating?
当我们删除集群中的某个namespace之后,有时候namespace并没有按照我们的期望正常删除,而是一直卡在Terminating状态。本文主要讨论下Terminating状态发生的可能性以及解决办法。
keke.
2021/03/16
10.1K0
KubeSphere 最佳实战:59 张高清大图,带你实战入门 KubeSphere DevOps
KubeSphere 基于 Jenkins 的 DevOps 系统是专为 Kubernetes 中的 CI/CD 工作流设计的,它提供了一站式的解决方案,帮助开发和运维团队用非常简单的方式构建、测试和发布应用到 Kubernetes。它还具有插件管理、Binary-to-Image (B2I)、Source-to-Image (S2I)、代码依赖缓存、代码质量分析、流水线日志等功能。
运维有术
2024/08/14
4340
KubeSphere 最佳实战:59 张高清大图,带你实战入门 KubeSphere DevOps
在现有 Kubernetes 集群上安装 KubeSphere
KubeSphere是在 Kubernetes 之上构建的企业级分布式多租户容器管理平台,提供简单易用的操作界面以及向导式操作方式,在降低用户使用容器调度平台学习成本的同时,极大减轻开发、测试、运维的日常工作的复杂度,旨在解决 Kubernetes 本身存在的存储、网络、安全和易用性等痛点。除此之外,平台已经整合并优化了多个适用于容器场景的功能模块,以完整的解决方案帮助企业轻松应对敏捷开发与自动化运维、微服务治理、多租户管理、工作负载和集群管理、服务与网络管理、应用编排与管理、镜像仓库管理和存储管理等业务场景。
jwangkun
2021/12/23
1K0
在现有 Kubernetes 集群上安装 KubeSphere
基于 KubeSphere 玩转 k8s 第二季|KubeKey 扩容 Kubernetes Worker 节点实战
上一期,我们实战讲解了使用 KubeSphere 开发的 KubeKey 工具自动化部署 3 Master 和 1 Worker 的 Kubernetes 集群和 KubeSphere。
运维有术
2023/07/16
5710
KubeSphere离线无网络环境部署
KubeSphere 是 GitHub 上的一个开源项目,是成千上万名社区用户的聚集地。很多用户都在使用 KubeSphere 运行工作负载。对于在 Linux 上的安装,KubeSphere 既可以部署在云端,也可以部署在本地环境中,例如 AWS EC2、Azure VM 和裸机等。
小陈运维
2022/01/13
1.3K0
推荐阅读
相关推荐
kubernetes(k8s)命名空间一直Terminating
更多 >
LV.0
这个人很懒,什么都没有留下~
目录
  • MapProcessor
  • ProcessorMapTaskRequest
  • reliableMapTask
  • onReceiveProcessorMapTaskRequest
  • submitTask
  • batchSave
  • TaskDAOImpl.batchSave
  • Dispatcher
  • dispatchTask
  • onReceiveTaskTrackerStartTaskReq
  • ProcessorTracker.submitTask
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档