citusdata / django-multitenant

Python/Django support for distributed multi-tenant databases like Postgres+Citus
MIT License
710 stars 116 forks source link

where can i find some DRF examples #126

Open aoulaa opened 2 years ago

svb0866 commented 1 month ago

import boto3 from django.db import transaction from django_multitenant.utils import set_current_tenant, unset_current_tenant from rest_framework_simplejwt.authentication import JWTAuthentication from rest_framework_simplejwt.exceptions import InvalidToken, TokenError, AuthenticationFailed from rest_framework_simplejwt.tokens import Token from rest_framework.request import Request

from accounts.models import CustomUser from practice.models import Practice

cognito_client = boto3.client('cognito-idp')

class CognitoJWTAuthenticationBackend(JWTAuthentication): raw_token = None

def authenticate(self, request: Request) -> CustomUser:
    header = self.get_header(request)
    if header is None:
        return None
    self.raw_token = self.get_raw_token(header)
    if self.raw_token is None:
        return None
    validated_token = self.get_validated_token(self.raw_token)
    return self.get_user(validated_token), validated_token

@transaction.atomic
def get_user(self, validated_token: Token) -> CustomUser:
    """
    Retrieves an existing user or creates a new one from a validated JWT token.

    :param validated_token: The validated JWT token.
    :return: An instance of the User model.
    """
    try:
        # Assuming 'sub' is used as the unique identifier for users in your user model
        unset_current_tenant()
        user_id = validated_token.get('sub')  # 'sub' is typically used as the user identifier in JWTs
        if not user_id:
            raise AuthenticationFailed('Missing user ID in token')
        # get user from cognito
        response = cognito_client.get_user(AccessToken=self.raw_token.decode('utf-8'))
        user_attributes = {attr['Name']: attr['Value'] for attr in response['UserAttributes']}

        if not CustomUser.objects.filter(username=user_id).exists():
            practice = Practice.objects.create(name="{}'s Practice".format(user_attributes.get('given_name', '')))
            set_current_tenant(practice)
            user = CustomUser(
                username=user_id,
                email=user_attributes['email'],
                email_verified=user_attributes.get('email_verified', 'false').lower() == 'true',
                first_name=user_attributes.get('given_name', ''),
                last_name=user_attributes.get('family_name', ''),
                phone_number=user_attributes.get('phone_number', ''),
                phone_number_verified=user_attributes.get('phone_number_verified', 'false').lower() == 'true',
            )
            user.save()
        else:
            user = CustomUser.objects.get(username=user_id)
            set_current_tenant(user.practice)
        return user
    except CustomUser.DoesNotExist:
        raise InvalidToken('User does not exist')
    except Exception as e:
        raise TokenError(f'Unexpected error retrieving user: {e}')