tabular-io / iceberg-kafka-connect

Apache License 2.0
177 stars 32 forks source link

Use Google Cloud Storage as a target #168

Closed florianMalbranque closed 7 months ago

florianMalbranque commented 7 months ago

Hi iceberg and Kafka connect folks 👋 , I am assessing how I can leverage this connector in our Google cloud infrastructure. As of now the connector works for s3 protocol, fortunately GCS is compatible with S3 protocol I have been testing credentials and variables using gsutil in interoperability mode and aws CLI. I am able to list and get my objects with both which means my credentials, variables and permissions are correct. However I am getting this error: Caused by: software.amazon.awssdk.services.s3.model.S3Exception: Access denied. (Service: S3, Status Code: 403, Request ID: null) (full stacktrace here). does anyone have a clue of how I can make it work ?

Here is the configuration I use (we use this terraform provider for our connectors)

resource "kafka-connect_connector" "kafka-to-iceberg-test" {

  name = "kafka-to-iceberg-test"
  config = {
    "name"                                 = "kafka-to-iceberg-test"
    "connector.class"                      = "io.tabular.iceberg.connect.IcebergSinkConnector"
    "tasks.max"                            = "2"
    "topics"                               = "###"
    "iceberg.tables"                       = "###"
    "iceberg.tables"                       = "default.events"
    "auto.create.topics.enable" = true
    "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
    "schemas.enable" =false
    "value.converter.schemas.enable"= "false"
    "iceberg.tables.upsert-mode-enabled"   = false
    "iceberg.tables.auto-create-enabled"   = true
    "iceberg.tables.evolve-schema-enabled" = true
    "iceberg.control.commit.interval-ms"   = 60000
    "iceberg.control.commit.timeout-ms"    = 60000
    "iceberg.catalog.catalog-impl"         = "org.apache.iceberg.nessie.NessieCatalog",
    "iceberg.catalog.uri"                  = "####",
    "iceberg.catalog.ref"                  = "main",
    "iceberg.catalog.io-impl"              = "org.apache.iceberg.aws.s3.S3FileIO"
    "iceberg.catalog.warehouse"            = "gs://random-bucket/"
    "iceberg.catalog.s3.endpoint"          = "https://storage.googleapis.com"
    "iceberg.catalog.s3.access-key-id"     = "###"
    "iceberg.catalog.s3.secret-access-key" = "###"
  }
}
bryanck commented 7 months ago

Have you tried using GCSFileIO instead of S3FileIO? This is what we use in our systems to write to GCS.

florianMalbranque commented 7 months ago

Hey @bryanck, org.apache.iceberg.gcp.gcs.GCSFileIO worked well, thanks a lot ! Interestingly, write to GCS fails because of permission errors. It is using my GKE service account rather than the service account I configured 🤔

{\n    \"code\": 403,\n    \"message\": \"###-gke-sa@###.iam.gserviceaccount.com does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist).\",
bryanck commented 7 months ago

You may need to set the environment variable GOOGLE_APPLICATION_CREDENTIALS to point to the location of your credentials file.

florianMalbranque commented 7 months ago

Exactly, so using "iceberg.catalog.io-impl" = "org.apache.iceberg.gcp.gcs.GCSFileIO" you don't need

    "iceberg.catalog.s3.endpoint"          = "https://storage.googleapis.com"
    "iceberg.catalog.s3.access-key-id"     = "###"
    "iceberg.catalog.s3.secret-access-key" = "###"

I'll reflect it into the documentation Thanks again

florianMalbranque commented 7 months ago

https://github.com/tabular-io/iceberg-kafka-connect/pull/169