MatthiasValvekens / pyHanko

pyHanko: sign and stamp PDF files
MIT License
494 stars 71 forks source link

LTV not working when signing in interrupted mode #438

Closed edispring closed 4 months ago

edispring commented 4 months ago

We have a signing service with basically two methods:

async def _prepare_signature(document_digest, hash_algorithm, cert):

    signer_cert = next(load_certs_from_pemder_data(base64.b64decode(cert)))

    ext_signer = signers.ExternalSigner(signing_cert=signer_cert, cert_registry=None ,signature_value=384)

    print('document_digest')
    print(document_digest)

    print('hash_algorithm')
    print(hash_algorithm)

    signed_attrs = await ext_signer.signed_attrs(
        document_digest, hash_algorithm, use_pades=True
    )

    return signed_attrs

Then the signed_attrs are hashed and signed by a HSM. After that the signed_attrs and the signature are combined in a CMS Object like here:

async def _create_cms(x_signing_cert, signed_attrs, sig_value, x_cert_type, hash_algorithm, x_insert_timestamp, environment):

    print('******************************** _create_cms start... **********************************')
    print('>> x_signing_cert base64 encoded')
    print(x_signing_cert)

    print('>> x_cert_type')
    print(x_cert_type)
    print('>> sig_value hex')
    print(sig_value.hex())
    print(f'>> sig_value signature length {len(sig_value)}' )

    print('>> envornment')
    print(environment)
    print('>> x_insert_timestamp')
    print(x_insert_timestamp)
    print('>> hash_algorithm')
    print(hash_algorithm)

    cert = base64.b64decode(x_signing_cert)

    signer_cert = next(load_certs_from_pemder_data(cert))

    if x_cert_type != 'zertes' and x_cert_type != 'eidas':
        raise InvalidUsage(f"Invalid certificate type: {x_cert_type}")

    # Verify the signature
    try:
        signed_attrs_digest = hash_data(signed_attrs)
        print('signed_attrs_digest hex')
        print(signed_attrs_digest.hex())

        x509_cert = load_pem_x509_certificate(cert, default_backend())
        public_key = x509_cert.public_key()
        alg = hash_algorithm == 'sha256' and hashes.SHA256() or hashes.SHA512()
        public_key.verify(
            sig_value,
            signed_attrs_digest,
            padding.PSS(
                mgf=padding.MGF1(alg),
                salt_length=padding.PSS.AUTO
            ),
            Prehashed(alg)
        )
        print("Signature is valid.")
    except Exception as e:
        print(e)
        print(f"Signature is not valid: {e}")
        raise InvalidUsage(f"Signature is not valid: {e}")

    cert_directory = 'cert/' + environment + '/' + x_cert_type + '/CAChain'
    print('>> cert_directory')
    print(cert_directory)

    root_cert_files = list_files_in_directory(cert_directory, ('.pem', '.crt'))
    print(">> files in cert_directory:", root_cert_files)

    root_cert = load_certs_from_pemder(root_cert_files)
    cert_registry = SimpleCertificateStore()
    cert_registry.register_multiple(root_cert)

    # TSA root certificates
    print('>> load TSA root certificates')
    root_cert = load_certs_from_pemder(['cert/swisscom root ca 4.pem', 'cert/swisscom tss ca 4.1.pem','cert/swisscom tss ca 4.1 AT.pem'])
    cert_registry.register_multiple(root_cert)

    mgf_algorithm = MaskGenAlgorithm({
        'algorithm': 'mgf1',
        'parameters': DigestAlgorithm({'algorithm': hash_algorithm}),
    })

    # Create RSASSA-PSS SignedDigestAlgorithm
    signed_digest_algorithm = SignedDigestAlgorithm({
        'algorithm': 'rsassa_pss',
        'parameters': RSASSAPSSParams({
            'hash_algorithm': DigestAlgorithm({'algorithm': hash_algorithm}),
            'mask_gen_algorithm': mgf_algorithm,
            'salt_length': Integer(32),  # Adjust salt length as needed
            'trailer_field': 1,
        }),
    })

    ext_signer = signers.ExternalSigner(signing_cert=signer_cert, cert_registry=cert_registry ,signature_value=sig_value, prefer_pss=True, signature_mechanism=signed_digest_algorithm )

    cms_signed_attrs = cms.CMSAttributes.load(signed_attrs)

    if(x_insert_timestamp is not False):
        # Set up a timestamping client to fetch timestamps tokens
        print('>> insert timestamp.')
        timestamper = SwisscomTimestamper(
            url='https://ais.swisscom.com/AIS-Server/rs/v1.0/sign', https=True, 
        )

        sig_cms = await ext_signer.async_sign_prescribed_attributes(
            hash_algorithm, signed_attrs=cms_signed_attrs,
            timestamper=timestamper
        )

    else:
        print('>> do not insert timestamp.')
        sig_cms = await ext_signer.async_sign_prescribed_attributes(
            hash_algorithm, signed_attrs=cms_signed_attrs
        )

    print('******************************** _create_cms done. **********************************')

    return sig_cms

Probably the revocation information`is missing in the CMS object. But I can't find a way to embed it? Thanks a lot for a hint!