pyca / cryptography

cryptography is a package designed to expose cryptographic primitives and recipes to Python developers.
https://cryptography.io
Other
6.4k stars 1.47k forks source link

x509.NoticeReference(organization, notice_numbers)Function Error #11157

Open dulanshuangqiao opened 6 days ago

dulanshuangqiao commented 6 days ago

I used x509.NoticeReference(organization, notice_numbers) to generate certificate content, but the function assigned the value to the wrong node, causing problems viewing the certificate. I used a script to extract the certificate, and then used the extracted content to assemble the certificate, but there was a problem with the processing of NoticeReference. I used Generate.py to read the content of 216.9.35.59.json to generate it. The obtained certificate was different from the original certificate in NoticeReference. 216.9.35.59.json

The generated code is as follows: import os import json from datetime import datetime from cryptography import x509 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import hashes from cryptography.x509.oid import ExtensionOID, NameOID, ObjectIdentifier from cryptography.x509 import AccessDescription, UniformResourceIdentifier from cryptography.x509 import DNSName, RFC822Name, DirectoryName, IPAddress, OtherName, RegisteredID from ipaddress import ip_network import base64

def create_general_name(name_dict): if "DNSName" in name_dict: return DNSName(name_dict["DNSName"]) elif "RFC822Name" in name_dict: return RFC822Name(name_dict["RFC822Name"]) elif "URI" in name_dict: return UniformResourceIdentifier(name_dict["URI"]) elif "DirectoryName" in name_dict: return DirectoryName(x509.Name.from_rfc4514_string(name_dict["DirectoryName"])) elif "IPAddress" in name_dict: return IPAddress(ip_network(name_dict["IPAddress"], strict=False)) elif "OtherName" in name_dict: type_id = ObjectIdentifier(name_dict["OtherName"]["type_id"]) value = base64.b64decode(name_dict["OtherName"]["value"]) return OtherName(type_id, value) elif "RegisteredID" in name_dict: return RegisteredID(ObjectIdentifier(name_dict["RegisteredID"])) else: raise ValueError("Unknown GeneralName type")

def create_general_names(names_list): return [create_general_name(name) for name in names_list]

def parse_dn(dn): dn_parts = [] temp_part = '' in_escape = False

for char in dn:
    if char == '\\' and not in_escape:
        in_escape = True
    elif char == ',' and not in_escape:
        dn_parts.append(temp_part)
        temp_part = ''
    else:
        temp_part += char
        in_escape = False

if temp_part:
    dn_parts.append(temp_part)

attributes = []
for part in dn_parts:
    if '=' in part:
        oid, value = part.split('=', 1)
        oid = oid.strip()
        value = value.strip()
        if oid == "C":
            if len(value) != 2:
                raise ValueError(f"Country name must be a 2 character country code: {value}")
            attributes.append(x509.NameAttribute(NameOID.COUNTRY_NAME, value))
        elif oid == "O":
            attributes.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, value))
        elif oid == "DC":  # 未知扩展部分问题导致添加
            attributes.append(x509.NameAttribute(NameOID.DOMAIN_COMPONENT, value))
        elif oid == 'STREET':  # 未知扩展部分问题导致添加
            attributes.append(x509.NameAttribute(NameOID.STREET_ADDRESS, value))
        elif oid == "OU":
            attributes.append(x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, value))
        elif oid == "CN":
            attributes.append(x509.NameAttribute(NameOID.COMMON_NAME, value))
        elif oid == "L":
            attributes.append(x509.NameAttribute(NameOID.LOCALITY_NAME, value))
        elif oid == "ST":
            attributes.append(x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, value))
        else:
            attributes.append(x509.NameAttribute(x509.ObjectIdentifier(oid), value))
return x509.Name(attributes)

def generate_certificate_policies(certificate_policies): policy_information_list = []

for policy in certificate_policies:
    policy_identifier = ObjectIdentifier(policy["policy_identifier"]["oid"])

    policy_qualifiers = []
    for qualifier in policy["policy_qualifiers"]:
        if isinstance(qualifier, str):
            policy_qualifiers.append(qualifier)
        else:
            notice_reference_info = qualifier.get("notice_reference")
            if notice_reference_info:
                notice_reference = x509.NoticeReference(
                    organization=notice_reference_info["organization"],
                    notice_numbers=notice_reference_info["notice_numbers"]
                )
            else:
                notice_reference = None

            explicit_text = qualifier.get("explicit_text")

            user_notice = x509.UserNotice(
                notice_reference=notice_reference,
                explicit_text=explicit_text
            )
            policy_qualifiers.append(user_notice)

    policy_info = x509.PolicyInformation(
        policy_identifier=policy_identifier,
        policy_qualifiers=policy_qualifiers
    )
    policy_information_list.append(policy_info)

return x509.CertificatePolicies(policy_information_list)

def parse_access_location(location): if location["type"] == "URI": return x509.UniformResourceIdentifier(location["value"]) else: raise ValueError(f"Unsupported GeneralName type: {location['type']}") def add_unknown_extensions(builder, unknown_extensions): for ext_oid, ext_info in unknown_extensions.items(): try: oid = ObjectIdentifier(ext_oid) value = bytes.fromhex(ext_info["value"]) critical = ext_info["critical"] ext = x509.UnrecognizedExtension(oid, value) builder = builder.add_extension(ext, critical=critical) except ValueError as e: print(f"Warning: Skipping invalid OID {ext_oid}: {str(e)}") return builder

def generate_certificate_from_json(json_data, output_path): version = json_data["Version"]["value"] serial_number = int(json_data["Serial_Number"]["value"]) # 获取字段是可能获取到0,此时无法组装证书(The serial number should be positive.) issuer_dn = json_data["Issuer"]["value"] validity = json_data["Validity"]["value"] subject_dn = json_data["Subject"]["value"] subject_public_key_info = json_data["Subject_Public_Key_Info"]["value"] authority_key_identifier = json_data.get("Authority_Key_Identifier") subject_key_identifier = json_data.get("Subject_Key_Identifier") key_usage = json_data.get("key_Usage") certificate_policies = json_data.get("Certificate_Policies") subject_alt_name = json_data.get("Subject_Alternative_Name") issuer_alt_name = json_data.get("Issuer_Alternative_Name") basic_constraints = json_data.get("Basic_Constraints") name_constraints = json_data.get("Name_Constraints") policy_constraints = json_data.get("Policy_Constraints") extended_key_usage = json_data.get("Extended_Key_Usage") crl_distribution_points = json_data.get("CRL_Distribution_Points") inhibit_any_policy = json_data.get("Inhibit_anyPolicy") freshest_crl = json_data.get("Freshest_CRL") authority_information_access = json_data.get("Authority_Information_Access") subject_information_access = json_data.get("Subject_Information_Access") unknown_extensions = json_data["Extensions"]["value"].get("Unknown", {})

valid_from = datetime.strptime(validity["valid_from"], '%Y-%m-%d %H:%M:%S')
valid_to = datetime.strptime(validity["valid_to"], '%Y-%m-%d %H:%M:%S')

issuer = parse_dn(issuer_dn)
subject = parse_dn(subject_dn)

public_key = serialization.load_pem_public_key(subject_public_key_info.encode())

builder = x509.CertificateBuilder(
).subject_name(
    subject
).issuer_name(
    issuer
).public_key(
    public_key
).serial_number(
    serial_number
).not_valid_before(
    valid_from
).not_valid_after(
    valid_to
)

if authority_key_identifier and authority_key_identifier["value"] is not None:
    authority_cert_issuer = [
        x509.DirectoryName(parse_dn(issuer)) for issuer in authority_key_identifier["value"].get("authority_cert_issuer", [])
    ] if authority_key_identifier["value"].get("authority_cert_issuer") else None
    aki_extension = x509.AuthorityKeyIdentifier(
        key_identifier=bytes.fromhex(authority_key_identifier["value"]["key_identifier"]),
        authority_cert_issuer=authority_cert_issuer,
        authority_cert_serial_number=int(authority_key_identifier["value"]["authority_cert_serial_number"]) if authority_key_identifier["value"]["authority_cert_serial_number"] is not None else None
    )
    builder = builder.add_extension(aki_extension, critical=authority_key_identifier["critical"])

if subject_key_identifier and subject_key_identifier["value"] is not None:
    ski_extension = x509.SubjectKeyIdentifier(
        bytes.fromhex(subject_key_identifier["value"]),
    )
    builder = builder.add_extension(ski_extension, critical=subject_key_identifier["critical"])

