Apache Kafka 是一个分布式流处理平台:distributed streaming platform
。
kafka主要应用于两大类应用:
Topic:kafka将消息分类,每一类的消息都有一个主题topic。
Producer:生产者,发布消息的对象。
Consumer:消费者,订阅消息的对象。
Broker:代理,已发布的消息保存在一组服务器中,称之为kafka集群,集群中每个服务器都是一个代理(broker)。消费者可以订阅一个或多个主题,并从broker上拉取数据,从而消费这些已发布的消息。
Partition:Topic物理上的分组,一个Topic可以分为多个partition,每个partition都是一个顺序的、不可变的消息队列,且可以持续添加。Partition中的每条消息都会被分配一个有序的序列号,称为偏移量(offset),因此每个分区中偏移量都是唯一的。
Consumer Group:每个Consumer属于一个特定的Consumer Group,这是kafka用来实现一个Topic消息的广播【发送给所有的consumer的发布订阅式消息模型】和单播【发送给任意一个consumer队列消息模型】的手段。一个topic可以有多个consumer group。
关于Consumer group的补充:一般来说,我们可以创建一些consumer group作为逻辑上的订阅者,每个组中包含数目不等的consumer,一个组内的多个消费者可以用来扩展性能和容错。 关于partition分区的补充: 1、【负载均衡】处理更多的消息,不受单台服务器的限制。 2、【顺序保证】kafka不能保证并行的时候消息的有序性,但是可以保证一个partition分区之中,消息只能由消费者组中的唯一一个消费者处理,以保证一个分区的消息先后顺序。 如下图:2个kafka集群托管4个分区(p0-p3),2个消费者组,组A有2个消费者实例,组B有4个消费者实例。
关于偏移量的补充:kafka集群将会保持所有的消息,直到他们过期,无论他们是否被消费。当消费者消费消息时,偏移量offset将会线性增加,但是消费者其实可以控制实际的偏移量,可以重置偏移量为更早的位置,意为着重新读取消息,且不会影响其他消费者对此log的处理。
Kafka的安装配置启动需要依赖于Zookeeper,Zookeeper的安装配置可以参考我的前一篇文章。
当然,其实你下载kafka之后,就自动已经集成了Zookeeper,你可以通过修改配置,启动内置的zookeeper。
关于使用内置的Zookeeper还是自己安装的Zookeeper的区别,可以看看这篇文章:https://segmentfault.com/q/1010000021110446
下载地址:http://kafka.apache.org/downloads
下载二进制版本【Binary downloads】,下载完成之后,解压到合适的目录下。
笔者目录为:D:\dev\kafka_2.11-2.3.1
。
进入config
目录下,找到server.properties
文件并修改如下:
log.dirs=D:\\dev\\kafka_2.11-2.3.1\\config\\kafka-logs
zookeeper.connect=localhost:2182 # 默认端口是2181,这里修改为2182
找到zookeeper.properties
文件,修改如下:
dataDir=D:\\softs\\zookeeper-3.4.13\\datas
dataLogDir=D:\\softs\\zookeeper-3.4.13\\logs
clientPort=2182
在bin目录下存放着所有可以使用的命令行指令,Linux和Windows的存放目录需要注意:
D:\dev\kafka_2.11-2.3.1> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
D:\dev\kafka_2.11-2.3.1> .\bin\windows\kafka-server-start.bat .\config\server.properties
创建1个分区1个副本,topic为test-topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic test-topic
Created topic test-topic.
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2182
test-topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test-topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic --from-beginning
如果kafka启动时加载的配置文件中 server.properties 中没有配置delete.topic.enable=true,则此删除非真正删除,而是仅仅将topic标记为marked for deletion
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --delete --zookeeper localhost:2182 --topic test-topic
Topic test-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
D:\dev\kafka_2.11-2.3.1\bin\windows>zookeeper-shell.bat localhost:2182
Connecting to localhost:2182
Welcome to ZooKeeper!
JLine support is disabled
ls /brokers
[ids, topics, seqid]
ls /brokers/topics
[test, test-topic, __consumer_offsets]
rmr /brokers/topics/test-topic # 物理删除 test-topic
ls /brokers/topics
[test, __consumer_offsets]
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
public class ProducerExample {
public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
props.put("zk.connect", "localhost:2182");
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "test";
for (int i = 1; i <= 100; i++) {
// send方法是异步的 , 返回Future对象,如果调用get(),将阻塞,直到相关请求完成并返回消息的metadata或抛出异常
producer.send(new ProducerRecord<>(topic, "key" + i, "msg" + i * 100));
}
// 生产者的传冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换程请求发送到集群
// 如果使用后不关闭生产者,将会丢失这些消息。
producer.close();
}
}
all
将会阻塞消息,这种设置性能最低,但是是最可靠的。batch.size
配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。linger.ms=0
。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。max.block.ms
设定,之后它将抛出一个TimeoutException。public class ConsumerSample {
public static void main(String[] args) {
String topic = "test";// topic name
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "testGroup1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer(props);
// 订阅多个主题
consumer.subscribe(Arrays.asList(topic));
while (true) {
// 订阅一组topic之后,调用poll时,消费者将自动加入到组中。
// 只要持续调用poll,消费者将一直保持可用,并继续从分配的分区中接收消息。
// 消费者向服务器定时发送心跳,如果在session.timeout.ms配置的时间内无法发送心跳,被视为死亡,分区将重新分配
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("*****************partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}
Kafka通过进程池瓜分消息并处理消息,这些进程可以在同一台机器运行,也可以分布到多台机器上,以增加可扩展型和容错性,相同的group.id
的消费者将视为同一个消费者组。
组中的每个消费者都通过subscribe API
动态的订阅一个topic列表。kafka将已订阅topic的消息发送到每个消费者组中。并通过平衡分区在消费者分组中所有成员之间来达到平均。因此每个分区恰好地分配1个消费者(一个消费者组中)。所有如果一个topic有4个分区,并且一个消费者分组有只有2个消费者。那么每个消费者将消费2个分区。
消费者组的成员是动态维护的:如果一个消费者故障。分配给它的分区将重新分配给同一个分组中其他的消费者。同样的,如果一个新的消费者加入到分组,将从现有消费者中移一个给它。这被称为重新平衡分组
。
创建topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic test
启动zookeeper
D:\dev\kafka_2.11-2.3.1>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
启动kafka
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-server-start.bat .\config\server.properties
先启动消费者ConsumerExample,再启动生产者ProducerExample,观察控制台。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有