前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >[MYSQL] 定制mysql中间件实现binlog过滤

[MYSQL] 定制mysql中间件实现binlog过滤

原创
作者头像
大大刺猬
发布2025-03-07 17:16:04
发布2025-03-07 17:16:04
12000
代码可运行
举报
文章被收录于专栏:大大刺猬大大刺猬
运行总次数:0
代码可运行

导读

有这么一个需求: 有个数据抽取工具 要从mysql里面抽日志, 但是只需要某N种类型的数据.

这种通常都是看下抽数工具有没得相关功能/选项来实现这个功能, 或者进行小小的二次开发来实现.但抽数工具众多, 修改压力就巨大(各种5话把门的语言).

那么从源头解决呢, 即写日志的时候就只写要解析的那部分数据, 不太可行, 毕竟日志还要做恢复使用, 而且通常还有从库的啊.

既然无法从源端解决, 也不方便从目标端解决, 那加个中间件如何呢?, 让中间件来做binlog的过滤, 这样既不影响主从库, 而且也不需要管抽数工具怎么实现.

原理细节分析

还好我们之前写过这种中间件的代码, 当时是叫的 流量镜像, 我们只需要在此基础上加个filter功能, 来匹配需求即可.

怎么实现binlog的filter呢? 还好我们之前解析过binlog, 而且还写过从Binlog里面抽取指定表的脚本, 我们在此基础上修改下即可. 比如加上去掉delete操作这种操作.

这些过滤规则会破坏事务的完整性, 需谨慎使用!!! 当然也可以改为基于事务来过滤. 比如我们之前写的pymysqlbinlog.

需要注意的关键点大概如下:

  1. 由于我们是模拟的中间件, binlog是不需要落地的, 所以解析数据包的时候, 得算上mysql协议,比如seq之类的信息.
  2. 而且存在过滤掉某部分event, 所以event里面的log_pos也得同步更新.
  3. binlog中元数据信息(table_map_event)和对应的数据操作行(row_event)是1对多关系, 所以如果某个row_event被过滤掉了, 则对应的table_map_event也不应该返回给抽数工具.(实际上返回给mysqlbinlog之类的工具没啥影响).
  4. 可能某个事务里面的信息全部都被过滤了, 所以还存在着空事务的情况. 这个我们就保留下来了.(不是偷懒,而是留一点我们的特色!)

也就是只需要修改如下红色部分信息即可:

其它的信息, 比如连接阶段的,或者一些查询信息, 我们应该原样返回给抽数工具(不能让它发现异常).

演示

实现过程就省略了, 完整代码放在文末的. 我这里就以去掉delete_event为例.

使用的时候需要注意修改相关端口,证书之类的信息.

先准备下环境

代码语言:sql
复制
create table db1.t20250307(id int);
insert into db1.t20250307 values(1);
insert into db1.t20250307 values(2);
insert into db1.t20250307 values(3);
insert into db1.t20250307 values(4);
commit;
flush logs; -- 我们从一个新的binlog开始, 主要是方便截图.
show master status;
代码语言:shell
复制
# 启动中间件,监控3306端口
python3 t20250307.py  # 懒得想名字了, 就日期来命名吧...

# 使用mysqlbinlog模拟抽数工具
mysqlbinlog -vvv -h127.0.0.1 -P3306  -p123456 --stop-never --read-from-remote-server m3314.000089

看起来我们的中间件正常工作了, 然后再模拟下删除数据

代码语言:sql
复制
begin;
insert into db1.t20250307 values(5);
delete from db1.t20250307 where id=4;
insert into db1.t20250307 values(6);
commit;

我们瞧瞧mysqlbinlog的效果呢

只有两条insert, 并没有delete, 说明我们的中间件过滤功能是有用的.

待完善

测试了下, 发现flush logs会导致抽数工具断开连接. 这个问题就留给感兴趣的读者自行完善了.

提示: 可以用那个流量镜像脚本观察下,数据包的变化. (记得加--ssl-mode=disabled,不然加密的包不好分析.)

参考:

https://cloud.tencent.com/developer/article/2245416

https://www.modb.pro/db/1763358489816174592

附源码

没想好名字, 随便叫啥吧.

代码语言:python
代码运行次数:0
复制
import struct
from threading import Thread
from multiprocessing import Process
import socket
import time
import sys
import ssl

def btoint(bdata,t='little'):
        return int.from_bytes(bdata,t)

