操作场景
本文介绍通过 TDMQ Pulsar 版控制台创建集群和 Topic 等资源后,下载 Demo 并进行简单的测试,帮助您快速上手使用 TDMQ Pulsar 版服务。
前提条件
操作步骤
步骤1:新建集群并配置网络
1. 登录 TDMQ Pulsar 版控制台,进入集群管理页面,选择目标地域。
2. 单击新建集群,创建一个集群。
3. 在创建好的集群中,单击操作列的接入地址。


步骤2:创建命名空间

步骤3:创建角色并授权
1. 在 TDMQ Pulsar 版控制台的 角色管理 页面,选择地域和刚刚创建好的集群,单击新建进入新建角色页面。
2. 填写角色名称和说明,单击提交完成角色创建。
3. 进入 命名空间 页面,在刚刚创建的命名空间中,单击操作列的配置权限进入命名空间的权限列表。
4. 在配置权限页面,单击添加角色,将刚刚创建的角色添加进来,分配生产和消费的权限。


5. 看到下图即代表配置成功。

步骤4:创建 Topic 和订阅关系
1. 在 Topic 管理 页面,在顶部选择目标地域、当前集群和命名空间,单击新建,创建一个 Topic。
2. 单击操作列的新增订阅,为刚刚新建好的 Topic 创建一个订阅关系。
3. 单击操作列的更多 > 查看订阅/消费者,可看到刚刚创建好的订阅。


步骤5:下载并运行 Demo
1. 下载 Demo(Demo 下载地址),并配置相关参数。
关于 Maven 依赖
<!-- in your <properties> block --><pulsar.version>2.7.2</pulsar.version><!-- in your <dependencies> block --><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>${pulsar.version}</version></dependency>
创建 Client
// 一个Pulsar client对应一个客户端链接// 原则上一个进程一个client,尽量避免重复创建,消耗资源// 关于客户端和生产消费者的实践教程,可以参考官方文档 https://cloud.tencent.com/document/product/1179/58090PulsarClient client = PulsarClient.builder()//替换成集群接入地址,位于【集群管理】页面接入地址.serviceUrl("http://pulsar-..tencenttdmq.com:8080")//替换成角色密钥,位于【角色管理】页面.authentication(AuthenticationFactory.token("eyJr")).build();System.out.println(">> pulsar client created.");
创建消费者进程
Consumer<byte[]> consumer = client.newConsumer()//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-****/namespace/topicName")//需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("subscriptionName")//声明消费模式为exclusive(独占)模式.subscriptionType(SubscriptionType.Exclusive)//配置从最早开始消费,否则可能会消费不到历史消息.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();System.out.println(">> pulsar consumer created.");
说明:
Topic 名称需要填入完整路径,即“persistent://clusterid/namespace/Topic”,clusterid/namespace/topic 的部分可以从控制台上 Topic 管理 页面直接复制。


subscriptionName 需要写入订阅名,可在消费者界面查看。
创建生产者进程
Producer<byte[]> producer = client.newProducer()//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称.topic("persistent://pulsar-****/namespace/topicName").create();System.out.println(">> pulsar producer created.");
说明:
Topic 名称需要填入完整路径,即“persistent://clusterid/namespace/Topic”,clusterid/namespace/topic 的部分可以从控制台上 Topic 管理 页面直接复制。
生产消息
for (int i = 0; i < 5; i++) {String value = "my-sync-message-" + i;//发送消息MessageId msgId = producer.newMessage().value(value.getBytes()).send();System.out.println("deliver msg " + msgId + ",value:" + value);}//关闭生产者producer.close();
消费消息
for (int i = 0; i < 5; i++) {//接收当前offset对应的一条消息Message<byte[]> msg = consumer.receive();MessageId msgId = msg.getMessageId();String value = new String(msg.getValue());System.out.println("receive msg " + msgId + ",value:" + value);//接收到之后必须要ack,否则offset会一直停留在当前消息,无法继续消费consumer.acknowledge(msg);}
2. 在
pom.xml 所在目录执行命令 mvn clean package,或者通过 IDE 自带的功能打包整个工程,在 target 目录下生成一个可运行的 jar 文件。

3. 运行成功后将 jar 文件上传到云服务器,具体操作参见 如何将本地文件拷贝到云服务器。
4. 登录云服务器,进入到刚刚上传 jar 文件所在的目录,可看到文件已上传到云服务器。

5. 执行命令
tdmq-demo-cloud-1.0.0.jar,运行 Demo,可查看运行日志。
6. 登录 TDMQ Pulsar 版控制台,依次单击Topic 管理 > Topic 名称进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。


步骤6:查询消息
1. 进入 消息查询 页面,可查看 Demo 运行后的消息轨迹。
说明:
消息轨迹的查询只支持单条消息,如果用户在 Producer 侧开启了 Batch 功能,则在消息查询中,同一个 Batch 的消息只可以查询到 Batch 中的第一条消息。


消息轨迹如下:


