authlib / joserfc

Implementations of JOSE RFCs in Python
https://jose.authlib.org
BSD 3-Clause "New" or "Revised" License
84 stars 8 forks source link

JWE: Message with multiple recipients requires ALL keys (instead of just one) #16

Closed aio-witc closed 5 months ago

aio-witc commented 11 months ago

Setup: Message contains two recipients I try to decrypt the message with ONE correct key.

Error:
  File "venv/lib64/python3.12/site-packages/joserfc/jwe.py", line 254, in decrypt_json
    _attach_recipient_keys(general_obj.recipients, private_key, sender_key)
  File "venv/lib64/python3.12/site-packages/joserfc/jwe.py", line 269, in _attach_recipient_keys
    key = guess_key(private_key, recipient)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "venv/lib64/python3.12/site-packages/joserfc/jwk.py", line 71, in guess_key
    rv_key = _norm_key.get_by_kid(kid)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "venv/lib64/python3.12/site-packages/joserfc/_keys.py", line 140, in get_by_kid
    raise ValueError(f'No key for kid: "{kid}"')
ValueError: No key for kid: "FCmerznXQ1ZWdLUWhESQ0NP7r_4_RNGJSOtmNEzw-NQ"

RFC: https://datatracker.ietf.org/doc/html/rfc7516#page-17

When there are multiple recipients, it is an application decision which of the recipients' encrypted content must successfully validate for the JWE to be accepted. In some cases, encrypted content for all recipients must successfully validate or the JWE will be considered invalid. In other cases, only the encrypted content for a single recipient needs to be successfully validated. However, in all cases, the encrypted content for at least one recipient MUST successfully validate or the JWE MUST be considered invalid.

Sample Message:

{
  "protected": "eyJlbmMiOiJBMTI4R0NNIn0",
  "iv": "nMJiDqzDzwvGU-MP",
  "ciphertext": "ryF25Rj33Uo9FTFi",
  "tag": "7h58L5Umg6ulbJkpC5AK4A",
  "recipients": [
    {
      "header": {
        "alg": "ECDH-ES+A128KW",
        "kid": "xsM1h0SZiMZTQHYRdGINj-RWnqQ1tzehgmqN8PqF7NQ",
        "epk": {
          "crv": "P-256",
          "x": "WrJMwjpg1zXDLxkVm1KPsxHaEJPRaGwkGRCc8dyG2Kk",
          "y": "PEB1MonzgxB2nXJNyCJ3Gh3vJ68tjUNeVmE9v-yOupc",
          "kty": "EC"
        }
      },
      "encrypted_key": "fV4kYX4axHBqlKyiER4akpzLw9DB4gTs"
    },
    {
      "header": {
        "alg": "ECDH-ES+A128KW",
        "kid": "FCmerznXQ1ZWdLUWhESQ0NP7r_4_RNGJSOtmNEzw-NQ",
        "epk": {
          "crv": "P-256",
          "x": "uthnacJYi9cuMa40ecBNZeFTSaRtKhd-aGZDO0k4kGY",
          "y": "6lklVzYoBIDRLwmMXJPuZ6R3MhzX-aMlvDnAS0lByxg",
          "kty": "EC"
        }
      },
      "encrypted_key": "qwrB-IYpJ6QKUDEd9x7yJ8PkyUp4Z130"
    }
  ]
}

Code:

private.pem and private.pem.bak contain one of the two keys each.

import json

import joserfc.jwk
import joserfc.jwe

with open("private.pem", "r") as f:
    private_pem="".join(f.readlines())

print(private_pem)

key1=joserfc.jwk.ECKey.import_key(private_pem)
key1.ensure_kid()

with open("private.pem.bak", "r") as f:
    private_pem="".join(f.readlines())

key2=joserfc.jwk.ECKey.import_key(private_pem)
key2.ensure_kid()

with open("message.enc", "r") as f:
    message="".join(f.readlines())

msg=json.loads(message)

print(joserfc.jwk.KeySet([key1]).as_dict())

print("key1:")
plaintext=joserfc.jwe.decrypt_json(msg, joserfc.jwk.KeySet([key1]))
print(plaintext.plaintext)

print("key2:")
plaintext=joserfc.jwe.decrypt_json(msg, joserfc.jwk.KeySet([key2]))
print(plaintext.plaintext)
aio-witc commented 11 months ago

Ideas: add option to:

    joserfc.jwe.decrypt_json(...)

example:

   plaintext=joserfc.jwe.decrypt_json(msg, joserfc.jwk.KeySet([key1]), single_recipient = True)

Quickfix:

def _perform_decrypt(obj: EncryptionData, registry: JWERegistry) -> None:
    enc = registry.get_enc(obj.protected["enc"])

    iv = obj.bytes_segments["iv"]
    enc.check_iv(iv)

    tag = obj.bytes_segments["tag"]
    ciphertext = obj.bytes_segments["ciphertext"]

    cek_set = set()
    for recipient in obj.recipients:
        headers = recipient.headers()
        registry.check_header(headers, True)
        # Step 6, Determine the Key Management Mode employed by the algorithm
        # specified by the "alg" (algorithm) Header Parameter.
        alg = registry.get_alg(headers["alg"])
        **_try:_**
            cek = decrypt_recipient(alg, enc, recipient, tag)
            cek_set.add(cek)
        **_except:
            continue_**

def _attach_recipient_keys(
        recipients: t.List[Recipient[Key]],
        private_key: KeyFlexible,
        sender_key: t.Optional[t.Union[ECKey, OKPKey, KeySet]] = None) -> None:
    for recipient in recipients:
        **_try:_**
            key = guess_key(private_key, recipient)
            key.check_use("enc")
        _**except:
            continue**_