前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Kafka-生产消费基础篇

Apache Kafka-生产消费基础篇

作者头像
小小工匠
发布2021-08-17 16:37:40
2830
发布2021-08-17 16:37:40
举报
文章被收录于专栏:小工匠聊架构

在这里插入图片描述
在这里插入图片描述

POM 依赖

版本请同使用的kafka服务端的版本保持一致

代码语言:javascript
复制
		<dependency>
			<groupId>org.apache.kafkagroupId>
			<artifactId>kafka-clientsartifactId>
			<version>2.4.1version>
		dependency>

生产者

请小伙伴注意一下注释,这里就不做多余的解释啦

代码语言:javascript
复制
package com.artisan.kafka.first;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 19:45
 * @mark: show me the code , change the world
 */
public class ProduceClient {

    private static final String TOPIC = "artisanTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 属性设置 
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.126.140:9092");
        properties.put(ProducerConfig.ACKS_CONFIG,"1");
        properties.put(ProducerConfig.RETRIES_CONFIG,3);

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 根据属性实例化KafkaProducer
        Producer<String,String>  producer = new KafkaProducer<String, String>(properties);

        // 创建消息  三个参数,分别是 Topic ,消息的 key ,消息的 message 。
        String message = "mockValue";
        ProducerRecord<String ,String> producerRecord = new ProducerRecord<>(TOPIC, "mockKey", message);

        // 发送消息  (同步)
        Future<RecordMetadata> result = producer.send(producerRecord);

        // 获取同步发送的结果
        RecordMetadata recordMetadata = result.get();

        System.out.println(String.format("Message[ %s ] sent to Topic: %s  || Partition: %s || Offset: %s",message, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()));


    }

}

消费者

请小伙伴注意一下注释,这里就不做多余的解释啦

代码语言:javascript
复制
package com.artisan.kafka.first;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 20:09
 * @mark: show me the code , change the world
 */
public class ConsumerClient {

    private static final String TOPIC = "artisanTopic";


    public static void main(String[] args) {

        // 属性设置
        Properties properties = new Properties();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG , "192.168.126.140:9092");  // Broker 的地址
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"artisan-consumer-group");// 消费者分组
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");  // 设置消费者分组最初的消费进度为 earliest
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); // 是否自动提交消费进度
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); // 自动提交消费进度频率
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息的 key 的反序列化方式
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 消息的 value 的反序列化方式

        // 根据设置实例化KafkaConsumer
        Consumer<String,String>  consumer = new KafkaConsumer<>(properties);

        // 订阅消息
        consumer.subscribe(Collections.singleton(TOPIC));


        // 循环拉取消息
        while (true){
            // 拉取消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
            // 遍历处理消息
            records.forEach(record -> System.out.println(String.format("接收到消息:Key %s  || 内容 %s" , record.key(),record.value())));
        }
    }
}

属性的话,需要结合kafka的特性来讲解,后面的单独介绍


测试

运行Produce

在这里插入图片描述
在这里插入图片描述

运行消费端

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/02/17 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • POM 依赖
  • 生产者
  • 消费者
  • 测试
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档