if key_usage and key_usage["value"] is not None:
    ku_extension = x509.KeyUsage(
        digital_signature=key_usage["value"]["digital_signature"],
        content_commitment=key_usage["value"]["content_commitment"],
        key_encipherment=key_usage["value"]["key_encipherment"],
        data_encipherment=key_usage["value"]["data_encipherment"],
        key_agreement=key_usage["value"]["key_agreement"],
        key_cert_sign=key_usage["value"]["key_cert_sign"],
        crl_sign=key_usage["value"]["crl_sign"],
        encipher_only=key_usage["value"].get("encipher_only", False),
        decipher_only=key_usage["value"].get("decipher_only", False),
    )
    builder = builder.add_extension(ku_extension, critical=key_usage["critical"])

if certificate_policies and certificate_policies["value"] is not None:
    cp_extension = generate_certificate_policies(certificate_policies["value"])
    builder = builder.add_extension(cp_extension, critical=certificate_policies["critical"])

if subject_alt_name and subject_alt_name["value"] is not None:
    san_list = create_general_names(subject_alt_name["value"])
    san_extension = x509.SubjectAlternativeName(san_list)
    builder = builder.add_extension(san_extension, critical=subject_alt_name["critical"])

if issuer_alt_name and issuer_alt_name["value"] is not None:
    ian_list = create_general_names(issuer_alt_name["value"])
    ian_extension = x509.IssuerAlternativeName(ian_list)
    builder = builder.add_extension(ian_extension, critical=json_data["Issuer_Alternative_Name"]["critical"])

if basic_constraints and basic_constraints["value"] is not None:
    bc_extension = x509.BasicConstraints(
        ca=basic_constraints["value"]["ca"],
        path_length=basic_constraints["value"]["path_length"],
    )
    builder = builder.add_extension(bc_extension, critical=basic_constraints["critical"])

if name_constraints and name_constraints["value"] is not None:
    permitted_subtrees = create_general_names(name_constraints["value"]["permitted_subtrees"]) if name_constraints["value"].get("permitted_subtrees") else None
    excluded_subtrees = create_general_names(name_constraints["value"]["excluded_subtrees"]) if name_constraints["value"].get("excluded_subtrees") else None
    nc_extension = x509.NameConstraints(
        permitted_subtrees=permitted_subtrees,
        excluded_subtrees=excluded_subtrees
    )
    builder = builder.add_extension(nc_extension, critical=json_data["Name_Constraints"]["critical"])

if policy_constraints and policy_constraints["value"] is not None:
    pc_extension = x509.PolicyConstraints(
        require_explicit_policy=policy_constraints["value"]["require_explicit_policy"],
        inhibit_policy_mapping=policy_constraints["value"]["inhibit_policy_mapping"],
    )
    builder = builder.add_extension(pc_extension, critical=json_data["Policy_Constraints"]["critical"])

if extended_key_usage and extended_key_usage["value"] is not None:
    eku_extension = x509.ExtendedKeyUsage([
        ObjectIdentifier(eku["oid"]) for eku in extended_key_usage["value"]
    ])
    builder = builder.add_extension(eku_extension, critical=json_data["Extended_Key_Usage"]["critical"])

if "CRL_Distribution_Points" in json_data and json_data["CRL_Distribution_Points"]["value"]:
    dp_objects = []
    for dp_info in json_data["CRL_Distribution_Points"]["value"]:
        full_name = [x509.UniformResourceIdentifier(uri) for uri in dp_info.get("full_name", [])]
        relative_name = x509.RelativeDistinguishedName(dp_info["relative_name"]) if dp_info.get("relative_name") else None
        crl_issuer = x509.GeneralNames(create_general_names(dp_info.get("crl_issuer", []))) if dp_info.get("crl_issuer") else None
        reasons = x509.ReasonFlags(int(dp_info["reasons"])) if dp_info.get("reasons") else None

        dp = x509.DistributionPoint(
            full_name=full_name,
            relative_name=relative_name,
            crl_issuer=crl_issuer,
            reasons=reasons
        )
        dp_objects.append(dp)

    # 创建 CRL Distribution Points 扩展
    crl_dp_extension = x509.CRLDistributionPoints(dp_objects)
    builder = builder.add_extension(crl_dp_extension, critical=json_data["CRL_Distribution_Points"]["critical"])

if inhibit_any_policy and inhibit_any_policy["value"] is not None:
    iap_extension = x509.InhibitAnyPolicy(
        skip_certs=inhibit_any_policy["value"],
    )
    builder = builder.add_extension(iap_extension, critical=json_data["Inhibit_anyPolicy"]["critical"])

if freshest_crl and freshest_crl["value"] is not None:
    dp_objects = []
    for dp_info in freshest_crl["value"]:
        full_name = [
            UniformResourceIdentifier(uri) for uri in dp_info["full_name"]
        ] if "full_name" in dp_info else []
        crl_issuer = create_general_names(dp_info["crl_issuer"]) if "crl_issuer" in dp_info else None

        relative_name = dp_info.get("relative_name")
        reasons = dp_info.get("reasons")

        dp = x509.DistributionPoint(
            full_name=full_name,
            relative_name=relative_name,
            reasons=reasons,
            crl_issuer=crl_issuer
        )
        dp_objects.append(dp)

    fcrl_extension = x509.FreshestCRL(dp_objects)
    builder = builder.add_extension(fcrl_extension, critical=json_data["Freshest_CRL"]["critical"])

if authority_information_access and authority_information_access["value"] is not None:
    aia_extension = x509.AuthorityInformationAccess([
        AccessDescription(
            ObjectIdentifier(access["access_method"]["oid"]),
            UniformResourceIdentifier(access["access_location"])
        ) for access in authority_information_access["value"]
    ])
    builder = builder.add_extension(aia_extension, critical=json_data["Authority_Information_Access"]["critical"])

if subject_information_access and subject_information_access["value"] is not None:
    sia_extension = x509.SubjectInformationAccess([
        AccessDescription(
            ObjectIdentifier(access["access_method"]["oid"]),
            UniformResourceIdentifier(access["access_location"])
        ) for access in subject_information_access["value"]
    ])
    builder = builder.add_extension(sia_extension, critical=json_data["Subject_Information_Access"]["critical"])

builder = add_unknown_extensions(builder, unknown_extensions)

private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048,
)

certificate = builder.sign(
    private_key=private_key,
    algorithm=hashes.SHA256(),
)

with open(output_path, 'wb') as f:
    f.write(certificate.public_bytes(serialization.Encoding.PEM))

print(f"Certificate generated successfully for {output_path}")

def process_json_files(input_folder, output_folder): for filename in os.listdir(input_folder): if filename.endswith('.json'): json_path = os.path.join(input_folder, filename) print(f"Processing file: {json_path}") try: with open(json_path, 'r') as json_file: json_data = json.load(json_file) output_path = os.path.join(output_folder, filename.replace('.json', '.pem')) generate_certificate_from_json(json_data, output_path) except Exception as e: print(f"Error processing file {json_path}: {e}")

if name == "main": input_folder = './jsons' output_folder = './Gcerts' os.makedirs(output_folder, exist_ok=True) process_json_files(input_folder, output_folder)

