aws / aws-cdk

The AWS Cloud Development Kit is a framework for defining cloud infrastructure in code
https://aws.amazon.com/cdk
Apache License 2.0
11.54k stars 3.86k forks source link

[aws-kinesis] Read permissions to stream doesn't include kinesis:DescribeStream #10783

Closed Alexander-D-Jensen closed 3 years ago

Alexander-D-Jensen commented 3 years ago

When granting an IAM role read permissions on a Kinesis stream, the resulting policy does not include the "kinesis:DescribeStream" permission, which is needed for at least Kinesis Firehose to read from it, perhaps other consumers as well.

Reproduction Steps

from aws_cdk import (
    core,
    aws_kinesis as kds,
    aws_iam as iam,
)
class DFPStack(core.NestedStack):
    def __init__(
        self, scope: core.Construct, id: str, *, stream=kds.Stream, **kwargs
    ) -> None:

        self.kfh_reader = iam.Role(
            self,
            "KFHReader",
            assumed_by=iam.ServicePrincipal(service="firehose.amazonaws.com"),
        )
        self.stream = stream
        self.stream.grant_read(self.kfh_reader)

Results in the following output:

"KFHReaderDefaultPolicy98F05724": {
      "Type": "AWS::IAM::Policy",
      "Properties": {
        "PolicyDocument": {
          "Statement": [
            {
              "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards",
                "kinesis:SubscribeToShard"
              ],
              "Effect": "Allow",
              "Resource": {
                "Ref": "referencetoLongboatkinesisNestedStackkinesisNestedStackResourceFA86427BOutputsLongboatkinesisstreamDE86A4D8Arn"
              }
            }
          ],
          "Version": "2012-10-17"
        },
        "PolicyName": "KFHReaderDefaultPolicy98F05724",
        "Roles": [
          {
            "Ref": "KFHReader8A09BD1B"
          }
        ]
      },
      "Metadata": {
        "aws:cdk:path": "Longboat/DFPStack/KFHReader/DefaultPolicy/Resource"
      }
    },

What did you expect to happen?

I expected the resulting IAM role and policy to actually be able to read from the supplied Kinesis stream when used by Kinesis Firehose. This would require the "kinesis:DescribeStream" permission to be given.

What actually happened?

Stack creation fails, because the "kinesis:DescribeStream" permission is lacking.

Cloudformation gives the following error: Role arn:aws:iam::<account>:role/<role_name> is not authorized to perform: kinesis:DescribeStream on resource arn:aws:kinesis:eu-west-1:<account>:stream/<stream_name>. (Service: Firehose, Status Code: 400, Request ID: <request_id>, Extended Request ID: <extended_request_id>

Environment

Other


This is :bug: Bug Report

iliapolo commented 3 years ago

@shivlaks Do you know why was the DescribeStream action explicitly removed in this commit?

@Alexander-D-Jensen - As a workaround, you can use Custom Permissions to add this action.

Alexander-D-Jensen commented 3 years ago

@iliapolo

I have done so afterwards, via the following: self.stream.grant(self.kfh_reader, "kinesis:DescribeStream")

However, I run into issues with Cloudformation, which tries to create the Firehose before the IAM policy is ready, and then it fails.

In other words, despite adding the permissions explicitly, I am no closer to a successful deployment of the stack at that point.

Finally, I've arrived at the following workaround:

grant_describe = self.stream.grant(self.kfh_reader, "kinesis:DescribeStream")
<create firehose here>
grant_describe.apply_before(self.firehose)

I am not sure if this is within the scope of CDK to handle or not, though I suspect the better solution would be for CloudFormation to handle it (correctly).

iliapolo commented 3 years ago

@Alexander-D-Jensen Thanks, we'll take a look.

Do you mind sharing the entire code you use? Including where you define the firehose and the stream itself?

Alexander-D-Jensen commented 3 years ago

Sure @iliapolo! I've redacted some info (account numbers, company name etc.) and a few tags, as well as services, modules etc. which aren't of any consequence. Below should be as close to a complete setup needed to replicate the error as I can easily get.

This is stack A, the stack which had the issue:

from aws_cdk import (
    core,
    aws_kinesis as kds,
    aws_sqs as sqs,
    aws_iam as iam,
    aws_s3 as s3,
    aws_kinesisfirehose as kfh,
)

import os
import pathlib

from .settings import ACCOUNT_ALIAS

commit_stream = os.popen(
    "git log -n 1 --pretty=format:%H -- {0}".format(pathlib.Path(__file__))
)
commit_hash = commit_stream.read()

tags = {
    "Last_Change": commit_hash,
    "Feature": "DFP Handover",
}

dfp_buckets = {
    "Dev": "arn:aws:s3:::<bucket_name>",
    "Test": "arn:aws:s3:::<bucket_name>",
    "Prod": "arn:aws:s3:::<bucket_name>",
}

class DFPStack(core.NestedStack):
    def __init__(
        self, scope: core.Construct, id: str, *, stream=kds.Stream, **kwargs
    ) -> None:
        super().__init__(scope, id, **kwargs)
        for k, v in tags.items():
            core.Tags.of(self).add(k, v)

        self.dfp_bucket = s3.Bucket.from_bucket_arn(
            self, "DFPBucket", bucket_arn=dfp_buckets[ACCOUNT_ALIAS]
        )

        self.kfh_reader = iam.Role(
            self,
            "KFHReader",
            assumed_by=iam.ServicePrincipal(service="firehose.amazonaws.com"),
        )
        self.kfh_writer = iam.Role(
            self,
            "KFHWriter",
            assumed_by=iam.ServicePrincipal(service="firehose.amazonaws.com"),
        )

        self.stream = stream

        self.stream.grant_read(self.kfh_reader)
        grant_describe = self.stream.grant(self.kfh_reader, "kinesis:DescribeStream")

        self.kfh_writer.add_to_policy(
            iam.PolicyStatement(
                actions=["s3:PutObjectAcl"], resources=[self.dfp_bucket.bucket_arn]
            )
        )
        self.dfp_bucket.grant_read_write(self.kfh_writer)

        self.firehose = kfh.CfnDeliveryStream(
            self,
            "DFPDelivery",
            delivery_stream_type="KinesisStreamAsSource",
            extended_s3_destination_configuration=kfh.CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty(
                bucket_arn=self.dfp_bucket.bucket_arn,
                role_arn=self.kfh_writer.role_arn,
                buffering_hints=kfh.CfnDeliveryStream.BufferingHintsProperty(
                    interval_in_seconds=300, size_in_m_bs=128
                ),
                compression_format="GZIP",
                prefix="<company_name>/v1/",
            ),
            kinesis_stream_source_configuration=kfh.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
                kinesis_stream_arn=self.stream.stream_arn,
                role_arn=self.kfh_reader.role_arn,
            ),
        )
        self.firehose.CloudWatchLoggingOptionsProperty(enabled=True)

        grant_describe.apply_before(self.firehose)

        core.CfnOutput(scope=self, id="WriterRoleArn", value=self.kfh_writer.role_arn)

