goauthentik / authentik

The authentication glue you need.
https://goauthentik.io
Other
12.77k stars 850 forks source link

Migrate users and passwords from Keycloak #4471

Open Lumrenion opened 1 year ago

Lumrenion commented 1 year ago

Describe your question I am currently using Keycloak as an IDP for my organization, without an LDAP behind. I manage roles and users directly in the administration backend of Keycloak. Because Authentik can also act as an LDAP server I consider switching, as many software lacks features when using SAML or OIDC, such as automatically revoke Roles from users, which is often only done when the user renews their session, but cannot be initiated from the IDP when changing Role memberships. It is not much of a problem to create SAML and OIDC clients in Authentik manually, an automatic migration is not necessary in my case as I have just about five SPs.

My biggest concern is migrating users with their passwords (which of course are hashed in the database). It would be a bad user experience if I have to send them an Email like "please login and create a new password" or having them to create new passwords on first login. You always lose users on the way.

Is there a way to migrate users, including their passwords, and roles, from Keycloak to Authentik? Or is such a feature considered in the near future?

Version and Deployment (please complete the following information):

Additional context I could imagine it to be possible by giving Authentik access to Keycloaks database. Users without a password in Authentik would be authenticated against the Keycloak database and their password then be set in Authentiks database on successful login.

lyz-code commented 1 year ago

I'm also facing this situation, does anyone have a suggestion on how to proceed?

gwelch-contegix commented 1 year ago

From what I can see a custom django hasher would need to be created. At least current keycloak uses pbkdf2-sha256 by default for it's passwords which in the database looks like this

$ select secret_data,credential_data from credential where user_id='e01f0de2-d22b-4746-9ef4-50f0bce4759b';
                                                                           secret_data                                                                            |                                credential_data                                 
------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------
 {"value":"UZ5mRNwxRf/WVgiGouBGv1GGVeNnftovNyD71l2tVxZrM+Sg+vXV+jpwfLhoz1SYofvDGXCkudwsDAS9sYPspg==","salt":"vVXYFjY59oQNnzBHhIdsmQ==","additionalParameters":{}} | {"hashIterations":27500,"algorithm":"pbkdf2-sha256","additionalParameters":{}}

We can mostly reformat this into what django uses in the db with this code

import base64

secret_data = {"value":"UZ5mRNwxRf/WVgiGouBGv1GGVeNnftovNyD71l2tVxZrM+Sg+vXV+jpwfLhoz1SYofvDGXCkudwsDAS9sYPspg==","salt":"vVXYFjY59oQNnzBHhIdsmQ==","additionalParameters":{}}
credential_data = {"hashIterations":27500,"algorithm":"pbkdf2-sha256","additionalParameters":{}}
algorithm = credential_data['algorithm']
iterations = credential_data['hashIterations']
salt = secret_data['salt']
hash = base64.b64encode(base64.b64decode(secret_data['value'])[:32]).decode('utf-8')

db_hash = f'{algorithm}${iterations}${salt}${hash}'

The problem then comes to that the salt stored in Keycloak is base64 encoded bytes. The salt that django stores is just a string and no encoding is done on the string.

django has support for adding custom hashers here https://docs.djangoproject.com/en/4.1/topics/auth/passwords/

and here is a hasher that shows my password (in the above example) is 'hello'

import base64
import hashlib
import math

from django.utils.crypto import (
    RANDOM_STRING_CHARS,
    constant_time_compare,
    get_random_string,
    pbkdf2,
)
from django.contrib.auth.hashers import (
    must_update_salt,
    mask_hash,
    BasePasswordHasher,
)

from django.utils.translation import gettext_noop as _

