首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark SQL读写 ES7.x 及问题总结

Spark SQL读写 ES7.x 及问题总结

作者头像
大数据真好玩
发布于 2021-01-26 08:09:10
发布于 2021-01-26 08:09:10
3.7K00
代码可运行
举报
文章被收录于专栏:暴走大数据暴走大数据
运行总次数:0
代码可运行

本文主要介绍 spark SQL 读写 ES,参数的配置以及问题总结。

ES官方提供了对spark的支持,可以直接通过spark读写es,具体可以参考ES Spark Support文档(https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark)

以下是pom依赖,具体版本可以根据自己的es和spark版本进行选择:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
      <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-20_2.11</artifactId>
        <version>7.3.1</version>
      </dependency>
Spark SQL to ES

主要提供了两种读写方式:

  • 一种是通过DataFrameReader/Writer传入ES Source实现
  • 另一种是直接读写DataFrame实现

在实现前,还要列一些相关的配置:

列了一些常用的配置,更多配置查看ES Spark Configuration文档(https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html)

DataFrameReader 读 ES
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.elasticsearch.spark.sql._
val options = Map(
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.read.field.as.array.include" -> "arr1, arr2"
)
val df = spark
    .read
    .format("es")
    .options(options)
    .load("index1/info")
df.show()
DataFrameWriter 写 ES
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.elasticsearch.spark.sql._
val options = Map(
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.mapping.id" -> "id"
)

val sourceDF = spark.table("hive_table")
sourceDF
  .write
  .format("org.elasticsearch.spark.sql")
  .options(options)
  .mode(SaveMode.Append)
  .save("hive_table/docs")
读DataFrame

jar包中提供了 esDF() 方法可以直接读es数据为DataFrame,以下是源码截图。

参数说明:

  • resource:资源路径,例如index和tpye: hive_table/docs
  • cfg:一些es的配置,和上面代码中的options差不多
  • query:指定DSL查询语句来过滤要读的数据,例如"?q=user_group_id:3"表示读user_group_id为3的数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val options = Map(
  "pushdown" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200"
)

val df = spark.esDF("hive_table/docs", "?q=user_group_id:3", options)
df.show()
写 DataFrame

jar包中提供了 saveToEs() 方法可以将DataFrame写入ES,以下是源码截图。

resource:资源路径,例如index和tpye: hive_table/docs cfg:一些es的配置,和上面代码中的options差不多

示例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val brandDF = sparkSession.sql(""" SELECT
              |   categoryname AS id
              | , concat_ws(',', collect_set(targetword)) AS targetWords
              | , get_utc_time() as `@timestamp`
              | FROM  t1
              | GROUP BY
              | categoryname
              """.stripMargin)

 // 手动指定ES _id值
 val map = Map("es.mapping.id" -> "id")
 EsSparkSQL.saveToEs(brandDF, "mkt_noresult_brand/mkt_noresult_brand", map)
Spark RDD to ES

SparkRDD方式写 ES,以下是源码截图。

示例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
    val rdd = sparkSession.sparkContext.makeRDD(Seq(numbers, airports))
    EsSpark.saveToEs(rdd, "mkt_noresult_brand/mkt_noresult_brand", map)
问题总结

手动指定ES _ id值

EsSparkSQL.saveToEs 报错org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [DataFrameFieldExtractor for field [[...]]] cannot extract value from entity

原因:"es.mapping.id"参数指定文档的id,这个参数必须配置成DataFrame中已有的字段,不能随意指定。配置成 val map = Map("es.mapping.id" -> "id"), 数据导入成功。

版权声明:

本文为《大数据真好玩》整理,原作者独家授权。未经原作者允许转载追究侵权责任。

编辑|冷眼丶

