本方案通过自签证书实现细粒度访问控制,结合非对称加密技术保障数据安全传输。核心功能包括:
使用RSA算法生成2048位密钥对:
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
密钥长度符合行业安全标准,可有效防御暴力破解
通过X.509证书扩展字段注入安全策略:
策略类型 | OID标识符 | 数据类型 | 示例值 |
---|---|---|---|
资源访问白名单 | 1.3.6.1.4.1.12345.1.1 | 字符串列表 | "/api/v1", "/data/export" |
账户授权列表 | 1.3.6.1.4.1.12345.1.2 | 字符串列表 | "admin", "audit_user" |
IP访问规则 | 1.3.6.1.4.1.12345.1.3 | JSON对象 | {"ip_list":"192.168.1.1", "cidr":"10.0.0.0/8"} |
采用SHA-512哈希算法进行自签名,有效期建议不超过90天:
cert = builder.sign(private_key, hashes.SHA512())
短期有效策略可降低证书泄露风险
验证流程包含三重安全检查:
validation_errors
密钥长度/8 - 2*SHA256长度 - 2
(例:2048位密钥支持190字节明文) # 加密
cipher_data = cert.public_key().encrypt(data, padding.OAEP(...))
# 解密
decrypted = private_key.decrypt(cipher_data, padding.OAEP(...))
from __future__ import annotations
import base64
import json
from datetime import datetime, timedelta, UTC
from typing import Tuple
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509 import Certificate, Extension, ObjectIdentifier
from cryptography.x509.oid import NameOID
# 自定义OID定义(示例使用1.3.6.1.4.1.12345命名空间)
ALLOWED_RESOURCES_OID = ObjectIdentifier("1.3.6.1.4.1.12345.1.1")
ALLOWED_ACCOUNTS_OID = ObjectIdentifier("1.3.6.1.4.1.12345.1.2")
SOURCE_IP_RULES_OID = ObjectIdentifier("1.3.6.1.4.1.12345.1.3")
def generate_cert_with_policy(
allowed_resources: list[str],
allowed_accounts: list[str],
ip_rules: dict,
validity_days: int
) -> tuple[bytes, bytes]:
"""
生成包含访问策略的自签名证书
:param allowed_resources: 允许访问的资源路径列表
:param allowed_accounts: 允许使用的账号列表
:param ip_rules: IP规则字典,包含以下键:
- "ip_list": 具体IP列表
- "cidr": CIDR列表
- "internal": 是否允许所有内网IP
- "public": 是否允许所有公网IP
:param validity_days: 证书有效天数
:return: (证书PEM字节, 私钥PEM字节)
"""
# 生成密钥对
private_key = rsa.generate_private_key(
public_exponent = 65537,
key_size = 2048,
)
# 构建主题信息
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MyOrg"),
x509.NameAttribute(NameOID.COMMON_NAME, "Access Control Certificate"),
])
# 构建证书
builder = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(UTC))
.not_valid_after(datetime.now(UTC) + timedelta(days = validity_days))
)
# 添加自定义扩展
extensions = [
# 允许访问的资源
Extension(
ALLOWED_RESOURCES_OID,
critical = True, # 表明扩展包含关键安全策略,否则是辅助拓展信息
value = x509.UnrecognizedExtension(
ALLOWED_RESOURCES_OID,
",".join(allowed_resources).encode()
)
),
# 允许使用的账号
Extension(
ALLOWED_ACCOUNTS_OID,
critical = True, # 表明扩展包含关键安全策略,否则是辅助拓展信息
value = x509.UnrecognizedExtension(
ALLOWED_ACCOUNTS_OID,
",".join(allowed_accounts).encode()
)
),
# IP访问规则
Extension(
SOURCE_IP_RULES_OID,
critical = True, # 表明扩展包含关键安全策略,否则是辅助拓展信息
value = x509.UnrecognizedExtension(
SOURCE_IP_RULES_OID,
json.dumps(ip_rules).encode()
)
)
]
for ext in extensions:
builder = builder.add_extension(ext.value, critical = ext.critical)
# 签名证书
cert = builder.sign(private_key, hashes.SHA512())
# 序列化输出
return (
cert.public_bytes(serialization.Encoding.PEM),
private_key.private_bytes(
encoding = serialization.Encoding.PEM,
format = serialization.PrivateFormat.PKCS8,
encryption_algorithm = serialization.NoEncryption()
)
)
def full_verify_certificate(cert_pem: bytes) -> Tuple[dict, Certificate]:
"""
完整证书验证(包含内容解析和签名验证)
返回结构:
{
"basic_valid": bool, # 基础验证是否通过
"signature_valid": bool, # 签名验证结果
"policy_data": dict, # 策略数据
"validation_errors": list, # 验证错误信息
"cert_info": dict # 证书基础信息
}
"""
cert_parse_result = {
"basic_valid": True,
"signature_valid": False,
"policy_data": {},
"validation_errors": [],
"cert_info": {}
}
try:
# 加载证书
cert = x509.load_pem_x509_certificate(cert_pem)
# 记录基础信息
cert_parse_result["cert_info"].update({
"subject": cert.subject.rfc4514_string(),
"issuer": cert.issuer.rfc4514_string(),
"serial_number": cert.serial_number,
"valid_from": cert.not_valid_before_utc.astimezone().isoformat(),
"valid_to": cert.not_valid_after_utc.astimezone().isoformat(),
"is_self_signed": cert.issuer == cert.subject
})
# 基础有效性验证
current_time = datetime.now(UTC)
if current_time < cert.not_valid_before_utc:
cert_parse_result["validation_errors"].append("证书尚未生效")
cert_parse_result["basic_valid"] = False
if current_time > cert.not_valid_after_utc:
cert_parse_result["validation_errors"].append("证书已过期")
cert_parse_result["basic_valid"] = False
# 签名验证(自签名验证)
public_key = cert.public_key()
try:
# 验证签名算法与证书一致
public_key.verify(
cert.signature,
cert.tbs_certificate_bytes,
padding.PKCS1v15(),
cert.signature_hash_algorithm
)
cert_parse_result["signature_valid"] = True
except InvalidSignature:
cert_parse_result["validation_errors"].append("证书签名无效")
cert_parse_result["signature_valid"] = False
except TypeError as e:
cert_parse_result["validation_errors"].append(f"签名算法不匹配: {str(e)}")
cert_parse_result["signature_valid"] = False
# 策略扩展解析
policy_data = {}
try:
# 资源白名单解析
res_ext = cert.extensions.get_extension_for_oid(
x509.ObjectIdentifier(ALLOWED_RESOURCES_OID.dotted_string)
)
policy_data["allowed_resources"] = res_ext.value.public_bytes().decode().split(",")
except x509.ExtensionNotFound:
cert_parse_result["validation_errors"].append("缺少资源白名单扩展")
try:
# 账户白名单解析
acc_ext = cert.extensions.get_extension_for_oid(
x509.ObjectIdentifier(ALLOWED_ACCOUNTS_OID.dotted_string)
)
policy_data["allowed_accounts"] = acc_ext.value.public_bytes().decode().split(",")
except x509.ExtensionNotFound:
cert_parse_result["validation_errors"].append("缺少账户白名单扩展")
try:
# IP规则解析
ip_ext = cert.extensions.get_extension_for_oid(
x509.ObjectIdentifier(SOURCE_IP_RULES_OID.dotted_string)
)
policy_data.update(json.loads(ip_ext.value.public_bytes().decode()))
except x509.ExtensionNotFound:
cert_parse_result["validation_errors"].append("缺少IP规则扩展")
except json.JSONDecodeError:
cert_parse_result["validation_errors"].append("IP规则格式错误")
cert_parse_result["policy_data"] = policy_data
except ValueError as e:
cert_parse_result["validation_errors"].append(f"证书格式错误: {str(e)}")
cert_parse_result["basic_valid"] = False
return cert_parse_result, Certificate()
else:
return cert_parse_result, cert
def encrypt_by_public_key(encrypt_cert: Certificate, data: bytes) -> bytes:
"""
使用公钥加密数据
最大明文长度 = 密钥模长(字节) - 2 × 哈希函数输出长度(字节) - 2
返回结构:
bytes
"""
max_plaintext_length = encrypt_cert.public_key().key_size // 8 - 2 * hashes.SHA256().digest_size - 2
if len(data) > max_plaintext_length:
raise ValueError(f"明文长度超出限制:当前长度 {len(data)},最大允许 {max_plaintext_length} 字节")
return encrypt_cert.public_key().encrypt(
data,
padding.OAEP(
mgf = padding.MGF1(algorithm = hashes.SHA256()),
algorithm = hashes.SHA256(),
label = None
))
def decrypt_by_private_key(decrypt_private: bytes, data: bytes) -> bytes:
"""
使用私钥解密数据
返回结构:
bytes
"""
rsa_private_key = serialization.load_pem_private_key(
decrypt_private,
password = None # 无加密时设为 None
)
return rsa_private_key.decrypt(
data,
padding.OAEP(
mgf = padding.MGF1(algorithm = hashes.SHA256()),
algorithm = hashes.SHA256(),
label = None
)
)
if __name__ == "__main__":
# 证书生成
# 生成证书的拓展策略信息
# 策略信息包括:此证书允许访问的资源、允许使用此证书的账户、允许访问的IP地址信息
cert_bytes, private_key_bytes = generate_cert_with_policy(
allowed_resources = ["12345", "67890"],
allowed_accounts = ["12345", "67890"],
ip_rules = {
"ip_list": ["127.0.0.1"],
"cidr": ["192.168.0.0/16"],
"internal": True,
"public": True
},
validity_days = 30
)
# 证书验证: 验证证书的有效性、完整性、签名、证书拓展的策略信息
result, cert = full_verify_certificate(cert_bytes)
# print(f"验证结果:{json.dumps(result, indent = 4, ensure_ascii = False)}")
print(f"证书验证成功:{result['basic_valid']}")
print(f"签名验证成功:{result['signature_valid']}")
print(f"密钥长度:{cert.public_key().key_size // 8}")
# 使用证书加密敏感数据
plain_data = {"key": "test_1234567890", "value": "test_1234567890"}
sensitive_data = json.dumps(plain_data).encode()
cipher_data = encrypt_by_public_key(cert, sensitive_data)
print(f"加密前的数据:{base64.b64encode(sensitive_data).decode()}, 长度:{len(sensitive_data)} "
f"加密后的数据:{base64.b64encode(cipher_data).decode()}, 长度:{len(cipher_data)}")
# 使用私钥解密敏感数据
decrypted_data = decrypt_by_private_key(private_key_bytes, cipher_data)
print(f"解密后的数据:{base64.b64encode(decrypted_data).decode()}, 长度:{len(decrypted_data)}")
assert decrypted_data == sensitive_data, "使用私钥解密失败"
print(f"解密成功:{decrypted_data == sensitive_data}")
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。