首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark SQL读写 ES7.x 及问题总结

Spark SQL读写 ES7.x 及问题总结

作者头像
大数据真好玩
发布于 2021-01-26 08:09:10
发布于 2021-01-26 08:09:10
3.7K00
代码可运行
举报
文章被收录于专栏:暴走大数据暴走大数据
运行总次数:0
代码可运行

本文主要介绍 spark SQL 读写 ES,参数的配置以及问题总结。

ES官方提供了对spark的支持,可以直接通过spark读写es,具体可以参考ES Spark Support文档(https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark)

以下是pom依赖,具体版本可以根据自己的es和spark版本进行选择:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
      <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-20_2.11</artifactId>
        <version>7.3.1</version>
      </dependency>
Spark SQL to ES

主要提供了两种读写方式:

  • 一种是通过DataFrameReader/Writer传入ES Source实现
  • 另一种是直接读写DataFrame实现

在实现前,还要列一些相关的配置:

列了一些常用的配置,更多配置查看ES Spark Configuration文档(https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html)

DataFrameReader 读 ES
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.elasticsearch.spark.sql._
val options = Map(
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.read.field.as.array.include" -> "arr1, arr2"
)
val df = spark
    .read
    .format("es")
    .options(options)
    .load("index1/info")
df.show()
DataFrameWriter 写 ES
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.elasticsearch.spark.sql._
val options = Map(
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.mapping.id" -> "id"
)

val sourceDF = spark.table("hive_table")
sourceDF
  .write
  .format("org.elasticsearch.spark.sql")
  .options(options)
  .mode(SaveMode.Append)
  .save("hive_table/docs")
读DataFrame

jar包中提供了 esDF() 方法可以直接读es数据为DataFrame,以下是源码截图。

参数说明:

  • resource:资源路径,例如index和tpye: hive_table/docs
  • cfg:一些es的配置,和上面代码中的options差不多
  • query:指定DSL查询语句来过滤要读的数据,例如"?q=user_group_id:3"表示读user_group_id为3的数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val options = Map(
  "pushdown" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200"
)

val df = spark.esDF("hive_table/docs", "?q=user_group_id:3", options)
df.show()
写 DataFrame

jar包中提供了 saveToEs() 方法可以将DataFrame写入ES,以下是源码截图。

resource:资源路径,例如index和tpye: hive_table/docs cfg:一些es的配置,和上面代码中的options差不多

示例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val brandDF = sparkSession.sql(""" SELECT
              |   categoryname AS id
              | , concat_ws(',', collect_set(targetword)) AS targetWords
              | , get_utc_time() as `@timestamp`
              | FROM  t1
              | GROUP BY
              | categoryname
              """.stripMargin)

 // 手动指定ES _id值
 val map = Map("es.mapping.id" -> "id")
 EsSparkSQL.saveToEs(brandDF, "mkt_noresult_brand/mkt_noresult_brand", map)
Spark RDD to ES

SparkRDD方式写 ES,以下是源码截图。

示例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
    val rdd = sparkSession.sparkContext.makeRDD(Seq(numbers, airports))
    EsSpark.saveToEs(rdd, "mkt_noresult_brand/mkt_noresult_brand", map)
问题总结

手动指定ES _ id值

EsSparkSQL.saveToEs 报错org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [DataFrameFieldExtractor for field [[...]]] cannot extract value from entity

原因:"es.mapping.id"参数指定文档的id,这个参数必须配置成DataFrame中已有的字段,不能随意指定。配置成 val map = Map("es.mapping.id" -> "id"), 数据导入成功。

版权声明:

本文为《大数据真好玩》整理,原作者独家授权。未经原作者允许转载追究侵权责任。

编辑|冷眼丶

