awsdocs / aws-doc-sdk-examples

Welcome to the AWS Code Examples Repository. This repo contains code examples used in the AWS documentation, AWS SDK Developer Guides, and more. For more information, see the Readme.md file below.
Apache License 2.0
9.23k stars 5.56k forks source link

[Enhancement]: Add type hints to Python Cognito example #6629

Open bionicles opened 2 weeks ago

bionicles commented 2 weeks ago

Background story

I want to make a Cognito app with Python.

However, there aren't any type hints for a lot of the examples, so it's pretty confusing about what's expected.

I started to do this (code attached) but need your help to clarify what the types are for these things:

assuming most of these are just str ?

AccessToken = Any
MFAChallenge = Any
    session, - what's this? a dict? 
    mfa_code,  - a string?
    MFAToken = Any - aka mfa secret
        device_key,
    device_group_key,
    device_password,
    access_token,
    aws_srp,
        confirmation_code,
        list of users , what's a "user" in this context ? a string, a dict? 

        DeliveryInformation = Any - in resend_confirmation, what's this we're getting back?
        MFAStatus = Any - what's this?

just hopeful someone might see this and already know what these types are. feel free to use the code, all i did was make the class into a dataclass and make the class methods free functions and add type hints where it seemed clear what things would be

What does this example accomplish?

This will help everyone trying to use Cognito from Python clarify how to do so.

Which AWS service(s)?

Cognito

Which AWS SDKs or tools?

Are there existing code examples to leverage?

https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/cognito#code-examples

Do you have any reference code?

from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional
import logging
import hashlib
import base64
import hmac

from botocore.exceptions import ClientError
from pycognito import aws_srp
import boto3

# create a logger
logger = logging.getLogger(__name__)

# Create a Cognito Identity Provider client
cognitoidp = boto3.client("cognito-idp")

# Initialize a paginator for the list_user_pools operation
paginator = cognitoidp.get_paginator("list_user_pools")

# Create a PageIterator from the paginator
page_iterator = paginator.paginate(MaxResults=10)

# Initialize variables for pagination
user_pools = []

# Handle pagination
for page in page_iterator:
    user_pools.extend(page.get("UserPools", []))

# Print the list of user pools
print("User Pools for the account:")
if user_pools:
    for pool in user_pools:
        print(f"Name: {pool['Name']}, ID: {pool['Id']}")
else:
    print("No user pools found.")

@dataclass(frozen=True, slots=True, kw_only=True)
class CognitoClient:
    """Encapsulates Amazon Cognito actions"""

    cognito_idp_client: Any  # TODO: clarify this type
    user_pool_id: str
    client_id: str
    client_secret: Optional[str] = None

    def hash_from_user_name(
        self,
        user_name: str,
    ) -> str:
        """
        Calculates a secret hash from a user name and a client secret.

        :param user_name: The user name to use when calculating the hash.
        :return: The secret hash.
        """
        key = self.client_secret.encode()
        msg = bytes(user_name + self.client_id, "utf-8")
        secret_hash = base64.b64encode(
            hmac.new(key, msg, digestmod=hashlib.sha256).digest()
        ).decode()
        logger.info("Made secret hash for %s: %s.", user_name, secret_hash)
        return secret_hash

@dataclass(frozen=True, slots=True, kw_only=True)
class Attribute:
    """
    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_AttributeType.html
    """

    Name: str
    Value: Optional[str]

@dataclass(frozen=True, slots=True, kw_only=True)
class MFAOption:
    """
    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_MFAOptionType.html
    """

    AttributeName: Optional[str]
    DeliveryMedium: Optional[str]

@dataclass(frozen=True, slots=True, kw_only=True)
class User:
    """
    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_UserType.html
    """

    Attributes: Optional[list[Attribute]]
    Enabled: Optional[bool]
    MFAOptions: Optional[list[MFAOption]]
    UserCreateDate: Optional[datetime]
    UserLastModifiedDate: Optional[datetime]
    Username: Optional[str]
    UserStatus: Optional[str]

    def __post_init__(self):
        if self.Attributes is not None:
            self.Attributes = [Attribute(**attr) for attr in self.Attributes]
        if self.MFAOptions is not None:
            self.MFAOptions = [
                MFAOption(**mfa_option) for mfa_option in self.MFAOptions
            ]
        if self.UserCreateDate is not None:
            self.UserCreateDate = datetime.fromtimestamp(self.UserCreateDate)
        if self.UserLastModifiedDate is not None:
            self.UserLastModifiedDate = datetime.fromtimestamp(
                self.UserLastModifiedDate
            )

