前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Elastic-Job系列二之调度全流程

Elastic-Job系列二之调度全流程

原创
作者头像
用户9511949
修改于 2024-07-09 07:16:48
修改于 2024-07-09 07:16:48
18600
代码可运行
举报
运行总次数:0
代码可运行

1 ElasticJobExecutor

elastic-job真正任务的执行时通过ElasticJobExecutor来执行,在新建JobScheduler实例时新建该实例,其内部构造函数如下

其中elasticJob属性为用户执行业务逻辑的实例,其他属性的作用在后续的分析中会一一提到

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final JobItemExecutor jobItemExecutor) {
    this.elasticJob = elasticJob;
    this.jobFacade = jobFacade;
    this.jobItemExecutor = jobItemExecutor;
    executorContext = new ExecutorContext(jobFacade.loadJobConfiguration(true));
    itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
}

接下来看看调度执行的入口,ElasticJobExecutor的无参execute方法,先整体看看执行的步骤

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void execute() {
    JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
    // 1 查看是否需要重新加载任务处理线程池和任务出错处理方式
    executorContext.reloadIfNecessary(jobConfig);
    JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
    try {
        // 2 校验Job执行的环境条件
        jobFacade.checkJobExecutionEnvironment();
    } catch (final JobExecutionEnvironmentException cause) {
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
    // 3 获取分片信息
    ShardingContexts shardingContexts = jobFacade.getShardingContexts();
    // 4 发布任务staging信息
    jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
    // 5 如果当前需要执行的分片正在running,那么设置所有的分片misfire,然后直接返回
    if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(),
                shardingContexts.getShardingItemParameters().keySet()));
        return;
    }
    try {
        // 6 执行job之前的自定义Listener的逻辑
        jobFacade.beforeJobExecuted(shardingContexts);
        // CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        // CHECKSTYLE:ON
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
    // 7 执行job
    execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
    // 8 如果需要执行misfire的任务,则在此处触发执行
    while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
    }
    // 9 任务执行完成之后看是否需要failover(查看/namespace/jobName/leader/failover/items下的子节点
    // 是否为空,如果不为空,当前节点参与failover的leader选举,成功则将执行失败的分片需要执行的任务
    // 在当前节点重新执行)
    jobFacade.failoverIfNecessary();
    try {
        // 10 job执行之后的自定义Listener的逻辑
        jobFacade.afterJobExecuted(shardingContexts);
        // CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        // CHECKSTYLE:ON
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
}

执行步骤还是比较清晰的,下面依次看看各个步骤具体干了什么

第一步:executorContext.reloadIfNecessary(jobConfig),查看是否需要重新加载context中可以需要重新加载的项

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void reloadIfNecessary(final JobConfiguration jobConfiguration) {
    reloadableItems.values().forEach(each -> each.reloadIfNecessary(jobConfiguration));
}

reloadableItems中的类需要实现Reloadable接口,elastic-job利用SPI机制加载实现了Reloadable接口的类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public ExecutorContext(final JobConfiguration jobConfig) {
    ServiceLoader.load(Reloadable.class).forEach(each -> {
        ElasticJobServiceLoader.newTypedServiceInstance(Reloadable.class, each.getType(), new Properties())
                .ifPresent(reloadable -> reloadableItems.put(reloadable.getType(), reloadable));
    });
    initReloadable(jobConfig);
}

有两个类实现了Reloadable接口

ExecutorServiceReloadable:获取任务执行的线程池,自带两种线程数设置方式,单线程和根据CPU的核数设置,默认是根据CPU核数设置,所以其reloadIfNecessary方法主要就是根据config中配置的线程池获取方式加载对应的线程池

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
    String newJobExecutorServiceHandlerType = Strings.isNullOrEmpty(jobConfig.getJobExecutorServiceHandlerType())
            ? JobExecutorServiceHandlerFactory.DEFAULT_HANDLER
            : jobConfig.getJobExecutorServiceHandlerType();
    if (newJobExecutorServiceHandlerType.equals(jobExecutorServiceHandlerType)) {
        return;
    }
    log.debug("JobExecutorServiceHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobExecutorServiceHandlerType, newJobExecutorServiceHandlerType);
    reload(newJobExecutorServiceHandlerType, jobConfig.getJobName());
}