This is stack B, which creates the Kinesis stream

from aws_cdk import core, aws_kinesis as kds, aws_sqs as sqs, aws_iam as iam

import os
import pathlib

from .settings import ACCOUNT_ALIAS

commit_stream = os.popen(
    "git log -n 1 --pretty=format:%H -- {0}".format(pathlib.Path(__file__))
)
commit_hash = commit_stream.read()

tags = {
    "Last_Change": commit_hash,
    "Feature": "Kinesis Stream",
}

class KinesisStack(core.NestedStack):
    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)
        for k, v in tags.items():
            core.Tags.of(self).add(k, v)

        self.stream = kds.Stream(
            self,
            "stream",
            retention_period=core.Duration.hours(72),
            shard_count=6 if ACCOUNT_ALIAS == "Prod" else 1,
        )

        self.dead_letter_queue = sqs.Queue(
            self,
            "DeadLetterQueue",
        )

And finally, we have the stack which coordinates the whole thing:

from aws_cdk import core, aws_iam

from .kinesis_stack import KinesisStack
from .dfp_stack import DFPStack

tags = {"Service": "Something"}

class LongboatStack(core.Stack):
    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)
        for k, v in tags.items():
            core.Tags.of(self).add(k, v)

        stream = KinesisStack(self, "kinesis")
        self.dfp_stack = DFPStack(self, "DFPStack", stream=self.stream.stream)

I hope this helps :)

iliapolo commented 3 years ago

Hi @Alexander-D-Jensen - thanks for sharing the code. It was very helpful.

So there are actually two separate issues here:

First is that stream.grantRead does not grant kinesis:DescribeStream operation. This was actually intentional because the DescribeStream API is deprecated in favor of DescribeStreamSummary. Seems like CfnDeliveryStream still uses this deprecated API however.

We will consider adding this action back to the policy.

Second is that the CfnDeliveryStream is created before the necessary policies are applied to the kinesis reader role. What happens here is that the reader policy is a separate resource (AWS::IAM::Policy) that is created after the role is created, and since the CfnDeliveryStream depends only on the role, not the policy, this race condition occurs.

This type of thing is exactly what an L2 for a CfnDeliveryStream can solve, but until we have that, the way to tackle this problem is exactly what you did.

I've created an issue for the L2 construct to make sure we consider this scenario when we start working on it.

github-actions[bot] commented 3 years ago

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.