首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

这可能是北半球最暖的Kafka Connect数据管道教程:2

在本系列的上篇文章中,我展示了通过使用Kafka Connect API,将数据从数据库流到Apache Kafka®的简单过程。范例中用到了MySQL,但同样适用于其他支持JDBC的任意数据库——是的,几乎所有的数据库都支持!这次,我们实现数据流从数据库到Kafka再到Elasticsearch。

同样,我们以一种可伸缩、容错的方式来运用Kafka Connect完成它,仅需简单配置文件!

假设你已经通过前面的文章完成了安装和设置步骤。启动Elasticsearch吧:

虽然Kafka Connect是Apache Kafka的一部分,但要将数据从Kafka流到Elasticsearch,就得靠Confluent Platform(至少得用Elasticsearch connector)了。

下面的配置文件很简单。如前所述,详细信息参看注释:

注意,如果在整个管道中使用相同的转换器(本例是Avro),那么应将其放入Connect worker配置,而不是每个连接器重复配。

Confluent CLI加载connector:

./bin/confluent load es-sink-mysql-foobar-01 -d /tmp/kafka-connect-elasticsearch-sink.json

如前文File sink,一旦connector被创建和运行(需要几秒直到Task是RUNNING状态),它将加载topic现存内容到指定的Elasticsearch索引。Elasticsearch控制台将输出:

使用Elasticsearch REST API查询,从Kafka流入的数据一览无余:

敏锐的你可能注意到,Kafka Connect已经将document的_id设置为topic/partition/offset,这使得我们可以用Elasticsearch的幂等写来优雅地进行一次交付。如果只有一个事件流,这是非常有用的,但是在某些情况下希望声明我们自己的键——这个话题后面再说。

还能看到mapping也被创建,使用了源MySQL表的定义。

通过Kibana,可以很容易地观察来自Kafka的数据,由于数据类型得到了保留,因此可以根据时间戳选择和聚合需要的数据:

小结

通过一些简单的REST调用,我们既构建了可伸缩的数据管道,使得流式数据从关系型数据库一直到Elasticsearch和文本文件。有了Kafka Connect生态系统,管道还能扩展和改造,以适应HDFS、BigQuery、S3、Couchbase和MongoDB的需求。

关于译者: 十年网络安全行业,八年Java EE研发、架构、管理,三年开源对象数据库社区运营,一年大数据、机器学习产线管理,知乎、微信@rosenjiang。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190130G12BIE00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券