JobErrorHandlerReloadable:获取任务出错处理类,elastic-job支持的错误处理方式有6中,钉钉通知、邮箱通知、忽略、记录日志,微信通知、直接排除异常,reloadIfNecessary方法如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
    String newJobErrorHandlerType = Strings.isNullOrEmpty(jobConfig.getJobErrorHandlerType()) ? JobErrorHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobErrorHandlerType();
    if (newJobErrorHandlerType.equals(jobErrorHandlerType) && props.equals(jobConfig.getProps())) {
        return;
    }
    log.debug("JobErrorHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobErrorHandlerType, newJobErrorHandlerType);
    reload(newJobErrorHandlerType, jobConfig.getProps());
}

第二步:jobFacade.checkJobExecutionEnvironment(),主要就是判断下当前server和注册中心服务器的时间差有没有超过限制

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
    configService.checkMaxTimeDiffSecondsTolerable();
}
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
        int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
        if (0 > maxTimeDiffSeconds) {
            return;
        }
        long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
        if (timeDiff > maxTimeDiffSeconds * 1000L) {
            throw new JobExecutionEnvironmentException(
                    "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
        }
    }

第三步:jobFacade.getShardingContexts(),获取分片信息

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public ShardingContexts getShardingContexts() {
    boolean isFailover = configService.load(true).isFailover();
    if (isFailover) {
        List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
        // 如果开启了失效转移且转移到当前节点的item不为空,那么直接调用
        // executionContextService.getJobShardingContext方法获取当前节点需要处理的item
        if (!failoverShardingItems.isEmpty()) {
            return executionContextService.getJobShardingContext(failoverShardingItems);
        }
    }
    // 如果没有开启失效转移或者转移到当前节点的item为空,在执行之前先reshard(如果需要)
    shardingService.shardingIfNecessary();
    // 从Zookeeper中获取当前节点需要执行的item信息
    List<Integer> shardingItems = shardingService.getLocalShardingItems();
    if (isFailover) {
        // 如果开启了失效转移,那么需要移除已经转移到其他节点的item
        // 比如当前实例本来需要执行0和1,但是0已经转移到了其他的节点,则需要把0删除掉
        shardingItems.removeAll(failoverService.getLocalTakeOffItems());
    }
    shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
    return executionContextService.getJobShardingContext(shardingItems);
}

重点看看reshard的流程

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void shardingIfNecessary() {
    // 获取可用的instance
    List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
    // 如果不需要sharding(判断/namespace/jobName/leader/sharding/necessary节点是否存在)
    // 或者无可用实例,直接返回
    if (!isNeedSharding() || availableJobInstances.isEmpty()) {
        return;
    }
    // 判断当前节点是否是leader,只有leader才能reshard
    // 这里如果有leader,直接判断即可,如果没有,则当前节点需要参与选举,选举完成之后再判断
    if (!leaderService.isLeaderUntilBlock()) {
        // 如果不是leader节点则需要等到sharding完成之后直接返回
        blockUntilShardingCompleted();
        return;
    }
    // sharding之前等待当前正在运行的item结束
    waitingOtherShardingItemCompleted();
    JobConfiguration jobConfig = configService.load(false);
    int shardingTotalCount = jobConfig.getShardingTotalCount();
    log.debug("Job '{}' sharding begin.", jobName);
    // 标记正在sahrding
    jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
    // 重置shardingInfo,设置好最新的分片信息,但是不设置分片的处理实例
    resetShardingInfo(shardingTotalCount);
    // 获取job的sharding策略,elastic默认支持三种策略,平均分片、奇偶分片、轮询分片,默认平均分片
    JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
    // 设置分片实例
    jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
    log.debug("Job '{}' sharding complete.", jobName);
}

第四步:jobFacade.postJobStatusTraceEvent,通过jobTracingEventBus发布了一条信息

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
    TaskContext taskContext = TaskContext.from(taskId);
    jobTracingEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
            taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType().name(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
    if (!Strings.isNullOrEmpty(message)) {
        log.trace(message);
    }
}

第五步:jobFacade.misfireIfRunning,这个方法比较简单,上文代码注释中仪解释其作用,此处省略

第六步:jobFacade.beforeJobExecuted,这个也没有什么说的,就是获取自定义Listener列表,依次执行即可