def list_users(
    cognito_client: CognitoClient,
) -> list[User]:
    """
    Returns a list of the users in the current user pool.

    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_ListUsers.html

    :return: The list of users.
    """
    try:
        response = cognito_client.cognito_idp_client.list_users(
            UserPoolId=cognito_client.user_pool_id
        )
        users = response["Users"]
    except ClientError as err:
        logger.error(
            "Couldn't list users for %s. Here's why: %s: %s",
            cognito_client.user_pool_id,
            err.response["Error"]["Code"],
            err.response["Error"]["Message"],
        )
        raise
    else:
        # convert the dictionaries to User objects
        return [User(**user) for user in users]

def sign_up_user(
    *,
    cognito_client: CognitoClient,
    user_name: str,
    password: str,
    user_email: str,
) -> bool:
    """
    Signs up a new user with Amazon Cognito. This action prompts Amazon Cognito
    to send an email to the specified email address. The email contains a code that
    can be used to confirm the user.

    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_SignUp.html

    When the user already exists, the user status is checked to determine whether
    the user has been confirmed.

    :param user_name: The user name that identifies the new user.
    :param password: The password for the new user.
    :param user_email: The email address for the new user.
    :return: True when the user is already confirmed with Amazon Cognito.
                Otherwise, false.
    """
    try:
        kwargs = {
            "ClientId": cognito_client.client_id,
            "Username": user_name,
            "Password": password,
            "UserAttributes": [{"Name": "email", "Value": user_email}],
        }
        if cognito_client.client_secret is not None:
            kwargs["SecretHash"] = cognito_client.hash_from_user_name(user_name)
        response = cognito_client.cognito_idp_client.sign_up(**kwargs)
        confirmed = response["UserConfirmed"]
    except ClientError as err:
        if err.response["Error"]["Code"] == "UsernameExistsException":
            response = cognito_client.cognito_idp_client.admin_get_user(
                UserPoolId=cognito_client.user_pool_id, Username=user_name
            )
            logger.warning(
                "User %s exists and is %s.", user_name, response["UserStatus"]
            )
            confirmed = response["UserStatus"] == "CONFIRMED"
        else:
            logger.error(
                "Couldn't sign up %s. Here's why: %s: %s",
                user_name,
                err.response["Error"]["Code"],
                err.response["Error"]["Message"],
            )
            raise
    return confirmed

def start_sign_in(
    *,
    cognito_client: CognitoClient,
    user_name: str,
    password: str,
) -> str:
    """
    Starts the sign-in process for a user by using administrator credentials.
    This method of signing in is appropriate for code running on a secure server.

    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_AdminInitiateAuth.html

    If the user pool is configured to require MFA and this is the first sign-in
    for the user, Amazon Cognito returns a challenge response to set up an
    MFA application. When this occurs, this function gets an MFA secret from
    Amazon Cognito and returns it to the caller.

    :param user_name: The name of the user to sign in.
    :param password: The user's password.
    :return: The result of the sign-in attempt. When sign-in is successful, this
                returns an access token that can be used to get AWS credentials. Otherwise,
                Amazon Cognito returns a challenge to set up an MFA application,
                or a challenge to enter an MFA code from a registered MFA application.
    """
    try:
        kwargs = {
            "UserPoolId": cognito_client.user_pool_id,
            "ClientId": cognito_client.client_id,
            "AuthFlow": "ADMIN_USER_PASSWORD_AUTH",
            "AuthParameters": {"USERNAME": user_name, "PASSWORD": password},
        }
        if cognito_client.client_secret is not None:
            kwargs["AuthParameters"]["SECRET_HASH"] = (
                cognito_client.hash_from_user_name(user_name)
            )
        response = cognito_client.cognito_idp_client.admin_initiate_auth(**kwargs)
        challenge_name = response.get("ChallengeName", None)
        if challenge_name == "MFA_SETUP":
            if (
                "SOFTWARE_TOKEN_MFA"
                in response["ChallengeParameters"]["MFAS_CAN_SETUP"]
            ):
                response.update(
                    associate_software_token(
                        cognito_client=cognito_client,
                        session=response["Session"],
                    )
                )
            else:
                raise RuntimeError(
                    "The user pool requires MFA setup, but the user pool is not "
                    "configured for TOTP MFA. This example requires TOTP MFA."
                )
    except ClientError as err:
        logger.error(
            "Couldn't start sign in for %s. Here's why: %s: %s",
            user_name,
            err.response["Error"]["Code"],
            err.response["Error"]["Message"],
        )
        raise
    else:
        response.pop("ResponseMetadata", None)
        return response

