aws / aws-cdk

The AWS Cloud Development Kit is a framework for defining cloud infrastructure in code
https://aws.amazon.com/cdk
Apache License 2.0
11.4k stars 3.79k forks source link

cdk/kinesis: Data stream sharing #28814

Open Rinbo opened 6 months ago

Rinbo commented 6 months ago

Describe the feature

Implement ability to add a resource-policy for a Kinesis Data Stream. Currently, not even the L1 contruct can accomodate this so we have to add it manually in the console which is not good.

Use Case

Eg. cross account sharing

Proposed Solution

Add a function to the Stream construct (L2?), to allow it to grant read/write access to an AccountPrincipal (similar to how it is done for SQS). This should create a resource policy on the stream. Currently this can only be done through the console from what I can tell

Other Information

No response

Acknowledgements

CDK version used

2.122.0

Environment details (OS name and version, etc.)

Ubuntu 20.04.5 LTS

tim-finnigan commented 6 months ago

Thanks for the feature request. As you mentioned there is no L1 support, so that would need to be added: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/AWS_Kinesis.html, like this. Labeling this as blocked pending Cfn support.

thorben-akent commented 2 months ago

We also have a client that has a need to support cross account Kinesis -> Lambda triggers within one of their CDK projects. As mentioned already in this issue CloudFormation does not yet support resource-based policies on AWS::Kinesis::Stream resources, but there is always Lambda-backed custom resources and CDK already has the AwsApi class that we can use to invoke kinesis:PutResourcePolicy.

I'm currently investigating using this as a workaround for our client.

thorben-akent commented 2 months ago

We ended up implementing a lambda-backed custom resource for this. Note: you may also have to use a Lambda layer to provide a more recent version of the boto3 module depending on the AWS partition you are deploying to.

Lambda code:

import boto3
import json
import logging
import os
import uuid

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("lambda")
logger.setLevel(logging.INFO)

logger.info("Lambda cold start")

debug_enabled = os.getenv("DEBUG","FALSE").upper() in ("TRUE","T","1")

if debug_enabled:
    logger.setLevel(logging.DEBUG)
    logger.debug("Debug enabled")

logger.debug(f"Boto3 version: {boto3.__version__}")

access_type_action_map = {
    "READ": [
        "kinesis:DescribeStream",
        "kinesis:DescribeStreamSummary",
        "kinesis:ListShards",
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
    ],
    "WRITE": [
        "kinesis:DescribeStream",
        "kinesis:DescribeStreamSummary",
        "kinesis:ListShards",
        "kinesis:PutRecord",
        "kinesis:PutRecords",
    ],
    "READWRITE": [
        "kinesis:DescribeStream",
        "kinesis:DescribeStreamSummary",
        "kinesis:ListShards",
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:PutRecord",
        "kinesis:PutRecords",
    ],
}

def get_policy(arn):

    client = boto3.client("kinesis")

    params = {
        "ResourceARN": arn
    }
    try:
        response = client.get_resource_policy(**params)
    except:
        logger.exception("Encountered an exception during kinesis.get_resource_policy()", extra={"params":params})
        raise
    else:
        policy = json.loads(response["Policy"])

    if not "Version" in policy:
         policy["Version"] = "2012-10-17"
    if not "Id" in policy:
         policy["Id"] = uuid.uuid4()
    if not "Statement" in policy:
         policy["Statement"] = []

    return policy

def put_policy(arn, policy):

    client = boto3.client("kinesis")

    params = {
        "ResourceARN": arn,
        "Policy": json.dumps(policy, default=str),
    }
    try:
        response = client.put_resource_policy(**params)
    except:
        logger.exception("Encountered an exception during kinesis.put_resource_policy()", extra={"params":params})
        raise

def delete_policy(arn):

    client = boto3.client("kinesis")

    params = {
        "ResourceARN": arn,
    }
    try:
        response = client.delete_resource_policy(**params)
    except:
        logger.exception("Encountered an exception during kinesis.delete_resource_policy()", extra={"params":params})
        raise