第七步:execute执行job,包括第八步misifre的执行,每次执行完成之后,如果之前存在未执行的任务且当前不需要重新shard且开启了misfire的执行,则需要重新执行

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
    // item为空,直接返回
    if (shardingContexts.getShardingItemParameters().isEmpty()) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
        return;
    }
    // 如果开启了monitorExecution,为item设置running节点(/namespace/jobName/sharding/{item}/running)
    // 节点的类型根据是否开启failover有关
    // 如果开启了failover,则创建的是持久化节点(当实例宕机时,running节点不能丢,因为有Listener要监听
    // 实例宕机事件,从而设置failOver的item)
    // 如果failover没开,则创建的是临时节点,实例宕机,running信息自动删除
    jobFacade.registerJobBegin(shardingContexts);
    String taskId = shardingContexts.getTaskId();
    // 发布task running信息
    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
    try {
        // 执行任务,具体逻辑看下面的代码
        process(jobConfig, shardingContexts, executionSource);
    } finally {
        // 执行完成之后删除running信息和failover信息(如果有)
        jobFacade.registerJobCompleted(shardingContexts);
        // 根据是否有错误信息,发布不同的信息
        if (itemErrorMessages.isEmpty()) {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
        } else {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
            itemErrorMessages.clear();
        }
    }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 如果是一个item,直接执行即可,如果是多个item,则需要将所有任务执行完成之后再返回
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
    Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
    if (1 == items.size()) {
        int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
        JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
        // 这里的process方法就是调用JobItemExecutor的process方法处理
        // 比如如果是SimpleJob,则调用用户定义的Job实例的Executor方法
        // 如果是其他类型的Job,都有对应的JobItemExecutor处理
        process(jobConfig, shardingContexts, item, jobExecutionEvent);
        return;
    }
    CountDownLatch latch = new CountDownLatch(items.size());
    for (int each : items) {
        JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
        ExecutorService executorService = executorContext.get(ExecutorService.class);
        if (executorService.isShutdown()) {
            return;
        }
        executorService.submit(() -> {
            try {
                process(jobConfig, shardingContexts, each, jobExecutionEvent);
            } finally {
                latch.countDown();
            }
        });
    }
    try {
        latch.await();
    } catch (final InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}

第九步:jobFacade.failoverIfNecessary(),任务执行完成之后看是否需要failover,

查看/namespace/jobName/leader/failover/items下的子节点是否为空,如果不为空,当前节点参与failover的leader选举,成功则将执行失败的分片需要执行的任务在当前节点重新执行,调用failoverService.failoverIfNecessary方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void failoverIfNecessary() {
    if (needFailover()) {
        jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
    }
}

看下选举成功之后的回调函数FailoverLeaderExecutionCallback

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
    
    @Override
    public void execute() {
        if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
            return;
        }
        // 获取需要failover的子节点,/namespace/jobName/leader/failover/items下的子节点
        // 这里为什么只取一个任务?因为肯定会有多个分片失效的场景,只取一个任务防止当前实例压力过大?
        int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
        log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
        // 设置crashedItem 的failover信息(即crashedItem分片的任务转移到当前实例上来)
        jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        jobNodeStorage.fillJobNode(FailoverNode.getExecutingFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
        // TODO Instead of using triggerJob, use executor for unified scheduling
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (null != jobScheduleController) {
            // 触发任务
            jobScheduleController.triggerJob();
        }
    }
}

第十步:jobFacade.afterJobExecuted(shardingContexts),执行任务执行完成之后的自定义Listener逻辑

