在本系列的上篇文章中,我展示了通过使用Kafka Connect API,将数据从数据库流到Apache Kafka®的简单过程。范例中用到了MySQL,但同样适用于其他支持JDBC的任意数据库——是的,几乎所有的数据库都支持!这次,我们实现数据流从数据库到Kafka再到Elasticsearch。
同样,我们以一种可伸缩、容错的方式来运用Kafka Connect完成它,仅需简单配置文件!
假设你已经通过前面的文章完成了安装和设置步骤。启动Elasticsearch吧:
虽然Kafka Connect是Apache Kafka的一部分,但要将数据从Kafka流到Elasticsearch,就得靠Confluent Platform(至少得用Elasticsearch connector)了。
下面的配置文件很简单。如前所述,详细信息参看注释:
注意,如果在整个管道中使用相同的转换器(本例是Avro),那么应将其放入Connect worker配置,而不是每个连接器重复配。
Confluent CLI加载connector:
./bin/confluent load es-sink-mysql-foobar-01 -d /tmp/kafka-connect-elasticsearch-sink.json
如前文File sink,一旦connector被创建和运行(需要几秒直到Task是RUNNING状态),它将加载topic现存内容到指定的Elasticsearch索引。Elasticsearch控制台将输出:
使用Elasticsearch REST API查询,从Kafka流入的数据一览无余:
敏锐的你可能注意到,Kafka Connect已经将document的_id设置为topic/partition/offset,这使得我们可以用Elasticsearch的幂等写来优雅地进行一次交付。如果只有一个事件流,这是非常有用的,但是在某些情况下希望声明我们自己的键——这个话题后面再说。
还能看到mapping也被创建,使用了源MySQL表的定义。
通过Kibana,可以很容易地观察来自Kafka的数据,由于数据类型得到了保留,因此可以根据时间戳选择和聚合需要的数据:
小结
通过一些简单的REST调用,我们既构建了可伸缩的数据管道,使得流式数据从关系型数据库一直到Elasticsearch和文本文件。有了Kafka Connect生态系统,管道还能扩展和改造,以适应HDFS、BigQuery、S3、Couchbase和MongoDB的需求。
关于译者: 十年网络安全行业,八年Java EE研发、架构、管理,三年开源对象数据库社区运营,一年大数据、机器学习产线管理,知乎、微信@rosenjiang。
领取专属 10元无门槛券
私享最新 技术干货