首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >自签证书的生成与应用举例

自签证书的生成与应用举例

原创
作者头像
密码学人CipherHUB
发布2025-05-13 16:15:29
发布2025-05-13 16:15:29
23210
代码可运行
举报
文章被收录于专栏:数安视界数安视界
运行总次数:0
代码可运行

自签(RSA)证书的生成与使用

一、概述

本方案通过自签证书实现细粒度访问控制,结合非对称加密技术保障数据安全传输。核心功能包括:

  1. 生成带访问策略的自签证书
  2. 证书有效性验证与策略解析
  3. 基于证书公钥的敏感数据加密
  4. 使用私钥进行数据解密

二、证书生成流程

1. 密钥对生成

使用RSA算法生成2048位密钥对:

代码语言:python
代码运行次数:0
运行
复制
private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048
)

密钥长度符合行业安全标准,可有效防御暴力破解

2. 策略嵌入

通过X.509证书扩展字段注入安全策略:

策略类型

OID标识符

数据类型

示例值

资源访问白名单

1.3.6.1.4.1.12345.1.1

字符串列表

"/api/v1", "/data/export"

账户授权列表

1.3.6.1.4.1.12345.1.2

字符串列表

"admin", "audit_user"

IP访问规则

1.3.6.1.4.1.12345.1.3

JSON对象

{"ip_list":"192.168.1.1", "cidr":"10.0.0.0/8"}

3. 证书签名

采用SHA-512哈希算法进行自签名,有效期建议不超过90天:

代码语言:python
代码运行次数:0
运行
复制
cert = builder.sign(private_key, hashes.SHA512())

短期有效策略可降低证书泄露风险

三、证书验证机制

验证流程包含三重安全检查:

  1. 基础验证
    • 证书有效期检查(UTC时间戳比对)
    • 证书链完整性验证
  2. 签名验证 public_key.verify(cert.signature, cert.tbs_certificate_bytes, ...)使用PKCS#1 v1.5填充方案验证签名真实性
  3. 策略解析undefined自动提取扩展字段并转换为可读策略,异常策略会记录至validation_errors

四、数据安全分发

  • 支持最大加密长度:密钥长度/8 - 2*SHA256长度 - 2(例:2048位密钥支持190字节明文)
  • 采用OAEP填充方案,比传统PKCS#1更安全

代码示例

代码语言:python
代码运行次数:0
运行
复制
# 加密
cipher_data = cert.public_key().encrypt(data, padding.OAEP(...))

# 解密
decrypted = private_key.decrypt(cipher_data, padding.OAEP(...))

五、完整代码

代码语言:python
代码运行次数:0
运行
复制
from __future__ import annotations

import base64
import json
from datetime import datetime, timedelta, UTC
from typing import Tuple

from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509 import Certificate, Extension, ObjectIdentifier
from cryptography.x509.oid import NameOID

# 自定义OID定义(示例使用1.3.6.1.4.1.12345命名空间)
ALLOWED_RESOURCES_OID = ObjectIdentifier("1.3.6.1.4.1.12345.1.1")
ALLOWED_ACCOUNTS_OID = ObjectIdentifier("1.3.6.1.4.1.12345.1.2")
SOURCE_IP_RULES_OID = ObjectIdentifier("1.3.6.1.4.1.12345.1.3")