The original certificate is as follows: -----BEGIN CERTIFICATE----- MIIHLDCCBhSgAwIBAgIDASR9MA0GCSqGSIb3DQEBBQUAMIGMMQswCQYDVQQGEwJJ TDEWMBQGA1UEChMNU3RhcnRDb20gTHRkLjErMCkGA1UECxMiU2VjdXJlIERpZ2l0 YWwgQ2VydGlmaWNhdGUgU2lnbmluZzE4MDYGA1UEAxMvU3RhcnRDb20gQ2xhc3Mg MiBQcmltYXJ5IEludGVybWVkaWF0ZSBTZXJ2ZXIgQ0EwHhcNMTIxMjMwMTQ1NDM4 WhcNMTUwMTAxMDYxODMxWjCBzzEZMBcGA1UEDRMQam5pOFZaYWQ1TFpvb3ZURjEL MAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExGDAWBgNVBAcTD1JhbmNo byBTYW50YSBGZTEuMCwGA1UEChMlRGlnaXRhbCBQb2ludCBTb2x1dGlvbnMgKFNo YXduIEhvZ2FuKTEbMBkGA1UEAxQSKi5kaWdpdGFscG9pbnQuY29tMSkwJwYJKoZI hvcNAQkBFhp3ZWJtYXN0ZXJAZGlnaXRhbHBvaW50LmNvbTCCASIwDQYJKoZIhvcN AQEBBQADggEPADCCAQoCggEBALetYG8uxKInrVi1floeGilDvJy/07nuJsdh19nP iwCsvITtuAab9kUoNXCvGTz+9Q/FT+MDJKjuNba5vTow4zCa8R9D3ZnSWMGZS/AE KDUmiD9MowsLaQ90D+FY5g/NymrJU0i4DVwnbdfOPEovAoIgbd0j80AzsZ++vXIx HuhU8rL+CePl3kTwjpc9Nx0EEhDcaOoVMe1hC45/+r5gCfzl+tSs26rIMZunv5uP A7m+kPnyAmAnMDkfH7IWKETMSxSrtBUMONzbe4M6cKeO44n2p9wcQNsZwA34EcDZ oG45oc9Sd5Ix1nBdZUdI42PjZvtUPpa1ixvmmJ0wFt9uTGECAwEAAaOCA1AwggNM MAkGA1UdEwQCMAAwCwYDVR0PBAQDAgOoMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggr BgEFBQcDATAdBgNVHQ4EFgQU+zqVxprr0DTmO8ROCk1vf8ppceAwHwYDVR0jBBgw FoAUEdsjRf1UzGpxb4SKA9e+9wEvJoYwgYsGA1UdEQSBgzCBgIISKi5kaWdpdGFs cG9pbnQuY29tghBkaWdpdGFscG9pbnQuY29tggxkcHN0YXRpYy5jb22CDiouZHBz dGF0aWMuY29tggVkcC5seYIHKi5kcC5seYITZGlnaXRhbHBvaW50YWRzLmNvbYIV Ki5kaWdpdGFscG9pbnRhZHMuY29tMIIBVgYDVR0gBIIBTTCCAUkwCAYGZ4EMAQIC MIIBOwYLKwYBBAGBtTcBAgMwggEqMC4GCCsGAQUFBwIBFiJodHRwOi8vd3d3LnN0 YXJ0c3NsLmNvbS9wb2xpY3kucGRmMIH3BggrBgEFBQcCAjCB6jAnFiBTdGFydENv bSBDZXJ0aWZpY2F0aW9uIEF1dGhvcml0eTADAgEBGoG+VGhpcyBjZXJ0aWZpY2F0 ZSB3YXMgaXNzdWVkIGFjY29yZGluZyB0byB0aGUgQ2xhc3MgMiBWYWxpZGF0aW9u IHJlcXVpcmVtZW50cyBvZiB0aGUgU3RhcnRDb20gQ0EgcG9saWN5LCByZWxpYW5j ZSBvbmx5IGZvciB0aGUgaW50ZW5kZWQgcHVycG9zZSBpbiBjb21wbGlhbmNlIG9m IHRoZSByZWx5aW5nIHBhcnR5IG9ibGlnYXRpb25zLjA1BgNVHR8ELjAsMCqgKKAm hiRodHRwOi8vY3JsLnN0YXJ0c3NsLmNvbS9jcnQyLWNybC5jcmwwgY4GCCsGAQUF BwEBBIGBMH8wOQYIKwYBBQUHMAGGLWh0dHA6Ly9vY3NwLnN0YXJ0c3NsLmNvbS9z dWIvY2xhc3MyL3NlcnZlci9jYTBCBggrBgEFBQcwAoY2aHR0cDovL2FpYS5zdGFy dHNzbC5jb20vY2VydHMvc3ViLmNsYXNzMi5zZXJ2ZXIuY2EuY3J0MCMGA1UdEgQc MBqGGGh0dHA6Ly93d3cuc3RhcnRzc2wuY29tLzANBgkqhkiG9w0BAQUFAAOCAQEA W9pxDRXuxuOqs1a1/qWCzHmsDx/N7IlDu2i8f30vdAs9lwqU4s+LXqmEnHHbofbl r2qdfjeYA/jZ1N/wUwTjusAYOOsL++/eYm8oca392py0Mh0vvGijLOGF0SG6Gm9h 822My01/TAURy5ZA81E+GMbu6bEN3u1OwtN1vWFXjoL5r9IxhwO6VDT4VwlBMg07 LNHhPbQuVe9pgZsyngWZaS8wVp4dtzxJNp4Qkb6xYTmqbu0q5qjEh7sm2+r0cX62 mW653JZdtzypojdTmR1KqGW+JBzjcVYXaiah181bp3dNCN3kw4eUz5yxzw2L3F0p mWyTVOAzC05Nc2oGstQHJw== -----END CERTIFICATE----- View the results comparison(Left generated right original): d3d52a28166613a6cce35cb61692fd0 Parse and compare, marking the tags of unmatched NoticeReference (generated on the left, original on the right): 221eee4bb8702bcfc429d5660023d96

alex commented 6 days ago

This is a very large reproducer with many moving pieces. Can you please provide a minimal reproducer

On Tue, Jun 25, 2024, 10:34 AM dulanshuangqiao @.***> wrote:

I used x509.NoticeReference(organization, notice_numbers) to generate certificate content, but the function assigned the value to the wrong node, causing problems viewing the certificate. I used a script to extract the certificate, and then used the extracted content to assemble the certificate, but there was a problem with the processing of NoticeReference. I used Generate.py to read the content of 216.9.35.59.json to generate it. The obtained certificate was different from the original certificate in NoticeReference. 216.9.35.59.json https://github.com/user-attachments/files/15972920/216.9.35.59.json

The generated code is as follows: import os import json from datetime import datetime from cryptography import x509 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import hashes from cryptography.x509.oid import ExtensionOID, NameOID, ObjectIdentifier from cryptography.x509 import AccessDescription, UniformResourceIdentifier from cryptography.x509 import DNSName, RFC822Name, DirectoryName, IPAddress, OtherName, RegisteredID from ipaddress import ip_network import base64

def create_general_name(name_dict): if "DNSName" in name_dict: return DNSName(name_dict["DNSName"]) elif "RFC822Name" in name_dict: return RFC822Name(name_dict["RFC822Name"]) elif "URI" in name_dict: return UniformResourceIdentifier(name_dict["URI"]) elif "DirectoryName" in name_dict: return DirectoryName(x509.Name.from_rfc4514_string(name_dict["DirectoryName"])) elif "IPAddress" in name_dict: return IPAddress(ip_network(name_dict["IPAddress"], strict=False)) elif "OtherName" in name_dict: type_id = ObjectIdentifier(name_dict["OtherName"]["type_id"]) value = base64.b64decode(name_dict["OtherName"]["value"]) return OtherName(type_id, value) elif "RegisteredID" in name_dict: return RegisteredID(ObjectIdentifier(name_dict["RegisteredID"])) else: raise ValueError("Unknown GeneralName type")

def create_general_names(names_list): return [create_general_name(name) for name in names_list]

def parse_dn(dn): dn_parts = [] temp_part = '' in_escape = False

for char in dn: if char == '\' and not in_escape: in_escape = True elif char == ',' and not in_escape: dn_parts.append(temp_part) temp_part = '' else: temp_part += char in_escape = False

if temp_part: dn_parts.append(temp_part)

attributes = [] for part in dn_parts: if '=' in part: oid, value = part.split('=', 1) oid = oid.strip() value = value.strip() if oid == "C": if len(value) != 2: raise ValueError(f"Country name must be a 2 character country code: {value}") attributes.append(x509.NameAttribute(NameOID.COUNTRY_NAME, value)) elif oid == "O": attributes.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, value)) elif oid == "DC": # 未知扩展部分问题导致添加 attributes.append(x509.NameAttribute(NameOID.DOMAIN_COMPONENT, value)) elif oid == 'STREET': # 未知扩展部分问题导致添加 attributes.append(x509.NameAttribute(NameOID.STREET_ADDRESS, value)) elif oid == "OU": attributes.append(x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, value)) elif oid == "CN": attributes.append(x509.NameAttribute(NameOID.COMMON_NAME, value)) elif oid == "L": attributes.append(x509.NameAttribute(NameOID.LOCALITY_NAME, value)) elif oid == "ST": attributes.append(x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, value)) else: attributes.append(x509.NameAttribute(x509.ObjectIdentifier(oid), value)) return x509.Name(attributes)

def generate_certificate_policies(certificate_policies): policy_information_list = []

for policy in certificate_policies: policy_identifier = ObjectIdentifier(policy["policy_identifier"]["oid"])