至此elastic-job的整个执行流程结束

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
通俗理解LDA主题模型
0 前言 印象中,最开始听说“LDA”这个名词,是缘于rickjin在2013年3月写的一个LDA科普系列,叫LDA数学八卦,我当时一直想看来着,记得还打印过一次,但不知是因为这篇文档的前序铺垫太长(现在才意识到这些“铺垫”都是深刻理解LDA 的基础,但如果没有人帮助初学者提纲挈领、把握主次、理清思路,则很容易陷入LDA的细枝末节之中),还是因为其中的数学推导细节太多,导致一直没有完整看完过。 理解LDA,可以分为下述5个步骤: 一个函数:gamma函数 四个分布:二项分布、多项分布、beta分布、Dir
机器学习AI算法工程
2018/03/13
20.7K0
通俗理解LDA主题模型
基于LDA的文本主题聚类Python实现
LDA(Latent Dirichlet Allocation)是一种文档主题生成模型,也称为一个三层贝叶斯概率模型,包含词、主题和文档三层结构。所谓生成模型,就是说,我们认为一篇文章的每个词都是通过“以一定概率选择了某个主题,并从这个主题中以一定概率选择某个词语”这样一个过程得到。文档到主题服从多项式分布,主题到词服从多项式分布。
里克贝斯
2021/05/21
3.9K0
基于LDA的文本主题聚类Python实现
我是这样一步步理解--主题模型(Topic Model)、LDA(案例代码)
关于LDA有两种含义,一种是线性判别分析(Linear Discriminant Analysis),一种是概率主题模型:隐含狄利克雷分布(Latent Dirichlet Allocation,简称LDA),本文讲后者。
mantch
2019/07/30
3.5K0
我是这样一步步理解--主题模型(Topic Model)、LDA(案例代码)
技术干货 | 一文详解LDA主题模型
作者简介 夏琦,达观数据NLP组实习生,就读于东南大学和 Monash University,自然语言处理方向二年级研究生,师从知识图谱专家漆桂林教授。曾获第五届“蓝桥杯”江苏省一等奖、国家二等奖。 本篇博文将详细讲解LDA主题模型,从最底层数学推导的角度来详细讲解,只想了解LDA的读者,可以只看第一小节简介即可。PLSA和LDA非常相似,PLSA也是主题模型方面非常重要的一个模型,本篇也会有的放矢的讲解此模型。如果读者阅读起来比较吃力,可以定义一个菲波那切数列,第 f(n) = f(n-1) + f
达观数据
2018/03/30
3.4K0
技术干货 | 一文详解LDA主题模型
文本相似度算法小结
首先是最简单粗暴的算法。为了对比两个东西的相似度,我们很容易就想到可以看他们之间有多少相似的内容,又有多少不同的内容,再进一步可以想到集合的交并集概念。
Marky Lumin
2018/02/06
5.4K0
文本相似度算法小结
LDA主题模型 | 原理详解与代码实战
很久之前的LDA笔记整理,包括算法原理介绍以及简单demo实践,主要参考自July老师的<通俗理解LDA主题模型>。
NewBeeNLP
2020/08/26
9.1K0
LDA主题模型 | 原理详解与代码实战
技术干货:一文详解LDA主题模型
本文介绍了自然语言处理中的文本分类任务,以及常用的文本分类算法。包括朴素贝叶斯分类器、支持向量机、逻辑回归和神经网络等。还介绍了这些算法的具体实现步骤和优缺点,以及适用场景。
企鹅号小编
2017/12/27
1.5K0
技术干货:一文详解LDA主题模型
LDA详解:自然语言处理
      LDA,其实有两种含义,一种是统计学中的分析方法:线性判别分析(Linear Discriminant Analysis),一种概率主题模型:隐含狄利克雷分布(Latent Dirichlet Allocation,简称LDA),本文阐述后者。       LDA(Latent Dirichlet Allocation)是一种文档主题生成模型,也称为一个三层贝叶斯概率模型,包含词、主题和文档三层结构。一个模型:LDA(文档-主题,主题-词语)       所谓生成模型,就是说,我们认为一篇
