大家好,又见面了,我是你们的朋友全栈君。
#kafka 生产者配置 #kafka 集群 kafka.bootstrap.servers=ip:端口 #发送端确认模式 kafka.acks=all #发送失败重试次数 kafka.retries =10 #批处理条数 kafka.batch.size=16384 #延迟统一收集,产生聚合,然后批量发送 kafka.linger.ms=100 #批处理缓冲区 kafka.buffer.memory=33554432 #key 序列化 kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer #value序列化 kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer #消费端 集群 kafka.bootstrap.servers=IP:端口 #一个用于跟踪调查的ID ,最好同group.id相同 kafka.client.id=MesSystem #Consumer归属的组ID kafka.group.id=debtorInfo #限制每回返回的最大数据条数 kafka.max.poll.records=1000 #是否自动提交 kafka.enable.auto.commit=false #自动提交的频率 kafka.auto.commit.interval.ms=1000 #会话的超时限制 kafka.session.timeout.ms=15000 kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
//生产者
KafkaProducerUtils.send("topics", json.toString());//往kafka中存入消息
//KafkaProducerUtils 工具类
package com.tera.util;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
import java.util.List;
import java.util.Properties;
public class KafkaProducerUtils {
//把KafkaProducer对象放到本地线程中
private static ThreadLocal<KafkaProducer> local = new ThreadLocal<KafkaProducer>();
private static Properties props;
private static KafkaProducer<String, String> producer;
static {
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertyUtil.getProperty("kafka.bootstrap.servers"));
props.put(ProducerConfig.ACKS_CONFIG, PropertyUtil.getProperty("kafka.acks"));
props.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(PropertyUtil.getProperty("kafka.retries")));
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(PropertyUtil.getProperty("kafka.batch.size")));
props.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(PropertyUtil.getProperty("kafka.linger.ms")));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.parseInt(PropertyUtil.getProperty("kafka.buffer.memory")));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertyUtil.getProperty("kafka.key.serializer"));
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PropertyUtil.getProperty("kafka.value.serializer"));
producer = new KafkaProducer<String, String>(props);
}
static class SendCallback implements Callback {
ProducerRecord<String, String> record;
int sendSeq = 0;
public SendCallback(ProducerRecord record, int sendSeq) {
this.record = record;
this.sendSeq = sendSeq;
}
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//send success
if (null == e) {
String meta = "send----topic:" + recordMetadata.topic() + ", partition:"
+ recordMetadata.topic() + ", offset:" + recordMetadata.offset();
System.out.println("send message success, record:" + record.toString() + ", meta:" + meta);
System.out.println("value==========="+record.value());
return;
}
//send failed
System.out.println("send message failed, seq:" + sendSeq + ", record:" + record.toString() + ", errmsg:" + e.getMessage());
}
}
/**
* 发送消息到kafka
* @param topicName
* @param key
* @param value
*/
public static void send(String topicName,String value) throws Exception {
if(StringUtils.isNullOrEmpty(topicName)){
throw new Exception("参数错误,topicName不能为空");
}
// RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(topicName,null,value)).get();
// System.out.println("topic---"+recordMetadata.topic()+"--hasTimestamp---"+recordMetadata.hasTimestamp()+"--hasOffset"+
// recordMetadata.hasOffset()+"--partition--"+recordMetadata.partition()+"---"+recordMetadata.serializedKeySize()+"--"+recordMetadata.serializedValueSize()
// +"-----all--"+recordMetadata.toString()
// );
ProducerRecord record= new ProducerRecord<String, String>(topicName,null,value);
producer.send(record,new SendCallback(record,0));
producer.flush();
}
/**
* 发送消息到kafka
* @param topicName
* @param key
* @param value
*/
public static void sendBatch(String topicName,List<String> list) throws Exception {
if(StringUtils.isNullOrEmpty(topicName)){
throw new Exception("参数错误,topicName不能为空");
}
if(list==null || list.size() ==0){
throw new Exception("参数错误,list不能为空");
}
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (String value : list){
producer.send(new ProducerRecord<String, String>(topicName,null,value));
}
producer.close();
}
public static void main(String[] args) {
KafkaProducerUtils kafkaProducerUtils = new KafkaProducerUtils();
try {
kafkaProducerUtils.send("withdrawaldev","123");
} catch (Exception e) {
e.printStackTrace();
}
}
}
//消费者
@Autowired
private DefaultKafkaConsumerFactory consumerFactory;
Consumer consumer = consumerFactory.createConsumer();
consumer.subscribe(Arrays.asList("t_message_log"));
ConsumerRecords<Integer, String> records = null;
records = consumer.poll(100);
for (ConsumerRecord<Integer, String> record : records) {
value = record.value();//数据
}
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/136688.html原文链接:https://javaforall.cn