MCP(Model Context Protocol)细粒度权限控制结合动态 RBAC(基于角色的访问控制)的实现,恰似一股清流,为权限管理开辟了全新境界。
随着企业规模的扩大与业务线的不断拓展,系统所承载的功能模块与用户群体愈发庞杂。早期那种简单粗暴的权限划分方式,已无法精准地匹配不同岗位、不同业务场景下用户对资源的差异化访问需求。例如,在金融机构中,交易员、风控人员、审计人员对于交易数据的访问权限就应截然不同:交易员需实时读写交易数据,但不能接触风控模型参数;风控人员可获取交易实时数据用于风险评估,却无权修改交易指令;审计人员则只在特定审计周期内,按既定流程查阅过往交易记录及相关操作日志。
传统 RBAC 模型虽能依据角色划分权限,可在面对业务流程的动态变化、临时任务组建的跨部门项目团队等复杂场景时,显得力不从心。此时,MCP 细粒度权限控制应运而生,它引入了上下文(Context)这一关键维度,让权限判定不再局限于用户身份与角色,还能结合诸如时间、地点、操作对象属性等丰富因素,真正实现权限的精准把控,为动态 RBAC 实现铺平道路。
在大型医院信息系统中,汇聚了来自不同科室、岗位的海量用户,涉及患者病历、医疗影像、药品库存、财务结算等众多敏感且关键的业务数据。传统权限管理面临诸多难题:
实施基于 MCP 与动态 RBAC 的权限管理方案后,医院信息系统预计实现以下效果:
sudo apt-get update && sudo apt-get upgrade
确保系统组件更新至最新状态,避免兼容性问题。CREATE TABLE users (
user\_id INT PRIMARY KEY AUTO\_INCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
password\_hash VARCHAR(100) NOT NULL,
email VARCHAR(100),
created\_at TIMESTAMP DEFAULT CURRENT\_TIMESTAMP
);
sudo apt-get install mysql-server
安装,安装完成后,执行 sudo mysql_secure_installation
进行安全配置,包括设置 root 用户强密码、禁止远程 root 登录等操作。创建用于存储权限相关数据(用户、角色、权限策略、上下文信息等)的数据库 mcp_auth_db
,并建立初始数据表结构,例如用户表:角色表:
CREATE TABLE roles (
role_id INT PRIMARY KEY AUTO_INCREMENT,
role_name VARCHAR(50) UNIQUE NOT NULL,
role_description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
权限策略表(简化示例):
CREATE TABLE auth_policies (
policy_id INT PRIMARY KEY AUTO_INCREMENT,
role_id INT NOT NULL,
object_type VARCHAR(50) NOT NULL,
action_type VARCHAR(50) NOT NULL,
context_rules JSON NOT NULL, -- 存储上下文规则,如 {"time_range":"09:00-18:00","locations":["dept1","dept2"]}
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (role_id) REFERENCES roles(role_id)
);
sudo apt-get install python3.8 python3-pip
安装 Python 及包管理工具 pip。安装 Flask 框架用于构建 Web 服务端,执行 pip install flask
,后续将基于此框架搭建权限管理服务接口。auth_service.py
):from flask import Flask, request, jsonify
import mysql.connector
app = Flask(\\_\\_name\\_\\_)
# 数据库连接配置
db\\_config = {
'user': 'root',
'password': 'your\\_password',
'host': 'localhost',
'database': 'mcp\\_auth\\_db'
}
@app.route('/login', methods=['POST'])
def login():
username = request.json.get('username')
password = request.json.get('password')
if not username or not password:
return jsonify({'error': 'Missing username or password'}), 400
# 连接数据库验证用户
try:
conn = mysql.connector.connect(\\*\\*db\\_config)
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT u.user\\_id, u.username, r.role\\_id, r.role\\_name FROM users u "
"JOIN user\\_roles ur ON u.user\\_id = ur.user\\_id "
"JOIN roles r ON ur.role\\_id = r.role\\_id "
"WHERE u.username = %s AND u.password\\_hash = MD5(%s)",
(username, password)
)
user\\_roles = cursor.fetchall()
if not user\\_roles:
return jsonify({'error': 'Invalid username or password'}), 401
# 返回用户及其角色信息
user\\_info = {
'user\\_id': user\\_roles[0]['user\\_id'],
'username': user\\_roles[0]['username'],
'roles': [{'role\\_id': ur['role\\_id'], 'role\\_name': ur['role\\_name']} for ur in user\\_roles]
}
return jsonify(user\\_info), 200
except mysql.connector.Error as err:
print(f"Database error: {err}")
return jsonify({'error': 'Internal server error'}), 500
finally:
if 'conn' in locals():
conn.close()
此模块实现用户基于用户名与密码登录,通过数据库关联查询获取用户的角色列表,为后续权限判定提供基础。
context_collector.py
):from flask import request, g
import datetime
import geoip2.database # 需提前安装 geoip2 库用于 IP 地理定位
# 加载 GeoIP 数据库,用于获取用户地理位置信息
geoip\\_db\\_path = 'GeoLite2-City.mmdb' # 需提前下载并配置路径
geoip\\_reader = geoip2.database.Reader(geoip\\_db\\_path)
def collect\\_context():
# 收集时间上下文
current\\_time = datetime.datetime.now()
g.context = {
'time': {
'current\\_time': current\\_time.isoformat(),
'time\\_of\\_day': current\\_time.strftime('%H:%M'),
'day\\_of\\_week': current\\_time.strftime('%A')
},
'client\\_ip': request.remote\\_addr
}
# 尝试获取地理位置上下文
try:
response = geoip\\_reader.city(g.context['client\\_ip'])
g.context['location'] = {
'country': response.country.name,
'city': response.city.name,
'latitude': response.location.latitude,
'longitude': response.location.longitude
}
except:
g.context['location'] = {
'country': 'Unknown',
'city': 'Unknown',
'latitude': 0.0,
'longitude': 0.0
}
# 可继续扩展收集设备类型、用户-agent 等更多上下文信息
g.context['user\\_agent'] = request.headers.get('User-Agent')
在 Flask 应用中注册该中间件,使其在每次请求处理前收集上下文信息,存储于全局变量 g
中供后续权限判定使用。
permission_checker.py
):import json
import mysql.connector
from flask import g
db\\_config = {
'user': 'root',
'password': 'your\\_password',
'host': 'localhost',
'database': 'mcp\\_auth\\_db'
}
def check\\_permission(user\\_roles, object\\_type, action\\_type):
try:
conn = mysql.connector.connect(\\*\\*db\\_config)
cursor = conn.cursor(dictionary=True)
# 查询匹配的角色权限策略
query = """
SELECT ap.context\\_rules
FROM auth\\_policies ap
WHERE ap.role\\_id IN (%s)
AND ap.object\\_type = %s
AND ap.action\\_type = %s
"""
# 构建 IN 子句的参数占位符
role\\_ids\\_str = ','.join(['%s'] \\* len(user\\_roles))
query = query % (role\\_ids\\_str, % s, % s)
# 提取角色 ID 列表
role\\_ids = [role['role\\_id'] for role in user\\_roles]
# 执行查询
cursor.execute(query, (\\*role\\_ids, object\\_type, action\\_type))
policies = cursor.fetchall()
if not policies:
return False, "No matching policy found"
# 遍历匹配策略,验证上下文规则
for policy in policies:
context\\_rules = json.loads(policy['context\\_rules'])
# 验证时间规则(简化示例,仅验证时间范围)
if 'time\\_range' in context\\_rules:
current\\_time\\_str = g.context['time']['time\\_of\\_day']
current\\_time = datetime.datetime.strptime(current\\_time\\_str, '%H:%M')
time\\_range = context\\_rules['time\\_range'].split('-')
start\\_time = datetime.datetime.strptime(time\\_range[0], '%H:%M')
end\\_time = datetime.datetime.strptime(time\\_range[1], '%H:%M')
if not (start\\_time <= current\\_time <= end\\_time):
continue # 时间不匹配,跳过此策略
# 验证位置规则(简化示例)
if 'allowed\\_locations' in context\\_rules:
user\\_city = g.context['location']['city']
if user\\_city not in context\\_rules['allowed\\_locations']:
continue # 位置不匹配,跳过此策略
# 若通过所有验证规则,返回允许
return True, "Permission granted by policy"
# 所有策略验证未通过
return False, "Permission denied by context rules"
except mysql.connector.Error as err:
print(f"Database error: {err}")
return False, "Internal server error during permission check"
finally:
if 'conn' in locals():
conn.close()
该模块首先查询用户角色对应的操作与客体权限策略,然后逐一验证策略中的上下文规则(示例中包含时间范围与允许位置规则验证),只有当所有规则验证通过,才判定用户有权限执行请求操作。
resource_api.py
):from flask import Flask, request, jsonify
from permission\\_checker import check\\_permission
from auth\\_service import login # 引入登录接口
from context\\_collector import collect\\_context # 引入上下文收集中间件
app = Flask(\\_\\_name\\_\\_)
# 注册中间件与登录接口
app.before\\_request(collect\\_context)
app.add\\_url\\_rule('/login', 'login', login, methods=['POST'])
@app.route('/resources/<resource\\_type>', methods=['GET', 'POST', 'DELETE'])
def access\\_resource(resource\\_type):
# 获取用户认证信息(此处简化为从请求头获取 token,实际应结合更安全的认证机制)
auth\\_token = request.headers.get('Authorization')
if not auth\\_token:
return jsonify({'error': 'Authentication required'}), 401
# 模拟从 token 解析用户角色(实际应通过安全的 token 验证与解析机制)
# 此处仅为示例,假设 token 包含 user\\_id 与 roles 信息
try:
token\\_payload = json.loads(auth\\_token)
user\\_roles = token\\_payload.get('roles', [])
except:
return jsonify({'error': 'Invalid token'}), 401
# 获取请求方法对应的操作类型
action\\_map = {
'GET': 'read',
'POST': 'create',
'DELETE': 'delete'
}
action\\_type = action\\_map.get(request.method, 'unknown')
# 进行权限判定
permission\\_granted, message = check\\_permission(user\\_roles, resource\\_type, action\\_type)
if not permission\\_granted:
return jsonify({'error': message}), 403
# 权限通过,执行实际资源操作逻辑(此处仅为示例返回成功消息)
return jsonify({'message': f'Successfully accessed {resource\\_type} resource', 'method': request.method}), 200
if \\_\\_name\\_\\_ == '\\_\\_main\\_\\_':
app.run(host='0.0.0.0', port=5000, debug=True)
此接口根据请求的资源类型与操作方法,结合从认证 token 中获取的用户角色,调用权限判定模块进行验证,只有通过判定请求才能访问资源。
unittest
框架,测试代码示例(test_permission_checker.py
):import unittest
from permission\\_checker import check\\_permission
from context\\_collector import collect\\_context
from flask import g
class TestPermissionChecker(unittest.TestCase):
def setUp(self):
# 模拟上下文信息
g.context = {
'time': {'time\\_of\\_day': '10:00'},
'location': {'city': 'New York'}
}
def test\\_permission\\_granted(self):
# 模拟用户角色
user\\_roles = [{'role\\_id': 1, 'role\\_name': 'admin'}]
object\\_type = 'documents'
action\\_type = 'read'
# 调用权限判定函数
granted, message = check\\_permission(user\\_roles, object\\_type, action\\_type)
self.assertTrue(granted)
self.assertEqual(message, 'Permission granted by policy')
def test\\_permission\\_denied\\_by\\_time(self):
# 修改上下文时间,模拟时间不匹配场景
g.context['time']['time\\_of\\_day'] = '20:00'
user\\_roles = [{'role\\_id': 2, 'role\\_name': 'regular\\_user'}]
object\\_type = 'sensitive\\_data'
action\\_type = 'read'
granted, message = check\\_permission(user\\_roles, object\\_type, action\\_type)
self.assertFalse(granted)
self.assertEqual(message, 'Permission denied by context rules')
if \\_\\_name\\_\\_ == '\\_\\_main\\_\\_':
unittest.main()CREATE INDEX idx\\_auth\\_policies ON auth\\_policies (role\\_id, object\\_type, action\\_type);
在代码中,对上下文信息收集部分进行异步优化,避免阻塞主线程处理请求。
1 Sandhu R S, Coyne E J, Feinstein H L, et al. Role - Based Access Control ModelsJ. Computer, 1996, 29(2):38 - 47.
2 Li N, Crampton J, Guo S. A framework for defining and analyzing hierarchical role-based access control modelsJ. IEEE Transactions on Dependable and Secure Computing, 2015, 12(5):525 - 538.
3 Kagal L, Finin T, Joshi A. Context - aware security for semantic web servicesJ. International Semantic Web Conference, 2003:279 - 294.
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。