XML-Security / signxml

Python XML Signature and XAdES library
https://xml-security.github.io/signxml/
Apache License 2.0
137 stars 105 forks source link

PKCS#11 support #237

Open kislyuk opened 7 months ago

kislyuk commented 7 months ago

References:

msetina commented 6 months ago

Is there any progress on this? Special signer? Key object classes, that support PKCS11 Keys and certs?

kislyuk commented 6 months ago

There is no progress yet, and I expect this to be a complex change. PRs and support donations are welcome.

Brandhor commented 5 months ago

maybe not the most elegant solution but this worked for me using the PyKCS11 library

import PyKCS11
from lxml import etree
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from signxml.xades import XAdESSigner, XAdESVerifier, XAdESVerifyResult, XAdESSignaturePolicy, XAdESDataObjectFormat
import binascii

class PublicNumbers:
    def __init__(self, n, e):
        self.n = n
        self.e = e

class PublicKey:
    def __init__(self, n, e):
        self.pubnumbers = PublicNumbers(n, e)

    def public_numbers(self):
        return self.pubnumbers

class Key:
    def __init__(self, session, keyid):
        self.session = session
        self.keyid = keyid
        pubkey = self.session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY), (PyKCS11.CKA_ID, self.keyid)])[0]
        modulus = session.getAttributeValue(pubkey, [PyKCS11.CKA_MODULUS])[0]
        m = int(binascii.hexlify(bytearray(modulus)), 16)
        exp = session.getAttributeValue(pubkey, [PyKCS11.CKA_PUBLIC_EXPONENT])[0]
        e = int(binascii.hexlify(bytearray(exp)), 16)
        self.pubkey = PublicKey(m, e)

    def sign(self, data, padding, algorithm):
        privkey = self.session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY), (PyKCS11.CKA_ID, self.keyid)])[0]
        sig = self.session.sign(privkey, data, PyKCS11.Mechanism(PyKCS11.CKM_SHA256_RSA_PKCS))
        return bytes(sig)

    def public_key(self):
        return self.pubkey