class KeycloakPBKDF2PasswordHasher(BasePasswordHasher):
    """
    Secure password hashing using the PBKDF2 algorithm (recommended)
    Configured to use PBKDF2 + HMAC + SHA256.
    The result is a 64 byte binary string.  Iterations may be changed
    safely but you must rename the algorithm if you change SHA256.
    """

    algorithm = "pbkdf2-sha256"
    iterations = 27500
    digest = hashlib.sha256

    def encode(self, password, salt, iterations=None):
        self._check_encode_args(password, salt)
        iterations = iterations or self.iterations
        hash = pbkdf2(password, base64.b64decode(salt), iterations, digest=self.digest)
        hash = base64.b64encode(hash).decode("ascii").strip()
        return "%s$%d$%s$%s" % (self.algorithm, iterations, salt, hash)

    def decode(self, encoded):
        algorithm, iterations, salt, hash = encoded.split("$", 3)
        assert algorithm == self.algorithm
        return {
            "algorithm": algorithm,
            "hash": hash,
            "iterations": int(iterations),
            "salt": salt,
        }

    def verify(self, password, encoded):
        decoded = self.decode(encoded)
        encoded_2 = self.encode(password, decoded["salt"], decoded["iterations"])
        return constant_time_compare(encoded, encoded_2)

    def salt(self):
        """
        Generate a cryptographically secure nonce salt in ASCII with an entropy
        of at least `salt_entropy` bits.
        """
        # Each character in the salt provides
        # log_2(len(alphabet)) bits of entropy.
        char_count = math.ceil(self.salt_entropy / math.log2(len(RANDOM_STRING_CHARS)))
        return str(base64.b64encode(get_random_string(char_count, allowed_chars=RANDOM_STRING_CHARS)))

    def safe_summary(self, encoded):
        decoded = self.decode(encoded)
        return {
            _("algorithm"): decoded["algorithm"],
            _("iterations"): decoded["iterations"],
            _("salt"): mask_hash(decoded["salt"]),
            _("hash"): mask_hash(decoded["hash"]),
        }

    def must_update(self, encoded):
        decoded = self.decode(encoded)
        update_salt = must_update_salt, (decoded["salt"], self.salt_entropy)
        return (decoded["iterations"] != self.iterations) or update_salt

    def harden_runtime(self, password, encoded):
        decoded = self.decode(encoded)
        extra_iterations = self.iterations - decoded["iterations"]
        if extra_iterations > 0:
            self.encode(password, decoded["salt"], extra_iterations)

secret_data = {"value": "UZ5mRNwxRf/WVgiGouBGv1GGVeNnftovNyD71l2tVxZrM+Sg+vXV+jpwfLhoz1SYofvDGXCkudwsDAS9sYPspg==", "salt": "vVXYFjY59oQNnzBHhIdsmQ==", "additionalParameters": {}}
credential_data = {"hashIterations": 27500, "algorithm": "pbkdf2-sha256", "additionalParameters": {}}
algorithm = credential_data['algorithm']
iterations = credential_data['hashIterations']
salt = secret_data['salt']
hash = base64.b64encode(base64.b64decode(secret_data['value'])[:32]).decode('utf-8')

converted_db_hash = f'{algorithm}${iterations}${salt}${hash}'

hasher = KeycloakPBKDF2PasswordHasher()
db_hash = hasher.encode('hello', secret_data['salt'], credential_data['hashIterations'])
verified_password = hasher.verify('hello', converted_db_hash)

print("Converted hash taken from keycloak:              ", converted_db_hash)
print("Generated hash with KeycloakPBKDF2PasswordHasher:", db_hash)
print("Did it work?", verified_password)
ddelange commented 4 months ago

Here's a helper to fetch the passwords from keycloak postgres:

# pip install 'psycopg[binary]'
import json
from functools import cached_property

import psycopg
from psycopg.rows import dict_row

class KeycloakPasswordFetcher:
    @cached_property
    def pg_connection(self):
        """Direct postgres connection to the keycloak db."""
        return psycopg.connect(
            f"dbname={KEYCLOAK_DATABASE_NAME} user={KEYCLOAK_DATABASE_USER} password={KEYCLOAK_DATABASE_PASSWORD} host={KEYCLOAK_DATABASE_HOST} port={KEYCLOAK_DATABASE_PORT}",
            row_factory=dict_row,
        )

    def get_keycloak_password(self, user_id):
        """Fetch secret data from keycloak postgres (not available via REST API)."""
        with self.pg_connection.cursor() as cur:
            for row in cur.execute(
                "SELECT secret_data,credential_data,created_date FROM credential WHERE user_id=%(user_id)s ORDER BY created_date DESC;",
                {"user_id": user_id},
            ):
                # blindly use the most recent row
                secret_data = json.loads(row["secret_data"])
                credential_data = json.loads(row["credential_data"])
                return {
                    "algorithm": credential_data["algorithm"],
                    "iterations": credential_data["hashIterations"],
                    "salt": secret_data["salt"],
                    "hash": secret_data["value"],
                }
        msg = f"No credential found for user_id {user_id}"
        raise ValueError(msg)

fetcher = KeycloakPasswordFetcher()  # re-use db connection
user_id = "<UUID>"
password = "{algorithm}${iterations}${salt}${hash}".format(
    **fetcher.get_keycloak_password(user_id=user_id),
)