前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Go 语言微服务框架 Kratos 集成第三方库 kafka-go 操作消息队列 Kafka

Go 语言微服务框架 Kratos 集成第三方库 kafka-go 操作消息队列 Kafka

作者头像
frank.
发布2025-01-06 12:29:38
发布2025-01-06 12:29:38
6500
代码可运行
举报
运行总次数:0
代码可运行

大家好,我是 frank。「Golang语言开发栈」公众号作者。

01 、介绍

Go 语言微服务框架 Kratos 不限制使用任何第三方库,Go 语言操作消息队列 Kafka 有很多优秀的第三方库,比如 sarama 和 kafka-go,我们在之前的文章中介绍过 Go 语言怎么使用 sarama 操作消息队列 Kafka。

本文我们介绍 Go 微服务框架 Kratos 怎么集成第三方库 kafka-go[1] 操作消息队列 Kafka。

02 、Kratos 集成第三方库 kafka-go

我们在本地搭建 Go 运行环境,并安装 kratos 工具,使用 kratos 工具创建项目 blog。

在 blog 项目中,集成第三方库 kafka-go。

创建项目

示例代码:

代码语言:javascript
代码运行次数:0
复制
kratos new blog

安装 kafka-go

代码语言:javascript
代码运行次数:0
复制
go get github.com/segmentio/kafka-go

集成 Kafka Producer(生产者)和 Kafka Consumer(消费者)

编写文件 blog/internal/data/data.go

导入第三方库:

代码语言:javascript
代码运行次数:0
复制
import (
 "github.com/segmentio/kafka-go"
)

添加 Kafka Producer(生产者)和 Kafka Consumer(消费者):

代码语言:javascript
代码运行次数:0
复制
// Data .
type Data struct {
 // TODO wrapped database client
 dbEngine *xorm.Engine
 kp       *kafkaProducer
 kc       *KafkaConsumer
}

// NewData .
func NewData(c *conf.Data, logger log.Logger, dbEngin *xorm.Engine, kp *kafkaProducer, kc *KafkaConsumer) (*Data, func(), error) {
 cleanup := func() {
  log.NewHelper(logger).Info("closing the data resources")
 }
 return &Data{
  dbEngine: dbEngin,
  kp:       kp,
  kc:       kc,
 }, cleanup, nil
}

Kafka Producer(生产者):

代码语言:javascript
代码运行次数:0
复制
type kafkaProducer struct {
 writer *kafka.Writer
}

func NewKafkaProducer(c *conf.Data) *kafkaProducer {
 brokers := c.Kafka.Brokers
 topic := c.Kafka.Topic
 writer := &kafka.Writer{
  Addr:     kafka.TCP(brokers...),
  Topic:    topic,
  Balancer: &kafka.LeastBytes{},
 }
 return &kafkaProducer{writer: writer}
}

func (p *kafkaProducer) SendMessage(ctx context.Context, key, value []byte) error {
 err := p.writer.WriteMessages(ctx, kafka.Message{
  Key:   key,
  Value: value,
 })
 if err != nil {
  return err
 }
 return nil
}

func (p *kafkaProducer) Close() error {
 return p.writer.Close()
}

Kafka Consumer(消费者):

代码语言:javascript
代码运行次数:0
复制
type KafkaConsumer struct {
 reader *kafka.Reader
}

func NewKafkaConsumer(c *conf.Data) *KafkaConsumer {
 brokers := c.Kafka.Brokers
 topic := c.Kafka.Topic
 groupId := c.Kafka.GroupId
 reader := kafka.NewReader(kafka.ReaderConfig{
  Brokers: brokers,
  Topic:   topic,
  GroupID: groupId,
 })
 return &KafkaConsumer{
  reader: reader,
 }
}

func (c *KafkaConsumer) Start(ctx context.Context) {
 for {
  msg, err := c.reader.ReadMessage(ctx)
  if err != nil {
   return
  }
  log.Debugf("key=%s || value=%s", string(msg.Key), string(msg.Value))
 }
}

func (c *KafkaConsumer) Close() error {
 return c.reader.Close()
}

生产 kafka 消息的方法:

创建文件 blog/internal/data/kafka.go

示例代码:

代码语言:javascript
代码运行次数:0
复制
func (u *userRepository) KafkaSendMessage(ctx context.Context, key []byte, value []byte) (err error) {
 defer u.data.kp.Close()
 // 设置超时时间
 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 defer cancel()
 err = u.data.kp.SendMessage(ctx, key, value)
 if err != nil {
  log.Errorf("KafkaSendMessage() || err=%v", err)
  return
 }
 return
}

阅读上面这段代码,我们可以发现 KafkaSendMessage 方法封装了生产 kafka 消息的方法 u.data.kp.SendMessage

需要注意的是,我们需要设置超时时间,否则,会返回错误消息 context deadline exceeded

添加 wire 提供者:

代码语言:javascript
代码运行次数:0
复制
// ProviderSet is data providers.
var ProviderSet = wire.NewSet(NewData, NewGreeterRepo, NewDbEngine, NewUserRepository, NewKafkaProducer, NewKafkaConsumer)

生成 wire 代码:

代码语言:javascript
代码运行次数:0
复制
cd blog/cmd/blog
wire

03 、操作 Kafka

在 Kratos 项目中,一般在项目的 bizservice 层使用 Kafka 的生产逻辑;在 service 层使用 Kafka 的消费逻辑。

限于篇幅,我们以 Kafka 的生产逻辑为例,介绍怎么在 biz 层生产 Kafka 消息。

编写文件 blog/internal/biz/user.go,在 CreateUser 方法中添加生产 Kafka 消息的代码。

代码语言:javascript
代码运行次数:0
复制
type UserRepository interface {
 Create(ctx context.Context, user *User) (int64, error)
 KafkaSendMessage(ctx context.Context, key []byte, value []byte) (err error)
}

func (u *UserUsecase) CreateUser(ctx context.Context, user *User) (id int64, err error) {
 id, err = u.userRepo.Create(ctx, user)
 if err != nil {
  return
 }
 if id > 0 {
  var b []byte
  b, err = json.Marshal(user)
  if err != nil {
   return
  }
  err = u.userRepo.KafkaSendMessage(ctx, []byte(user.Name), b)
  if err != nil {
   return
  }
 }
 return
}

阅读上面这段代码,我们可以发现 UserRepository 接口中的方法 KafkaSendMessage,就是我们在 blog/internal/data/kafka.go 文件中实现的方法。

项目运行和测试:

Kratos 运行:

代码语言:javascript
代码运行次数:0
复制
kratos run

curl 请求示例:

代码语言:javascript
代码运行次数:0
复制
curl -H "Content-Type: application/json" -X POST -d '{"name":"mac", "email":"mac@gmail.com", "password":"123456"}' http://192.168.110.209:8000/user/create

kafka 消费者:

代码语言:javascript
代码运行次数:0
复制
kafka_2.13-3.9.0/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
{"Id":10,"Name":"mac","Email":"mac@gmail.com","Password":"123456","Created":1735972949,"Updated":1735972949}

04 、总结

本文我们通过示例代码,介绍 Kratos 微服务框架怎么集成第三方库 kafka-go,操作 Kafka。

参考资料

[1]

kafka-go: https://github.com/segmentio/kafka-go

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-01-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Golang语言开发栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档