policy_qualifiers = []
for qualifier in policy["policy_qualifiers"]:
    if isinstance(qualifier, str):
        policy_qualifiers.append(qualifier)
    else:
        notice_reference_info = qualifier.get("notice_reference")
        if notice_reference_info:
            notice_reference = x509.NoticeReference(
                organization=notice_reference_info["organization"],
                notice_numbers=notice_reference_info["notice_numbers"]
            )
        else:
            notice_reference = None

        explicit_text = qualifier.get("explicit_text")

        user_notice = x509.UserNotice(
            notice_reference=notice_reference,
            explicit_text=explicit_text
        )
        policy_qualifiers.append(user_notice)

policy_info = x509.PolicyInformation(
    policy_identifier=policy_identifier,
    policy_qualifiers=policy_qualifiers
)
policy_information_list.append(policy_info)

return x509.CertificatePolicies(policy_information_list)

def parse_access_location(location): if location["type"] == "URI": return x509.UniformResourceIdentifier(location["value"]) else: raise ValueError(f"Unsupported GeneralName type: {location['type']}") def add_unknown_extensions(builder, unknown_extensions): for ext_oid, ext_info in unknown_extensions.items(): try: oid = ObjectIdentifier(ext_oid) value = bytes.fromhex(ext_info["value"]) critical = ext_info["critical"] ext = x509.UnrecognizedExtension(oid, value) builder = builder.add_extension(ext, critical=critical) except ValueError as e: print(f"Warning: Skipping invalid OID {ext_oid}: {str(e)}") return builder

def generate_certificate_from_json(json_data, output_path): version = json_data["Version"]["value"] serial_number = int(json_data["Serial_Number"]["value"]) # 获取字段是可能获取到0,此时无法组装证书(The serial number should be positive.) issuer_dn = json_data["Issuer"]["value"] validity = json_data["Validity"]["value"] subject_dn = json_data["Subject"]["value"] subject_public_key_info = json_data["Subject_Public_Key_Info"]["value"] authority_key_identifier = json_data.get("Authority_Key_Identifier") subject_key_identifier = json_data.get("Subject_Key_Identifier") key_usage = json_data.get("key_Usage") certificate_policies = json_data.get("Certificate_Policies") subject_alt_name = json_data.get("Subject_Alternative_Name") issuer_alt_name = json_data.get("Issuer_Alternative_Name") basic_constraints = json_data.get("Basic_Constraints") name_constraints = json_data.get("Name_Constraints") policy_constraints = json_data.get("Policy_Constraints") extended_key_usage = json_data.get("Extended_Key_Usage") crl_distribution_points = json_data.get("CRL_Distribution_Points") inhibit_any_policy = json_data.get("Inhibit_anyPolicy") freshest_crl = json_data.get("Freshest_CRL") authority_information_access = json_data.get("Authority_Information_Access") subject_information_access = json_data.get("Subject_Information_Access") unknown_extensions = json_data["Extensions"]["value"].get("Unknown", {})

valid_from = datetime.strptime(validity["valid_from"], '%Y-%m-%d %H:%M:%S') valid_to = datetime.strptime(validity["valid_to"], '%Y-%m-%d %H:%M:%S')

issuer = parse_dn(issuer_dn) subject = parse_dn(subject_dn)

public_key = serialization.load_pem_public_key(subject_public_key_info.encode())

builder = x509.CertificateBuilder( ).subject_name( subject ).issuer_name( issuer ).public_key( public_key ).serial_number( serial_number ).not_valid_before( valid_from ).not_valid_after( valid_to )

if authority_key_identifier and authority_key_identifier["value"] is not None: authority_cert_issuer = [ x509.DirectoryName(parse_dn(issuer)) for issuer in authority_key_identifier["value"].get("authority_cert_issuer", []) ] if authority_key_identifier["value"].get("authority_cert_issuer") else None aki_extension = x509.AuthorityKeyIdentifier( key_identifier=bytes.fromhex(authority_key_identifier["value"]["key_identifier"]), authority_cert_issuer=authority_cert_issuer, authority_cert_serial_number=int(authority_key_identifier["value"]["authority_cert_serial_number"]) if authority_key_identifier["value"]["authority_cert_serial_number"] is not None else None ) builder = builder.add_extension(aki_extension, critical=authority_key_identifier["critical"])

if subject_key_identifier and subject_key_identifier["value"] is not None: ski_extension = x509.SubjectKeyIdentifier( bytes.fromhex(subject_key_identifier["value"]), ) builder = builder.add_extension(ski_extension, critical=subject_key_identifier["critical"])

if key_usage and key_usage["value"] is not None: ku_extension = x509.KeyUsage( digital_signature=key_usage["value"]["digital_signature"], content_commitment=key_usage["value"]["content_commitment"], key_encipherment=key_usage["value"]["key_encipherment"], data_encipherment=key_usage["value"]["data_encipherment"], key_agreement=key_usage["value"]["key_agreement"], key_cert_sign=key_usage["value"]["key_cert_sign"], crl_sign=key_usage["value"]["crl_sign"], encipher_only=key_usage["value"].get("encipher_only", False), decipher_only=key_usage["value"].get("decipher_only", False), ) builder = builder.add_extension(ku_extension, critical=key_usage["critical"])

if certificate_policies and certificate_policies["value"] is not None: cp_extension = generate_certificate_policies(certificate_policies["value"]) builder = builder.add_extension(cp_extension, critical=certificate_policies["critical"])

if subject_alt_name and subject_alt_name["value"] is not None: san_list = create_general_names(subject_alt_name["value"]) san_extension = x509.SubjectAlternativeName(san_list) builder = builder.add_extension(san_extension, critical=subject_alt_name["critical"])

if issuer_alt_name and issuer_alt_name["value"] is not None: ian_list = create_general_names(issuer_alt_name["value"]) ian_extension = x509.IssuerAlternativeName(ian_list) builder = builder.add_extension(ian_extension, critical=json_data["Issuer_Alternative_Name"]["critical"])

if basic_constraints and basic_constraints["value"] is not None: bc_extension = x509.BasicConstraints( ca=basic_constraints["value"]["ca"], path_length=basic_constraints["value"]["path_length"], ) builder = builder.add_extension(bc_extension, critical=basic_constraints["critical"])

if name_constraints and name_constraints["value"] is not None: permitted_subtrees = create_general_names(name_constraints["value"]["permitted_subtrees"]) if name_constraints["value"].get("permitted_subtrees") else None excluded_subtrees = create_general_names(name_constraints["value"]["excluded_subtrees"]) if name_constraints["value"].get("excluded_subtrees") else None nc_extension = x509.NameConstraints( permitted_subtrees=permitted_subtrees, excluded_subtrees=excluded_subtrees ) builder = builder.add_extension(nc_extension, critical=json_data["Name_Constraints"]["critical"])

if policy_constraints and policy_constraints["value"] is not None: pc_extension = x509.PolicyConstraints( require_explicit_policy=policy_constraints["value"]["require_explicit_policy"], inhibit_policy_mapping=policy_constraints["value"]["inhibit_policy_mapping"], ) builder = builder.add_extension(pc_extension, critical=json_data["Policy_Constraints"]["critical"])

if extended_key_usage and extended_key_usage["value"] is not None: eku_extension = x509.ExtendedKeyUsage([ ObjectIdentifier(eku["oid"]) for eku in extended_key_usage["value"] ]) builder = builder.add_extension(eku_extension, critical=json_data["Extended_Key_Usage"]["critical"])

if "CRL_Distribution_Points" in json_data and json_data["CRL_Distribution_Points"]["value"]: dp_objects = [] for dp_info in json_data["CRL_Distribution_Points"]["value"]: full_name = [x509.UniformResourceIdentifier(uri) for uri in dp_info.get("full_name", [])] relative_name = x509.RelativeDistinguishedName(dp_info["relative_name"]) if dp_info.get("relative_name") else None crl_issuer = x509.GeneralNames(create_general_names(dp_info.get("crl_issuer", []))) if dp_info.get("crl_issuer") else None reasons = x509.ReasonFlags(int(dp_info["reasons"])) if dp_info.get("reasons") else None

    dp = x509.DistributionPoint(
        full_name=full_name,
        relative_name=relative_name,
        crl_issuer=crl_issuer,
        reasons=reasons
    )
    dp_objects.append(dp)

# 创建 CRL Distribution Points 扩展
crl_dp_extension = x509.CRLDistributionPoints(dp_objects)
builder = builder.add_extension(crl_dp_extension, critical=json_data["CRL_Distribution_Points"]["critical"])

if inhibit_any_policy and inhibit_any_policy["value"] is not None: iap_extension = x509.InhibitAnyPolicy( skip_certs=inhibit_any_policy["value"], ) builder = builder.add_extension(iap_extension, critical=json_data["Inhibit_anyPolicy"]["critical"])

