当我尝试使用卡夫卡集成进行结构化流时,我遇到了一个NoSuchMethodError。当有第一个可用记录时,堆栈跟踪如下所示:由:java.lang.NoSuchMethodError引起:
org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.createConsumer(CachedKafkaConsumer.scala:56)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.<init>(CachedKafkaConsumer.scala:45)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer$.getOrCreate(CachedKafkaConsumer.scala:349)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.<init>(KafkaSourceRDD.scala:137)
at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
...
我的sbt具有以下依赖关系:
libraryDependencies ++= Seq(
scalaTest % Test,
"org.apache.spark" %% "spark-core" % "2.1.0" % "provided",
"org.apache.spark" %% "spark-sql" % "2.1.0" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.1.0" % "provided",
"org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided"
)
Scala版本为2.11.8。
我能够运行火花应用程序的数据。只是卡夫卡的整合给了我这个错误。结构化流代码只是文档这里中的示例
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "data01:9092,data02:9092,data03:9092")
.option("subscribe", "cluster-topic-01")
.load()
val ds2 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
val query = ds2.writeStream
.format("console")
.start()
query.awaitTermination()
有什么建议让我看看这个问题吗?提亚
发布于 2017-03-26 15:54:46
我的猜测是,在运行时类路径中的某个位置有一个游离的Kafka客户机正在加载,而不是一个spark-sql-kafka-0-10
依赖的。
发布于 2018-06-27 05:31:18
在现代CDH分布(例如Cloudera 5.12)中,有一种更简单的方法。
spark_kafka_version
配置设置为'0.9‘或'None’。如果上面的方法不起作用(Cloudera中没有'0.10‘值或没有这样的配置),您可以手动编辑配置:
/etc/spark2/conf.cloudera.spark2_on_yarn/spark-env.sh
。SPARK_KAFKA_VERSION
和SPARK_DIST_CLASSPATH
):SPARK_KAFKA_VERSION=${SPARK_KAFKA_VERSION:-'0.10'}
SPARK_DIST_CLASSPATH="$SPARK_HOME/kafka-0.10/*"
https://stackoverflow.com/questions/43035542
复制