def generate_cert_with_policy(
    allowed_resources: list[str],
    allowed_accounts: list[str],
    ip_rules: dict,
    validity_days: int
) -> tuple[bytes, bytes]:
    """
    生成包含访问策略的自签名证书

    :param allowed_resources: 允许访问的资源路径列表
    :param allowed_accounts: 允许使用的账号列表
    :param ip_rules: IP规则字典,包含以下键:
        - "ip_list": 具体IP列表
        - "cidr": CIDR列表
        - "internal": 是否允许所有内网IP
        - "public": 是否允许所有公网IP
    :param validity_days: 证书有效天数
    :return: (证书PEM字节, 私钥PEM字节)
    """
    # 生成密钥对
    private_key = rsa.generate_private_key(
        public_exponent = 65537,
        key_size = 2048,
    )

    # 构建主题信息
    subject = issuer = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MyOrg"),
        x509.NameAttribute(NameOID.COMMON_NAME, "Access Control Certificate"),
    ])

    # 构建证书
    builder = (
        x509.CertificateBuilder()
        .subject_name(subject)
        .issuer_name(issuer)
        .public_key(private_key.public_key())
        .serial_number(x509.random_serial_number())
        .not_valid_before(datetime.now(UTC))
        .not_valid_after(datetime.now(UTC) + timedelta(days = validity_days))
    )

    # 添加自定义扩展
    extensions = [
        # 允许访问的资源
        Extension(
            ALLOWED_RESOURCES_OID,
            critical = True,  # 表明扩展包含关键安全策略,否则是辅助拓展信息
            value = x509.UnrecognizedExtension(
                ALLOWED_RESOURCES_OID,
                ",".join(allowed_resources).encode()
            )
        ),
        # 允许使用的账号
        Extension(
            ALLOWED_ACCOUNTS_OID,
            critical = True,  # 表明扩展包含关键安全策略,否则是辅助拓展信息
            value = x509.UnrecognizedExtension(
                ALLOWED_ACCOUNTS_OID,
                ",".join(allowed_accounts).encode()
            )
        ),
        # IP访问规则
        Extension(
            SOURCE_IP_RULES_OID,
            critical = True,  # 表明扩展包含关键安全策略,否则是辅助拓展信息
            value = x509.UnrecognizedExtension(
                SOURCE_IP_RULES_OID,
                json.dumps(ip_rules).encode()
            )
        )
    ]

    for ext in extensions:
        builder = builder.add_extension(ext.value, critical = ext.critical)

    # 签名证书
    cert = builder.sign(private_key, hashes.SHA512())

    # 序列化输出
    return (
        cert.public_bytes(serialization.Encoding.PEM),
        private_key.private_bytes(
            encoding = serialization.Encoding.PEM,
            format = serialization.PrivateFormat.PKCS8,
            encryption_algorithm = serialization.NoEncryption()
        )
    )


def full_verify_certificate(cert_pem: bytes) -> Tuple[dict, Certificate]:
    """
    完整证书验证(包含内容解析和签名验证)

    返回结构:
    {
        "basic_valid": bool,          # 基础验证是否通过
        "signature_valid": bool,      # 签名验证结果
        "policy_data": dict,          # 策略数据
        "validation_errors": list,    # 验证错误信息
        "cert_info": dict             # 证书基础信息
    }
    """
    cert_parse_result = {
        "basic_valid": True,
        "signature_valid": False,
        "policy_data": {},
        "validation_errors": [],
        "cert_info": {}
    }

    try:
        # 加载证书
        cert = x509.load_pem_x509_certificate(cert_pem)

        # 记录基础信息
        cert_parse_result["cert_info"].update({
            "subject": cert.subject.rfc4514_string(),
            "issuer": cert.issuer.rfc4514_string(),
            "serial_number": cert.serial_number,
            "valid_from": cert.not_valid_before_utc.astimezone().isoformat(),
            "valid_to": cert.not_valid_after_utc.astimezone().isoformat(),
            "is_self_signed": cert.issuer == cert.subject
        })

        # 基础有效性验证
        current_time = datetime.now(UTC)
        if current_time < cert.not_valid_before_utc:
            cert_parse_result["validation_errors"].append("证书尚未生效")
            cert_parse_result["basic_valid"] = False
        if current_time > cert.not_valid_after_utc:
            cert_parse_result["validation_errors"].append("证书已过期")
            cert_parse_result["basic_valid"] = False

        # 签名验证(自签名验证)
        public_key = cert.public_key()
        try:
            # 验证签名算法与证书一致
            public_key.verify(
                cert.signature,
                cert.tbs_certificate_bytes,
                padding.PKCS1v15(),
                cert.signature_hash_algorithm
            )
            cert_parse_result["signature_valid"] = True
        except InvalidSignature:
            cert_parse_result["validation_errors"].append("证书签名无效")
            cert_parse_result["signature_valid"] = False
        except TypeError as e:
            cert_parse_result["validation_errors"].append(f"签名算法不匹配: {str(e)}")
            cert_parse_result["signature_valid"] = False

        # 策略扩展解析
        policy_data = {}
        try:
            # 资源白名单解析
            res_ext = cert.extensions.get_extension_for_oid(
                x509.ObjectIdentifier(ALLOWED_RESOURCES_OID.dotted_string)
            )
            policy_data["allowed_resources"] = res_ext.value.public_bytes().decode().split(",")
        except x509.ExtensionNotFound:
            cert_parse_result["validation_errors"].append("缺少资源白名单扩展")

        try:
            # 账户白名单解析
            acc_ext = cert.extensions.get_extension_for_oid(
                x509.ObjectIdentifier(ALLOWED_ACCOUNTS_OID.dotted_string)
            )
            policy_data["allowed_accounts"] = acc_ext.value.public_bytes().decode().split(",")
        except x509.ExtensionNotFound:
            cert_parse_result["validation_errors"].append("缺少账户白名单扩展")

        try:
            # IP规则解析
            ip_ext = cert.extensions.get_extension_for_oid(
                x509.ObjectIdentifier(SOURCE_IP_RULES_OID.dotted_string)
            )
            policy_data.update(json.loads(ip_ext.value.public_bytes().decode()))
        except x509.ExtensionNotFound:
            cert_parse_result["validation_errors"].append("缺少IP规则扩展")
        except json.JSONDecodeError:
            cert_parse_result["validation_errors"].append("IP规则格式错误")

        cert_parse_result["policy_data"] = policy_data

    except ValueError as e:
        cert_parse_result["validation_errors"].append(f"证书格式错误: {str(e)}")
        cert_parse_result["basic_valid"] = False
        return cert_parse_result, Certificate()
    else:
        return cert_parse_result, cert