if freshest_crl and freshest_crl["value"] is not None: dp_objects = [] for dp_info in freshest_crl["value"]: full_name = [ UniformResourceIdentifier(uri) for uri in dp_info["full_name"] ] if "full_name" in dp_info else [] crl_issuer = create_general_names(dp_info["crl_issuer"]) if "crl_issuer" in dp_info else None

    relative_name = dp_info.get("relative_name")
    reasons = dp_info.get("reasons")

    dp = x509.DistributionPoint(
        full_name=full_name,
        relative_name=relative_name,
        reasons=reasons,
        crl_issuer=crl_issuer
    )
    dp_objects.append(dp)

fcrl_extension = x509.FreshestCRL(dp_objects)
builder = builder.add_extension(fcrl_extension, critical=json_data["Freshest_CRL"]["critical"])

if authority_information_access and authority_information_access["value"] is not None: aia_extension = x509.AuthorityInformationAccess([ AccessDescription( ObjectIdentifier(access["access_method"]["oid"]), UniformResourceIdentifier(access["access_location"]) ) for access in authority_information_access["value"] ]) builder = builder.add_extension(aia_extension, critical=json_data["Authority_Information_Access"]["critical"])

if subject_information_access and subject_information_access["value"] is not None: sia_extension = x509.SubjectInformationAccess([ AccessDescription( ObjectIdentifier(access["access_method"]["oid"]), UniformResourceIdentifier(access["access_location"]) ) for access in subject_information_access["value"] ]) builder = builder.add_extension(sia_extension, critical=json_data["Subject_Information_Access"]["critical"])

builder = add_unknown_extensions(builder, unknown_extensions)

private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, )

certificate = builder.sign( private_key=private_key, algorithm=hashes.SHA256(), )

with open(output_path, 'wb') as f: f.write(certificate.public_bytes(serialization.Encoding.PEM))

print(f"Certificate generated successfully for {output_path}")

def process_json_files(input_folder, output_folder): for filename in os.listdir(input_folder): if filename.endswith('.json'): json_path = os.path.join(input_folder, filename) print(f"Processing file: {json_path}") try: with open(json_path, 'r') as json_file: json_data = json.load(json_file) output_path = os.path.join(output_folder, filename.replace('.json', '.pem')) generate_certificate_from_json(json_data, output_path) except Exception as e: print(f"Error processing file {json_path}: {e}")

if name == "main": input_folder = './jsons' output_folder = './Gcerts' os.makedirs(output_folder, exist_ok=True) process_json_files(input_folder, output_folder)

The original certificate is as follows: -----BEGIN CERTIFICATE----- MIIHLDCCBhSgAwIBAgIDASR9MA0GCSqGSIb3DQEBBQUAMIGMMQswCQYDVQQGEwJJ TDEWMBQGA1UEChMNU3RhcnRDb20gTHRkLjErMCkGA1UECxMiU2VjdXJlIERpZ2l0 YWwgQ2VydGlmaWNhdGUgU2lnbmluZzE4MDYGA1UEAxMvU3RhcnRDb20gQ2xhc3Mg MiBQcmltYXJ5IEludGVybWVkaWF0ZSBTZXJ2ZXIgQ0EwHhcNMTIxMjMwMTQ1NDM4 WhcNMTUwMTAxMDYxODMxWjCBzzEZMBcGA1UEDRMQam5pOFZaYWQ1TFpvb3ZURjEL MAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExGDAWBgNVBAcTD1JhbmNo byBTYW50YSBGZTEuMCwGA1UEChMlRGlnaXRhbCBQb2ludCBTb2x1dGlvbnMgKFNo YXduIEhvZ2FuKTEbMBkGA1UEAxQSKi5kaWdpdGFscG9pbnQuY29tMSkwJwYJKoZI hvcNAQkBFhp3ZWJtYXN0ZXJAZGlnaXRhbHBvaW50LmNvbTCCASIwDQYJKoZIhvcN AQEBBQADggEPADCCAQoCggEBALetYG8uxKInrVi1floeGilDvJy/07nuJsdh19nP iwCsvITtuAab9kUoNXCvGTz+9Q/FT+MDJKjuNba5vTow4zCa8R9D3ZnSWMGZS/AE KDUmiD9MowsLaQ90D+FY5g/NymrJU0i4DVwnbdfOPEovAoIgbd0j80AzsZ++vXIx HuhU8rL+CePl3kTwjpc9Nx0EEhDcaOoVMe1hC45/+r5gCfzl+tSs26rIMZunv5uP A7m+kPnyAmAnMDkfH7IWKETMSxSrtBUMONzbe4M6cKeO44n2p9wcQNsZwA34EcDZ oG45oc9Sd5Ix1nBdZUdI42PjZvtUPpa1ixvmmJ0wFt9uTGECAwEAAaOCA1AwggNM MAkGA1UdEwQCMAAwCwYDVR0PBAQDAgOoMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggr BgEFBQcDATAdBgNVHQ4EFgQU+zqVxprr0DTmO8ROCk1vf8ppceAwHwYDVR0jBBgw FoAUEdsjRf1UzGpxb4SKA9e+9wEvJoYwgYsGA1UdEQSBgzCBgIISKi5kaWdpdGFs cG9pbnQuY29tghBkaWdpdGFscG9pbnQuY29tggxkcHN0YXRpYy5jb22CDiouZHBz dGF0aWMuY29tggVkcC5seYIHKi5kcC5seYITZGlnaXRhbHBvaW50YWRzLmNvbYIV Ki5kaWdpdGFscG9pbnRhZHMuY29tMIIBVgYDVR0gBIIBTTCCAUkwCAYGZ4EMAQIC MIIBOwYLKwYBBAGBtTcBAgMwggEqMC4GCCsGAQUFBwIBFiJodHRwOi8vd3d3LnN0 YXJ0c3NsLmNvbS9wb2xpY3kucGRmMIH3BggrBgEFBQcCAjCB6jAnFiBTdGFydENv bSBDZXJ0aWZpY2F0aW9uIEF1dGhvcml0eTADAgEBGoG+VGhpcyBjZXJ0aWZpY2F0 ZSB3YXMgaXNzdWVkIGFjY29yZGluZyB0byB0aGUgQ2xhc3MgMiBWYWxpZGF0aW9u IHJlcXVpcmVtZW50cyBvZiB0aGUgU3RhcnRDb20gQ0EgcG9saWN5LCByZWxpYW5j ZSBvbmx5IGZvciB0aGUgaW50ZW5kZWQgcHVycG9zZSBpbiBjb21wbGlhbmNlIG9m IHRoZSByZWx5aW5nIHBhcnR5IG9ibGlnYXRpb25zLjA1BgNVHR8ELjAsMCqgKKAm hiRodHRwOi8vY3JsLnN0YXJ0c3NsLmNvbS9jcnQyLWNybC5jcmwwgY4GCCsGAQUF BwEBBIGBMH8wOQYIKwYBBQUHMAGGLWh0dHA6Ly9vY3NwLnN0YXJ0c3NsLmNvbS9z dWIvY2xhc3MyL3NlcnZlci9jYTBCBggrBgEFBQcwAoY2aHR0cDovL2FpYS5zdGFy dHNzbC5jb20vY2VydHMvc3ViLmNsYXNzMi5zZXJ2ZXIuY2EuY3J0MCMGA1UdEgQc MBqGGGh0dHA6Ly93d3cuc3RhcnRzc2wuY29tLzANBgkqhkiG9w0BAQUFAAOCAQEA W9pxDRXuxuOqs1a1/qWCzHmsDx/N7IlDu2i8f30vdAs9lwqU4s+LXqmEnHHbofbl r2qdfjeYA/jZ1N/wUwTjusAYOOsL++/eYm8oca392py0Mh0vvGijLOGF0SG6Gm9h 822My01/TAURy5ZA81E+GMbu6bEN3u1OwtN1vWFXjoL5r9IxhwO6VDT4VwlBMg07 LNHhPbQuVe9pgZsyngWZaS8wVp4dtzxJNp4Qkb6xYTmqbu0q5qjEh7sm2+r0cX62 mW653JZdtzypojdTmR1KqGW+JBzjcVYXaiah181bp3dNCN3kw4eUz5yxzw2L3F0p mWyTVOAzC05Nc2oGstQHJw== -----END CERTIFICATE----- View the results comparison(Left generated right original): d3d52a28166613a6cce35cb61692fd0.png (view on web) https://github.com/pyca/cryptography/assets/67566220/3a6189e6-f3d3-429d-a393-bd41de8803bb Parse and compare, marking the tags of unmatched NoticeReference (generated on the left, original on the right): 221eee4bb8702bcfc429d5660023d96.png (view on web) https://github.com/pyca/cryptography/assets/67566220/de11dd1c-e01d-4e2a-91e2-73ed489524e6