def respond_to_mfa_challenge(
    *,
    cognito_client: CognitoClient,
    user_name: str,
    session: str,
    mfa_code: str,
) -> str:
    """
    Responds to a challenge for an MFA code. This completes the second step of
    a two-factor sign-in. When sign-in is successful, it returns an access token
    that can be used to get AWS credentials from Amazon Cognito.

    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_AdminRespondToAuthChallenge.html

    :param user_name: The name of the user who is signing in.
    :param session: Session information returned from a previous call to initiate
                    authentication.
    :param mfa_code: A code generated by the associated MFA application.
    :return: The result of the authentication. When successful, this contains an
                access token for the user.
    """
    try:
        kwargs = {
            "UserPoolId": cognito_client.user_pool_id,
            "ClientId": cognito_client.client_id,
            "ChallengeName": "SOFTWARE_TOKEN_MFA",
            "Session": session,
            "ChallengeResponses": {
                "USERNAME": user_name,
                "SOFTWARE_TOKEN_MFA_CODE": mfa_code,
            },
        }
        if cognito_client.client_secret is not None:
            kwargs["ChallengeResponses"]["SECRET_HASH"] = (
                cognito_client.hash_from_user_name(user_name)
            )
        response = cognito_client.cognito_idp_client.admin_respond_to_auth_challenge(
            **kwargs
        )
        auth_result = response["AuthenticationResult"]
    except ClientError as err:
        if err.response["Error"]["Code"] == "ExpiredCodeException":
            logger.warning(
                "Your MFA code has expired or has been used already. You might have "
                "to wait a few seconds until your app shows you a new code."
            )
        else:
            logger.error(
                "Couldn't respond to mfa challenge for %s. Here's why: %s: %s",
                user_name,
                err.response["Error"]["Code"],
                err.response["Error"]["Message"],
            )
            raise
    else:
        return auth_result

def associate_software_token(
    *,
    cognito_client: CognitoClient,
    session: str,
) -> dict[str, Any]:
    """
    Gets a token that can be used to associate an MFA application with the user.

    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_AssociateSoftwareToken.html

    :param session: Session information returned from a previous call to initiate
                    authentication.
    :return: An MFA token that can be used to set up an MFA application.
    """
    try:
        response = cognito_client.cognito_idp_client.associate_software_token(
            Session=session
        )
    except ClientError as err:
        logger.error(
            "Couldn't get MFA secret. Here's why: %s: %s",
            err.response["Error"]["Code"],
            err.response["Error"]["Message"],
        )
        raise
    else:
        response.pop("ResponseMetadata", None)
        return response

