Kafka 提供了数据高可靠的特性,
但是如果使用不当,
你可能无法享受到这一特性,
今天我们就来看看如何正确的使用Kafka 保证数据的不会丢失吧!
Kafka为生产者生产消息提供了一个 send(msg)
方法,
另有一个重载的方法send(msg, callback)
,
Future<RecordMetadata> future = producer.send(record)
上面的示例代码也可以看到,send
返回的是一个 Future
,
也就是说其实你是可以 Future.get()
获取返回值的,
但这种同步的方式,基本上可以说是不会用到。callback
回调中得到该条消息的发送结果,
并且callback
是异步回调,
所以在兼具性能的情况下,
也对消息具有比较好的掌控。 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: "
+ metadata.offset());
}
}
});
send(msg, callback)
来发送消息,
绝大多数情况下,我也建议你这么做。当我们通过 send(msg, callback)
是不是就意味着消息一定不丢失了呢?
答案明显是:不是的
我们接着上面,
send(msg, callback)
里面 callback
返回的成功,
到底是不是真的确保消息万无一失了?
其实这个返回的成功也是可以在生产者配置的:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//*******重点*****************
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
这段代码是生产者发送消息的一个例子,
其中没使用callback
主要是这里callback
不是重点,
我们的重点是props.put("acks", "all");
这个acks
配置属性就是我们callback
成功的具体含义:
send(msg)
使用。min.insync.replicas
决定
除非一些不可抗力因素,
这种方式基本可以确保数据的完全不丢失。其实到这里,生产者端基本已经做好了数据不丢失的大部分准备,
但是有些东西是要配合 Broker 端一起,
才能达到预期的不丢失数据的,
比如我们上面说到的
min.insync.replicas
配置
我们上面知道了,
当 生产者 acks = -1
的时候,
写入的副本数就必须 >= min.insync.replicas
数,
当达不到这个要求的时候,
生产者端会收到一个either NotEnoughReplicas or NotEnoughReplicasAfterAppend
的异常。
所以我们这个参数必须不能大于 replication.factor
副本数。
否则生产者将无法写入任何数据,
一般建议 replication.factor
数要大于 min.insync.replicas
,
比如3个机器的集群,设置 replication.factor
= 3,
那么设置 min.insync.replicas
= 2 就可以了,
这样既保证了数据写入的时候有一个副本的冗余,
也能保证在一些情况下,
某台Broker宕机导致数据无法达到3个副本时,
依然可以正常写入数据。unclean.leader.election.enable
这里 Broker 端还有一个重要的配置就是 unclean.leader.election.enable = false
这个配置代表着一些数据落后比较多的 follower,
是否能在leader宕机后被选举成新的 leader
如果你设置成 true,
很明显,如果这样的follower成为新leader,
就会造成最新的一部分数据丢失掉,上面已经基本完成了不丢数据的方方面面了,
但是有些东西不是我们能控制的,
比如 网络抖动 等不可抗拒的因素,
这时候重试次数就很关键了,
配置合适的retries
重试次数,
和 合适的retry.backoff.ms
重试间隔时间,
将为我们的数据发送提供更高的稳定性,
当然如果实在发送不成功,怎么办呢?
一般我们也可以把发送不成功的数据保存在一个日志文件,
如果数据很重要,那就发送警告信息,
人工干预一下。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。