— Reply to this email directly, view it on GitHub https://github.com/pyca/cryptography/issues/11157, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAAGBHE7UG55IYM67F3VR3ZJF5XFAVCNFSM6AAAAABJ37WNJWVHI2DSMVQWIX3LMV43ASLTON2WKOZSGM3TEOBWGY4DGMI . You are receiving this because you are subscribed to this thread.Message ID: @.***>

reaperhulk commented 5 days ago

Based solely on the screenshot it looks like you're expecting the ASN.1 string type to be IA5String, while we encode a UTF8String for that type. Are you having an interoperability problem because of this? UTF8String is a permissible (and generally preferable) string type so it's unclear why this is a problem from your comment.

dulanshuangqiao commented 5 days ago

This is a very large reproducer with many moving pieces. Can you please provide a minimal reproducer On Tue, Jun 25, 2024, 10:34 AM dulanshuangqiao @.*> wrote: I used x509.NoticeReference(organization, notice_numbers) to generate certificate content, but the function assigned the value to the wrong node, causing problems viewing the certificate. I used a script to extract the certificate, and then used the extracted content to assemble the certificate, but there was a problem with the processing of NoticeReference. I used Generate.py to read the content of 216.9.35.59.json to generate it. The obtained certificate was different from the original certificate in NoticeReference. 216.9.35.59.json https://github.com/user-attachments/files/15972920/216.9.35.59.json The generated code is as follows: import os import json from datetime import datetime from cryptography import x509 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import hashes from cryptography.x509.oid import ExtensionOID, NameOID, ObjectIdentifier from cryptography.x509 import AccessDescription, UniformResourceIdentifier from cryptography.x509 import DNSName, RFC822Name, DirectoryName, IPAddress, OtherName, RegisteredID from ipaddress import ip_network import base64 def create_general_name(name_dict): if "DNSName" in name_dict: return DNSName(name_dict["DNSName"]) elif "RFC822Name" in name_dict: return RFC822Name(name_dict["RFC822Name"]) elif "URI" in name_dict: return UniformResourceIdentifier(name_dict["URI"]) elif "DirectoryName" in name_dict: return DirectoryName(x509.Name.from_rfc4514_string(name_dict["DirectoryName"])) elif "IPAddress" in name_dict: return IPAddress(ip_network(name_dict["IPAddress"], strict=False)) elif "OtherName" in name_dict: type_id = ObjectIdentifier(name_dict["OtherName"]["type_id"]) value = base64.b64decode(name_dict["OtherName"]["value"]) return OtherName(type_id, value) elif "RegisteredID" in name_dict: return RegisteredID(ObjectIdentifier(name_dict["RegisteredID"])) else: raise ValueError("Unknown GeneralName type") def create_general_names(names_list): return [create_general_name(name) for name in names_list] def parse_dn(dn): dn_parts = [] temp_part = '' in_escape = False for char in dn: if char == '\' and not in_escape: in_escape = True elif char == ',' and not in_escape: dn_parts.append(temp_part) temp_part = '' else: temp_part += char in_escape = False if temp_part: dn_parts.append(temp_part) attributes = [] for part in dn_parts: if '=' in part: oid, value = part.split('=', 1) oid = oid.strip() value = value.strip() if oid == "C": if len(value) != 2: raise ValueError(f"Country name must be a 2 character country code: {value}") attributes.append(x509.NameAttribute(NameOID.COUNTRY_NAME, value)) elif oid == "O": attributes.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, value)) elif oid == "DC": # 未知扩展部分问题导致添加 attributes.append(x509.NameAttribute(NameOID.DOMAIN_COMPONENT, value)) elif oid == 'STREET': # 未知扩展部分问题导致添加 attributes.append(x509.NameAttribute(NameOID.STREET_ADDRESS, value)) elif oid == "OU": attributes.append(x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, value)) elif oid == "CN": attributes.append(x509.NameAttribute(NameOID.COMMON_NAME, value)) elif oid == "L": attributes.append(x509.NameAttribute(NameOID.LOCALITY_NAME, value)) elif oid == "ST": attributes.append(x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, value)) else: attributes.append(x509.NameAttribute(x509.ObjectIdentifier(oid), value)) return x509.Name(attributes) def generate_certificate_policies(certificate_policies): policy_information_list = [] for policy in certificate_policies: policy_identifier = ObjectIdentifier(policy["policy_identifier"]["oid"]) policy_qualifiers = [] for qualifier in policy["policy_qualifiers"]: if isinstance(qualifier, str): policy_qualifiers.append(qualifier) else: notice_reference_info = qualifier.get("notice_reference") if notice_reference_info: notice_reference = x509.NoticeReference( organization=notice_reference_info["organization"], notice_numbers=notice_reference_info["notice_numbers"] ) else: notice_reference = None explicit_text = qualifier.get("explicit_text") user_notice = x509.UserNotice( notice_reference=notice_reference, explicit_text=explicit_text ) policy_qualifiers.append(user_notice) policy_info = x509.PolicyInformation( policy_identifier=policy_identifier, policy_qualifiers=policy_qualifiers ) policy_information_list.append(policy_info) return x509.CertificatePolicies(policy_information_list) def parse_access_location(location): if location["type"] == "URI": return x509.UniformResourceIdentifier(location["value"]) else: raise ValueError(f"Unsupported GeneralName type: {location['type']}") def add_unknown_extensions(builder, unknown_extensions): for ext_oid, ext_info in unknown_extensions.items(): try: oid = ObjectIdentifier(ext_oid) value = bytes.fromhex(ext_info["value"]) critical = ext_info["critical"] ext = x509.UnrecognizedExtension(oid, value) builder = builder.add_extension(ext, critical=critical) except ValueError as e: print(f"Warning: Skipping invalid OID {ext_oid}: {str(e)}") return builder def generate_certificate_from_json(json_data, output_path): version = json_data["Version"]["value"] serial_number = int(json_data["Serial_Number"]["value"]) # 获取字段是可能获取到0,此时无法组装证书(The serial number should be positive.) issuer_dn = json_data["Issuer"]["value"] validity = json_data["Validity"]["value"] subject_dn = json_data["Subject"]["value"] subject_public_key_info = json_data["Subject_Public_Key_Info"]["value"] authority_key_identifier = json_data.get("Authority_Key_Identifier") subject_key_identifier = json_data.get("Subject_Key_Identifier") key_usage = json_data.get("key_Usage") certificate_policies = json_data.get("Certificate_Policies") subject_alt_name = json_data.get("Subject_Alternative_Name") issuer_alt_name = json_data.get("Issuer_Alternative_Name") basic_constraints = json_data.get("Basic_Constraints") name_constraints = json_data.get("Name_Constraints") policy_constraints = json_data.get("Policy_Constraints") extended_key_usage = json_data.get("Extended_Key_Usage") crl_distribution_points = json_data.get("CRL_Distribution_Points") inhibit_any_policy = json_data.get("Inhibit_anyPolicy") freshest_crl = json_data.get("Freshest_CRL") authority_information_access = json_data.get("Authority_Information_Access") subject_information_access = json_data.get("Subject_Information_Access") unknown_extensions = json_data["Extensions"]["value"].get("Unknown", {}) valid_from = datetime.strptime(validity["valid_from"], '%Y-%m-%d %H:%M:%S') valid_to = datetime.strptime(validity["valid_to"], '%Y-%m-%d %H:%M:%S') issuer = parse_dn(issuer_dn) subject = parse_dn(subject_dn) public_key = serialization.load_pem_public_key(subject_public_key_info.encode()) builder = x509.CertificateBuilder( ).subject_name( subject ).issuer_name( issuer ).public_key( public_key ).serial_number( serial_number ).not_valid_before( valid_from ).not_valid_after( valid_to ) if authority_key_identifier and authority_key_identifier["value"] is not None: authority_cert_issuer = [ x509.DirectoryName(parse_dn(issuer)) for issuer in authority_key_identifier["value"].get("authority_cert_issuer", []) ] if authority_key_identifier["value"].get("authority_cert_issuer") else None aki_extension = x509.AuthorityKeyIdentifier( key_identifier=bytes.fromhex(authority_key_identifier["value"]["key_identifier"]), authority_cert_issuer=authority_cert_issuer, authority_cert_serial_number=int(authority_key_identifier["value"]["authority_cert_serial_number"]) if authority_key_identifier["value"]["authority_cert_serial_number"] is not None else None ) builder = builder.add_extension(aki_extension, critical=authority_key_identifier["critical"]) if subject_key_identifier and subject_key_identifier["value"] is not None: ski_extension = x509.SubjectKeyIdentifier( bytes.fromhex(subject_key_identifier["value"]), ) builder = builder.add_extension(ski_extension, critical=subject_key_identifier["critical"]) if key_usage and key_usage["value"] is not None: ku_extension = x509.KeyUsage( digital_signature=key_usage["value"]["digital_signature"], content_commitment=key_usage["value"]["content_commitment"], key_encipherment=key_usage["value"]["key_encipherment"], data_encipherment=key_usage["value"]["data_encipherment"], key_agreement=key_usage["value"]["key_agreement"], key_cert_sign=key_usage["value"]["key_cert_sign"], crl_sign=key_usage["value"]["crl_sign"], encipher_only=key_usage["value"].get("encipher_only", False), decipher_only=key_usage["value"].get("decipher_only", False), ) builder = builder.add_extension(ku_extension, critical=key_usage["critical"]) if certificate_policies and certificate_policies["value"] is not None: cp_extension = generate_certificate_policies(certificate_policies["value"]) builder = builder.add_extension(cp_extension, critical=certificate_policies["critical"]) if subject_alt_name and subject_alt_name["value"] is not None: san_list = create_general_names(subject_alt_name["value"]) san_extension = x509.SubjectAlternativeName(san_list) builder = builder.add_extension(san_extension, critical=subject_alt_name["critical"]) if issuer_alt_name and issuer_alt_name["value"] is not None: ian_list = create_general_names(issuer_alt_name["value"]) ian_extension = x509.IssuerAlternativeName(ian_list) builder = builder.add_extension(ian_extension, critical=json_data["Issuer_Alternative_Name"]["critical"]) if basic_constraints and basic_constraints["value"] is not None: bc_extension = x509.BasicConstraints( ca=basic_constraints["value"]["ca"], path_length=basic_constraints["value"]["path_length"], ) builder = builder.add_extension(bc_extension, critical=basic_constraints["critical"]) if name_constraints and name_constraints["value"] is not None: permitted_subtrees = create_general_names(name_constraints["value"]["permitted_subtrees"]) if name_constraints["value"].get("permitted_subtrees") else None excluded_subtrees = create_general_names(name_constraints["value"]["excluded_subtrees"]) if name_constraints["value"].get("excluded_subtrees") else None nc_extension = x509.NameConstraints( permitted_subtrees=permitted_subtrees, excluded_subtrees=excluded_subtrees ) builder = builder.add_extension(nc_extension, critical=json_data["Name_Constraints"]["critical"]) if policy_constraints and policy_constraints["value"] is not None: pc_extension = x509.PolicyConstraints( require_explicit_policy=policy_constraints["value"]["require_explicit_policy"], inhibit_policy_mapping=policy_constraints["value"]["inhibit_policy_mapping"], ) builder = builder.add_extension(pc_extension, critical=json_data["Policy_Constraints"]["critical"]) if extended_key_usage and extended_key_usage["value"] is not None: eku_extension = x509.ExtendedKeyUsage([ ObjectIdentifier(eku["oid"]) for eku in extended_key_usage["value"] ]) builder = builder.add_extension(eku_extension, critical=json_data["Extended_Key_Usage"]["critical"]) if "CRL_Distribution_Points" in json_data and json_data["CRL_Distribution_Points"]["value"]: dp_objects = [] for dp_info in json_data["CRL_Distribution_Points"]["value"]: full_name = [x509.UniformResourceIdentifier(uri) for uri in dp_info.get("full_name", [])] relative_name = x509.RelativeDistinguishedName(dp_info["relative_name"]) if dp_info.get("relative_name") else None crl_issuer = x509.GeneralNames(create_general_names(dp_info.get("crl_issuer", []))) if dp_info.get("crl_issuer") else None reasons = x509.ReasonFlags(int(dp_info["reasons"])) if dp_info.get("reasons") else None dp = x509.DistributionPoint( full_name=full_name, relative_name=relative_name, crl_issuer=crl_issuer, reasons=reasons ) dp_objects.append(dp) # 创建 CRL Distribution Points 扩展 crl_dp_extension = x509.CRLDistributionPoints(dp_objects) builder = builder.add_extension(crl_dp_extension, critical=json_data["CRL_Distribution_Points"]["critical"]) if inhibit_any_policy and inhibit_any_policy["value"] is not None: iap_extension = x509.InhibitAnyPolicy( skip_certs=inhibit_any_policy["value"], ) builder = builder.add_extension(iap_extension, critical=json_data["Inhibit_anyPolicy"]["critical"]) if freshest_crl and freshest_crl["value"] is not None: dp_objects = [] for dp_info in freshest_crl["value"]: full_name = [ UniformResourceIdentifier(uri) for uri in dp_info["full_name"] ] if "full_name" in dp_info else [] crl_issuer = create_general_names(dp_info["crl_issuer"]) if "crl_issuer" in dp_info else None relative_name = dp_info.get("relative_name") reasons = dp_info.get("reasons") dp = x509.DistributionPoint( full_name=full_name, relative_name=relative_name, reasons=reasons, crl_issuer=crl_issuer ) dp_objects.append(dp) fcrl_extension = x509.FreshestCRL(dp_objects) builder = builder.add_extension(fcrl_extension, critical=json_data["Freshest_CRL"]["critical"]) if authority_information_access and authority_information_access["value"] is not None: aia_extension = x509.AuthorityInformationAccess([ AccessDescription( ObjectIdentifier(access["access_method"]["oid"]), UniformResourceIdentifier(access["access_location"]) ) for access in authority_information_access["value"] ]) builder = builder.add_extension(aia_extension, critical=json_data["Authority_Information_Access"]["critical"]) if subject_information_access and subject_information_access["value"] is not None: sia_extension = x509.SubjectInformationAccess([ AccessDescription( ObjectIdentifier(access["access_method"]["oid"]), UniformResourceIdentifier(access["access_location"]) ) for access in subject_information_access["value"] ]) builder = builder.add_extension(sia_extension, critical=json_data["Subject_Information_Access"]["critical"]) builder = add_unknown_extensions(builder, unknown_extensions) private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) certificate = builder.sign( private_key=private_key, algorithm=hashes.SHA256(), ) with open(output_path, 'wb') as f: f.write(certificate.public_bytes(serialization.Encoding.PEM)) print(f"Certificate generated successfully for {output_path}") def process_json_files(input_folder, output_folder): for filename in os.listdir(input_folder): if filename.endswith('.json'): json_path = os.path.join(input_folder, filename) print(f"Processing file: {json_path}") try: with open(json_path, 'r') as json_file: json_data = json.load(json_file) output_path = os.path.join(output_folder, filename.replace('.json', '.pem')) generate_certificate_from_json(json_data, output_path) except Exception as e: print(f"Error processing file {json_path}: {e}") if name == "main": input_folder = './jsons' output_folder = './Gcerts' os.makedirs(output_folder, exist_ok=True) process_json_files(input_folder, output_folder) The original certificate is as follows: -----BEGIN CERTIFICATE----- MIIHLDCCBhSgAwIBAgIDASR9MA0GCSqGSIb3DQEBBQUAMIGMMQswCQYDVQQGEwJJ TDEWMBQGA1UEChMNU3RhcnRDb20gTHRkLjErMCkGA1UECxMiU2VjdXJlIERpZ2l0 YWwgQ2VydGlmaWNhdGUgU2lnbmluZzE4MDYGA1UEAxMvU3RhcnRDb20gQ2xhc3Mg MiBQcmltYXJ5IEludGVybWVkaWF0ZSBTZXJ2ZXIgQ0EwHhcNMTIxMjMwMTQ1NDM4 WhcNMTUwMTAxMDYxODMxWjCBzzEZMBcGA1UEDRMQam5pOFZaYWQ1TFpvb3ZURjEL MAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExGDAWBgNVBAcTD1JhbmNo byBTYW50YSBGZTEuMCwGA1UEChMlRGlnaXRhbCBQb2ludCBTb2x1dGlvbnMgKFNo YXduIEhvZ2FuKTEbMBkGA1UEAxQSKi5kaWdpdGFscG9pbnQuY29tMSkwJwYJKoZI hvcNAQkBFhp3ZWJtYXN0ZXJAZGlnaXRhbHBvaW50LmNvbTCCASIwDQYJKoZIhvcN AQEBBQADggEPADCCAQoCggEBALetYG8uxKInrVi1floeGilDvJy/07nuJsdh19nP iwCsvITtuAab9kUoNXCvGTz+9Q/FT+MDJKjuNba5vTow4zCa8R9D3ZnSWMGZS/AE KDUmiD9MowsLaQ90D+FY5g/NymrJU0i4DVwnbdfOPEovAoIgbd0j80AzsZ++vXIx HuhU8rL+CePl3kTwjpc9Nx0EEhDcaOoVMe1hC45/+r5gCfzl+tSs26rIMZunv5uP A7m+kPnyAmAnMDkfH7IWKETMSxSrtBUMONzbe4M6cKeO44n2p9wcQNsZwA34EcDZ oG45oc9Sd5Ix1nBdZUdI42PjZvtUPpa1ixvmmJ0wFt9uTGECAwEAAaOCA1AwggNM MAkGA1UdEwQCMAAwCwYDVR0PBAQDAgOoMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggr BgEFBQcDATAdBgNVHQ4EFgQU+zqVxprr0DTmO8ROCk1vf8ppceAwHwYDVR0jBBgw FoAUEdsjRf1UzGpxb4SKA9e+9wEvJoYwgYsGA1UdEQSBgzCBgIISKi5kaWdpdGFs cG9pbnQuY29tghBkaWdpdGFscG9pbnQuY29tggxkcHN0YXRpYy5jb22CDiouZHBz dGF0aWMuY29tggVkcC5seYIHKi5kcC5seYITZGlnaXRhbHBvaW50YWRzLmNvbYIV Ki5kaWdpdGFscG9pbnRhZHMuY29tMIIBVgYDVR0gBIIBTTCCAUkwCAYGZ4EMAQIC MIIBOwYLKwYBBAGBtTcBAgMwggEqMC4GCCsGAQUFBwIBFiJodHRwOi8vd3d3LnN0 YXJ0c3NsLmNvbS9wb2xpY3kucGRmMIH3BggrBgEFBQcCAjCB6jAnFiBTdGFydENv bSBDZXJ0aWZpY2F0aW9uIEF1dGhvcml0eTADAgEBGoG+VGhpcyBjZXJ0aWZpY2F0 ZSB3YXMgaXNzdWVkIGFjY29yZGluZyB0byB0aGUgQ2xhc3MgMiBWYWxpZGF0aW9u IHJlcXVpcmVtZW50cyBvZiB0aGUgU3RhcnRDb20gQ0EgcG9saWN5LCByZWxpYW5j ZSBvbmx5IGZvciB0aGUgaW50ZW5kZWQgcHVycG9zZSBpbiBjb21wbGlhbmNlIG9m IHRoZSByZWx5aW5nIHBhcnR5IG9ibGlnYXRpb25zLjA1BgNVHR8ELjAsMCqgKKAm hiRodHRwOi8vY3JsLnN0YXJ0c3NsLmNvbS9jcnQyLWNybC5jcmwwgY4GCCsGAQUF BwEBBIGBMH8wOQYIKwYBBQUHMAGGLWh0dHA6Ly9vY3NwLnN0YXJ0c3NsLmNvbS9z dWIvY2xhc3MyL3NlcnZlci9jYTBCBggrBgEFBQcwAoY2aHR0cDovL2FpYS5zdGFy dHNzbC5jb20vY2VydHMvc3ViLmNsYXNzMi5zZXJ2ZXIuY2EuY3J0MCMGA1UdEgQc MBqGGGh0dHA6Ly93d3cuc3RhcnRzc2wuY29tLzANBgkqhkiG9w0BAQUFAAOCAQEA W9pxDRXuxuOqs1a1/qWCzHmsDx/N7IlDu2i8f30vdAs9lwqU4s+LXqmEnHHbofbl r2qdfjeYA/jZ1N/wUwTjusAYOOsL++/eYm8oca392py0Mh0vvGijLOGF0SG6Gm9h 822My01/TAURy5ZA81E+GMbu6bEN3u1OwtN1vWFXjoL5r9IxhwO6VDT4VwlBMg07 LNHhPbQuVe9pgZsyngWZaS8wVp4dtzxJNp4Qkb6xYTmqbu0q5qjEh7sm2+r0cX62 mW653JZdtzypojdTmR1KqGW+JBzjcVYXaiah181bp3dNCN3kw4eUz5yxzw2L3F0p mWyTVOAzC05Nc2oGstQHJw== -----END CERTIFICATE----- View the results comparison(Left generated right original): d3d52a28166613a6cce35cb61692fd0.png (view on web) https://github.com/pyca/cryptography/assets/67566220/3a6189e6-f3d3-429d-a393-bd41de8803bb Parse and compare, marking the tags of unmatched NoticeReference (generated on the left, original on the right): 221eee4bb8702bcfc429d5660023d96.png (view on web) https://github.com/pyca/cryptography/assets/67566220/de11dd1c-e01d-4e2a-91e2-73ed489524e6 — Reply to this email directly, view it on GitHub <#11157>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAAGBHE7UG55IYM67F3VR3ZJF5XFAVCNFSM6AAAAABJ37WNJWVHI2DSMVQWIX3LMV43ASLTON2WKOZSGM3TEOBWGY4DGMI . You are receiving this because you are subscribed to this thread.Message ID: *@.>

