trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.18k stars 2.94k forks source link

Unable to connect Trino Kafka and read avro topic #21526

Closed CJsPod closed 3 months ago

CJsPod commented 5 months ago

I'm trying to read data in Avro format stored in Kafka using Kafka Connector. We are running Strimzi kafka and Trino on K8s.

Here is kafka.properties connector.name=kafka kafka.default-schema=default kafka.table-names=order-responses kafka.nodes=XXXXXXXX:443 kafka.config.resources=/etc/trino/catalog/kafka.properties kafka.security-protocol=SSL kafka.ssl.keystore.location=/secrets/user.p12 kafka.ssl.keystore.password=/secrets/user.password kafka.ssl.keystore.type=pkcs12 kafka.ssl.truststore.location=/secrets/cluster-ca.p12 kafka.ssl.truststore.password=/secrets/cluster-ca.password kafka.ssl.truststore.type=pkcs12 kafka.ssl.key.password=/secrets/cluster-ca.password kafka.table-description-supplier=CONFLUENT kafka.confluent-schema-registry-url=XXXXXXXX:443

Table description

    {
      "tableName": "order-responses",
      "schemaName": "default",
      "topicName": "paperplane.out.order-responses",
      "key": {
        "dataFormat": "json",
        "fields": [
          {
            "name": "_key",
            "dataFormat": "VARCHAR",
            "type": "VARCHAR",
            "hidden": "false"
          }
        ]
      },
      "message": {
        "dataFormat": "avro",
        "dataSchema": "/etc/trino/schemas/paperplane.avsc",
        "fields": [
          {
            "name": "id",
            "mapping": "id",
            "type": "VARCHAR"
          },
          {
            "name": "model",
            "mapping": "model",
            "type": "VARCHAR"
          },
          {
            "name": "status",
            "mapping": "status",
            "type": "VARCHAR"
          },
          {
            "name": "paperPlanes",
            "mapping": "paperPlanes",
            "type": "ARRAY"
          }
        ]
      }
    }

Avro Schema

    {
      "type": "record",
      "name": "Order",
      "namespace": "xxx.avro",
      "fields": [
        {
          "name": "id",
          "type": {
            "type": "string",
            "avro.java.string": "String"
          }
        },
        {
          "name": "model",
          "type": {
            "type": "string",
            "avro.java.string": "String"
          }
        },
        {
          "name": "status",
          "type": {
            "type": "string",
            "avro.java.string": "String"
          }
        },
        {
          "name": "paperPlanes",
          "type": {
            "type": "array",
            "items": {
              "type": "record",
              "name": "PaperPlane",
              "fields": [
                {
                  "name": "serialNumber",
                  "type": {
                    "type": "string",
                    "avro.java.string": "String"
                  }
                },
                {
                  "name": "model",
                  "type": {
                    "type": "string",
                    "avro.java.string": "String"
                  }
                }
              ]
            }
          }
        }
      ]
    }

Getting below error ERROR main io.trino.server.Server Configuration errors: 1) Error: Configuration property 'kafka.table-names' was not used 1 error io.airlift.bootstrap.ApplicationConfigurationException: Configuration errors: 1) Error: Configuration property 'kafka.table-names' was not used 1 error at io.airlift.bootstrap.Bootstrap.initialize(Bootstrap.java:232) at io.trino.plugin.kafka.KafkaConnectorFactory.create(KafkaConnectorFactory.java:71) at io.trino.connector.DefaultCatalogFactory.createConnector(DefaultCatalogFactory.java:224) at io.trino.connector.DefaultCatalogFactory.createCatalog(DefaultCatalogFactory.java:133) at io.trino.connector.LazyCatalogFactory.createCatalog(LazyCatalogFactory.java:45) at io.trino.connector.StaticCatalogManager.lambda$loadInitialCatalogs$1(StaticCatalogManager.java:157) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) at java.base/java.util.concurrent.ExecutorCompletionService.submit(ExecutorCompletionService.java:186) at io.trino.util.Executors.executeUntilFailure(Executors.java:41) at io.trino.connector.StaticCatalogManager.loadInitialCatalogs(StaticCatalogManager.java:151) at io.trino.server.Server.doStart(Server.java:144) at io.trino.server.Server.lambda$start$0(Server.java:91) at io.trino.$gen.Trino_432____20240412_045334_1.run(Unknown Source) at io.trino.server.Server.start(Server.java:91) at io.trino.server.TrinoServer.main(TrinoServer.java:38)

However If I skip below keys , than it reads table-names kafka.table-description-supplier=CONFLUENT kafka.confluent-schema-registry-url=XXXXXXXX:443

is there any wrong configurations or any additional configurations required ?

ebyhr commented 5 months ago

I don't think you can use kafka.table-names when kafka.table-description-supplier isn't name.