def create_handler(event):

    required_properties = [
        "StreamArn",
        "AccountId",
        "AccessType",
    ]

    for property in required_properties:
        if not property in event["ResourceProperties"]:
            message = f'Missing required key {property} in ResourceProperties'
            logger.error(message)
            raise RuntimeError(message)

    sid = event["RequestId"].replace("-","")
    new_statement = {
        "Sid": sid,
        "Effect": "Allow",
        "Principal": {
            "AWS": event["ResourceProperties"]["AccountId"],
        },
        "Action": access_type_action_map[event["ResourceProperties"]["AccessType"]],
        "Resource": event["ResourceProperties"]["StreamArn"],
    }

    policy = get_policy(event["ResourceProperties"]["StreamArn"])
    logger.info(f'Current resource policy on {event["ResourceProperties"]["StreamArn"]}: {json.dumps(policy, default=str)}')

    logger.info(f'Adding/updating statement to policy: {json.dumps(new_statement, default=str)}')
    statements = []
    for statement in policy["Statement"]:
        if "Sid" in statement and statement["Sid"] == sid:
            continue
        else:
            statements.append(statement)
    statements.append(new_statement)

    policy["Statement"] = statements
    logger.info(f'Applying new resource policy on {event["ResourceProperties"]["StreamArn"]}: {json.dumps(policy, default=str)}')
    put_policy(event["ResourceProperties"]["StreamArn"], policy)

def update_handler(event):

    event["RequestId"] == event["PhysicalResourceId"]
    create_handler(event)

def delete_handler(event):

    required_properties = [
        "StreamArn",
    ]

    for property in required_properties:
        if not property in event["ResourceProperties"]:
            message = f'Missing required key {property} in ResourceProperties'
            logger.error(message)
            raise RuntimeError(message)

    sid = event["PhysicalResourceId"].replace("-","")

    policy = get_policy(event["ResourceProperties"]["StreamArn"])
    logger.info(f'Current resource policy on {event["ResourceProperties"]["StreamArn"]}: {json.dumps(policy, default=str)}')

    statements = []
    for statement in policy["Statement"]:
        if "Sid" in statement and statement["Sid"] == sid:
            continue
        else:
            statements.append(statement)

    policy["Statement"] = statements
    logger.info(f'Applying resource policy on {event["ResourceProperties"]["StreamArn"]}: {json.dumps(policy, default=str)}')
    if len(policy["Statement"]) == 0:
        delete_policy(event["ResourceProperties"]["StreamArn"])
    else:
        put_policy(event["ResourceProperties"]["StreamArn"], policy)

def on_event(event, context):

    logger.debug(f"Received event: {json.dumps(event, default=str)}")

    match event["RequestType"]:
        case "Create":
            return create_handler(event)
        case "Update":
            return update_handler(event)
        case "Delete":
            return delete_handler(event)
        case _:
            message = f'Value of {event["RequestType"]} is unsupported for RequestType'
            logger.error(message)
            raise RuntimeError(message)

CDK code:

stream = kinesis.Stream(self, "Stream",
    retention_period = cdk.Duration.hours(24),
    stream_mode = kinesis.StreamMode.ON_DEMAND,
)

kinesis_resource_policy_provider_function = lambda_.Function(self, "KinesisResourcePolicyProviderFunction",
    code = lambda_.Code.from_asset(
        path = str((definitions.CUSTOM_RESOURCE_PROVIDERS_DIR / "kinesis_resource_policy_provider").resolve()),
        deploy_time = True,
    ),
    environment = {
        "DEBUG": "TRUE",
    },
    handler = "kinesis_resource_policy_provider_handler.on_event",
    runtime = lambda_.Runtime.PYTHON_3_11,
    timeout = cdk.Duration.minutes(5),
)
kinesis_resource_policy_provider_function.add_to_role_policy(iam.PolicyStatement(
    sid = "ModifyKinesisResourcePolicy",
    actions = [
        "kinesis:DeleteResourcePolicy",
        "kinesis:GetResourcePolicy",
        "kinesis:PutResourcePolicy",
    ],
    resources = [ "*" ],
))
kinesis_resource_policy_provider = custom_resources.Provider(self, "KinesisResourcePolicyProvider",
    on_event_handler = kinesis_resource_policy_provider_function,
)

kinesis_resource_policy = cdk.CustomResource(self, "KinesisResourcePolicy",
    properties = {
        "StreamArn": stream.stream_arn,
        "AccountId": "11111111111",
        "AccessType": "READ",
    },
    resource_type = "Custom::KinesisResourcePolicy",
    service_token = kinesis_resource_policy_provider.service_token,
)