databricks / iceberg-kafka-connect

Apache License 2.0
219 stars 49 forks source link

Use Google Cloud Storage as a target #168

Closed florianMalbranque closed 11 months ago

florianMalbranque commented 11 months ago

Hi iceberg and Kafka connect folks 👋 , I am assessing how I can leverage this connector in our Google cloud infrastructure. As of now the connector works for s3 protocol, fortunately GCS is compatible with S3 protocol I have been testing credentials and variables using gsutil in interoperability mode and aws CLI. I am able to list and get my objects with both which means my credentials, variables and permissions are correct. However I am getting this error: Caused by: software.amazon.awssdk.services.s3.model.S3Exception: Access denied. (Service: S3, Status Code: 403, Request ID: null) (full stacktrace here). does anyone have a clue of how I can make it work ?

Here is the configuration I use (we use this terraform provider for our connectors)

resource "kafka-connect_connector" "kafka-to-iceberg-test" {

  name = "kafka-to-iceberg-test"
  config = {
    "name"                                 = "kafka-to-iceberg-test"
    "connector.class"                      = "io.tabular.iceberg.connect.IcebergSinkConnector"
    "tasks.max"                            = "2"
    "topics"                               = "###"
    "iceberg.tables"                       = "###"
    "iceberg.tables"                       = "default.events"
    "auto.create.topics.enable" = true
    "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
    "schemas.enable" =false
    "value.converter.schemas.enable"= "false"
    "iceberg.tables.upsert-mode-enabled"   = false
    "iceberg.tables.auto-create-enabled"   = true
    "iceberg.tables.evolve-schema-enabled" = true
    "iceberg.control.commit.interval-ms"   = 60000
    "iceberg.control.commit.timeout-ms"    = 60000
    "iceberg.catalog.catalog-impl"         = "org.apache.iceberg.nessie.NessieCatalog",
    "iceberg.catalog.uri"                  = "####",
    "iceberg.catalog.ref"                  = "main",
    "iceberg.catalog.io-impl"              = "org.apache.iceberg.aws.s3.S3FileIO"
    "iceberg.catalog.warehouse"            = "gs://random-bucket/"
    "iceberg.catalog.s3.endpoint"          = "https://storage.googleapis.com"
    "iceberg.catalog.s3.access-key-id"     = "###"
    "iceberg.catalog.s3.secret-access-key" = "###"
  }
}
bryanck commented 11 months ago

Have you tried using GCSFileIO instead of S3FileIO? This is what we use in our systems to write to GCS.

florianMalbranque commented 11 months ago

Hey @bryanck, org.apache.iceberg.gcp.gcs.GCSFileIO worked well, thanks a lot ! Interestingly, write to GCS fails because of permission errors. It is using my GKE service account rather than the service account I configured 🤔

{\n    \"code\": 403,\n    \"message\": \"###-gke-sa@###.iam.gserviceaccount.com does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist).\",
bryanck commented 11 months ago

You may need to set the environment variable GOOGLE_APPLICATION_CREDENTIALS to point to the location of your credentials file.

florianMalbranque commented 11 months ago

Exactly, so using "iceberg.catalog.io-impl" = "org.apache.iceberg.gcp.gcs.GCSFileIO" you don't need

    "iceberg.catalog.s3.endpoint"          = "https://storage.googleapis.com"
    "iceberg.catalog.s3.access-key-id"     = "###"
    "iceberg.catalog.s3.secret-access-key" = "###"

I'll reflect it into the documentation Thanks again

florianMalbranque commented 11 months ago

https://github.com/tabular-io/iceberg-kafka-connect/pull/169