I will use two pictures to restate my question. I want to understand whether the problem is caused by my code writing problem or the problem of cryptography function itself. Attached is my code and picture def generate_certificate_policies(certificate_policies): policy_information_list = []

for policy in certificate_policies:
    policy_identifier = ObjectIdentifier(policy["policy_identifier"]["oid"])
    print(policy_identifier)
    policy_qualifiers = []
    for qualifier in policy["policy_qualifiers"]:
        if isinstance(qualifier, str):
            policy_qualifiers.append(qualifier)
        else:
            notice_reference_info = qualifier.get("notice_reference")
            print(notice_reference_info)
            if notice_reference_info:
                notice_reference = x509.NoticeReference(
                    organization=notice_reference_info["organization"],
                    notice_numbers=notice_reference_info["notice_numbers"]
                )
                print(notice_reference)
            else:
                notice_reference = None

            explicit_text = qualifier.get("explicit_text")

            user_notice = x509.UserNotice(
                notice_reference=notice_reference,
                explicit_text=explicit_text
            )
            print(user_notice)
            policy_qualifiers.append(user_notice)

    policy_info = x509.PolicyInformation(
        policy_identifier=policy_identifier,
        policy_qualifiers=policy_qualifiers
    )
    print(policy_info)
    policy_information_list.append(policy_info)
