有这么一个需求: 有个数据抽取工具 要从mysql里面抽日志, 但是只需要某N种类型的数据.
这种通常都是看下抽数工具有没得相关功能/选项来实现这个功能, 或者进行小小的二次开发来实现.但抽数工具众多, 修改压力就巨大(各种5话把门的语言).
那么从源头解决呢, 即写日志的时候就只写要解析的那部分数据, 不太可行, 毕竟日志还要做恢复使用, 而且通常还有从库的啊.
既然无法从源端解决, 也不方便从目标端解决, 那加个中间件如何呢?, 让中间件来做binlog的过滤, 这样既不影响主从库, 而且也不需要管抽数工具怎么实现.
还好我们之前写过这种中间件的代码, 当时是叫的 流量镜像, 我们只需要在此基础上加个filter功能, 来匹配需求即可.
怎么实现binlog的filter呢? 还好我们之前解析过binlog, 而且还写过从Binlog里面抽取指定表的脚本, 我们在此基础上修改下即可. 比如加上去掉delete操作这种操作.
这些过滤规则会破坏事务的完整性, 需谨慎使用!!! 当然也可以改为基于事务来过滤. 比如我们之前写的pymysqlbinlog.
需要注意的关键点大概如下:
也就是只需要修改如下红色部分信息即可:
其它的信息, 比如连接阶段的,或者一些查询信息, 我们应该原样返回给抽数工具(不能让它发现异常).
实现过程就省略了, 完整代码放在文末的. 我这里就以去掉delete_event为例.
使用的时候需要注意修改相关端口,证书之类的信息.
先准备下环境
create table db1.t20250307(id int);
insert into db1.t20250307 values(1);
insert into db1.t20250307 values(2);
insert into db1.t20250307 values(3);
insert into db1.t20250307 values(4);
commit;
flush logs; -- 我们从一个新的binlog开始, 主要是方便截图.
show master status;
# 启动中间件,监控3306端口
python3 t20250307.py # 懒得想名字了, 就日期来命名吧...
# 使用mysqlbinlog模拟抽数工具
mysqlbinlog -vvv -h127.0.0.1 -P3306 -p123456 --stop-never --read-from-remote-server m3314.000089
看起来我们的中间件正常工作了, 然后再模拟下删除数据
begin;
insert into db1.t20250307 values(5);
delete from db1.t20250307 where id=4;
insert into db1.t20250307 values(6);
commit;
我们瞧瞧mysqlbinlog的效果呢
只有两条insert, 并没有delete, 说明我们的中间件过滤功能是有用的.
测试了下, 发现flush logs
会导致抽数工具断开连接. 这个问题就留给感兴趣的读者自行完善了.
提示: 可以用那个流量镜像脚本观察下,数据包的变化. (记得加--ssl-mode=disabled,不然加密的包不好分析.)
参考:
https://cloud.tencent.com/developer/article/2245416
https://www.modb.pro/db/1763358489816174592
没想好名字, 随便叫啥吧.
import struct
from threading import Thread
from multiprocessing import Process
import socket
import time
import sys
import ssl
def btoint(bdata,t='little'):
return int.from_bytes(bdata,t)
def read_pack(rf):
pack_header = rf.read(4)
if len(pack_header) < 4:
print(pack_header,' bye!')
sys.exit(2)
btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header)
pack_size = btrl + (btrh << 16)
bdata = rf.read(pack_size)
return pack_header+bdata
def send_repack(bdata,conn,seq,log_pos):
seq = (seq+1)%255
log_pos += len(bdata)
bdata = bdata[:3] + struct.pack('<B',seq) + bdata[4:18] + struct.pack('<L',log_pos) + bdata[22:]
conn.sendall(bdata)
return seq,log_pos
class mmonitor(object):
def __init__(self):
self.host = '0.0.0.0'
self.port = 3306
self.server = ('192.168.101.21',3314,)
self.cert = '/data/mysql_3314/mysqldata/server-cert.pem'
self.key = '/data/mysql_3314/mysqldata/server-key.pem'
def handler_msg_fileter_delete(self,rf,sock,f):
SEQ = 0
LOG_POS = 0
server_uuid = 866003314
server_uuid = struct.pack('<L',server_uuid)
table_map_packt = b''
while True: # 一些客户端需要查询的信息
bdata = read_pack(rf)
sock.sendall(bdata)
if bdata[4] == 0 and len(bdata) >= 25 and server_uuid == bdata[10:14]:
bdata = read_pack(rf)
SEQ = bdata[3] # struct.unpack('<B',bdata[3:4])
LOG_POS = struct.unpack('<L',bdata[14:18])[0]
sock.sendall(bdata) # GTID
break
while True:
bdata = read_pack(rf)
seq,_,timestamp,event_type,server_id,event_size,log_pos,flags = struct.unpack('<BBLBLLLh',bdata[3:24])
#print(seq,_,timestamp,event_type,server_id,event_size,log_pos,flags)
if event_type == 4: # 轮转了
SEQ = seq
LOG_POS = log_pos
sock.sendall(bdata)
elif event_type == 19: # table_map_event
table_map_packt = bdata
elif event_type != 32:
if table_map_packt != b'':
SEQ,LOG_POS = send_repack(table_map_packt,sock,SEQ,LOG_POS)
table_map_packt = b''
SEQ,LOG_POS = send_repack(bdata,sock,SEQ,LOG_POS)
def handler_msg(self,rf,sock,f):
#print(f'{f} start')
while True:
bdata = read_pack(rf)
sock.sendall(bdata)
print(f'{f}',btoint(bdata[3:4]),bdata)
def handler(self,conn,addr):
sock = socket.create_connection((self.server[0], self.server[1]))
server_rf = sock.makefile('rb')
bdata = read_pack(server_rf)
conn.sendall(bdata)
print('S->C: ',btoint(bdata[3:4]),bdata)
client_rf = conn.makefile('rb')
bdata = read_pack(client_rf)
print('C->S: ',btoint(bdata[3:4]),bdata)
sock.sendall(bdata)
if len(bdata) < 38: #封装为SSL (32+4)
#print('SSL')
#封装客户端的SSL (因为相对于client, 这是server角色)
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile=self.cert, keyfile=self.key)
conn = context.wrap_socket(conn, server_side=True)
client_rf = conn.makefile('rb')
#封装到server的SSL
sock = ssl.wrap_socket(sock)
server_rf = sock.makefile('rb')
t1 = Process(target=self.handler_msg,args=(client_rf,sock,'C->S: ')) #监控客户端数据, 然后发往server端
#t2 = Process(target=self.handler_msg,args=(server_rf,conn,'S->C: '))
t2 = Process(target=self.handler_msg_fileter_delete,args=(server_rf,conn,'S->C: '))
t1.start()
t2.start()
t1.join()
t2.join()
def init(self):
socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_server.bind((self.host, self.port))
socket_server.listen(12345) #设置连接数
self.socket_server = socket_server
accept_client_thread = Thread(target=self.accept_client,)
accept_client_thread.start()
accept_client_thread.join()
def accept_client(self,):
while True:
conn, addr = self.socket_server.accept()
p = Process(target=self.handler,args=(conn,addr),)
p.start()
aa = mmonitor()
aa.init()
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。