def encrypt_by_public_key(encrypt_cert: Certificate, data: bytes) -> bytes:
    """
    使用公钥加密数据
    最大明文长度 = 密钥模长(字节) - 2 × 哈希函数输出长度(字节) - 2

    返回结构:
    bytes
    """
    max_plaintext_length = encrypt_cert.public_key().key_size // 8 - 2 * hashes.SHA256().digest_size - 2
    if len(data) > max_plaintext_length:
        raise ValueError(f"明文长度超出限制:当前长度 {len(data)},最大允许 {max_plaintext_length} 字节")
    return encrypt_cert.public_key().encrypt(
        data,
        padding.OAEP(
            mgf = padding.MGF1(algorithm = hashes.SHA256()),
            algorithm = hashes.SHA256(),
            label = None
        ))


def decrypt_by_private_key(decrypt_private: bytes, data: bytes) -> bytes:
    """
    使用私钥解密数据

    返回结构:
    bytes
    """
    rsa_private_key = serialization.load_pem_private_key(
        decrypt_private,
        password = None  # 无加密时设为 None
    )
    return rsa_private_key.decrypt(
        data,
        padding.OAEP(
            mgf = padding.MGF1(algorithm = hashes.SHA256()),
            algorithm = hashes.SHA256(),
            label = None
        )
    )


if __name__ == "__main__":
    # 证书生成
    # 生成证书的拓展策略信息
    # 策略信息包括:此证书允许访问的资源、允许使用此证书的账户、允许访问的IP地址信息
    cert_bytes, private_key_bytes = generate_cert_with_policy(
        allowed_resources = ["12345", "67890"],
        allowed_accounts = ["12345", "67890"],
        ip_rules = {
            "ip_list": ["127.0.0.1"],
            "cidr": ["192.168.0.0/16"],
            "internal": True,
            "public": True
        },
        validity_days = 30
    )

    # 证书验证: 验证证书的有效性、完整性、签名、证书拓展的策略信息
    result, cert = full_verify_certificate(cert_bytes)
    # print(f"验证结果:{json.dumps(result, indent = 4, ensure_ascii = False)}")
    print(f"证书验证成功:{result['basic_valid']}")
    print(f"签名验证成功:{result['signature_valid']}")
    print(f"密钥长度:{cert.public_key().key_size // 8}")

    # 使用证书加密敏感数据
    plain_data = {"key": "test_1234567890", "value": "test_1234567890"}
    sensitive_data = json.dumps(plain_data).encode()
    cipher_data = encrypt_by_public_key(cert, sensitive_data)
    print(f"加密前的数据:{base64.b64encode(sensitive_data).decode()}, 长度:{len(sensitive_data)} "
          f"加密后的数据:{base64.b64encode(cipher_data).decode()}, 长度:{len(cipher_data)}")

    # 使用私钥解密敏感数据
    decrypted_data = decrypt_by_private_key(private_key_bytes, cipher_data)
    print(f"解密后的数据:{base64.b64encode(decrypted_data).decode()}, 长度:{len(decrypted_data)}")

    assert decrypted_data == sensitive_data, "使用私钥解密失败"
    print(f"解密成功:{decrypted_data == sensitive_data}")
运行效果
运行效果

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 自签(RSA)证书的生成与使用
    • 一、概述
    • 二、证书生成流程
      • 1. 密钥对生成
      • 2. 策略嵌入
      • 3. 证书签名
    • 三、证书验证机制
    • 四、数据安全分发
      • 代码示例
    • 五、完整代码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档