首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >与Kafka集成的Flink

与Kafka集成的Flink
EN

Stack Overflow用户
提问于 2021-02-11 18:17:29
回答 1查看 29关注 0票数 0

我正在尝试将Flink与Kafka集成,并读取Kafka producer的数据。我尝试按照flink-docs-release-1.11文档中的代码运行以下代码

代码语言:javascript
运行
复制
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

public class Flink_Kafka_Integration {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
    
    
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(myConsumer);
    
    }
}

我得到以下错误,

代码语言:javascript
运行
复制
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer<String>)
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files

我在项目构建路径中包含了一个名为flink-streaming-java_2.12-1.11.3.jar的jar文件。

任何建议都会很有帮助。

以下是我正在使用的软件的版本:

Flink - 1.11.3

Scala - 2.12

flinkKafkaConsumer-2.12

EN

回答 1

Stack Overflow用户

发布于 2021-02-11 23:02:16

您需要在项目构建中包含flink-connector-kafka_2.12-1.11.3.jar

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66152726

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档