消费 Demo-消费预过滤

最近更新时间:2025-12-29 18:59:22

我的收藏
本操作文档指导您如何使用 Python SDK 实现日志数据的预过滤与消费。

环境准备

SDK 完整示例:continuous_sample_consumer
Python 版本:推荐使用 Python 3.5 及以上版本。

核心逻辑说明

数据结构体 (LogGroup)

消费者处理消费的数据,用户的日志数据被保存到 log_group 结构体中。
log_group {
source //日志来源,一般为机器的 IP
filename //日志文件名
logs {
time //日志时间,unix 时间戳,微秒级别
user_defined_log_kvs //用户日志字段
}
}

消费者方法实现 (SampleConsumer)

通过继承 ConsumerProcessorBase 类来定义具体的日志处理逻辑。
class SampleConsumer(ConsumerProcessorBase):
last_check_time = 0

def initialize(self, topic_id):
self.topic_id = topic_id

def process(self, log_groups, offset_tracker):
for log_group in log_groups:
for log in log_group.logs:
# 处理单行数据
item = dict()
item['filename'] = log_group.filename
item['source'] = log_group.source
item['time'] = log.time
for content in log.contents:
item[content.key] = content.value

# Subsequent data processing
# put your business logic here
print(json.dumps(item))

# offset commit
current_time = time.time()
if current_time - self.last_check_time > 3:
try:
self.last_check_time = current_time
offset_tracker.save_offset(True)
except Exception:
import traceback
traceback.print_exc()
else:
try:
offset_tracker.save_offset(False)
except Exception:
import traceback
traceback.print_exc()

return None

消费任务启动

配置要点:
预过滤配置:通过设置 query 参数实现。若不配置则默认进行全量消费
并发建议:为了保证消费性能,消费者数量应与日志主题的分区(Partition)数量保持一致
Demo 中只有一个消费者,建议您的消费者个数与日志主题分区数保持一致。
class App:
def __init__(self):
self.shutdown_flag = False
# access endpoint
self.endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')
# region
self.region = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_REGION', '')
# secret id
self.access_key_id = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')
# secret key
self.access_key = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')
# logset id
self.logset_id = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')
# topic ids
self.topic_ids = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_TOPICS', '').split(',')
# 预过滤条件 (通过配置 query 参数来实现预过滤再消费, 不配置这个参数代表全量消费日志)
# query的例子: log_keep(op_and(op_gt(v("status"), 400), str_exist(v("cdb_message"), "pwd")))
# 实现的效果:仅消费 status 大于400且 cdb_message 字段包含 "pwd" 的日志
self.query = '您的过滤条件'
# consumer group name,
self.consumer_group = 'consumer-group-1'
# consumer id, 建议您的消费者个数=日志主题分区数.
self.consumer_name1 = "consumer-group-1-A"
assert self.endpoint and self.access_key_id and self.access_key and self.logset_id, ValueError("endpoint/access_id/access_key and "
"logset_id cannot be empty")
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGINT, self.signal_handler)

def signal_handler(self, signum, frame):
print(f"catch signal {signum},cleanup...")
self.shutdown_flag = True

def run(self):
print("*** start to run consumer...")
self.consume()
# waiting for exit signal
while not self.shutdown_flag:
time.sleep(1)
# shutdown consumer
print("*** stopping workers")
self.consumer.shutdown()
sys.exit(0)

def consume(self):
try:
# consumer config
option1 = LogHubConfig(self.endpoint, self.access_key_id, self.access_key, self.region, self.logset_id, self.topic_ids, self.consumer_group,
self.consumer_name1, heartbeat_interval=3, data_fetch_interval=1,
offset_start_time='begin', max_fetch_log_group_size=1048576,query=self.query)
# init consumer
self.consumer = ConsumerWorker(
SampleConsumer, consumer_option=option1)

# start consumer
print("*** start to consume data...")
self.consumer.start()
except Exception as e:
import traceback
traceback.print_exc()
raise e


预过滤效果测试示例

通过配置 query 参数,可以在日志从服务端传输到本地之前完成过滤,大幅节省带宽开销。
原始日志
{"cdb_message":"password:99743036","log_level":"ERROR","status":"400","__SOURCE__":"127.0.0.1"}
{"cdb_message":"pwd:3qJ0VaPn","log_level":"INFO","status":"500","__SOURCE__":"127.0.0.1"}

预过滤条件(Query)
log_keep(op_and(op_gt(v("status"), 400), str_exist(v("cdb_message"), "pwd")))
最终消费到的结果
仅满足条件的第二条日志会被下载到本地进行 process 处理。

{"cdb_message":"pwd:3qJ0VaPn","log_level":"INFO","status":"500","__SOURCE__":"127.0.0.1"}