def read_pack(rf):
	pack_header = rf.read(4)
	if len(pack_header) < 4:
		print(pack_header,' bye!')
		sys.exit(2)
	btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header)
	pack_size = btrl + (btrh << 16)
	bdata = rf.read(pack_size)
	return pack_header+bdata


def send_repack(bdata,conn,seq,log_pos):
	seq = (seq+1)%255
	log_pos += len(bdata)
	bdata = bdata[:3] + struct.pack('<B',seq) + bdata[4:18] + struct.pack('<L',log_pos) + bdata[22:]
	conn.sendall(bdata)
	return seq,log_pos

class mmonitor(object): 
	def __init__(self):
		self.host = '0.0.0.0'
		self.port = 3306
		self.server = ('192.168.101.21',3314,)
		self.cert = '/data/mysql_3314/mysqldata/server-cert.pem'
		self.key = '/data/mysql_3314/mysqldata/server-key.pem'

	def handler_msg_fileter_delete(self,rf,sock,f):
		SEQ = 0
		LOG_POS = 0
		server_uuid = 866003314
		server_uuid = struct.pack('<L',server_uuid)
		table_map_packt = b''
		while True: # 一些客户端需要查询的信息
			bdata = read_pack(rf)
			sock.sendall(bdata)
			if bdata[4] == 0 and len(bdata) >= 25 and server_uuid == bdata[10:14]:
				bdata = read_pack(rf)
				SEQ = bdata[3] # struct.unpack('<B',bdata[3:4])
				LOG_POS = struct.unpack('<L',bdata[14:18])[0]
				sock.sendall(bdata) # GTID
				break
		
		while True:
			bdata = read_pack(rf)
			seq,_,timestamp,event_type,server_id,event_size,log_pos,flags = struct.unpack('<BBLBLLLh',bdata[3:24])
			#print(seq,_,timestamp,event_type,server_id,event_size,log_pos,flags)
			if event_type == 4: # 轮转了
				SEQ = seq
				LOG_POS = log_pos
				sock.sendall(bdata)
			elif event_type == 19: # table_map_event
				table_map_packt = bdata
			elif event_type != 32:
					if table_map_packt != b'':
						SEQ,LOG_POS = send_repack(table_map_packt,sock,SEQ,LOG_POS)
						table_map_packt = b''
					SEQ,LOG_POS = send_repack(bdata,sock,SEQ,LOG_POS)
		
	def handler_msg(self,rf,sock,f):
		#print(f'{f} start')
		while True:
			bdata = read_pack(rf)
			sock.sendall(bdata)
			print(f'{f}',btoint(bdata[3:4]),bdata)

	def handler(self,conn,addr):
		sock = socket.create_connection((self.server[0], self.server[1]))
		server_rf = sock.makefile('rb')
		bdata = read_pack(server_rf)
		conn.sendall(bdata)
		print('S->C: ',btoint(bdata[3:4]),bdata)

		client_rf = conn.makefile('rb')
		bdata = read_pack(client_rf)
		print('C->S: ',btoint(bdata[3:4]),bdata)
		sock.sendall(bdata)

		if len(bdata) < 38: #封装为SSL (32+4)
			#print('SSL')
			#封装客户端的SSL (因为相对于client, 这是server角色)
			context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
			context.load_cert_chain(certfile=self.cert, keyfile=self.key)
			conn = context.wrap_socket(conn, server_side=True)
			client_rf = conn.makefile('rb')

			#封装到server的SSL
			sock = ssl.wrap_socket(sock)
			server_rf = sock.makefile('rb')
		
		t1 = Process(target=self.handler_msg,args=(client_rf,sock,'C->S: ')) #监控客户端数据, 然后发往server端
		#t2 = Process(target=self.handler_msg,args=(server_rf,conn,'S->C: '))
		t2 = Process(target=self.handler_msg_fileter_delete,args=(server_rf,conn,'S->C: '))
		t1.start()
		t2.start()
		t1.join()
		t2.join()

	def init(self):
		socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		socket_server.bind((self.host, self.port))
		socket_server.listen(12345) #设置连接数
		self.socket_server = socket_server

		accept_client_thread = Thread(target=self.accept_client,)
		accept_client_thread.start()
		accept_client_thread.join()
		
	def accept_client(self,):
		while True:
			conn, addr = self.socket_server.accept()
			p = Process(target=self.handler,args=(conn,addr),)
			p.start()
	
aa = mmonitor()
aa.init()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 导读
  • 原理细节分析
  • 演示
  • 待完善
  • 附源码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档