tabular-io / iceberg-kafka-connect

Apache License 2.0
171 stars 31 forks source link

Slow querying of Iceberg Athena tables created by the connector #198

Open fienlag opened 4 months ago

fienlag commented 4 months ago

Hi everyone!

I've created Iceberg Kafka connector, that consumes avro data and loads it to AWS S3 with auto creation of Athena table. The problem is that querying Athena takes an abnormal amount of time.

In my case:

  1. table has ~40 million records
  2. in S3 there is ~700 files in data/ prefix
  3. total size of files in data/ prefix 2.4GB (actually there are less data files with my Kafka data, because some files contains only sets of identifiers and some contains paths)
  4. metadata/ prefix contain ~170 files (~100 files of metadata.json and other are .avro files)

For example, I tried to run query select count(distinct id) from db.iceberg_table and it takes 18-20 minutes to execute. As I see, my connector perform loading data hourly, so every time it creates snapshot. Therefore, I manually set following property for my table 'vacuum_max_snapshot_age_seconds' = '7200' (2 hours) and perform VACUUM db.iceberg_table and OPTIMIZE db.iceberg_table REWRITE DATA USING BIN_PACK. This queries decreased snapshot count to 2 and data files count/size to the numbers I previously mentioned. After this optimization query execution time did not decrease.

Here is connector config:

{
  "name": "sink-s3-iceberg-connector",
  "config": {
    "avro.codec": "deflate",
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "flush.size": "100000",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.catalog.warehouse": "s3://bucket/prefix",
    "iceberg.control.commit.interval-ms": "3600000",
    "iceberg.control.topic": "control-iceberg",
    "iceberg.tables": "db.iceberg_table",
    "iceberg.tables.auto-create-enabled": "true",
    "iceberg.tables.default-id-columns": "id1,id2,id3",
    "iceberg.tables.evolve-schema-enabled": "true",
    "iceberg.tables.upsert-mode-enabled": "true",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": "1",
    "topics": "my-kafka-topic",
    "transforms": "injectTS,injectOffset,injectPartition",
    "transforms.injectOffset.offset.field": "offset",
    "transforms.injectOffset.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.injectPartition.partition.field": "partition",
    "transforms.injectPartition.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.injectTS.timestamp.field": "ts",
    "transforms.injectTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.enhanced.avro.schema.support": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Here is table DDL:

CREATE TABLE db.iceberg_table (
  id1 string,
  id2 string,
  id3 string,
  field_a string,
  field_b string,
  field_c string,
  ts timestamp,
  offset bigint,
  partition int,
  field_d timestamp)
LOCATION 's3://bucket/prefix'
TBLPROPERTIES (
  'table_type'='iceberg',
  'vacuum_max_snapshot_age_seconds'='7200',
  'write_compression'='zstd'
);

I hope you can help me understand why this is happening and how I can fix this issue with long query execution. Thank you :)

svdimchenko commented 4 months ago

@bryanck seems the issue is caused because no rewrite_manifests operation is run after commit https://iceberg.apache.org/docs/latest/spark-procedures/#rewrite_manifests. Could you suggest if adding such step can resolve the issue ?

Once other table is create from this table like

create new_table as select * from iceberg_table

the new table can be queried with normal performance.

soumitra-bm-ai commented 1 month ago

@fienlag ,checkout iceberg table maintenance , rewrite data files and rewrite manifests ["small files problem"] , can expire unnecessary snapshots aswell