学到老
2018/03/16
1.6K0
NLP 点滴 :文本相似度 (下)
本文介绍了自然语言处理中的文本相似度计算方法和模型,包括余弦相似度、Jaccard相似度、编辑距离、基于词向量的方法、概率语言模型等。这些方法在文本分类、聚类、机器翻译等任务中都有广泛应用。
肖力涛
2017/08/24
3.4K1
NLP 点滴 :文本相似度 (下)
Kaggle知识点:文本相似度计算方法
文本相似度是指衡量两个文本的相似程度,相似程度的评价有很多角度:单纯的字面相似度(例如:我和他 v.s. 我和她),语义的相似度(例如:爸爸 v.s. 父亲)和风格的相似度(例如:我喜欢你 v.s. 我好喜欢你耶)等等。
Coggle数据科学
2021/02/23
2.9K0
Kaggle知识点:文本相似度计算方法
15分钟入门NLP神器—Gensim
作为自然语言处理爱好者,大家都应该听说过或使用过大名鼎鼎的Gensim吧,这是一款具备多种功能的神器。 Gensim是一款开源的第三方Python工具包,用于从原始的非结构化的文本中,无监督地学习到文本隐层的主题向量表达。 它支持包括TF-IDF,LSA,LDA,和word2vec在内的多种主题模型算法, 支持流式训练,并提供了诸如相似度计算,信息检索等一些常用任务的API接口
机器学习算法工程师
2018/07/27
1.9K0
python文本相似度计算
两篇中文文本,如何计算相似度?相似度是数学上的概念,自然语言肯定无法完成,所有要把文本转化为向量。两个向量计算相似度就很简单了,欧式距离、余弦相似度等等各种方法,只需要中学水平的数学知识。
周小董
2019/03/25
5.1K0
python文本相似度计算
技术干货 | 如何做好文本关键词提取?从三种算法说起
在自然语言处理领域,处理海量的文本文件最关键的是要把用户最关心的问题提取出来。而无论是对于长文本还是短文本,往往可以通过几个关键词窥探整个文本的主题思想。与此同时,不管是基于文本的推荐还是基于文本的搜索,对于文本关键词的依赖也很大,关键词提取的准确程度直接关系到推荐系统或者搜索系统的最终效果。因此,关键词提取在文本挖掘领域是一个很重要的部分。 关于文本的关键词提取方法分为有监督、半监督和无监督三种: 1 有监督的关键词抽取算法 它是建关键词抽取算法看作是二分类问题,判断文档中的词或者短语是或者不是关键词
达观数据
2018/04/02
5.6K0
技术干货 | 如何做好文本关键词提取?从三种算法说起
Word2Vec,LDA 知识普及
Word2vec,Word2vec,是为一群用来产生词向量的相关模型。这些模型为浅而双层的神经网络,用来训练以重新建构语言学之词文本。网络以词表现,并且需猜测相邻位置的输入词,在word2vec中词袋模型假设下,词的顺序是不重要的。训练完成之后,word2vec模型可用来映射每个词到一个向量,可用来表示词对词之间的关系,该向量为神经网络之隐藏层。
热心的社会主义接班人
2018/10/22
6750
自然语言处理技术(NLP)在推荐系统中的应用
个性化推荐是大数据时代不可或缺的技术,在电商、信息分发、计算广告、互联网金融等领域都起着重要的作用。具体来讲,个性化推荐在流量高效利用、信息高效分发、提升用户体验、长尾物品挖掘等方面均起着核心作用。在推荐系统中经常需要处理各种文本类数据,例如商品描述、新闻资讯、用户留言等等。具体来讲,我们需要使用文本数据完成以下任务: 候选商品召回。候选商品召回是推荐流程的第一步,用来生成待推荐的物品集合。这部分的核心操作是根据各种不同的推荐算法来获取到对应的物品集合。而文本类数据就是很重要的一类召回算法,具有不依赖用户
CSDN技术头条
2018/02/13
3.7K0
自然语言处理技术(NLP)在推荐系统中的应用
Notes | 文本大数据信息提取方法
本文为刊载于《经济学(季刊)》2019 年第 4 期上《文本大数据分析在经济学和金融学中的应用:一个文献综述》[1]的阅读笔记。原论文详细综述了文本大数据信息提取方法、文本分析方法在经济学和金融学中的应用,是了解文本分析方法在经济学研究中应用的好材料。本篇笔记聚焦论文的第二部分,即文本大数据信息提取方法,旨在为文本分析方法的学习和日后研究运用提供基本认识。
PyStaData
2020/07/21
2.8K0
Notes | 文本大数据信息提取方法
【AI in 美团】深度学习在文本领域的应用
AI(人工智能)技术已经广泛应用于美团的众多业务,从美团App到大众点评App,从外卖到打车出行,从旅游到婚庆亲子,美团数百名最优秀的算法工程师正致力于将AI技术应用于搜索、推荐、广告、风控、智能调度、语音识别、机器人、无人配送等多个领域,帮助美团3.2亿消费者和400多万商户改善服务和体验,帮大家吃得更好,生活更好。
美团技术团队
2018/08/01
7360
【AI in 美团】深度学习在文本领域的应用
机器学习概念总结笔记(四)
作者:许敏 系列推荐 机器学习概念总结笔记(一) 机器学习概念总结笔记(二) 机器学习概念总结笔记(三) 21)KMeans 聚类分析是一种静态数据分析方法,常被用于机器学习,模式识别,数据挖掘等领域
serena
2017/10/09
2.2K0
机器学习概念总结笔记(四)
教程 | 一文读懂如何用LSA、PSLA、LDA和lda2vec进行主题建模
在自然语言理解任务中,我们可以通过一系列的层次来提取含义——从单词、句子、段落,再到文档。在文档层面,理解文本最有效的方式之一就是分析其主题。在文档集合中学习、识别和提取这些主题的过程被称为主题建模。
机器之心
2018/07/30
1.7K0
教程 | 一文读懂如何用LSA、PSLA、LDA和lda2vec进行主题建模
神策杯 2018高校算法大师赛(个人、top2、top6)方案总结
神策数据推荐系统是基于神策分析平台的智能推荐系统。它针对客户需求和业务特点,并基于神策分析采集的用户行为数据使用机器学习算法来进行咨询、视频、商品等进行个性化推荐,为客户提供不同场景下的智能应用,如优化产品体验,提升点击率等核心的业务指标。 神策推荐系统是一个完整的学习闭环。采集的基础数据,通过机器学习的算法模型形成应用。效果实时验证,从而指导添加数据源,算法优化反馈形成一个全流程、实时、自动、可快速迭代的推荐闭环。
致Great
2021/01/13
1.4K0
相关推荐
通俗理解LDA主题模型
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档