微信公众号|大数据真好玩

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
在 k8s 中配置域名解析
在应用开发中,我们不应把远程服务的 ip 硬编码到应用中。有些同学习惯使用域名来标定远程服务,通过修改解析,来区分开发测试和生产环境,这是一个挺好的习惯。
谢正伟
2020/06/05
13.6K0
Kubernetes之自定义hosts
在使用k8s 中,难免有一些实例需要添加hosts绑定,最土的办法就是把这个hosts写在dockfile里。但是这样稍显麻烦,其实k8s本身就可以实现这样的功能。
SY小站
2020/06/15
3.8K0
k8s服务发现之配置Pod的hosts
某些情况下,DNS 或者其他的域名解析方法可能不太适用,您需要配置 /etc/hosts 文件,在Linux下是比较容易做到的,在 Kubernetes 中,可以通过 Pod 定义中的 hostAliases 字段向 Pod 的 /etc/hosts 添加条目。
linus_lin
2024/09/06
2850
k8s服务发现之配置Pod的hosts
k8s中服务添加hosts及一键转换脚本
项目管理k8s集群用的是rancher,可是rancher没有提供给deployment批量添加hosts的图形化界面,所以还是只能按照k8s官方的方法修改yaml文件。
Ewdager
2020/07/30
1.3K0
5 种解析容器内特定域名的小技巧
本篇文章中,我们将探讨如何在容器内指定特定域名解析结果的几种方式。为了方便演示,首先我们创建一个演示用的 Deployment 配置文件。
iMike
2024/05/11
2870
5 种解析容器内特定域名的小技巧
​DNS在Kubernetes中的高阶玩法(一)
自从 Kubernetes1.11 之后,CoreDNS 作为集群内默认的域名解析服务,你是否对它还仅仅还停留在对 Kubernetes 的 Service 解析呢?事实上光 DNS 在 K8S 内就有很多有意思的操作,今天我们不妨来看看 CoreDNS 的各种高阶玩法。
我是阳明
2020/11/20
2.5K0
​DNS在Kubernetes中的高阶玩法(一)
Kubernetes 服务发现之 coreDNS
服务发现是 K8s 的一项很重要的功能。K8s 的服务发现有两种方式,一种是将 svc 的 ClusterIP 以环境变量的方式注入到 pod 中;一种就是 DNS,从 1.13 版本开始,coreDNS 就取代了 kube dns 成为了内置的 DNS 服务器。这篇文章就来简单分析一下 coreDNS。
CS实验室
2021/03/22
4.3K0
Kubernetes 服务发现之 coreDNS
第9课 Kubernetes之服务发现和域名解析过程分析
作为服务发现机制的基本功能,在集群内需要能够通过服务名对服务进行访问,这就需要一个集群范围内的DNS服务来完成从服务名到ClusterIP的解析。
辉哥
2021/11/24
2K0
第9课 Kubernetes之服务发现和域名解析过程分析
k8s 服务注册与发现(三)CoreDNS
作为一个加入 CNCF(Cloud Native Computing Foundation) 的服务 CoreDNS 的实现可以说的非常的简单。
看、未来
2022/09/27
2.3K0
k8s 服务注册与发现(三)CoreDNS
TKE之DNS解析
TKE集群中使用的DNS解析是采用coreDNS,Kubernetes 1.11 和更高版本中,CoreDNS 位于 GA 并且默认情况下与 kubeadm 一起安装
聂伟星
2020/06/27
16.2K0
CoreDNS介绍
开始之前先吐槽一下busybox中的nslookup命令。这个命令应该是实现的不是很完全,导致我在测试DNS的成功,得到了错误的信息。先来看一下
大江小浪
2018/09/19
4.4K0
CoreDNS 自定义域名失效问题
前几天我们在解决 CoreDNS 的5秒超时问题的时候,使用了 NodeLocal DNSCache 来解决这个问题,集群 DNS 的解析性能也明显大幅提升了。但是今天确遇到一个很大的坑,我们在做 DevOps 实验的时候,相关的工具都使用的是自定义的域名,这个时候要互相访问的话就需要添加自定义的域名解析,我们可以通过给 Pod 添加 hostAlias 来解决,但是在使用 Jenkins 的 Kubernetes 插件的时候却不支持这个参数,需要使用 YAML 来自定义,比较麻烦,所以想着通过 CoreDNS 来添加 A 记录解决这个问题。
我是阳明
2020/06/15
3.4K0
CoreDNS 原理浅析
域名系统(英语:Domain Name System,缩写:DNS)是互联网的一项服务。它作为将域名和IP地址相互映射的一个分布式数据库,能够使人更方便地访问互联网。DNS使用TCP和UDP端口53,对于每一级域名长度的限制是63个字符,域名总长度则不能超过253个字符。(维基百科)
我是阳明
2023/08/21
7960
CoreDNS 原理浅析
jenkins X实践系列(4) —— jenkins X 构建提速
jx是云原生CICD,devops的一个最佳实践之一,目前在快速的发展成熟中。最近调研了JX,这里为第4篇,介绍如何加入jx构建和部署。
JadePeng
2019/05/25
1.3K0
你所不了解的 coreDNS
本文的将不深入探讨 coreDNS,而是解释 DNS 如何在 Kubernetes 中工作,coreDNS 包含什么以及 Corefile 如何使用插件。
Luga Lee
2021/12/10
1.6K0
你所不了解的 coreDNS
K8S 生态周报| 2019-07-08~2019-07-14
「K8S 生态周报」内容主要包含我所接触到的 K8S 生态相关的每周值得推荐的一些信息。欢迎订阅知乎专栏「k8s生态」。本周为什么发布时间比往常迟呢?因为上周我在忙结婚呀。
Jintao Zhang
2019/07/17
4830
云原生 | 使用 CoreDNS 构建高性能、插件化的DNS服务器
在企业高可用DNS架构部署方案中我们使用的是传统老牌DNS软件Bind, 但是现在不少企业内部流行容器化部署,所以也可以将 Bind 替换为 CoreDNS ,由于 CoreDNS 是 Kubernetes 的一个重要组件,稳定性不必担心,于此同时还可将K8S集群SVC解析加入到企业内部的私有的CoreDNS中。
全栈工程师修炼指南
2023/10/31
4K0
云原生 | 使用 CoreDNS 构建高性能、插件化的DNS服务器
附011.Kubernetes-DNS及搭建
作为服务发现机制的基本功能,在集群内需要能够通过服务名对服务进行访问,因此需要一个集群范围内的DNS服务来完成从服务名到ClusterIP的解析。
木二
2019/12/10
9150
附011.Kubernetes-DNS及搭建
二进制部署k8s教程14 - 部署coredns
表示 coredns 能够访问外网,并且能够进行服务发现,能够访问其他服务。即正常工作。
janrs.com
2023/03/08
7372
Kubernetes配置镜像中Hosts文件的域名解析
最近,给公司搭建的持续集成过程中,由于每次执行任务时都是新创建一个 Kubernetes Pod 执行的,在执行过程中经常出现 DNS 解析错误问题,如下:
院长技术
2021/01/07
3.7K0
相关推荐
在 k8s 中配置域名解析
更多 >
交个朋友
加入HAI高性能应用服务器交流群
探索HAI应用新境界 共享实践心得
加入[游戏服务器] 腾讯云官方交流站
游戏服运维小技巧 常见问题齐排查
加入云原生工作实战群
云原生落地实践 技术难题攻坚探讨
换一批
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档