def confirm_mfa_device(
    *,
    cognito_client: CognitoClient,
    user_name: str,
    device_key: str,
    device_group_key: str,
    device_password: str,
    access_token: str,
) -> bool:
    """
    Confirms an MFA device to be tracked by Amazon Cognito. When a device is
    tracked, its key and password can be used to sign in without requiring a new
    MFA code from the MFA application.

    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_ConfirmDevice.html

    :param user_name: The user that is associated with the device.
    :param device_key: The key of the device, returned by Amazon Cognito.
    :param device_group_key: The group key of the device, returned by Amazon Cognito.
    :param device_password: The password that is associated with the device.
    :param access_token: The user's access token.
    :return: True when the user must confirm the device. Otherwise, False. When
                False, the device is automatically confirmed and tracked.
    """
    srp_helper = aws_srp.AWSSRP(
        username=user_name,
        password=device_password,
        pool_id="_",
        client_id=cognito_client.client_id,
        client_secret=None,
        client=cognito_client.cognito_idp_client,
    )
    device_and_pw = f"{device_group_key}{device_key}:{device_password}"
    device_and_pw_hash = aws_srp.hash_sha256(device_and_pw.encode("utf-8"))
    salt = aws_srp.pad_hex(aws_srp.get_random(16))
    x_value = aws_srp.hex_to_long(aws_srp.hex_hash(salt + device_and_pw_hash))
    verifier = aws_srp.pad_hex(pow(srp_helper.val_g, x_value, srp_helper.big_n))
    device_secret_verifier_config = {
        "PasswordVerifier": base64.standard_b64encode(
            bytearray.fromhex(verifier)
        ).decode("utf-8"),
        "Salt": base64.standard_b64encode(bytearray.fromhex(salt)).decode("utf-8"),
    }
    try:
        response = cognito_client.cognito_idp_client.confirm_device(
            AccessToken=access_token,
            DeviceKey=device_key,
            DeviceSecretVerifierConfig=device_secret_verifier_config,
        )
        user_confirm = response["UserConfirmationNecessary"]
    except ClientError as err:
        logger.error(
            "Couldn't confirm mfa device %s. Here's why: %s: %s",
            device_key,
            err.response["Error"]["Code"],
            err.response["Error"]["Message"],
        )
        raise
    else:
        return user_confirm

def confirm_user_sign_up(
    *,
    cognito_client: CognitoClient,
    user_name: str,
    confirmation_code: str,
) -> bool:
    """
    Confirms a previously created user. A user must be confirmed before they
    can sign in to Amazon Cognito.

    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_ConfirmSignUp.html

    :param user_name: The name of the user to confirm.
    :param confirmation_code: The confirmation code sent to the user's registered
                                email address.
    :return: True when the confirmation succeeds.
    """
    try:
        kwargs = {
            "ClientId": cognito_client.client_id,
            "Username": user_name,
            "ConfirmationCode": confirmation_code,
        }
        if cognito_client.client_secret is not None:
            kwargs["SecretHash"] = cognito_client.hash_from_user_name(user_name)
        cognito_client.cognito_idp_client.confirm_sign_up(**kwargs)
    except ClientError as err:
        logger.error(
            "Couldn't confirm sign up for %s. Here's why: %s: %s",
            user_name,
            err.response["Error"]["Code"],
            err.response["Error"]["Message"],
        )
        raise
    else:
        return True

