airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.28k stars 4.15k forks source link

Source DynamoDB: connector Tries To Scan All Tables #25718

Closed murat-cetinkaya closed 6 months ago

murat-cetinkaya commented 1 year ago

Connector Name

DynamoDB Source

Connector Version

0.1.2

What step the error happened?

Configuring a new connector

Revelant information

I'm facing a problem with the Airbyte DynamoDB Source connector. I have an issue with Airbyte DynamoDB Source connector. I have 10 DynamoDB tables in my account. I created an AWS user with scan, read permissions to read from a single DynamoDB table using the below policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:BatchGetItem",
                "dynamodb:Query",
                "dynamodb:Scan",
                "dynamodb:DescribeTable"
            ],
            "Resource": ["arn:aws:dynamodb:us-west-1:aws-account-id:table/table-name"]
        },
        {
            "Effect": "Allow",
            "Action": ["dynamodb:ListTables"],
            "Resource": "*"
        }
    ]
}

Despite these settings, the connector asks SCAN permission for the other tables on the creation of a connection and I get the following error:

ERROR i.a.i.b.AirbyteExceptionHandler(uncaughtException):26 Something went wrong in the connector. See the logs for more details. software.amazon.awssdk.services.dynamodb.model.DynamoDbException: User: arn:aws:iam::account-id:user/airbyte_user is not authorized to perform: dynamodb:Scan on resource: arn:aws:dynamodb:us-west-1:account-id:table/another_table because no identity-based policy allows the dynamodb:Scan action (Service: DynamoDb, Status Code: 400, Request ID: A8RC5JQ2FUQRHR5M368RPUKR5RVV4KQNSO5AEMVJF66Q9AAJG)

Relevant log output

stacktrace=software.amazon.awssdk.services.dynamodb.model.DynamoDbException: User: arn:aws:iam::account-no:user/airbyte_user is not authorized to perform: dynamodb:Scan on resource: arn:aws:dynamodb:us-west-1:account_no:table/another_table because no identity-based policy allows the dynamodb:Scan action (Service: DynamoDb, Status Code: 400, Request ID: 7TR6BUN0P8RLOQF9IFVV4KQNSO5AEMVJF66Q9AS2JG)
    at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
    at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
    at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
    at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
    at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
    at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
    at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
    at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
    at software.amazon.awssdk.services.dynamodb.DefaultDynamoDbClient.scan(DefaultDynamoDbClient.java:4876)
    at software.amazon.awssdk.services.dynamodb.paginators.ScanIterable$ScanResponseFetcher.nextPage(ScanIterable.java:134)
    at software.amazon.awssdk.services.dynamodb.paginators.ScanIterable$ScanResponseFetcher.nextPage(ScanIterable.java:125)
    at software.amazon.awssdk.core.pagination.sync.PaginatedResponsesIterator.next(PaginatedResponsesIterator.java:58)
    at io.airbyte.integrations.source.dynamodb.DynamodbOperations.inferSchema(DynamodbOperations.java:82)
    at io.airbyte.integrations.source.dynamodb.DynamodbSource.lambda$discover$0(DynamodbSource.java:83)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575)
    at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
    at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622)
    at java.base/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627)
    at io.airbyte.integrations.source.dynamodb.DynamodbSource.discover(DynamodbSource.java:87)
    at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:129)
    at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:98)
    at io.airbyte.integrations.source.dynamodb.DynamodbSource.main(DynamodbSource.java:50)

Contribute

murat-cetinkaya commented 1 year ago

@itaseskii I hope the description of the issue is clear. Please let me know if you have further question.

evantahler commented 1 year ago

To clarify - should we not be doing a SACN in our code, or is SCAN the right operation to be using, and we need to update our docs to require this permission?

itaseskii commented 1 year ago

@evantahler scan is the right operation to perform since you need to read from all the partitions for a given table. What the connector needs to take into account is that it might not have a read access to some of the tables that are retrieved with https://docs.aws.amazon.com/cli/latest/reference/dynamodb/list-tables.html. The most straightforward way to handle this is to catch the exception and ignore the table on discover() and read(). We can also consider adding a configuration property for this and making it opt-in.