微信公众号|大数据真好玩

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Hadoop/Spark读写ES之性能调优
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
ethanzhang
2020/04/09
5.7K0
Hadoop/Spark读写ES之性能调优
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
ethanzhang
2018/12/10
5.5K1
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
spark对elasticsearch增删查改
新建一个 dataframe ,插入到索引 _index/_type ,直接调用 saveToEs ,让 _id 为自己设定的 id:
机器学习和大数据挖掘
2019/07/02
2.6K0
spark Could not write all entries
使用 spark 将 dataFrame 储存到 elasticsearch 出现如下报错:
机器学习和大数据挖掘
2019/07/02
1.2K0
用户画像 | 标签数据存储之Elasticsearch真实应用
上一篇文章已经为大家介绍了 HBase 在用户画像的标签数据存储中的具体应用场景,本篇我们来谈谈 Elasticsearch 的使用!
大数据梦想家
2021/10/22
4.3K0
Spark读写ES最佳实践
更换代码中公网ip为内网ip,选择maven assembly plugin进行打包,上传带依赖的jar包到EMR上,运行"ReadES"
沈小翊
2023/11/14
9050
ES-Hadoop 实践
在大数据背景下,适用于不同场景下的框架、系统层出不穷,在批量数据计算上hadoop鲜有敌手,而在实时搜索领域es则是独孤求败,那如何能让数据同时结合两者优势呢?本文介绍的es-hadoop将做到这点。关于es-hadoop的使用在ethanbzhang之前的两篇文章《腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇》和《腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇》中已经进行了一些介绍,本文一方面是对其内容的一些补充,另一方面也是对个人实践过程的一个总结。
franyang
2019/12/03
3.5K0
ES-Hadoop 实践
Spark写入ES报错403|Forbidden问题处理
本文描述问题及解决方法同样适用于 腾讯云 Elasticsearch Service(ES)。
岳涛
2025/05/30
1230
Spark写入ES报错403|Forbidden问题处理
【ES三周年】ElasticSearch 简要技术总结与Spark结合使用实践
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。它是一个实时的分布式搜索和分析引擎。它可以帮助你用几秒钟内搜索百万级别的数据。
大鹅
2023/02/14
2.1K0
腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
ethanzhang
2018/12/29
8.8K0
腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇
Spark代码调优(一)
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
用户3003813
2018/09/06
1.9K0
Spark代码调优(一)
hive读写ES集群及Role权限控制
1.下载elasticsearch-hadoop-hive-xxx.jar包,版本要与ES集群对应
沈小翊
2023/11/15
3840
elasticsearch-spark的用法
Hadoop允许Elasticsearch在Spark中以两种方式使用:通过自2.1以来的原生RDD支持,或者通过自2.0以来的Map/Reduce桥接器。从5.0版本开始,elasticsearch-hadoop就支持Spark 2.0。目前spark支持的数据源有:
Zephery
2022/05/23
9020
elasticsearch-spark的用法
Spark系列 - (3) Spark SQL
Hive:Hadoop刚开始出来的时候,使用的是hadoop自带的分布式计算系统 MapReduce,但是MapReduce的使用难度较大,所以就开发了Hive。Hive的出现解决了MapReduce的使用难度较大的问题,Hive的运行原理是将HQL语句经过语法解析、逻辑计划、物理计划转化成MapReduce程序执行。
码老思
2023/10/19
9720
Spark系列 - (3) Spark SQL
ES-Spark连接ES后,ES Client节点流量打满分析
问题描述 前段时间用es-spark读取es数遇到了client节点流量打满的现象。es-spark配置的es.nodes是es的域名。由于其中一个client是master节点,然后普通查询变得特别慢,运行20多分钟后,主节点崩溃。 解决方法 临时解决方案:降低es-spark的并发,并重启主节点。 最终解决方案:设置es.nodes.wan.only为false,即不用域名访问。将es.nodes配置为client节点的IP。 原因分析 域名访问时必须配置参数es.nodes.wan.only为true
YG
2018/05/23
3.3K3
【数据科学】数据科学中的 Spark 入门
本文由 伯乐在线 - zhique 翻译,xxmen 校稿。未经许可,禁止转载! 英文出处:Ram Sriharsha。欢迎加入翻译组。 Apache Spark 为数据科学提供了许多有价值的工具。随着 Apache Spark 1.3.1 技术预览版的发布,强大的 Data Frame API 也可以在 HDP 上使用数据科学家使用数据挖掘和可视化来帮助构造问题架构并对学习进行微调。Apache Zeppelin 正好能够帮他们做到这些。 Zeppelin 是一个基于 Web 的 notebook 服务器
陆勤_数据人网
2018/02/26
1.6K0
【数据科学】数据科学中的 Spark 入门
Spark SQL | 目前Spark社区最活跃的组件之一
Spark SQL是一个用来处理结构化数据的Spark组件,前身是shark,但是shark过多的依赖于hive如采用hive的语法解析器、查询优化器等,制约了Spark各个组件之间的相互集成,因此Spark SQL应运而生。
大数据学习与分享
2020/08/10
2.6K0
Spark SQL | 目前Spark社区最活跃的组件之一
spark加载数据到ES
在日常开发中一定会遇到,spark将计算好的数据load到es中,供后端同学查询使用。下面介绍一下spark写es的方式。 使用scala进行演示,对应的java自己google了。
若与
2021/03/02
1.1K0
spark加载数据到ES
Elasticsearch与Hive的数据互导
首先先下载一个叫"elasticsearch-hadoop-hive"的JAR包,放到相应路径下:https://jar-download.com/artifacts/org.elasticsearch/elasticsearch-hadoop-hive
dandelion1990
2019/06/27
6.6K2
Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN
本文介绍了基于Spark的SQL编程的常用概念和技术。首先介绍了Spark的基本概念和架构,然后详细讲解了Spark的数据类型和SQL函数,最后列举了一些Spark在实际应用中的例子。
片刻
2018/01/05
27.1K0
相关推荐
Hadoop/Spark读写ES之性能调优
更多 >
交个朋友
加入[数据] 腾讯云技术交流站
获取数据实战干货 共享技术经验心得
加入数据技术工作实战群
获取实战干货 交流技术经验
加入[数据库] 腾讯云官方技术交流站
数据库问题秒解答 分享实践经验
换一批
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档