class Signer:
    def __init__(self):
        self.pkcs11 = PyKCS11.PyKCS11Lib()
        self.pkcs11.load(pkcs11lib)
        try:
            self.slot = self.get_slot()
            self.session = None

            fp = open("file.xml", "rb")
            r = fp.read()
            fp.close()
            root = etree.fromstring(r)

            keyid, cert = self.get_cert()

            key = Key(self.session, keyid)
            data_object_format = XAdESDataObjectFormat(
                Description="My XAdES signature",
                MimeType="text/xml",
            )
            signed_root = XAdESSigner(signature_policy=None,
                                   claimed_roles=["signer"],
                                   data_object_format=data_object_format,
                                   c14n_algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315").sign(root, key=key, cert=cert)

            with open("signed.xml", "wb") as fp:
                fp.write(etree.tostring(signed_root))

            verifier = XAdESVerifier()
            verify_results = verifier.verify(
                signed_root, x509_cert=cert, expect_references=3, expect_signature_policy=None
            )
        finally:
            self.logout()

    def get_slot(self):
        slots = self.pkcs11.getSlotList(tokenPresent=True)
        return slots[0]

    def get_cert(self):
        self.session = self.pkcs11.openSession(
            self.slot, PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION
        )
        self.session.login(pin)

        pk11objects = self.session.findObjects(
            [(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE)]
        )
        all_attributes = [
            PyKCS11.CKA_VALUE,
            PyKCS11.CKA_ID,
        ]

        for pk11object in pk11objects:
            try:
                attributes = self.session.getAttributeValue(pk11object, all_attributes)
            except PyKCS11.PyKCS11Error as e:
                continue

            attr_dict = dict(list(zip(all_attributes, attributes)))
            cert = bytes(attr_dict[PyKCS11.CKA_VALUE])
            keyid = bytes(attr_dict[PyKCS11.CKA_ID])
            cert = x509.load_der_x509_certificate(cert, backend=default_backend())
            cert = cert.public_bytes(encoding=serialization.Encoding.PEM)
            return keyid, cert

    def logout(self):
        if self.session is not None:
            self.session.logout()
            self.session.closeSession()
            self.session = None   

if __name__ == "__main__":
    Signer()
msetina commented 5 months ago

Why does Key class need a public_key() ? And why does it have to be destilled to e and n? Is there something in signing process that needs this part of interface with data in this form? Is this RSA specific?

Brandhor commented 5 months ago

signxml expects key to be bytes or a RSAPrivateKey instance, in case of bytes it will transform it to a RSAPrivateKey.

then in the _serialize_key_value function it calls the public_key() and public_numbers() methods to get the modulus and exponent, so I created a Key class that kinda emulates RSAPrivateKey and implements the sign() and public_key() methods which are needed by signxml.

and yes my code is RSA specific because if you look again at the _serialize_key_value method there are other methods and values that I have not implemented like parameter_numbers() or x and y

msetina commented 5 months ago

Thank you for the pointer. I see in the _serialize_key_value there are only methods to add info for RSA and ECDSA keys. In PKCS11 IMHO there would need to be serialisation for X509Data (https://www.w3.org/TR/xmldsig-core/#sec-X509Data) instead of KeyValue. You provided the X509 cert, it would be nice to serialze it as X509. Just for starters. You made a great example. I have an EC based card. WIll try to get it work on it. PyKCS11 is missing some ECDSA mechanisms.

msetina commented 5 months ago

I made it sign, but: signxml thinks I signed with rsa-sha256, at least this is what I got in XML. Verification dies, but I know why. A sign method that you overridden is one part. I think we would also need digest method to calculate digest on the card. For both we would need to return mechanism type to signxml.

msetina commented 5 months ago

Digest is a little bigger problem as it is in XMLSignatureProcessor and just uses a Hash. So a little hidden or on the other hand all signers need to be overriden.

msetina commented 5 months ago

Maybe a signer can get a parameter Hasher that would expose digest method. By default it would be implemented with Hash, but if overriden it would calculate digest in our case on the card.

msetina commented 5 months ago

This is how X509Data could be made (it is written with python-pkcs11 and OpenSSL, but...):

X509_crt_element = etree.Element(etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509Data')    )
        has_crt = False
        #Get all certificates
        for co in session.get_objects({Attribute.CLASS: ObjectClass.CERTIFICATE}):                            
            cb4 = base64.b64encode(bytes(co[Attribute.VALUE])).decode('utf-8')                              
            #if certificate has the same label as PK then fill data from certificate
            if co[Attribute.LABEL] == priv.label:                
                X509_data_element = etree.SubElement(key_info_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509Data')    )
                X509_certificate_element = etree.SubElement(X509_data_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509Certificate')  )
                X509_certificate_element.text = cb4
                cert = load_certificate(FILETYPE_ASN1,co[Attribute.VALUE])
                X509_issuer_serial_element = etree.SubElement(X509_data_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509IssuerSerial') )
                X509_issuer_name_element = etree.SubElement(X509_issuer_serial_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509IssuerName')                  )
                X509_issuer_name_element.text =  ", ".join("{:s}={:s}".format(name.decode(), value.decode()) for name, value in cert.get_issuer().get_components())        
                X509_serial_number_element = etree.SubElement(X509_issuer_serial_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509SerialNumber')         )
                X509_serial_number_element.text = str(cert.get_serial_number())
                X509_subject_name_element = etree.SubElement(X509_data_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509SubjectName')    )
                X509_subject_name_element.text = ", ".join("{:s}={:s}".format(name.decode(), value.decode()) for name, value in cert.get_subject().get_components())        
            else:
                #othres add to the certificate chain
                X509_certificate_element = etree.SubElement(X509_crt_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509Certificate')  )
                X509_certificate_element.text = cb4
                has_crt = True
        if has_crt:
            #Add the certificate chain
            key_info_element.append(X509_crt_element)

It has a special feature to not just add a cert to verify, but also a chain if it is present on the card. Also there is an assumption to have a cert with the same label as private key.

msetina commented 5 months ago

This signs and verifies an EC key (a lot of patchwork, sorry):

import PyKCS11
from lxml import etree
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import utils
from signxml.xades import (
    XAdESSigner,
    XAdESVerifier,
    XAdESVerifyResult,
    XAdESSignaturePolicy,
    XAdESDataObjectFormat,
)
from signxml.algorithms import SignatureMethod
import binascii

def decode_ec_point(keylen, data):
    """Decode an octet string into an elliptic curve point"""

    if data == b"\x00":
        return None, None
    elif data.startswith(b"\x04"):
        if len(data) == 2 * keylen + 1:
            return (
                int.from_bytes(data[1 : keylen + 1], "big"),
                int.from_bytes(data[keylen + 1 :], "big"),
            )
        else:
            raise ValueError("Invalid elliptic curve point data length")
    else:
        raise ValueError("Unsupported elliptic curve point type")

class RSAPublicNumbers:
    def __init__(self, n, e):
        self.n = n
        self.e = e

class ECDSAPublicNumbers:
    def __init__(self, x, y):
        self.x = x
        self.y = y

class RSAPublicKey:
    def __init__(self, n, e):
        self.pubnumbers = RSAPublicNumbers(n, e)

    def public_numbers(self):
        return self.pubnumbers

class ECDSAPublicKey:
    def __init__(self, x, y):
        self.pubnumbers = ECDSAPublicNumbers(x, y)

    def public_numbers(self):
        return self.pubnumbers

class ECDSACurve:
    def __init__(self, name):
        self.name = name

class Key:
    def __init__(self, session, keyid):
        self.session = session
        self.keyid = keyid
        self.curve = None
        self.key_size = None
        pubkey = self.session.findObjects(
            [
                (PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY),
                (PyKCS11.CKA_ID, self.keyid),
            ]
        )[0]
        key_type = session.getAttributeValue(pubkey, [PyKCS11.CKA_KEY_TYPE])[0]
        if key_type == 3:
            # temporary
            self.curve = ECDSACurve("secp384r1")
            self.key_size = 384
            ec_point = session.getAttributeValue(
                pubkey, [PyKCS11.CKA_EC_POINT]
            )[0]

            ec_x_y = decode_ec_point(int(98 / 2), bytes(ec_point))
            x = ec_x_y[0]
            y = ec_x_y[1]
            self.pubkey = ECDSAPublicKey(x, y)
        else:
            modulus = session.getAttributeValue(pubkey, [PyKCS11.CKA_MODULUS])[
                0
            ]
            m = int(binascii.hexlify(bytearray(modulus)), 16)
            exp = session.getAttributeValue(
                pubkey, [PyKCS11.CKA_PUBLIC_EXPONENT]
            )[0]
            e = int(binascii.hexlify(bytearray(exp)), 16)
            self.pubkey = RSAPublicKey(m, e)

    def sign(self, data, **argv):
        privkey = self.session.findObjects(
            [
                (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
                (PyKCS11.CKA_ID, self.keyid),
            ]
        )[0]
        sig = self.session.sign(
            privkey, data, PyKCS11.Mechanism(PyKCS11.CKM_ECDSA_SHA256)
        )
        l = len(sig) / 2
        r = bytearray()
        s = bytearray()
        for i in range(len(sig)):
            if i < l:
                r.append(sig[i])
            else:
                s.append(sig[i])
        return utils.encode_dss_signature(
            int(binascii.hexlify(r), 16), int(binascii.hexlify(s), 16)
        )

    def public_key(self):
        return self.pubkey

class Signer:
    def __init__(self):        
        self.pkcs11 = PyKCS11.PyKCS11Lib()
        self.pkcs11.load(pkcs11lib)
        self.session = None
        try:
            self.slot = self.get_slot()

            fp = open("file.xml", "rb")
            r = fp.read()
            fp.close()
            root = etree.fromstring(r)
            # root = etree.Element("root")
            # etree.SubElement(root, "child").text = "Child 1"
            # etree.SubElement(root, "child").text = "Child 2"
            # etree.SubElement(root, "another").text = "Child 3"

            keyid, cert = self.get_cert()

            key = Key(self.session, keyid)
            data_object_format = XAdESDataObjectFormat(
                Description="My XAdES signature",
                MimeType="text/xml",
            )
            signed_root = XAdESSigner(
                signature_policy=None,
                claimed_roles=["signer"],
                data_object_format=data_object_format,
                c14n_algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315",
                signature_algorithm=SignatureMethod.ECDSA_SHA256,
            ).sign(root, key=key, cert=cert)

            with open("signed_signxml.xml", "wb") as fp:
                fp.write(etree.tostring(signed_root))

            verifier = XAdESVerifier()
            verify_results = verifier.verify(
                signed_root,
                x509_cert=cert,
                expect_references=3,
                expect_signature_policy=None,
                ignore_ambiguous_key_info=True,
            )
            for verif in verify_results:
                print(verif.signed_xml)
        finally:
            self.logout()

    def get_slot(self):
        slots = self.pkcs11.getSlotList(tokenPresent=True)
        return slots[0]

    def get_cert(self):
        self.session = self.pkcs11.openSession(
            self.slot, PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION
        )
        self.session.login(pin)

        pk11objects = self.session.findObjects(
            [(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE)]
        )
        all_attributes = [
            PyKCS11.CKA_VALUE,
            PyKCS11.CKA_ID,
        ]

        for pk11object in pk11objects:
            try:
                attributes = self.session.getAttributeValue(
                    pk11object, all_attributes
                )
            except PyKCS11.PyKCS11Error as e:
                continue

            attr_dict = dict(list(zip(all_attributes, attributes)))
            cert = bytes(attr_dict[PyKCS11.CKA_VALUE])
            keyid = bytes(attr_dict[PyKCS11.CKA_ID])
            cert = x509.load_der_x509_certificate(
                cert, backend=default_backend()
            )
            cert = cert.public_bytes(encoding=serialization.Encoding.PEM)
            return keyid, cert

    def logout(self):
        if self.session is not None:
            self.session.logout()
            self.session.closeSession()
            self.session = None

if __name__ == "__main__":
    Signer()
msetina commented 5 months ago

As mentioned before it would be nice to have just X509Data for certificate information. My case at the end needed ignore_ambiguous_key_info=True to verify because there were 2 different information for publc key (public key itself and the X509 certificate) Also it would also be good to use all certificate info on some cards, that have certificate chain written on them.

msetina commented 5 months ago

Here is a prototype:

import PyKCS11
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import utils
from signxml.algorithms import SignatureMethod, DigestAlgorithm
import binascii

# taken from pksc11-tool.c
_ec_curve_info = [
    ("secp192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("prime192v1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("prime192v2", "1.2.840.10045.3.1.2", b"06082a8648ce3d030102", 192),
    ("prime192v3", "1.2.840.10045.3.1.3", b"06082a8648ce3d030103", 192),
    ("nistp192", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("ansiX9p192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("secp224r1", "1.3.132.0.33", b"06052b81040021", 224),
    ("nistp224", "1.3.132.0.33", b"06052b81040021", 224),
    ("prime256v1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
    ("secp256r1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
    ("ansiX9p256r1", "1.2.840.10045.3.1.7", b"06082b8648ce3d030107", 256),
    ("frp256v1", "1.2.250.1.223.101.256.1", b"060a2a817a01815f65820001", 256),
    ("secp384r1", "1.3.132.0.34", b"06052b81040022", 384),
    # ("prime384v1", "1.3.132.0.34", b"06052b81040022", 384),
    # ("ansiX9p384r1", "1.3.132.0.34", b"06052b81040022", 384),
    ("prime521v1", "1.3.132.0.35", b"06052b81040023", 521),
    ("secp521r1", "1.3.132.0.35", b"06052b81040023", 521),
    ("nistp521", "1.3.132.0.35", b"06052b81040023", 521),
    ("brainpoolP192r1", "1.3.36.3.3.2.8.1.1.3", b"06092b2403030208010103", 192),
    ("brainpoolP224r1", "1.3.36.3.3.2.8.1.1.5", b"06092b2403030208010105", 224),
    ("brainpoolP256r1", "1.3.36.3.3.2.8.1.1.7", b"06092b2403030208010107", 256),
    ("brainpoolP320r1", "1.3.36.3.3.2.8.1.1.9", b"06092b2403030208010109", 320),
    (
        "brainpoolP384r1",
        "1.3.36.3.3.2.8.1.1.11",
        b"06092b240303020801010b",
        384,
    ),
    (
        "brainpoolP512r1",
        "1.3.36.3.3.2.8.1.1.13",
        b"06092b240303020801010d",
        512,
    ),
    ("secp192k1", "1.3.132.0.31", b"06052b8104001f", 192),
    ("secp256k1", "1.3.132.0.10", b"06052b8104000a", 256),
    ("secp521k1", "1.3.132.0.35", b"06052b81040023", 521),
    (
        "edwards25519",
        "1.3.6.1.4.1159.15.1",
        "130c656477617264733235353139",
        255,
    ),
    ("curve25519", "1.3.6.1.4.3029.1.5.1", b"130b63757276653235353139", 255),
]

def _translate_to_ec_key_data(ec_params):
    for l in _ec_curve_info:
        if ec_params == l[2]:
            return l
    return None

# Decode EC point x and y for public key description
def decode_ec_point(keylen, data):
    """Decode an octet string into an elliptic curve point"""

    if data == b"\x00":
        return None, None
    elif data.startswith(b"\x04"):
        if len(data) == 2 * keylen + 1:
            return (
                int.from_bytes(data[1 : keylen + 1], "big"),
                int.from_bytes(data[keylen + 1 :], "big"),
            )
        else:
            raise ValueError("Invalid elliptic curve point data length")
    else:
        raise ValueError("Unsupported elliptic curve point type")

# ECDSA keys come fro the card RS encoded, for transformation we need separate r and s
def decode_RS_signature(data):
    l = len(data) / 2
    r = bytearray()
    s = bytearray()
    for i in range(len(data)):
        if i < l:
            r.append(data[i])
        else:
            s.append(data[i])
    return r, s

# public key information presentation for signxml

class RSAPublicNumbers:
    def __init__(self, n, e):
        self.n = n
        self.e = e

class ECDSAPublicNumbers:
    def __init__(self, x, y):
        self.x = x
        self.y = y

class RSAPublicKey:
    def __init__(self, n, e):
        self.pubnumbers = RSAPublicNumbers(n, e)

    def public_numbers(self):
        return self.pubnumbers

class ECDSAPublicKey:
    def __init__(self, x, y):
        self.pubnumbers = ECDSAPublicNumbers(x, y)

    def public_numbers(self):
        return self.pubnumbers

class ECDSACurve:
    def __init__(self, name):
        self.name = name

# Token representation for use in signing
# signxml can take it as a key as it implements sign and public key presentation
# in addition it loads certificate data in form of seigning certificate or signing certificate plus the CA chain
class PKCS11Token:
    def __init__(self):
        # session for interacton with the card
        self._session = None
        # id of key read from private key
        self._keyid = None
        # type of key
        self._key_type = None
        # private key reference
        self._private_key = None
        # public key reference
        self._pubkey = None
        # signing certificate
        self._certificate = None
        # certificate chain containing signing certificate and its ca chain
        self._ca_chain = None
        # operations supported by the card
        # they are separated in operation groups (DIGEST,VERIFY,SIGN,ENCRYPT,DECRYPT)
        self._operations = None
        # does user need to be logged in to use session
        self._login_required = False

        # public key data for EC key required by signxml public key serialisation
        self.curve = None
        self.key_size = None

    # get private key reference and get key type and keyid for it
    def _get_private_key(self, session):
        if session is not None:
            self._private_key = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
                ]
            )[0]
            attrs = session.getAttributeValue(
                self._private_key, [PyKCS11.CKA_KEY_TYPE, PyKCS11.CKA_ID]
            )
            self._key_type = attrs[0]
            self._keyid = bytes(attrs[1])

    def _get_public_key(self, session):
        if session is not None and self._key_type is not None:
            # public key. This is only needes as long as Signer demands data of public key
            # If Signer would just take cert and serialize it as X509Data, then this is not needed
            pubkey = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY),
                    (PyKCS11.CKA_ID, self._keyid),
                ]
            )[0]
            if self._key_type == PyKCS11.CKK_ECDSA:  # EC key
                ec_attrs = session.getAttributeValue(
                    pubkey,
                    [
                        PyKCS11.CKA_EC_POINT,
                        PyKCS11.CKA_EC_PARAMS,
                    ],
                )
                ec_point = ec_attrs[0]

                curve_object = _translate_to_ec_key_data(
                    binascii.hexlify(bytearray(ec_attrs[1]))
                )
                if curve_object != None:
                    self.curve = ECDSACurve(curve_object[0])
                    self.key_size = curve_object[3]

                    ec_x_y = decode_ec_point(
                        int((self.key_size / 8) + 1), bytes(ec_point)
                    )
                    x = ec_x_y[0]
                    y = ec_x_y[1]
                    self._pubkey = ECDSAPublicKey(x, y)
            else:
                # PyKCS11.CKK_RSA
                modulus = session.getAttributeValue(
                    pubkey, [PyKCS11.CKA_MODULUS]
                )[0]
                m = int(binascii.hexlify(bytearray(modulus)), 16)
                exp = session.getAttributeValue(
                    pubkey, [PyKCS11.CKA_PUBLIC_EXPONENT]
                )[0]
                e = int(binascii.hexlify(bytearray(exp)), 16)
                self._pubkey = RSAPublicKey(m, e)

    def _get_cert(self, session):
        if session is not None:
            pk11objects = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
                    (PyKCS11.CKA_ID, self._keyid),
                ]
            )
            all_attributes = [
                PyKCS11.CKA_VALUE,
            ]

            for pk11object in pk11objects:
                try:
                    attributes = session.getAttributeValue(
                        pk11object, all_attributes
                    )
                except PyKCS11.PyKCS11Error as e:
                    continue

                attr_dict = dict(list(zip(all_attributes, attributes)))
                cert = bytes(attr_dict[PyKCS11.CKA_VALUE])
                cert = x509.load_der_x509_certificate(
                    cert, backend=default_backend()
                )
                self._certificate = cert.public_bytes(
                    encoding=serialization.Encoding.PEM
                )

    def _get_cert_with_ca_chain(self, session):
        if session is not None:
            pk11objects = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
                ]
            )
            ca_chain = []
            for pk11object in pk11objects:
                try:
                    attributes = session.getAttributeValue(
                        pk11object, [PyKCS11.CKA_VALUE]
                    )
                except PyKCS11.PyKCS11Error as e:
                    continue

                cert = bytes(attributes[0])
                cert = x509.load_der_x509_certificate(
                    cert, backend=default_backend()
                )
                ca_chain.append(
                    cert.public_bytes(encoding=serialization.Encoding.PEM)
                )
            self._ca_chain = b"".join(ca_chain)

    # Open session with the card
    # Uses pin if needed, reads permited operations(mechanisms)
    # Loads private key reference, prepares public key info and
    # Loads signing certificate and its ca chain if present
    def open_session(self, pksc11_lib, token_label, pin):
        library = PyKCS11.PyKCS11Lib()
        library.load(pksc11_lib)
        slots = library.getSlotList(tokenPresent=True)
        slot = None
        self._login_required = False
        for idx, sl in enumerate(slots):
            ti = library.getTokenInfo(idx)
            if ti.flags & PyKCS11.CKF_LOGIN_REQUIRED != 0:
                self._login_required = True
            if token_label is None:
                slot = sl
            if ti.label.strip() == token_label:
                slot = sl
                break
        if slot is not None:
            self._session = library.openSession(
                slot, PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION
            )
            if self._session is not None:
                if self._login_required:
                    self._session.login(pin)
                mechanisms = library.getMechanismList(slot)
                self._operations = {}
                for m in mechanisms:
                    mi = library.getMechanismInfo(0, m)
                    meh = str(m).replace("CKM_", "")
                    for mf in mi.flags_dict:
                        if mi.flags & mf != 0:
                            op = mi.flags_dict[mf].replace("CKF_", "")
                            mm = None
                            try:
                                if op == "DIGEST":
                                    mm = DigestAlgorithm[meh]
                                if op in ["SIGN", "VERIFY"]:
                                    mm = SignatureMethod[meh]
                            except Exception as e:
                                pass
                            if mm:
                                if op in self._operations:
                                    self._operations[op][mm] = PyKCS11.CKM[m]
                                else:
                                    self._operations[op] = {mm: PyKCS11.CKM[m]}
                # it is an assumption that each token has one private key and multiple public keys and multiple certificates
                self._get_private_key(self._session)
                self._get_public_key(self._session)
                self._get_cert(self._session)
                self._get_cert_with_ca_chain(self._session)
        return self._session is not None

    # signxml interface

    # sign data on the card using provided signature_method:SignatureMethod
    def sign(self, data, **argv):
        if (
            self._session is not None
            and self._private_key is not None
            and self._key_type is not None
        ):
            PK_me = None
            if "signature_algorithm" in argv:
                o = argv["signature_algorithm"]
                # Call to sign should propagate SignatureMethod
                # Next row need to be removed after integration
                o = SignatureMethod.ECDSA_SHA256
                if o in self._operations["SIGN"]:
                    mech = self._operations["SIGN"][o]
                    PK_me = PyKCS11.Mechanism(mech)
            if PK_me is None:
                sig = self._session.sign(self._private_key, data)
            else:
                sig = self._session.sign(self._private_key, data, PK_me)

            if self._key_type in [
                PyKCS11.CKK_ECDSA,
                PyKCS11.CKK_DSA,
            ]:
                # sign on the other side does the folowing
                # # Note: The output of the DSA and ECDSA signers is a DER-encoded ASN.1 sequence of two DER integers.
                # (r, s) = utils.decode_dss_signature(signature)
                # int_len = bits_to_bytes_unit(signing_settings.key.key_size)
                # signature = long_to_bytes(r, blocksize=int_len) + long_to_bytes(s, blocksize=int_len)
                # Next rows need to be removed after integration
                r, s = decode_RS_signature(sig)
                return utils.encode_dss_signature(
                    int(binascii.hexlify(r), 16), int(binascii.hexlify(s), 16)
                )
            else:
                # PyKCS11.CKK_RSA
                return bytes(sig)
        else:
            return None

    # Calculate digest with provided algorithm
    # return None if algorithm is not supported
    # with modified signxml signing procedure the same token can calculate a digest if algorithm is supported
    def get_digest(self, data, algorithm: DigestAlgorithm):
        ret = None
        if self._session is not None:
            if algorithm in self._operations["DIGEST"]:
                mech = self._operations["DIGEST"][algorithm]
                PK_me = PyKCS11.Mechanism(mech)
                ret = self._session.digest(data, PK_me)
        return ret

    def public_key(self):
        return self._pubkey

    def certificate(self):
        return self._certificate

    def certificate_with_ca_chain(self):
        return self._ca_chain

    # Closeing work on an open session
    def close(self):
        if self._session is not None:
            if self._login_required:
                self._session.logout()
            self._session.closeSession()
            self._session = None

I took @Brandhor example, but I hope I did not break something. save it to file named PKCS11_token.py so that then this works:

from PKCS11_token import PKCS11Token
from lxml import etree
from signxml.algorithms import SignatureMethod
from signxml.xades import (
    XAdESSigner,
    XAdESVerifier,
    XAdESDataObjectFormat,
)

class Signer:
    def __init__(self):
        fp = open("file.xml", "rb")
        r = fp.read()
        fp.close()
        root = etree.fromstring(r)
        # root = etree.Element("root")
        # etree.SubElement(root, "child").text = "Child 1"
        # etree.SubElement(root, "child").text = "Child 2"
        # etree.SubElement(root, "another").text = "Child 3"
        token = None
        try:
            token = PKCS11Token()
            if token.open_session(pkcs11lib, token_label, pin):

                data_object_format = XAdESDataObjectFormat(
                    Description="My XAdES signature",
                    MimeType="text/xml",
                )
                signed_root = XAdESSigner(
                    signature_policy=None,
                    claimed_roles=["signer"],
                    data_object_format=data_object_format,
                    c14n_algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315",
                    signature_algorithm=SignatureMethod.ECDSA_SHA256,
                ).sign(root, key=token, cert=token.certificate_with_ca_chain())

                with open("signed_signxml.xml", "wb") as fp:
                    fp.write(etree.tostring(signed_root))

                verifier = XAdESVerifier()
                verify_results = verifier.verify(
                    signed_root,
                    x509_cert=token.certificate(),
                    expect_references=3,
                    expect_signature_policy=None,
                    ignore_ambiguous_key_info=True,
                )
                for verif in verify_results:
                    print(verif.signed_xml)
        finally:
            if token is not None:
                token.close()

if __name__ == "__main__":
    Signer() 

In EC key case it needs ignore_ambiguous_key_info=True when verifying because signer writes KeyValue and X509Cerificate. Also prototype for token also includes login check and operation check. In integration there needs to be a seprate path for sign as it would be nice to get signature_algorithm from caller (now it get a class from cryptography) and when signture is returned I think Token does not need to do encoding, so that then signer decodes it.

msetina commented 5 months ago

If signxml can write just signing certificate without public key (KeyValue) it also cleans up the interface and removes the need to get public key as it is a subset of certificate. I found out that this happens when you add always_add_key_value=False,to call to sign. We still need to get key_size from private key for recoding the signature (And there is an ointment for that ;-) ) Then the prototype can be like this:

import PyKCS11
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import utils
from signxml.algorithms import SignatureMethod, DigestAlgorithm
import binascii

# taken from pksc11-tool.c
_ec_curve_info = [
    ("secp192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("prime192v1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("prime192v2", "1.2.840.10045.3.1.2", b"06082a8648ce3d030102", 192),
    ("prime192v3", "1.2.840.10045.3.1.3", b"06082a8648ce3d030103", 192),
    ("nistp192", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("ansiX9p192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("secp224r1", "1.3.132.0.33", b"06052b81040021", 224),
    ("nistp224", "1.3.132.0.33", b"06052b81040021", 224),
    ("prime256v1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
    ("secp256r1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
    ("ansiX9p256r1", "1.2.840.10045.3.1.7", b"06082b8648ce3d030107", 256),
    ("frp256v1", "1.2.250.1.223.101.256.1", b"060a2a817a01815f65820001", 256),
    ("secp384r1", "1.3.132.0.34", b"06052b81040022", 384),
    # ("prime384v1", "1.3.132.0.34", b"06052b81040022", 384),
    # ("ansiX9p384r1", "1.3.132.0.34", b"06052b81040022", 384),
    ("prime521v1", "1.3.132.0.35", b"06052b81040023", 521),
    ("secp521r1", "1.3.132.0.35", b"06052b81040023", 521),
    ("nistp521", "1.3.132.0.35", b"06052b81040023", 521),
    ("brainpoolP192r1", "1.3.36.3.3.2.8.1.1.3", b"06092b2403030208010103", 192),
    ("brainpoolP224r1", "1.3.36.3.3.2.8.1.1.5", b"06092b2403030208010105", 224),
    ("brainpoolP256r1", "1.3.36.3.3.2.8.1.1.7", b"06092b2403030208010107", 256),
    ("brainpoolP320r1", "1.3.36.3.3.2.8.1.1.9", b"06092b2403030208010109", 320),
    (
        "brainpoolP384r1",
        "1.3.36.3.3.2.8.1.1.11",
        b"06092b240303020801010b",
        384,
    ),
    (
        "brainpoolP512r1",
        "1.3.36.3.3.2.8.1.1.13",
        b"06092b240303020801010d",
        512,
    ),
    ("secp192k1", "1.3.132.0.31", b"06052b8104001f", 192),
    ("secp256k1", "1.3.132.0.10", b"06052b8104000a", 256),
    ("secp521k1", "1.3.132.0.35", b"06052b81040023", 521),
    (
        "edwards25519",
        "1.3.6.1.4.1159.15.1",
        "130c656477617264733235353139",
        255,
    ),
    ("curve25519", "1.3.6.1.4.3029.1.5.1", b"130b63757276653235353139", 255),
]

def _translate_to_ec_key_data(ec_params):
    for l in _ec_curve_info:
        if ec_params == l[2]:
            return l
    return None

# ECDSA keys come fro the card RS encoded, for transformation we need separate r and s
def decode_RS_signature(data):
    l = len(data) / 2
    r = bytearray()
    s = bytearray()
    for i in range(len(data)):
        if i < l:
            r.append(data[i])
        else:
            s.append(data[i])
    return r, s

# Token representation for use in signing
# signxml can take it as a key as it implements sign and public key presentation
# in addition it loads certificate data in form of seigning certificate or signing certificate plus the CA chain
class PKCS11Token:
    def __init__(self):
        # session for interacton with the card
        self._session = None
        # id of key read from private key
        self._keyid = None
        # type of key
        self._key_type = None
        # private key reference
        self._private_key = None
        # signing certificate
        self._certificate = None
        # certificate chain containing signing certificate and its ca chain
        self._ca_chain = None
        # operations supported by the card
        # they are separated in operation groups (DIGEST,VERIFY,SIGN,ENCRYPT,DECRYPT)
        self._operations = None
        # does user need to be logged in to use session
        self._login_required = False

        # public key data for EC key required by signxml public key serialisation
        # and for signature recoding (i think it can be sidestepped by proper signature encoding)
        self.key_size = None

    # get private key reference and get key type and keyid for it
    def _get_private_key(self, session):
        if session is not None:
            self._private_key = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
                ]
            )[0]
            attrs = session.getAttributeValue(
                self._private_key,
                [PyKCS11.CKA_KEY_TYPE, PyKCS11.CKA_ID, PyKCS11.CKA_EC_PARAMS],
            )
            self._key_type = attrs[0]
            self._keyid = bytes(attrs[1])
            curve_object = _translate_to_ec_key_data(
                binascii.hexlify(bytearray(attrs[2]))
            )
            if curve_object != None:
                self.key_size = curve_object[3]

    def _get_cert(self, session):
        if session is not None:
            pk11objects = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
                    (PyKCS11.CKA_ID, self._keyid),
                ]
            )
            all_attributes = [
                PyKCS11.CKA_VALUE,
            ]

            for pk11object in pk11objects:
                try:
                    attributes = session.getAttributeValue(
                        pk11object, all_attributes
                    )
                except PyKCS11.PyKCS11Error as e:
                    continue

                attr_dict = dict(list(zip(all_attributes, attributes)))
                cert = bytes(attr_dict[PyKCS11.CKA_VALUE])
                cert = x509.load_der_x509_certificate(
                    cert, backend=default_backend()
                )
                self._certificate = cert.public_bytes(
                    encoding=serialization.Encoding.PEM
                )

    def _get_cert_with_ca_chain(self, session):
        if session is not None:
            pk11objects = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
                ]
            )
            ca_chain = []
            for pk11object in pk11objects:
                try:
                    attributes = session.getAttributeValue(
                        pk11object, [PyKCS11.CKA_VALUE]
                    )
                except PyKCS11.PyKCS11Error as e:
                    continue

                cert = bytes(attributes[0])
                cert = x509.load_der_x509_certificate(
                    cert, backend=default_backend()
                )
                ca_chain.append(
                    cert.public_bytes(encoding=serialization.Encoding.PEM)
                )
            self._ca_chain = b"".join(ca_chain)

    # Open session with the card
    # Uses pin if needed, reads permited operations(mechanisms)
    # Loads private key reference, prepares public key info and
    # Loads signing certificate and its ca chain if present
    def open_session(self, pksc11_lib, token_label, pin):
        library = PyKCS11.PyKCS11Lib()
        library.load(pksc11_lib)
        slots = library.getSlotList(tokenPresent=True)
        slot = None
        self._login_required = False
        for idx, sl in enumerate(slots):
            ti = library.getTokenInfo(idx)
            if ti.flags & PyKCS11.CKF_LOGIN_REQUIRED != 0:
                self._login_required = True
            if token_label is None:
                slot = sl
            if ti.label.strip() == token_label:
                slot = sl
                break
        if slot is not None:
            self._session = library.openSession(
                slot, PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION
            )
            if self._session is not None:
                if self._login_required:
                    self._session.login(pin)
                mechanisms = library.getMechanismList(slot)
                self._operations = {}
                for m in mechanisms:
                    mi = library.getMechanismInfo(0, m)
                    meh = str(m).replace("CKM_", "")
                    for mf in mi.flags_dict:
                        if mi.flags & mf != 0:
                            op = mi.flags_dict[mf].replace("CKF_", "")
                            mm = None
                            try:
                                if op == "DIGEST":
                                    mm = DigestAlgorithm[meh]
                                if op in ["SIGN", "VERIFY"]:
                                    mm = SignatureMethod[meh]
                            except Exception as e:
                                pass
                            if mm:
                                if op in self._operations:
                                    self._operations[op][mm] = PyKCS11.CKM[m]
                                else:
                                    self._operations[op] = {mm: PyKCS11.CKM[m]}
                # it is an assumption that each token has one private key and multiple public keys and multiple certificates
                self._get_private_key(self._session)
                self._get_cert(self._session)
                self._get_cert_with_ca_chain(self._session)
        return self._session is not None

    # signxml interface

    # sign data on the card using provided signature_method:SignatureMethod
    def sign(self, data, **argv):
        if (
            self._session is not None
            and self._private_key is not None
            and self._key_type is not None
        ):
            PK_me = None
            if "signature_algorithm" in argv:
                o = argv["signature_algorithm"]
                # Call to sign should propagate SignatureMethod
                # Next row need to be removed after integration
                o = SignatureMethod.ECDSA_SHA256
                if o in self._operations["SIGN"]:
                    mech = self._operations["SIGN"][o]
                    PK_me = PyKCS11.Mechanism(mech)
            if PK_me is None:
                sig = self._session.sign(self._private_key, data)
            else:
                sig = self._session.sign(self._private_key, data, PK_me)

            if self._key_type in [
                PyKCS11.CKK_ECDSA,
                PyKCS11.CKK_DSA,
            ]:
                # sign on the other side does the folowing
                # # Note: The output of the DSA and ECDSA signers is a DER-encoded ASN.1 sequence of two DER integers.
                # (r, s) = utils.decode_dss_signature(signature)
                # int_len = bits_to_bytes_unit(signing_settings.key.key_size)
                # signature = long_to_bytes(r, blocksize=int_len) + long_to_bytes(s, blocksize=int_len)
                # Next rows need to be removed after integration
                r, s = decode_RS_signature(sig)
                return utils.encode_dss_signature(
                    int(binascii.hexlify(r), 16), int(binascii.hexlify(s), 16)
                )
            else:
                # PyKCS11.CKK_RSA
                return bytes(sig)
        else:
            return None

    # Calculate digest with provided algorithm
    # return None if algorithm is not supported
    # with modified signxml signing procedure the same token can calculate a digest if algorithm is supported
    def get_digest(self, data, algorithm: DigestAlgorithm):
        ret = None
        if self._session is not None:
            if algorithm in self._operations["DIGEST"]:
                mech = self._operations["DIGEST"][algorithm]
                PK_me = PyKCS11.Mechanism(mech)
                ret = self._session.digest(data, PK_me)
        return ret

    def certificate(self):
        return self._certificate

    def certificate_with_ca_chain(self):
        return self._ca_chain

    # Closeing work on an open session
    def close(self):
        if self._session is not None:
            if self._login_required:
                self._session.logout()
            self._session.closeSession()
            self._session = None

There is still room for improvement, but it is a start. Thank you @Brandhor for the kick in the right direction. Using EC key woken some bugs that are getting fixed and maybe there will be a solution in the future.

msetina commented 5 months ago

RSA names from the card are special, so I made a translation to produce currect mechanism. Translation equals what is in the @Brandhor example.

import PyKCS11
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import utils
from signxml.algorithms import SignatureMethod, DigestAlgorithm
import binascii

# translate from mechanism reported by the card to SignatureMethod names (If it realy is tha same only time will tell)
_pkcs_mech_names = {
    "RSA_PKCS": "RSA",
    "SHA1_RSA_PKCS": "RSA_SHA1",
    "SHA224_RSA_PKCS": "RSA_SHA224",
    "SHA256_RSA_PKCS": "RSA_SHA256",
    "SHA384_RSA_PKCS": "RSA_SHA384",
    "SHA512_RSA_PKCS": "RSA_SHA512",
    "MD5_RSA_PKCS": "RSA_MD5",
    "RIPEMD160_RSA_PKCS": "RSA_RIPEMD160",
}

# taken from pksc11-tool.c
_ec_curve_info = [
    ("secp192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("prime192v1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("prime192v2", "1.2.840.10045.3.1.2", b"06082a8648ce3d030102", 192),
    ("prime192v3", "1.2.840.10045.3.1.3", b"06082a8648ce3d030103", 192),
    ("nistp192", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("ansiX9p192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
    ("secp224r1", "1.3.132.0.33", b"06052b81040021", 224),
    ("nistp224", "1.3.132.0.33", b"06052b81040021", 224),
    ("prime256v1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
    ("secp256r1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
    ("ansiX9p256r1", "1.2.840.10045.3.1.7", b"06082b8648ce3d030107", 256),
    ("frp256v1", "1.2.250.1.223.101.256.1", b"060a2a817a01815f65820001", 256),
    ("secp384r1", "1.3.132.0.34", b"06052b81040022", 384),
    # ("prime384v1", "1.3.132.0.34", b"06052b81040022", 384),
    # ("ansiX9p384r1", "1.3.132.0.34", b"06052b81040022", 384),
    ("prime521v1", "1.3.132.0.35", b"06052b81040023", 521),
    ("secp521r1", "1.3.132.0.35", b"06052b81040023", 521),
    ("nistp521", "1.3.132.0.35", b"06052b81040023", 521),
    ("brainpoolP192r1", "1.3.36.3.3.2.8.1.1.3", b"06092b2403030208010103", 192),
    ("brainpoolP224r1", "1.3.36.3.3.2.8.1.1.5", b"06092b2403030208010105", 224),
    ("brainpoolP256r1", "1.3.36.3.3.2.8.1.1.7", b"06092b2403030208010107", 256),
    ("brainpoolP320r1", "1.3.36.3.3.2.8.1.1.9", b"06092b2403030208010109", 320),
    (
        "brainpoolP384r1",
        "1.3.36.3.3.2.8.1.1.11",
        b"06092b240303020801010b",
        384,
    ),
    (
        "brainpoolP512r1",
        "1.3.36.3.3.2.8.1.1.13",
        b"06092b240303020801010d",
        512,
    ),
    ("secp192k1", "1.3.132.0.31", b"06052b8104001f", 192),
    ("secp256k1", "1.3.132.0.10", b"06052b8104000a", 256),
    ("secp521k1", "1.3.132.0.35", b"06052b81040023", 521),
    (
        "edwards25519",
        "1.3.6.1.4.1159.15.1",
        "130c656477617264733235353139",
        255,
    ),
    ("curve25519", "1.3.6.1.4.3029.1.5.1", b"130b63757276653235353139", 255),
]

def _translate_to_ec_key_data(ec_params):
    for l in _ec_curve_info:
        if ec_params == l[2]:
            return l
    return None

# Decode EC point x and y for public key description
def decode_ec_point(keylen, data):
    """Decode an octet string into an elliptic curve point"""

    if data == b"\x00":
        return None, None
    elif data.startswith(b"\x04"):
        if len(data) == 2 * keylen + 1:
            return (
                int.from_bytes(data[1 : keylen + 1], "big"),
                int.from_bytes(data[keylen + 1 :], "big"),
            )
        else:
            raise ValueError("Invalid elliptic curve point data length")
    else:
        raise ValueError("Unsupported elliptic curve point type")

# ECDSA keys come fro the card RS encoded, for transformation we need separate r and s
def decode_RS_signature(data):
    l = len(data) / 2
    r = bytearray()
    s = bytearray()
    for i in range(len(data)):
        if i < l:
            r.append(data[i])
        else:
            s.append(data[i])
    return r, s

# public key information presentation for signxml

class RSAPublicNumbers:
    def __init__(self, n, e):
        self.n = n
        self.e = e

class ECDSAPublicNumbers:
    def __init__(self, x, y):
        self.x = x
        self.y = y

class RSAPublicKey:
    def __init__(self, n, e):
        self.pubnumbers = RSAPublicNumbers(n, e)

    def public_numbers(self):
        return self.pubnumbers

class ECDSAPublicKey:
    def __init__(self, x, y):
        self.pubnumbers = ECDSAPublicNumbers(x, y)

    def public_numbers(self):
        return self.pubnumbers

class ECDSACurve:
    def __init__(self, name):
        self.name = name

# Token representation for use in signing
# signxml can take it as a key as it implements sign and public key presentation
# in addition it loads certificate data in form of seigning certificate or signing certificate plus the CA chain
class PKCS11Token:
    def __init__(self):
        # session for interacton with the card
        self._session = None
        # id of key read from private key
        self._keyid = None
        # type of key
        self._key_type = None
        # private key reference
        self._private_key = None
        # public key reference
        self._pubkey = None
        # signing certificate
        self._certificate = None
        # certificate chain containing signing certificate and its ca chain
        self._ca_chain = None
        # operations supported by the card
        # they are separated in operation groups (DIGEST,VERIFY,SIGN,ENCRYPT,DECRYPT)
        self._operations = None
        # does user need to be logged in to use session
        self._login_required = False

        # public key data for EC key required by signxml public key serialisation
        self.curve = None
        self.key_size = None

    # get private key reference and get key type and keyid for it
    def _get_private_key(self, session):
        if session is not None:
            self._private_key = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
                ]
            )[0]
            attrs = session.getAttributeValue(
                self._private_key, [PyKCS11.CKA_KEY_TYPE, PyKCS11.CKA_ID]
            )
            self._key_type = attrs[0]
            self._keyid = bytes(attrs[1])

    def _get_public_key(self, session):
        if session is not None and self._key_type is not None:
            # public key. This is only needes as long as Signer demands data of public key
            # If Signer would just take cert and serialize it as X509Data, then this is not needed
            pubkey = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY),
                    (PyKCS11.CKA_ID, self._keyid),
                ]
            )[0]
            if self._key_type == PyKCS11.CKK_ECDSA:  # EC key
                ec_attrs = session.getAttributeValue(
                    pubkey,
                    [
                        PyKCS11.CKA_EC_POINT,
                        PyKCS11.CKA_EC_PARAMS,
                    ],
                )
                ec_point = ec_attrs[0]

                curve_object = _translate_to_ec_key_data(
                    binascii.hexlify(bytearray(ec_attrs[1]))
                )
                if curve_object != None:
                    self.curve = ECDSACurve(curve_object[0])
                    self.key_size = curve_object[3]

                    ec_x_y = decode_ec_point(
                        int((self.key_size / 8) + 1), bytes(ec_point)
                    )
                    x = ec_x_y[0]
                    y = ec_x_y[1]
                    self._pubkey = ECDSAPublicKey(x, y)
            else:
                # PyKCS11.CKK_RSA
                modulus = session.getAttributeValue(
                    pubkey, [PyKCS11.CKA_MODULUS]
                )[0]
                m = int(binascii.hexlify(bytearray(modulus)), 16)
                exp = session.getAttributeValue(
                    pubkey, [PyKCS11.CKA_PUBLIC_EXPONENT]
                )[0]
                e = int(binascii.hexlify(bytearray(exp)), 16)
                self._pubkey = RSAPublicKey(m, e)

    def _get_cert(self, session):
        if session is not None:
            pk11objects = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
                    (PyKCS11.CKA_ID, self._keyid),
                ]
            )
            all_attributes = [
                PyKCS11.CKA_VALUE,
            ]

            for pk11object in pk11objects:
                try:
                    attributes = session.getAttributeValue(
                        pk11object, all_attributes
                    )
                except PyKCS11.PyKCS11Error as e:
                    continue

                attr_dict = dict(list(zip(all_attributes, attributes)))
                cert = bytes(attr_dict[PyKCS11.CKA_VALUE])
                cert = x509.load_der_x509_certificate(
                    cert, backend=default_backend()
                )
                self._certificate = cert.public_bytes(
                    encoding=serialization.Encoding.PEM
                )

    def _get_cert_with_ca_chain(self, session):
        if session is not None:
            pk11objects = session.findObjects(
                [
                    (PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
                ]
            )
            ca_chain = []
            for pk11object in pk11objects:
                try:
                    attributes = session.getAttributeValue(
                        pk11object, [PyKCS11.CKA_VALUE]
                    )
                except PyKCS11.PyKCS11Error as e:
                    continue

                cert = bytes(attributes[0])
                cert = x509.load_der_x509_certificate(
                    cert, backend=default_backend()
                )
                ca_chain.append(
                    cert.public_bytes(encoding=serialization.Encoding.PEM)
                )
            self._ca_chain = b"".join(ca_chain)

    # Open session with the card
    # Uses pin if needed, reads permited operations(mechanisms)
    # Loads private key reference, prepares public key info and
    # Loads signing certificate and its ca chain if present
    def open_session(self, pksc11_lib, token_label, pin):
        library = PyKCS11.PyKCS11Lib()
        library.load(pksc11_lib)
        slots = library.getSlotList(tokenPresent=True)
        slot = None
        self._login_required = False
        for idx, sl in enumerate(slots):
            ti = library.getTokenInfo(idx)
            if ti.flags & PyKCS11.CKF_LOGIN_REQUIRED != 0:
                self._login_required = True
            if token_label is None:
                slot = sl
            if ti.label.strip() == token_label:
                slot = sl
                break
        if slot is not None:
            self._session = library.openSession(
                slot, PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION
            )
            if self._session is not None:
                if self._login_required:
                    self._session.login(pin)
                mechanisms = library.getMechanismList(slot)
                self._operations = {}
                for m in mechanisms:
                    mi = library.getMechanismInfo(0, m)
                    meh = str(m).replace("CKM_", "")
                    meh = _pkcs_mech_names.get(meh, meh)
                    for mf in mi.flags_dict:
                        if mi.flags & mf != 0:
                            op = mi.flags_dict[mf].replace("CKF_", "")
                            mm = None
                            try:
                                if op == "DIGEST":
                                    mm = DigestAlgorithm[meh]
                                if op in ["SIGN", "VERIFY"]:
                                    mm = SignatureMethod[meh]
                            except Exception as e:
                                pass
                            if mm:
                                if op in self._operations:
                                    self._operations[op][mm] = PyKCS11.CKM[m]
                                else:
                                    self._operations[op] = {mm: PyKCS11.CKM[m]}
                # it is an assumption that each token has one private key and multiple public keys and multiple certificates
                self._get_private_key(self._session)
                self._get_public_key(self._session)
                self._get_cert(self._session)
                self._get_cert_with_ca_chain(self._session)
        return self._session is not None

    # signxml interface

    # sign data on the card using provided signature_method:SignatureMethod
    def sign(self, data, **argv):
        if (
            self._session is not None
            and self._private_key is not None
            and self._key_type is not None
        ):
            PK_me = None
            if "signature_algorithm" in argv:
                o = argv["signature_algorithm"]
                # Call to sign should propagate SignatureMethod
                # Next row need to be removed after integration
                o = SignatureMethod.ECDSA_SHA256
            else:
                o = SignatureMethod.RSA_SHA256
            if o in self._operations["SIGN"]:
                mech = self._operations["SIGN"][o]
                PK_me = PyKCS11.Mechanism(mech)
            if PK_me is None:
                sig = self._session.sign(self._private_key, data)
            else:
                sig = self._session.sign(self._private_key, data, PK_me)

            if self._key_type in [
                PyKCS11.CKK_ECDSA,
                PyKCS11.CKK_DSA,
            ]:
                # sign on the other side does the folowing
                # # Note: The output of the DSA and ECDSA signers is a DER-encoded ASN.1 sequence of two DER integers.
                # (r, s) = utils.decode_dss_signature(signature)
                # int_len = bits_to_bytes_unit(signing_settings.key.key_size)
                # signature = long_to_bytes(r, blocksize=int_len) + long_to_bytes(s, blocksize=int_len)
                # Next rows need to be removed after integration
                r, s = decode_RS_signature(sig)
                return utils.encode_dss_signature(
                    int(binascii.hexlify(r), 16), int(binascii.hexlify(s), 16)
                )
            else:
                # PyKCS11.CKK_RSA
                return bytes(sig)
        else:
            return None

    # Calculate digest with provided algorithm
    # return None if algorithm is not supported
    # with modified signxml signing procedure the same token can calculate a digest if algorithm is supported
    def get_digest(self, data, algorithm: DigestAlgorithm):
        ret = None
        if self._session is not None:
            if algorithm in self._operations["DIGEST"]:
                mech = self._operations["DIGEST"][algorithm]
                PK_me = PyKCS11.Mechanism(mech)
                ret = self._session.digest(data, PK_me)
        return ret

    def public_key(self):
        return self._pubkey

    def certificate(self):
        return self._certificate

    def certificate_with_ca_chain(self):
        return self._ca_chain

    # Closeing work on an open session
    def close(self):
        if self._session is not None:
            if self._login_required:
                self._session.logout()
            self._session.closeSession()
            self._session = None