我正在尝试将Flink与Kafka集成,并读取Kafka producer的数据。我尝试按照flink-docs-release-1.11文档中的代码运行以下代码
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
public class Flink_Kafka_Integration {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(myConsumer);
}
}我得到以下错误,
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer<String>)
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files我在项目构建路径中包含了一个名为flink-streaming-java_2.12-1.11.3.jar的jar文件。
任何建议都会很有帮助。
以下是我正在使用的软件的版本:
Flink - 1.11.3
Scala - 2.12
flinkKafkaConsumer-2.12
发布于 2021-02-11 23:02:16
您需要在项目构建中包含flink-connector-kafka_2.12-1.11.3.jar。
https://stackoverflow.com/questions/66152726
复制相似问题