Cosmian / kms

A feature-rich, scalable, Key Management System
https://docs.cosmian.com/cosmian_key_management_system/
Other
79 stars 5 forks source link

Cosmian KMS RSA keys cannot use for Generate JWT token #286

Closed PraneethJay closed 1 week ago

PraneethJay commented 1 month ago

Cannot convert retrieved keys to PEM format and not working for encryption also. The issue is in retrieved keys. Is there have any wrap ? Do you have any suggestions ?

import datetime
import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cloudproof_py.cover_crypt import Policy, PolicyAxis
from cloudproof_py.kms import KmsClient, MasterSecretKey

def convert_to_pem_der(key_bytes):
    """Convert DER-encoded key bytes to PEM format"""
    try:
        # Load key from DER format
        private_key = serialization.load_der_private_key(
            key_bytes,
            password=None,
        )

        # Convert to PEM format
        pem_key = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption()
        )

        return pem_key.decode('utf-8')
    except Exception as e:
        print("Failed to convert DER key to PEM format:", str(e))
        return None

class KmsManager:
    def __init__(self, server_url, api_key):
        self.kms_client = KmsClient(server_url=server_url, api_key=api_key)

    async def generate_and_store_keys(self, policy):
        """Generate RSA key pair for RS256 with a specific policy and store in Cosmian KMS"""
        response = await self.kms_client.create_cover_crypt_master_key_pair(policy)
        if isinstance(response, tuple) and len(response) == 2:
            public_key_uid, private_key_uid = response
            print("Public Key UID:", public_key_uid)
            print("Private Key UID:", private_key_uid)
            print("Keys stored with policy")
            return private_key_uid
        else:
            print("Unexpected response format:", response)
            return None

    async def retrieve_private_key(self, private_key_uid):
        """Retrieve private key from Cosmian KMS using the private key UID"""
        try:
            private_key_obj = await self.kms_client.retrieve_cover_crypt_private_master_key(private_key_uid)
            return private_key_obj
        except Exception as e:
            print("Failed to retrieve private key:", str(e))
            return None

    def convert_to_pem(self, key_obj):
        """Convert MasterSecretKey object to PEM format"""
        try:
            if isinstance(key_obj, MasterSecretKey):
                # Convert MasterSecretKey to bytes
                key_bytes = key_obj.to_bytes()

                # Try to convert DER-encoded bytes to PEM
                pem_key = convert_to_pem_der(key_bytes)
                return pem_key
            else:
                raise ValueError("Unexpected key object type")
        except Exception as e:
            print("Failed to convert key to PEM format:", str(e))
            return None

async def main():
    kms_manager = KmsManager(server_url="", api_key="your_api_key_here")

    policy = Policy()
    policy.add_axis(
        PolicyAxis(
            "Security Level",
            [
                ("Protected", False),
                ("Confidential", False),
                ("Top Secret", True),
            ],
            hierarchical=True,
        )
    )
    policy.add_axis(
        PolicyAxis(
            "Department",
            [("FIN", False), ("MKG", False), ("HR", False)],
            hierarchical=False,
        )
    )

    private_key_uid = await kms_manager.generate_and_store_keys(policy=policy)
    if not private_key_uid:
        print("Failed to generate and store keys.")
        return

    private_key_obj = await kms_manager.retrieve_private_key(private_key_uid)
    if not private_key_obj:
        print("Failed to retrieve private key.")
        return

    pem_private_key = kms_manager.convert_to_pem(private_key_obj)
    if not pem_private_key:
        print("Failed to convert private key to PEM format.")
        return

    payload = {
        "sub": "user_id",
        "name": "John Doe",
        "iat": datetime.datetime.utcnow()
    }
    try:
        token = jwt.encode(payload, pem_private_key, algorithm="RS256")
        print("Generated JWT Token:", token)
    except Exception as e:
        print("Failed to generate JWT token:", str(e))

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())
    loop.close()

Execute Result: Public Key UID: 7621c826-988a-495a-8bc5-2fcea67ed8ca Private Key UID: a3e0be46-c481-42c1-83bb-205d3409caa0 Keys stored with policy Failed to convert DER key to PEM format: ('Could not deserialize key data. The data may be in an incorrect format, it may be encrypted with an unsupported algorithm, or it may be an unsupported key type (e.g. EC curves with explicit parameters).', [<OpenSSLError(code=109052027, lib=13, reason=123, reason_text=header too long)>]) Failed to convert private key to PEM format.

Manuthor commented 3 weeks ago

Hello Praneeth,

We did not have implemented yet the pyo3 bindings to generate RSA keys from cloudproof_py. Currently, only Covercrypt keys are supported - those are specific keys allowing fine-grained partitioning during encryption.

About your python code, your function generate_and_store_keys generate Covercrypt keys (not RSA keys) that are exported as raw bytes (not DER) through the method to_bytes() and cannot be converted to PEM.

If you're interested by RSA or EC keys generation in python, we could eventually plan to implement them.

Manuthor commented 1 week ago

I hope this assistance has been helpful to you. We're closing the issue as there are no plans to support RSA or EC keys in cloudproof_python. Feel free to contribute if necessary.