print(policy_information_list)
return x509.CertificatePolicies(policy_information_list)

1 2

dulanshuangqiao commented 5 days ago

Based solely on the screenshot it looks like you're expecting the ASN.1 string type to be IA5String, while we encode a UTF8String for that type. Are you having an interoperability problem because of this? UTF8String is a permissible (and generally preferable) string type so it's unclear why this is a problem from your comment.

I will use two pictures to restate my question. I want to understand whether the problem is caused by my code writing problem or the problem of cryptography function itself. 1 2

reaperhulk commented 5 days ago

Cryptography does encode this value as utf8string and there’s no current way to change that, but you have not explained why this is an issue. As I already noted this is generally a permissible and preferable encoding so if you are having a problem we would be interested in what software is seeing the issue.

dulanshuangqiao commented 5 days ago

Cryptography does encode this value as utf8string and there’s no current way to change that, but you have not explained why this is an issue. As I already noted this is generally a permissible and preferable encoding so if you are having a problem we would be interested in what software is seeing the issue.

Why is the certificate policy extension in the second picture not displayed in the same way as in the first picture after adding the notice_reference field? I parsed the TLV results of the original certificate and found that the certificate with the wrong display has a different tag in the notice_reference field. This is my guess, so I would like to ask if it is a problem with the classcryptography.x509.NoticeReference(organization, notice_numbers) function (because the original certificate is displayed correctly, so I have ruled out the problem of the Windows viewer) 11 22

alex commented 5 days ago

That's a question for the certificate viewer application. Using UTF8 String values here is supported, and in fact considered the best practice.

github-actions[bot] commented 1 day ago

This issue has been waiting for a reporter response for 3 days. It will be auto-closed if no activity occurs in the next 5 days.