Apache Flink 可以轻松地从 Kafka 读取 JSON 数据。为了实现这一点,您需要执行以下步骤:
对于 Maven 项目,请在您的 pom.xml
文件中添加以下依赖项:
<dependencies>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<!-- Flink JSON Deserialization Schema -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.14.0</version>
</dependency>
</dependencies>
对于 Gradle 项目,请在您的 build.gradle
文件中添加以下依赖项:
dependencies {
implementation 'org.apache.flink:flink-connector-kafka_2.12:1.14.0'
implementation 'org.apache.flink:flink-json:1.14.0'
}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-group");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("your_topic", new SimpleStringSchema(), properties);
kafkaConsumer.setStartFromLatest(); // 跳过已经消费过的记录
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(kafkaConsumer).print(); // 打印接收到的数据
// 使用 JSONDeserializer 序列化和反序列化 JSON
import org.apache.flink.formats.json.JsonDeserializer;
// 定义一个 POJO 类来表示 JSON 数据
public class MyJsonData {
public String field1;
public int field2;
}
// 创建一个 POJO 类型的 DataStream
DataStream<MyJsonData> jsonDataStream = env.addSource(kafkaConsumer)
.map(value -> {
JsonDeserializer<MyJsonData> deserializer = new JsonDeserializer<>(MyJsonData.class);
return deserializer.deserialize(value);
});
// 使用 POJO 类型的 DataStream 进行后续处理
jsonDataStream.print();
这样,您就可以从 Kafka 中读取 JSON 数据并将其反序列化为 Java 对象了。
领取专属 10元无门槛券
手把手带您无忧上云