def sign_in_with_tracked_device(
    *,
    cognito_client: CognitoClient,
    user_name: str,
    password: str,
    device_key: str,
    device_group_key: str,
    device_password: str,
) -> str:
    """
    Signs in to Amazon Cognito as a user who has a tracked device. Signing in
    with a tracked device lets a user sign in without entering a new MFA code.

    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_InitiateAuth.html
    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_RespondToAuthChallenge.html

    Signing in with a tracked device requires that the client respond to the SRP
    protocol. The scenario associated with this example uses the warrant package
    to help with SRP calculations.

    For more information on SRP, see https://en.wikipedia.org/wiki/Secure_Remote_Password_protocol.

    :param user_name: The user that is associated with the device.
    :param password: The user's password.
    :param device_key: The key of a tracked device.
    :param device_group_key: The group key of a tracked device.
    :param device_password: The password that is associated with the device.
    :return: The result of the authentication. When successful, this contains an
                access token for the user.
    """
    try:
        srp_helper = aws_srp.AWSSRP(
            username=user_name,
            password=device_password,
            pool_id="_",
            client_id=cognito_client.client_id,
            client_secret=None,
            client=cognito_client.cognito_idp_client,
        )

        response_init = cognito_client.cognito_idp_client.initiate_auth(
            ClientId=cognito_client.client_id,
            AuthFlow="USER_PASSWORD_AUTH",
            AuthParameters={
                "USERNAME": user_name,
                "PASSWORD": password,
                "DEVICE_KEY": device_key,
            },
        )
        if response_init["ChallengeName"] != "DEVICE_SRP_AUTH":
            raise RuntimeError(
                f"Expected DEVICE_SRP_AUTH challenge but got {response_init['ChallengeName']}."
            )

        auth_params = srp_helper.get_auth_params()
        auth_params["DEVICE_KEY"] = device_key
        response_auth = cognito_client.cognito_idp_client.respond_to_auth_challenge(
            ClientId=cognito_client.client_id,
            ChallengeName="DEVICE_SRP_AUTH",
            ChallengeResponses=auth_params,
        )
        if response_auth["ChallengeName"] != "DEVICE_PASSWORD_VERIFIER":
            raise RuntimeError(
                f"Expected DEVICE_PASSWORD_VERIFIER challenge but got "
                f"{response_init['ChallengeName']}."
            )

        challenge_params = response_auth["ChallengeParameters"]
        challenge_params["USER_ID_FOR_SRP"] = device_group_key + device_key
        cr = srp_helper.process_challenge(challenge_params, {"USERNAME": user_name})
        cr["USERNAME"] = user_name
        cr["DEVICE_KEY"] = device_key
        response_verifier = cognito_client.cognito_idp_client.respond_to_auth_challenge(
            ClientId=cognito_client.client_id,
            ChallengeName="DEVICE_PASSWORD_VERIFIER",
            ChallengeResponses=cr,
        )
        auth_tokens = response_verifier["AuthenticationResult"]
    except ClientError as err:
        logger.error(
            "Couldn't start client sign in for %s. Here's why: %s: %s",
            user_name,
            err.response["Error"]["Code"],
            err.response["Error"]["Message"],
        )
        raise
    else:
        return auth_tokens

def resend_confirmation(
    *,
    cognito_client: CognitoClient,
    user_name: str,
) -> str:
    """
    Prompts Amazon Cognito to resend an email with a new confirmation code.

    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_ResendConfirmationCode.html

    :param user_name: The name of the user who will receive the email.
    :return: Delivery information about where the email is sent.
    """
    try:
        kwargs = {"ClientId": cognito_client.client_id, "Username": user_name}
        if cognito_client.client_secret is not None:
            kwargs["SecretHash"] = cognito_client.hash_from_user_name(user_name)
        response = cognito_client.cognito_idp_client.resend_confirmation_code(**kwargs)
        delivery = response["CodeDeliveryDetails"]
    except ClientError as err:
        logger.error(
            "Couldn't resend confirmation to %s. Here's why: %s: %s",
            user_name,
            err.response["Error"]["Code"],
            err.response["Error"]["Message"],
        )
        raise
    else:
        return delivery

def verify_mfa(
    cognito_client: CognitoClient,
    session: str,
    user_code: str,
) -> str:
    """
    Verify a new MFA application that is associated with a user.

    https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_VerifySoftwareToken.html

    :param session: Session information returned from a previous call to initiate
                    authentication.
    :param user_code: A code generated by the associated MFA application.
    :return: Status that indicates whether the MFA application is verified.
    """
    try:
        response = cognito_client.cognito_idp_client.verify_software_token(
            Session=session, UserCode=user_code
        )
    except ClientError as err:
        logger.error(
            "Couldn't verify MFA. Here's why: %s: %s",
            err.response["Error"]["Code"],
            err.response["Error"]["Message"],
        )
        raise
    else:
        response.pop("ResponseMetadata", None)
        return response
bionicles commented 2 weeks ago

ah, this might have the necessary info, I will take a look at this and post back here if it works out

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html

bionicles commented 2 weeks ago

OK, that did work out! I also had inadvertently copied some of the pasta twice. I fixed that and added some dataclasses to clarify the types a bit better. Hopefully this helps someone. Also added links so when we hover over stuff we can jump right into the API reference.

if we clarify the cognitoidp = boto3.client("cognito-idp") then it could help make sure all the helper function signatures show up. Sorry if I screwed up the "chesterton's fence" rule here somehow